mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-25 10:06:28 +00:00
Compare commits
2 Commits
levkk-dist
...
levkk-sear
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5872354c3e | ||
|
|
48bb6ebeef |
@@ -15,34 +15,14 @@ jobs:
|
|||||||
RUSTFLAGS: "-C instrument-coverage"
|
RUSTFLAGS: "-C instrument-coverage"
|
||||||
LLVM_PROFILE_FILE: "pgcat-%m.profraw"
|
LLVM_PROFILE_FILE: "pgcat-%m.profraw"
|
||||||
- image: postgres:14
|
- image: postgres:14
|
||||||
command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements"]
|
# auth:
|
||||||
|
# username: mydockerhub-user
|
||||||
|
# password: $DOCKERHUB_PASSWORD
|
||||||
environment:
|
environment:
|
||||||
POSTGRES_USER: postgres
|
POSTGRES_USER: postgres
|
||||||
POSTGRES_DB: postgres
|
POSTGRES_DB: postgres
|
||||||
POSTGRES_PASSWORD: postgres
|
POSTGRES_PASSWORD: postgres
|
||||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||||
- image: postgres:14
|
|
||||||
command: ["postgres", "-p", "7432", "-c", "shared_preload_libraries=pg_stat_statements"]
|
|
||||||
environment:
|
|
||||||
POSTGRES_USER: postgres
|
|
||||||
POSTGRES_DB: postgres
|
|
||||||
POSTGRES_PASSWORD: postgres
|
|
||||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
|
||||||
- image: postgres:14
|
|
||||||
command: ["postgres", "-p", "8432", "-c", "shared_preload_libraries=pg_stat_statements"]
|
|
||||||
environment:
|
|
||||||
POSTGRES_USER: postgres
|
|
||||||
POSTGRES_DB: postgres
|
|
||||||
POSTGRES_PASSWORD: postgres
|
|
||||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
|
||||||
- image: postgres:14
|
|
||||||
command: ["postgres", "-p", "9432", "-c", "shared_preload_libraries=pg_stat_statements"]
|
|
||||||
environment:
|
|
||||||
POSTGRES_USER: postgres
|
|
||||||
POSTGRES_DB: postgres
|
|
||||||
POSTGRES_PASSWORD: postgres
|
|
||||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
|
||||||
|
|
||||||
# Add steps to the job
|
# Add steps to the job
|
||||||
# See: https://circleci.com/docs/2.0/configuration-reference/#steps
|
# See: https://circleci.com/docs/2.0/configuration-reference/#steps
|
||||||
steps:
|
steps:
|
||||||
|
|||||||
@@ -108,6 +108,7 @@ servers = [
|
|||||||
]
|
]
|
||||||
# Database name (e.g. "postgres")
|
# Database name (e.g. "postgres")
|
||||||
database = "shard0"
|
database = "shard0"
|
||||||
|
search_path = "\"$user\",public"
|
||||||
|
|
||||||
[pools.sharded_db.shards.1]
|
[pools.sharded_db.shards.1]
|
||||||
servers = [
|
servers = [
|
||||||
|
|||||||
@@ -16,9 +16,6 @@ function start_pgcat() {
|
|||||||
|
|
||||||
# Setup the database with shards and user
|
# Setup the database with shards and user
|
||||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql
|
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql
|
||||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 7432 -U postgres -f tests/sharding/query_routing_setup.sql
|
|
||||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 8432 -U postgres -f tests/sharding/query_routing_setup.sql
|
|
||||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 9432 -U postgres -f tests/sharding/query_routing_setup.sql
|
|
||||||
|
|
||||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard0 -i
|
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard0 -i
|
||||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
|
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
|
||||||
@@ -29,7 +26,7 @@ wget -O toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/downlo
|
|||||||
sudo dpkg -i toxiproxy-2.4.0.deb
|
sudo dpkg -i toxiproxy-2.4.0.deb
|
||||||
|
|
||||||
# Start Toxiproxy
|
# Start Toxiproxy
|
||||||
LOG_LEVEL=error toxiproxy-server &
|
toxiproxy-server &
|
||||||
sleep 1
|
sleep 1
|
||||||
|
|
||||||
# Create a database at port 5433, forward it to Postgres
|
# Create a database at port 5433, forward it to Postgres
|
||||||
@@ -90,8 +87,7 @@ kill -SIGHUP $(pgrep pgcat) # Reload config again
|
|||||||
cd tests/ruby
|
cd tests/ruby
|
||||||
sudo gem install bundler
|
sudo gem install bundler
|
||||||
bundle install
|
bundle install
|
||||||
bundle exec ruby tests.rb || exit 1
|
ruby tests.rb
|
||||||
bundle exec rspec *_spec.rb || exit 1
|
|
||||||
cd ../..
|
cd ../..
|
||||||
|
|
||||||
#
|
#
|
||||||
@@ -99,7 +95,7 @@ cd ../..
|
|||||||
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
|
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
|
||||||
#
|
#
|
||||||
pip3 install -r tests/python/requirements.txt
|
pip3 install -r tests/python/requirements.txt
|
||||||
python3 tests/python/tests.py || exit 1
|
python3 tests/python/tests.py
|
||||||
|
|
||||||
start_pgcat "info"
|
start_pgcat "info"
|
||||||
|
|
||||||
@@ -109,9 +105,9 @@ psql -U admin_user -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/n
|
|||||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RELOAD' > /dev/null
|
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RELOAD' > /dev/null
|
||||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW CONFIG' > /dev/null
|
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW CONFIG' > /dev/null
|
||||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES' > /dev/null
|
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES' > /dev/null
|
||||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgcat -c 'SHOW LISTS' > /dev/null
|
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW LISTS' > /dev/null
|
||||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgcat -c 'SHOW POOLS' > /dev/null
|
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW POOLS' > /dev/null
|
||||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgcat -c 'SHOW VERSION' > /dev/null
|
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW VERSION' > /dev/null
|
||||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c "SET client_encoding TO 'utf8'" > /dev/null # will ignore
|
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c "SET client_encoding TO 'utf8'" > /dev/null # will ignore
|
||||||
(! psql -U admin_user -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null)
|
(! psql -U admin_user -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null)
|
||||||
export PGPASSWORD=sharding_user
|
export PGPASSWORD=sharding_user
|
||||||
|
|||||||
17
README.md
17
README.md
@@ -1,11 +1,8 @@
|
|||||||

|
# PgCat
|
||||||
|
|
||||||
##### PgCat: PostgreSQL at petabyte scale
|
|
||||||
|
|
||||||
[](https://circleci.com/gh/levkk/pgcat/tree/main)
|
[](https://circleci.com/gh/levkk/pgcat/tree/main)
|
||||||
<a href="https://discord.gg/DmyJP3qJ7U" target="_blank">
|
|
||||||
<img src="https://img.shields.io/discord/1013868243036930099" alt="Join our Discord!" />
|

|
||||||
</a>
|
|
||||||
|
|
||||||
PostgreSQL pooler (like PgBouncer) with sharding, load balancing and failover support.
|
PostgreSQL pooler (like PgBouncer) with sharding, load balancing and failover support.
|
||||||
|
|
||||||
@@ -90,14 +87,6 @@ pgbench -t 1000 -p 6432 -h 127.0.0.1 --protocol extended
|
|||||||
|
|
||||||
See [sharding README](./tests/sharding/README.md) for sharding logic testing.
|
See [sharding README](./tests/sharding/README.md) for sharding logic testing.
|
||||||
|
|
||||||
Run `cargo test` to run Rust tests.
|
|
||||||
|
|
||||||
Run the following commands to run Integration tests locally.
|
|
||||||
```
|
|
||||||
cd tests/docker/
|
|
||||||
docker compose up --exit-code-from main # This will also produce coverage report under ./cov/
|
|
||||||
```
|
|
||||||
|
|
||||||
| **Feature** | **Tested in CI** | **Tested manually** | **Comments** |
|
| **Feature** | **Tested in CI** | **Tested manually** | **Comments** |
|
||||||
|-----------------------|--------------------|---------------------|--------------------------------------------------------------------------------------------------------------------------|
|
|-----------------------|--------------------|---------------------|--------------------------------------------------------------------------------------------------------------------------|
|
||||||
| Transaction pooling | :white_check_mark: | :white_check_mark: | Used by default for all tests. |
|
| Transaction pooling | :white_check_mark: | :white_check_mark: | Used by default for all tests. |
|
||||||
|
|||||||
396
src/client.rs
396
src/client.rs
@@ -59,7 +59,6 @@ pub struct Client<S, T> {
|
|||||||
client_server_map: ClientServerMap,
|
client_server_map: ClientServerMap,
|
||||||
|
|
||||||
/// Client parameters, e.g. user, client_encoding, etc.
|
/// Client parameters, e.g. user, client_encoding, etc.
|
||||||
#[allow(dead_code)]
|
|
||||||
parameters: HashMap<String, String>,
|
parameters: HashMap<String, String>,
|
||||||
|
|
||||||
/// Statistics
|
/// Statistics
|
||||||
@@ -83,14 +82,8 @@ pub struct Client<S, T> {
|
|||||||
/// Postgres user for this client (This comes from the user in the connection string)
|
/// Postgres user for this client (This comes from the user in the connection string)
|
||||||
username: String,
|
username: String,
|
||||||
|
|
||||||
/// Application name for this client (defaults to pgcat)
|
|
||||||
application_name: String,
|
|
||||||
|
|
||||||
/// Used to notify clients about an impending shutdown
|
/// Used to notify clients about an impending shutdown
|
||||||
shutdown: Receiver<()>,
|
shutdown: Receiver<()>,
|
||||||
|
|
||||||
// Sharding key column position
|
|
||||||
sharding_key_column: Option<usize>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Client entrypoint.
|
/// Client entrypoint.
|
||||||
@@ -98,7 +91,7 @@ pub async fn client_entrypoint(
|
|||||||
mut stream: TcpStream,
|
mut stream: TcpStream,
|
||||||
client_server_map: ClientServerMap,
|
client_server_map: ClientServerMap,
|
||||||
shutdown: Receiver<()>,
|
shutdown: Receiver<()>,
|
||||||
drain: Sender<i32>,
|
drain: Sender<i8>,
|
||||||
admin_only: bool,
|
admin_only: bool,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
// Figure out if the client wants TLS or not.
|
// Figure out if the client wants TLS or not.
|
||||||
@@ -207,7 +200,7 @@ pub async fn client_entrypoint(
|
|||||||
Ok(mut client) => {
|
Ok(mut client) => {
|
||||||
info!("Client {:?} connected (plain)", addr);
|
info!("Client {:?} connected (plain)", addr);
|
||||||
|
|
||||||
if !client.is_admin() {
|
if client.is_admin() {
|
||||||
let _ = drain.send(1).await;
|
let _ = drain.send(1).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -232,7 +225,7 @@ pub async fn client_entrypoint(
|
|||||||
Ok(mut client) => {
|
Ok(mut client) => {
|
||||||
info!("Client {:?} issued a cancel query request", addr);
|
info!("Client {:?} issued a cancel query request", addr);
|
||||||
|
|
||||||
if !client.is_admin() {
|
if client.is_admin() {
|
||||||
let _ = drain.send(1).await;
|
let _ = drain.send(1).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -372,11 +365,6 @@ where
|
|||||||
None => return Err(Error::ClientError),
|
None => return Err(Error::ClientError),
|
||||||
};
|
};
|
||||||
|
|
||||||
let application_name = match parameters.get("application_name") {
|
|
||||||
Some(application_name) => application_name,
|
|
||||||
None => "pgcat",
|
|
||||||
};
|
|
||||||
|
|
||||||
let admin = ["pgcat", "pgbouncer"]
|
let admin = ["pgcat", "pgbouncer"]
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|db| *db == &pool_name)
|
.filter(|db| *db == &pool_name)
|
||||||
@@ -505,10 +493,8 @@ where
|
|||||||
last_server_id: None,
|
last_server_id: None,
|
||||||
pool_name: pool_name.clone(),
|
pool_name: pool_name.clone(),
|
||||||
username: username.clone(),
|
username: username.clone(),
|
||||||
application_name: application_name.to_string(),
|
|
||||||
shutdown,
|
shutdown,
|
||||||
connected_to_server: false,
|
connected_to_server: false,
|
||||||
sharding_key_column: None,
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -540,10 +526,8 @@ where
|
|||||||
last_server_id: None,
|
last_server_id: None,
|
||||||
pool_name: String::from("undefined"),
|
pool_name: String::from("undefined"),
|
||||||
username: String::from("undefined"),
|
username: String::from("undefined"),
|
||||||
application_name: String::from("undefined"),
|
|
||||||
shutdown,
|
shutdown,
|
||||||
connected_to_server: false,
|
connected_to_server: false,
|
||||||
sharding_key_column: None,
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -616,23 +600,10 @@ where
|
|||||||
message_result = read_message(&mut self.read) => message_result?
|
message_result = read_message(&mut self.read) => message_result?
|
||||||
};
|
};
|
||||||
|
|
||||||
match message[0] as char {
|
// Avoid taking a server if the client just wants to disconnect.
|
||||||
// Buffer extended protocol messages even if we do not have
|
if message[0] as char == 'X' {
|
||||||
// a server connection yet. Hopefully, when we get the S message
|
debug!("Client disconnecting");
|
||||||
// we'll be able to allocate a connection. Also, clients do not expect
|
return Ok(());
|
||||||
// the server to respond to these messages so even if we were not able to
|
|
||||||
// allocate a connection, we wouldn't be able to send back an error message
|
|
||||||
// to the client so we buffer them and defer the decision to error out or not
|
|
||||||
// to when we get the S message
|
|
||||||
'P' | 'B' | 'D' | 'E' => {
|
|
||||||
self.buffer.put(&message[..]);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
'X' => {
|
|
||||||
debug!("Client disconnecting");
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
_ => (),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle admin database queries.
|
// Handle admin database queries.
|
||||||
@@ -673,8 +644,8 @@ where
|
|||||||
|
|
||||||
// SET SHARD TO
|
// SET SHARD TO
|
||||||
Some((Command::SetShard, _)) => {
|
Some((Command::SetShard, _)) => {
|
||||||
// Selected shard is not configured.
|
let shard = query_router.shard();
|
||||||
if query_router.shard() >= pool.shards() {
|
if shard >= pool.shards() {
|
||||||
// Set the shard back to what it was.
|
// Set the shard back to what it was.
|
||||||
query_router.set_shard(current_shard);
|
query_router.set_shard(current_shard);
|
||||||
|
|
||||||
@@ -682,7 +653,7 @@ where
|
|||||||
&mut self.write,
|
&mut self.write,
|
||||||
&format!(
|
&format!(
|
||||||
"shard {} is more than configured {}, staying on shard {}",
|
"shard {} is more than configured {}, staying on shard {}",
|
||||||
query_router.shard(),
|
shard,
|
||||||
pool.shards(),
|
pool.shards(),
|
||||||
current_shard,
|
current_shard,
|
||||||
),
|
),
|
||||||
@@ -729,13 +700,6 @@ where
|
|||||||
show_response(&mut self.write, "primary reads", &value).await?;
|
show_response(&mut self.write, "primary reads", &value).await?;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// COPY .. SHARDING_KEY_COLUMN ..
|
|
||||||
Some((Command::StartShardedCopy, value)) => {
|
|
||||||
custom_protocol_response_ok(&mut self.write, "SHARDED_COPY").await?;
|
|
||||||
self.sharding_key_column = Some(value.parse::<usize>().unwrap());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!("Waiting for connection from pool");
|
debug!("Waiting for connection from pool");
|
||||||
@@ -750,19 +714,22 @@ where
|
|||||||
conn
|
conn
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
// Client is attempting to get results from the server,
|
// Clients do not expect to get SystemError followed by ReadyForQuery in the middle
|
||||||
// but we were unable to grab a connection from the pool
|
// of extended protocol submission. So we will hold off on sending the actual error
|
||||||
// We'll send back an error message and clean the extended
|
// message to the client until we get 'S' message
|
||||||
// protocol buffer
|
match message[0] as char {
|
||||||
if message[0] as char == 'S' {
|
'P' | 'B' | 'E' | 'D' => (),
|
||||||
error!("Got Sync message but failed to get a connection from the pool");
|
_ => {
|
||||||
self.buffer.clear();
|
error_response(
|
||||||
}
|
&mut self.write,
|
||||||
error_response(&mut self.write, "could not get connection from the pool")
|
"could not get connection from the pool",
|
||||||
.await?;
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
error!("Could not get connection from pool: {:?}", err);
|
||||||
|
|
||||||
error!("Could not get connection from pool: {{ pool_name: {:?}, username: {:?}, shard: {:?}, role: \"{:?}\", error: \"{:?}\" }}",
|
|
||||||
self.pool_name.clone(), self.username.clone(), query_router.shard(), query_router.role(), err);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -792,10 +759,13 @@ where
|
|||||||
server.address()
|
server.address()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Set application_name if any.
|
||||||
// TODO: investigate other parameters and set them too.
|
// TODO: investigate other parameters and set them too.
|
||||||
|
if self.parameters.contains_key("application_name") {
|
||||||
// Set application_name.
|
server
|
||||||
server.set_name(&self.application_name).await?;
|
.set_name(&self.parameters["application_name"])
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
// Transaction loop. Multiple queries can be issued by the client here.
|
// Transaction loop. Multiple queries can be issued by the client here.
|
||||||
// The connection belongs to the client until the transaction is over,
|
// The connection belongs to the client until the transaction is over,
|
||||||
@@ -804,7 +774,7 @@ where
|
|||||||
// If the client is in session mode, no more custom protocol
|
// If the client is in session mode, no more custom protocol
|
||||||
// commands will be accepted.
|
// commands will be accepted.
|
||||||
loop {
|
loop {
|
||||||
let message = if message.len() == 0 {
|
let mut message = if message.len() == 0 {
|
||||||
trace!("Waiting for message inside transaction or in session mode");
|
trace!("Waiting for message inside transaction or in session mode");
|
||||||
|
|
||||||
match read_message(&mut self.read).await {
|
match read_message(&mut self.read).await {
|
||||||
@@ -812,7 +782,12 @@ where
|
|||||||
Err(err) => {
|
Err(err) => {
|
||||||
// Client disconnected inside a transaction.
|
// Client disconnected inside a transaction.
|
||||||
// Clean up the server and re-use it.
|
// Clean up the server and re-use it.
|
||||||
server.checkin_cleanup().await?;
|
// This prevents connection thrashing by bad clients.
|
||||||
|
if server.in_transaction() {
|
||||||
|
server.query("ROLLBACK").await?;
|
||||||
|
server.query("DISCARD ALL").await?;
|
||||||
|
server.set_name("pgcat").await?;
|
||||||
|
}
|
||||||
|
|
||||||
return Err(err);
|
return Err(err);
|
||||||
}
|
}
|
||||||
@@ -823,174 +798,157 @@ where
|
|||||||
msg
|
msg
|
||||||
};
|
};
|
||||||
|
|
||||||
match self.handle_message(&pool, server, &address, message).await? {
|
// The message will be forwarded to the server intact. We still would like to
|
||||||
Some(done) => if done { break; },
|
// parse it below to figure out what to do with it.
|
||||||
None => return Ok(()),
|
let original = message.clone();
|
||||||
};
|
|
||||||
|
|
||||||
|
let code = message.get_u8() as char;
|
||||||
|
let _len = message.get_i32() as usize;
|
||||||
|
|
||||||
|
trace!("Message: {}", code);
|
||||||
|
|
||||||
|
match code {
|
||||||
|
// ReadyForQuery
|
||||||
|
'Q' => {
|
||||||
|
debug!("Sending query to server");
|
||||||
|
|
||||||
|
self.send_and_receive_loop(code, original, server, &address, &pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if !server.in_transaction() {
|
||||||
|
// Report transaction executed statistics.
|
||||||
|
self.stats.transaction(self.process_id, address.id);
|
||||||
|
|
||||||
|
// Release server back to the pool if we are in transaction mode.
|
||||||
|
// If we are in session mode, we keep the server until the client disconnects.
|
||||||
|
if self.transaction_mode {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Terminate
|
||||||
|
'X' => {
|
||||||
|
// Client closing. Rollback and clean up
|
||||||
|
// connection before releasing into the pool.
|
||||||
|
// Pgbouncer closes the connection which leads to
|
||||||
|
// connection thrashing when clients misbehave.
|
||||||
|
if server.in_transaction() {
|
||||||
|
server.query("ROLLBACK").await?;
|
||||||
|
server.query("DISCARD ALL").await?;
|
||||||
|
server.set_name("pgcat").await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.release();
|
||||||
|
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse
|
||||||
|
// The query with placeholders is here, e.g. `SELECT * FROM users WHERE email = $1 AND active = $2`.
|
||||||
|
'P' => {
|
||||||
|
self.buffer.put(&original[..]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bind
|
||||||
|
// The placeholder's replacements are here, e.g. 'user@email.com' and 'true'
|
||||||
|
'B' => {
|
||||||
|
self.buffer.put(&original[..]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Describe
|
||||||
|
// Command a client can issue to describe a previously prepared named statement.
|
||||||
|
'D' => {
|
||||||
|
self.buffer.put(&original[..]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute
|
||||||
|
// Execute a prepared statement prepared in `P` and bound in `B`.
|
||||||
|
'E' => {
|
||||||
|
self.buffer.put(&original[..]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sync
|
||||||
|
// Frontend (client) is asking for the query result now.
|
||||||
|
'S' => {
|
||||||
|
debug!("Sending query to server");
|
||||||
|
|
||||||
|
self.buffer.put(&original[..]);
|
||||||
|
|
||||||
|
self.send_and_receive_loop(
|
||||||
|
code,
|
||||||
|
self.buffer.clone(),
|
||||||
|
server,
|
||||||
|
&address,
|
||||||
|
&pool,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
self.buffer.clear();
|
||||||
|
|
||||||
|
if !server.in_transaction() {
|
||||||
|
self.stats.transaction(self.process_id, address.id);
|
||||||
|
|
||||||
|
// Release server back to the pool if we are in transaction mode.
|
||||||
|
// If we are in session mode, we keep the server until the client disconnects.
|
||||||
|
if self.transaction_mode {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CopyData
|
||||||
|
'd' => {
|
||||||
|
// Forward the data to the server,
|
||||||
|
// don't buffer it since it can be rather large.
|
||||||
|
self.send_server_message(server, original, &address, &pool)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// CopyDone or CopyFail
|
||||||
|
// Copy is done, successfully or not.
|
||||||
|
'c' | 'f' => {
|
||||||
|
self.send_server_message(server, original, &address, &pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let response = self.receive_server_message(server, &address, &pool).await?;
|
||||||
|
|
||||||
|
match write_all_half(&mut self.write, response).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(err) => {
|
||||||
|
server.mark_bad();
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if !server.in_transaction() {
|
||||||
|
self.stats.transaction(self.process_id, address.id);
|
||||||
|
|
||||||
|
// Release server back to the pool if we are in transaction mode.
|
||||||
|
// If we are in session mode, we keep the server until the client disconnects.
|
||||||
|
if self.transaction_mode {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Some unexpected message. We either did not implement the protocol correctly
|
||||||
|
// or this is not a Postgres client we're talking to.
|
||||||
|
_ => {
|
||||||
|
error!("Unexpected code: {}", code);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// The server is no longer bound to us, we can't cancel it's queries anymore.
|
// The server is no longer bound to us, we can't cancel it's queries anymore.
|
||||||
debug!("Releasing server back into the pool");
|
debug!("Releasing server back into the pool");
|
||||||
server.checkin_cleanup().await?;
|
|
||||||
self.stats.server_idle(server.process_id(), address.id);
|
self.stats.server_idle(server.process_id(), address.id);
|
||||||
self.connected_to_server = false;
|
self.connected_to_server = false;
|
||||||
|
|
||||||
self.release();
|
self.release();
|
||||||
self.stats.client_idle(self.process_id, address.id);
|
self.stats.client_idle(self.process_id, address.id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_message(&mut self, pool: &ConnectionPool, server: &mut Server, address: &Address, mut message: BytesMut) -> Result<Option<bool>, Error> {
|
|
||||||
let original = message.clone();
|
|
||||||
let code = message.get_u8() as char;
|
|
||||||
let _len = message.get_i32() as usize;
|
|
||||||
|
|
||||||
trace!("Message: {}", code);
|
|
||||||
|
|
||||||
match code {
|
|
||||||
// ReadyForQuery
|
|
||||||
'Q' => {
|
|
||||||
debug!("Sending query to server");
|
|
||||||
|
|
||||||
self.send_and_receive_loop(code, original, server, &address, &pool)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
if !server.in_transaction() {
|
|
||||||
// Report transaction executed statistics.
|
|
||||||
self.stats.transaction(self.process_id, address.id);
|
|
||||||
|
|
||||||
// Release server back to the pool if we are in transaction mode.
|
|
||||||
// If we are in session mode, we keep the server until the client disconnects.
|
|
||||||
if self.transaction_mode {
|
|
||||||
return Ok(Some(true));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Terminate
|
|
||||||
'X' => {
|
|
||||||
server.checkin_cleanup().await?;
|
|
||||||
self.release();
|
|
||||||
|
|
||||||
return Ok(None);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Parse
|
|
||||||
// The query with placeholders is here, e.g. `SELECT * FROM users WHERE email = $1 AND active = $2`.
|
|
||||||
'P' => {
|
|
||||||
self.buffer.put(&original[..]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Bind
|
|
||||||
// The placeholder's replacements are here, e.g. 'user@email.com' and 'true'
|
|
||||||
'B' => {
|
|
||||||
self.buffer.put(&original[..]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Describe
|
|
||||||
// Command a client can issue to describe a previously prepared named statement.
|
|
||||||
'D' => {
|
|
||||||
self.buffer.put(&original[..]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Execute
|
|
||||||
// Execute a prepared statement prepared in `P` and bound in `B`.
|
|
||||||
'E' => {
|
|
||||||
self.buffer.put(&original[..]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sync
|
|
||||||
// Frontend (client) is asking for the query result now.
|
|
||||||
'S' => {
|
|
||||||
debug!("Sending query to server");
|
|
||||||
|
|
||||||
self.buffer.put(&original[..]);
|
|
||||||
|
|
||||||
// Clone after freeze does not allocate
|
|
||||||
let first_message_code = (*self.buffer.get(0).unwrap_or(&0)) as char;
|
|
||||||
|
|
||||||
// Almost certainly true
|
|
||||||
if first_message_code == 'P' {
|
|
||||||
// Message layout
|
|
||||||
// P followed by 32 int followed by null-terminated statement name
|
|
||||||
// So message code should be in offset 0 of the buffer, first character
|
|
||||||
// in prepared statement name would be index 5
|
|
||||||
let first_char_in_name = *self.buffer.get(5).unwrap_or(&0);
|
|
||||||
if first_char_in_name != 0 {
|
|
||||||
// This is a named prepared statement
|
|
||||||
// Server connection state will need to be cleared at checkin
|
|
||||||
server.mark_dirty();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
self.send_and_receive_loop(
|
|
||||||
code,
|
|
||||||
self.buffer.clone(),
|
|
||||||
server,
|
|
||||||
&address,
|
|
||||||
&pool,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
self.buffer.clear();
|
|
||||||
|
|
||||||
if !server.in_transaction() {
|
|
||||||
self.stats.transaction(self.process_id, address.id);
|
|
||||||
|
|
||||||
// Release server back to the pool if we are in transaction mode.
|
|
||||||
// If we are in session mode, we keep the server until the client disconnects.
|
|
||||||
if self.transaction_mode {
|
|
||||||
return Ok(Some(true));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// CopyData
|
|
||||||
'd' => {
|
|
||||||
// Forward the data to the server,
|
|
||||||
// don't buffer it since it can be rather large.
|
|
||||||
self.send_server_message(server, original, &address, &pool)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// CopyDone or CopyFail
|
|
||||||
// Copy is done, successfully or not.
|
|
||||||
'c' | 'f' => {
|
|
||||||
self.send_server_message(server, original, &address, &pool)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let response = self.receive_server_message(server, &address, &pool).await?;
|
|
||||||
|
|
||||||
match write_all_half(&mut self.write, response).await {
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(err) => {
|
|
||||||
server.mark_bad();
|
|
||||||
return Err(err);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if !server.in_transaction() {
|
|
||||||
self.stats.transaction(self.process_id, address.id);
|
|
||||||
|
|
||||||
// Release server back to the pool if we are in transaction mode.
|
|
||||||
// If we are in session mode, we keep the server until the client disconnects.
|
|
||||||
if self.transaction_mode {
|
|
||||||
return Ok(Some(true));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Some unexpected message. We either did not implement the protocol correctly
|
|
||||||
// or this is not a Postgres client we're talking to.
|
|
||||||
_ => {
|
|
||||||
error!("Unexpected code: {}", code);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(Some(false))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Release the server from the client: it can't cancel its queries anymore.
|
/// Release the server from the client: it can't cancel its queries anymore.
|
||||||
pub fn release(&self) {
|
pub fn release(&self) {
|
||||||
let mut guard = self.client_server_map.lock();
|
let mut guard = self.client_server_map.lock();
|
||||||
|
|||||||
@@ -72,6 +72,9 @@ pub struct Address {
|
|||||||
/// The name of the Postgres database.
|
/// The name of the Postgres database.
|
||||||
pub database: String,
|
pub database: String,
|
||||||
|
|
||||||
|
/// Default search_path.
|
||||||
|
pub search_path: Option<String>,
|
||||||
|
|
||||||
/// Server role: replica, primary.
|
/// Server role: replica, primary.
|
||||||
pub role: Role,
|
pub role: Role,
|
||||||
|
|
||||||
@@ -98,6 +101,7 @@ impl Default for Address {
|
|||||||
address_index: 0,
|
address_index: 0,
|
||||||
replica_number: 0,
|
replica_number: 0,
|
||||||
database: String::from("database"),
|
database: String::from("database"),
|
||||||
|
search_path: None,
|
||||||
role: Role::Replica,
|
role: Role::Replica,
|
||||||
username: String::from("username"),
|
username: String::from("username"),
|
||||||
pool_name: String::from("pool_name"),
|
pool_name: String::from("pool_name"),
|
||||||
@@ -206,6 +210,7 @@ impl Default for Pool {
|
|||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||||
pub struct Shard {
|
pub struct Shard {
|
||||||
pub database: String,
|
pub database: String,
|
||||||
|
pub search_path: Option<String>,
|
||||||
pub servers: Vec<(String, u16, String)>,
|
pub servers: Vec<(String, u16, String)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -213,6 +218,7 @@ impl Default for Shard {
|
|||||||
fn default() -> Shard {
|
fn default() -> Shard {
|
||||||
Shard {
|
Shard {
|
||||||
servers: vec![(String::from("localhost"), 5432, String::from("primary"))],
|
servers: vec![(String::from("localhost"), 5432, String::from("primary"))],
|
||||||
|
search_path: None,
|
||||||
database: String::from("postgres"),
|
database: String::from("postgres"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
82
src/main.rs
82
src/main.rs
@@ -74,8 +74,7 @@ use crate::stats::{Collector, Reporter, REPORTER};
|
|||||||
|
|
||||||
#[tokio::main(worker_threads = 4)]
|
#[tokio::main(worker_threads = 4)]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
env_logger::builder().format_timestamp_micros().init();
|
env_logger::init();
|
||||||
|
|
||||||
info!("Welcome to PgCat! Meow. (Version {})", VERSION);
|
info!("Welcome to PgCat! Meow. (Version {})", VERSION);
|
||||||
|
|
||||||
if !query_router::QueryRouter::setup() {
|
if !query_router::QueryRouter::setup() {
|
||||||
@@ -155,31 +154,12 @@ async fn main() {
|
|||||||
|
|
||||||
info!("Config autoreloader: {}", config.general.autoreload);
|
info!("Config autoreloader: {}", config.general.autoreload);
|
||||||
|
|
||||||
let mut autoreload_interval = tokio::time::interval(tokio::time::Duration::from_millis(15_000));
|
|
||||||
let autoreload_client_server_map = client_server_map.clone();
|
|
||||||
tokio::task::spawn(async move {
|
|
||||||
loop {
|
|
||||||
autoreload_interval.tick().await;
|
|
||||||
if config.general.autoreload {
|
|
||||||
info!("Automatically reloading config");
|
|
||||||
|
|
||||||
match reload_config(autoreload_client_server_map.clone()).await {
|
|
||||||
Ok(changed) => {
|
|
||||||
if changed {
|
|
||||||
get_config().show()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(_) => (),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut term_signal = unix_signal(SignalKind::terminate()).unwrap();
|
let mut term_signal = unix_signal(SignalKind::terminate()).unwrap();
|
||||||
let mut interrupt_signal = unix_signal(SignalKind::interrupt()).unwrap();
|
let mut interrupt_signal = unix_signal(SignalKind::interrupt()).unwrap();
|
||||||
let mut sighup_signal = unix_signal(SignalKind::hangup()).unwrap();
|
let mut sighup_signal = unix_signal(SignalKind::hangup()).unwrap();
|
||||||
|
let mut autoreload_interval = tokio::time::interval(tokio::time::Duration::from_millis(15_000));
|
||||||
let (shutdown_tx, _) = broadcast::channel::<()>(1);
|
let (shutdown_tx, _) = broadcast::channel::<()>(1);
|
||||||
let (drain_tx, mut drain_rx) = mpsc::channel::<i32>(2048);
|
let (drain_tx, mut drain_rx) = mpsc::channel::<i8>(2048);
|
||||||
let (exit_tx, mut exit_rx) = mpsc::channel::<()>(1);
|
let (exit_tx, mut exit_rx) = mpsc::channel::<()>(1);
|
||||||
|
|
||||||
info!("Waiting for clients");
|
info!("Waiting for clients");
|
||||||
@@ -202,6 +182,21 @@ async fn main() {
|
|||||||
get_config().show();
|
get_config().show();
|
||||||
},
|
},
|
||||||
|
|
||||||
|
_ = autoreload_interval.tick() => {
|
||||||
|
if config.general.autoreload {
|
||||||
|
info!("Automatically reloading config");
|
||||||
|
|
||||||
|
match reload_config(client_server_map.clone()).await {
|
||||||
|
Ok(changed) => {
|
||||||
|
if changed {
|
||||||
|
get_config().show()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => (),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
// Initiate graceful shutdown sequence on sig int
|
// Initiate graceful shutdown sequence on sig int
|
||||||
_ = interrupt_signal.recv() => {
|
_ = interrupt_signal.recv() => {
|
||||||
info!("Got SIGINT, waiting for client connection drain now");
|
info!("Got SIGINT, waiting for client connection drain now");
|
||||||
@@ -222,16 +217,13 @@ async fn main() {
|
|||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
|
|
||||||
// We're done waiting.
|
// We're done waiting.
|
||||||
error!("Graceful shutdown timed out. {} active clients being closed", total_clients);
|
error!("Timed out waiting for clients");
|
||||||
|
|
||||||
let _ = exit_tx.send(()).await;
|
let _ = exit_tx.send(()).await;
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
|
||||||
_ = term_signal.recv() => {
|
_ = term_signal.recv() => break,
|
||||||
info!("Got SIGTERM, closing with {} clients active", total_clients);
|
|
||||||
break;
|
|
||||||
},
|
|
||||||
|
|
||||||
new_client = listener.accept() => {
|
new_client = listener.accept() => {
|
||||||
let (socket, addr) = match new_client {
|
let (socket, addr) = match new_client {
|
||||||
@@ -308,18 +300,34 @@ async fn main() {
|
|||||||
///
|
///
|
||||||
/// * `duration` - A duration of time
|
/// * `duration` - A duration of time
|
||||||
fn format_duration(duration: &chrono::Duration) -> String {
|
fn format_duration(duration: &chrono::Duration) -> String {
|
||||||
let milliseconds = format!("{:0>3}", duration.num_milliseconds() % 1000);
|
let seconds = {
|
||||||
|
let seconds = duration.num_seconds() % 60;
|
||||||
|
if seconds < 10 {
|
||||||
|
format!("0{}", seconds)
|
||||||
|
} else {
|
||||||
|
format!("{}", seconds)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let seconds = format!("{:0>2}", duration.num_seconds() % 60);
|
let minutes = {
|
||||||
|
let minutes = duration.num_minutes() % 60;
|
||||||
|
if minutes < 10 {
|
||||||
|
format!("0{}", minutes)
|
||||||
|
} else {
|
||||||
|
format!("{}", minutes)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let minutes = format!("{:0>2}", duration.num_minutes() % 60);
|
let hours = {
|
||||||
|
let hours = duration.num_hours() % 24;
|
||||||
let hours = format!("{:0>2}", duration.num_hours() % 24);
|
if hours < 10 {
|
||||||
|
format!("0{}", hours)
|
||||||
|
} else {
|
||||||
|
format!("{}", hours)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let days = duration.num_days().to_string();
|
let days = duration.num_days().to_string();
|
||||||
|
|
||||||
format!(
|
format!("{}d {}:{}:{}", days, hours, minutes, seconds)
|
||||||
"{}d {}:{}:{}.{}",
|
|
||||||
days, hours, minutes, seconds, milliseconds
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -111,7 +111,12 @@ where
|
|||||||
|
|
||||||
/// Send the startup packet the server. We're pretending we're a Pg client.
|
/// Send the startup packet the server. We're pretending we're a Pg client.
|
||||||
/// This tells the server which user we are and what database we want.
|
/// This tells the server which user we are and what database we want.
|
||||||
pub async fn startup(stream: &mut TcpStream, user: &str, database: &str) -> Result<(), Error> {
|
pub async fn startup(
|
||||||
|
stream: &mut TcpStream,
|
||||||
|
user: &str,
|
||||||
|
database: &str,
|
||||||
|
search_path: Option<&String>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
let mut bytes = BytesMut::with_capacity(25);
|
let mut bytes = BytesMut::with_capacity(25);
|
||||||
|
|
||||||
bytes.put_i32(196608); // Protocol number
|
bytes.put_i32(196608); // Protocol number
|
||||||
@@ -125,6 +130,17 @@ pub async fn startup(stream: &mut TcpStream, user: &str, database: &str) -> Resu
|
|||||||
bytes.put(&b"database\0"[..]);
|
bytes.put(&b"database\0"[..]);
|
||||||
bytes.put_slice(&database.as_bytes());
|
bytes.put_slice(&database.as_bytes());
|
||||||
bytes.put_u8(0);
|
bytes.put_u8(0);
|
||||||
|
|
||||||
|
// search_path
|
||||||
|
match search_path {
|
||||||
|
Some(search_path) => {
|
||||||
|
bytes.put(&b"options\0"[..]);
|
||||||
|
bytes.put_slice(&format!("-c search_path={}", search_path).as_bytes());
|
||||||
|
bytes.put_u8(0);
|
||||||
|
}
|
||||||
|
None => (),
|
||||||
|
};
|
||||||
|
|
||||||
bytes.put_u8(0); // Null terminator
|
bytes.put_u8(0); // Null terminator
|
||||||
|
|
||||||
let len = bytes.len() as i32 + 4i32;
|
let len = bytes.len() as i32 + 4i32;
|
||||||
|
|||||||
@@ -155,6 +155,7 @@ impl ConnectionPool {
|
|||||||
let address = Address {
|
let address = Address {
|
||||||
id: address_id,
|
id: address_id,
|
||||||
database: shard.database.clone(),
|
database: shard.database.clone(),
|
||||||
|
search_path: shard.search_path.clone(),
|
||||||
host: server.0.clone(),
|
host: server.0.clone(),
|
||||||
port: server.1 as u16,
|
port: server.1 as u16,
|
||||||
role: role,
|
role: role,
|
||||||
@@ -559,7 +560,11 @@ impl ManageConnection for ServerPool {
|
|||||||
|
|
||||||
/// Attempts to create a new connection.
|
/// Attempts to create a new connection.
|
||||||
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
|
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
|
||||||
info!("Creating a new server connection {:?}", self.address);
|
info!(
|
||||||
|
"Creating a new connection to {:?} using user {:?}",
|
||||||
|
self.address.name(),
|
||||||
|
self.user.username
|
||||||
|
);
|
||||||
|
|
||||||
// Put a temporary process_id into the stats
|
// Put a temporary process_id into the stats
|
||||||
// for server login.
|
// for server login.
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ use crate::pool::PoolSettings;
|
|||||||
use crate::sharding::Sharder;
|
use crate::sharding::Sharder;
|
||||||
|
|
||||||
/// Regexes used to parse custom commands.
|
/// Regexes used to parse custom commands.
|
||||||
const CUSTOM_SQL_REGEXES: [&str; 8] = [
|
const CUSTOM_SQL_REGEXES: [&str; 7] = [
|
||||||
r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$",
|
r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$",
|
||||||
r"(?i)^ *SET SHARD TO '?([0-9]+|ANY)'? *;? *$",
|
r"(?i)^ *SET SHARD TO '?([0-9]+|ANY)'? *;? *$",
|
||||||
r"(?i)^ *SHOW SHARD *;? *$",
|
r"(?i)^ *SHOW SHARD *;? *$",
|
||||||
@@ -21,7 +21,6 @@ const CUSTOM_SQL_REGEXES: [&str; 8] = [
|
|||||||
r"(?i)^ *SHOW SERVER ROLE *;? *$",
|
r"(?i)^ *SHOW SERVER ROLE *;? *$",
|
||||||
r"(?i)^ *SET PRIMARY READS TO '?(on|off|default)'? *;? *$",
|
r"(?i)^ *SET PRIMARY READS TO '?(on|off|default)'? *;? *$",
|
||||||
r"(?i)^ *SHOW PRIMARY READS *;? *$",
|
r"(?i)^ *SHOW PRIMARY READS *;? *$",
|
||||||
r"(?i)^ *SHARDED_COPY '?([0-9]+)'? *;? *$",
|
|
||||||
];
|
];
|
||||||
|
|
||||||
/// Custom commands.
|
/// Custom commands.
|
||||||
@@ -34,7 +33,6 @@ pub enum Command {
|
|||||||
ShowServerRole,
|
ShowServerRole,
|
||||||
SetPrimaryReads,
|
SetPrimaryReads,
|
||||||
ShowPrimaryReads,
|
ShowPrimaryReads,
|
||||||
StartShardedCopy,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Quickly test for match when a query is received.
|
/// Quickly test for match when a query is received.
|
||||||
@@ -59,9 +57,6 @@ pub struct QueryRouter {
|
|||||||
|
|
||||||
/// Pool configuration.
|
/// Pool configuration.
|
||||||
pool_settings: PoolSettings,
|
pool_settings: PoolSettings,
|
||||||
|
|
||||||
// Sharding key column
|
|
||||||
sharding_key_column: Option<usize>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl QueryRouter {
|
impl QueryRouter {
|
||||||
@@ -103,7 +98,6 @@ impl QueryRouter {
|
|||||||
query_parser_enabled: false,
|
query_parser_enabled: false,
|
||||||
primary_reads_enabled: false,
|
primary_reads_enabled: false,
|
||||||
pool_settings: PoolSettings::default(),
|
pool_settings: PoolSettings::default(),
|
||||||
sharding_key_column: None,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -151,7 +145,6 @@ impl QueryRouter {
|
|||||||
4 => Command::ShowServerRole,
|
4 => Command::ShowServerRole,
|
||||||
5 => Command::SetPrimaryReads,
|
5 => Command::SetPrimaryReads,
|
||||||
6 => Command::ShowPrimaryReads,
|
6 => Command::ShowPrimaryReads,
|
||||||
7 => Command::StartShardedCopy,
|
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -159,8 +152,7 @@ impl QueryRouter {
|
|||||||
Command::SetShardingKey
|
Command::SetShardingKey
|
||||||
| Command::SetShard
|
| Command::SetShard
|
||||||
| Command::SetServerRole
|
| Command::SetServerRole
|
||||||
| Command::SetPrimaryReads
|
| Command::SetPrimaryReads => {
|
||||||
| Command::StartShardedCopy => {
|
|
||||||
// Capture value. I know this re-runs the regex engine, but I haven't
|
// Capture value. I know this re-runs the regex engine, but I haven't
|
||||||
// figured out a better way just yet. I think I can write a single Regex
|
// figured out a better way just yet. I think I can write a single Regex
|
||||||
// that matches all 5 custom SQL patterns, but maybe that's not very legible?
|
// that matches all 5 custom SQL patterns, but maybe that's not very legible?
|
||||||
@@ -212,13 +204,6 @@ impl QueryRouter {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
Command::StartShardedCopy => {
|
|
||||||
self.sharding_key_column = match value.parse::<usize>() {
|
|
||||||
Ok(value) => Some(value),
|
|
||||||
Err(_) => return None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Command::SetServerRole => {
|
Command::SetServerRole => {
|
||||||
self.active_role = match value.to_ascii_lowercase().as_ref() {
|
self.active_role = match value.to_ascii_lowercase().as_ref() {
|
||||||
"primary" => {
|
"primary" => {
|
||||||
|
|||||||
@@ -1,8 +1,7 @@
|
|||||||
/// Implementation of the PostgreSQL server (database) protocol.
|
/// Implementation of the PostgreSQL server (database) protocol.
|
||||||
/// Here we are pretending to the a Postgres client.
|
/// Here we are pretending to the a Postgres client.
|
||||||
use bytes::{Buf, BufMut, BytesMut};
|
use bytes::{Buf, BufMut, BytesMut};
|
||||||
use log::{debug, error, info, trace, warn};
|
use log::{debug, error, info, trace};
|
||||||
use std::io::Read;
|
|
||||||
use std::time::SystemTime;
|
use std::time::SystemTime;
|
||||||
use tokio::io::{AsyncReadExt, BufReader};
|
use tokio::io::{AsyncReadExt, BufReader};
|
||||||
use tokio::net::{
|
use tokio::net::{
|
||||||
@@ -49,9 +48,6 @@ pub struct Server {
|
|||||||
/// Is the server broken? We'll remote it from the pool if so.
|
/// Is the server broken? We'll remote it from the pool if so.
|
||||||
bad: bool,
|
bad: bool,
|
||||||
|
|
||||||
/// If server connection requires a DISCARD ALL before checkin
|
|
||||||
needs_cleanup: bool,
|
|
||||||
|
|
||||||
/// Mapping of clients and servers used for query cancellation.
|
/// Mapping of clients and servers used for query cancellation.
|
||||||
client_server_map: ClientServerMap,
|
client_server_map: ClientServerMap,
|
||||||
|
|
||||||
@@ -90,7 +86,13 @@ impl Server {
|
|||||||
trace!("Sending StartupMessage");
|
trace!("Sending StartupMessage");
|
||||||
|
|
||||||
// StartupMessage
|
// StartupMessage
|
||||||
startup(&mut stream, &user.username, database).await?;
|
startup(
|
||||||
|
&mut stream,
|
||||||
|
&user.username,
|
||||||
|
database,
|
||||||
|
address.search_path.as_ref(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
let mut server_info = BytesMut::new();
|
let mut server_info = BytesMut::new();
|
||||||
let mut process_id: i32 = 0;
|
let mut process_id: i32 = 0;
|
||||||
@@ -320,7 +322,6 @@ impl Server {
|
|||||||
in_transaction: false,
|
in_transaction: false,
|
||||||
data_available: false,
|
data_available: false,
|
||||||
bad: false,
|
bad: false,
|
||||||
needs_cleanup: false,
|
|
||||||
client_server_map: client_server_map,
|
client_server_map: client_server_map,
|
||||||
connected_at: chrono::offset::Utc::now().naive_utc(),
|
connected_at: chrono::offset::Utc::now().naive_utc(),
|
||||||
stats: stats,
|
stats: stats,
|
||||||
@@ -445,29 +446,6 @@ impl Server {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// CommandComplete
|
|
||||||
'C' => {
|
|
||||||
let mut command_tag = String::new();
|
|
||||||
match message.reader().read_to_string(&mut command_tag) {
|
|
||||||
Ok(_) => {
|
|
||||||
// Non-exhaustive list of commands that are likely to change session variables/resources
|
|
||||||
// which can leak between clients. This is a best effort to block bad clients
|
|
||||||
// from poisoning a transaction-mode pool by setting inappropriate session variables
|
|
||||||
match command_tag.as_str() {
|
|
||||||
"SET\0" | "PREPARE\0" => {
|
|
||||||
debug!("Server connection marked for clean up");
|
|
||||||
self.needs_cleanup = true;
|
|
||||||
}
|
|
||||||
_ => (),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Err(err) => {
|
|
||||||
warn!("Encountered an error while parsing CommandTag {}", err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// DataRow
|
// DataRow
|
||||||
'D' => {
|
'D' => {
|
||||||
// More data is available after this message, this is not the end of the reply.
|
// More data is available after this message, this is not the end of the reply.
|
||||||
@@ -581,43 +559,14 @@ impl Server {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Perform any necessary cleanup before putting the server
|
|
||||||
/// connection back in the pool
|
|
||||||
pub async fn checkin_cleanup(&mut self) -> Result<(), Error> {
|
|
||||||
// Client disconnected with an open transaction on the server connection.
|
|
||||||
// Pgbouncer behavior is to close the server connection but that can cause
|
|
||||||
// server connection thrashing if clients repeatedly do this.
|
|
||||||
// Instead, we ROLLBACK that transaction before putting the connection back in the pool
|
|
||||||
if self.in_transaction() {
|
|
||||||
self.query("ROLLBACK").await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Client disconnected but it perfromed session-altering operations such as
|
|
||||||
// SET statement_timeout to 1 or create a prepared statement. We clear that
|
|
||||||
// to avoid leaking state between clients. For performance reasons we only
|
|
||||||
// send `DISCARD ALL` if we think the session is altered instead of just sending
|
|
||||||
// it before each checkin.
|
|
||||||
if self.needs_cleanup {
|
|
||||||
self.query("DISCARD ALL").await?;
|
|
||||||
self.needs_cleanup = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A shorthand for `SET application_name = $1`.
|
/// A shorthand for `SET application_name = $1`.
|
||||||
|
#[allow(dead_code)]
|
||||||
pub async fn set_name(&mut self, name: &str) -> Result<(), Error> {
|
pub async fn set_name(&mut self, name: &str) -> Result<(), Error> {
|
||||||
if self.application_name != name {
|
if self.application_name != name {
|
||||||
self.application_name = name.to_string();
|
self.application_name = name.to_string();
|
||||||
// We don't want `SET application_name` to mark the server connection
|
Ok(self
|
||||||
// as needing cleanup
|
|
||||||
let needs_cleanup_before = self.needs_cleanup;
|
|
||||||
|
|
||||||
let result = Ok(self
|
|
||||||
.query(&format!("SET application_name = '{}'", name))
|
.query(&format!("SET application_name = '{}'", name))
|
||||||
.await?);
|
.await?)
|
||||||
self.needs_cleanup = needs_cleanup_before;
|
|
||||||
return result;
|
|
||||||
} else {
|
} else {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -638,11 +587,6 @@ impl Server {
|
|||||||
pub fn last_activity(&self) -> SystemTime {
|
pub fn last_activity(&self) -> SystemTime {
|
||||||
self.last_activity
|
self.last_activity
|
||||||
}
|
}
|
||||||
|
|
||||||
// Marks a connection as needing DISCARD ALL at checkin
|
|
||||||
pub fn mark_dirty(&mut self) {
|
|
||||||
self.needs_cleanup = true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for Server {
|
impl Drop for Server {
|
||||||
@@ -669,8 +613,7 @@ impl Drop for Server {
|
|||||||
let duration = now - self.connected_at;
|
let duration = now - self.connected_at;
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"Server connection closed {:?}, session duration: {}",
|
"Server connection closed, session duration: {}",
|
||||||
self.address,
|
|
||||||
crate::format_duration(&duration)
|
crate::format_duration(&duration)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +0,0 @@
|
|||||||
FROM rust:bullseye
|
|
||||||
|
|
||||||
RUN apt-get update && apt-get install llvm-11 psmisc postgresql-contrib postgresql-client ruby ruby-dev libpq-dev python3 python3-pip lcov sudo curl -y
|
|
||||||
RUN cargo install cargo-binutils rustfilt
|
|
||||||
RUN rustup component add llvm-tools-preview
|
|
||||||
@@ -1,47 +0,0 @@
|
|||||||
version: "3"
|
|
||||||
services:
|
|
||||||
pg1:
|
|
||||||
image: postgres:14
|
|
||||||
network_mode: "service:main"
|
|
||||||
environment:
|
|
||||||
POSTGRES_USER: postgres
|
|
||||||
POSTGRES_DB: postgres
|
|
||||||
POSTGRES_PASSWORD: postgres
|
|
||||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
|
||||||
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "5432"]
|
|
||||||
pg2:
|
|
||||||
image: postgres:14
|
|
||||||
network_mode: "service:main"
|
|
||||||
environment:
|
|
||||||
POSTGRES_USER: postgres
|
|
||||||
POSTGRES_DB: postgres
|
|
||||||
POSTGRES_PASSWORD: postgres
|
|
||||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
|
||||||
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "7432"]
|
|
||||||
pg3:
|
|
||||||
image: postgres:14
|
|
||||||
network_mode: "service:main"
|
|
||||||
environment:
|
|
||||||
POSTGRES_USER: postgres
|
|
||||||
POSTGRES_DB: postgres
|
|
||||||
POSTGRES_PASSWORD: postgres
|
|
||||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
|
||||||
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "8432"]
|
|
||||||
pg4:
|
|
||||||
image: postgres:14
|
|
||||||
network_mode: "service:main"
|
|
||||||
environment:
|
|
||||||
POSTGRES_USER: postgres
|
|
||||||
POSTGRES_DB: postgres
|
|
||||||
POSTGRES_PASSWORD: postgres
|
|
||||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
|
||||||
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "9432"]
|
|
||||||
main:
|
|
||||||
build: .
|
|
||||||
command: ["bash", "/app/tests/docker/run.sh"]
|
|
||||||
environment:
|
|
||||||
RUSTFLAGS: "-C instrument-coverage"
|
|
||||||
LLVM_PROFILE_FILE: "pgcat-%m.profraw"
|
|
||||||
volumes:
|
|
||||||
- ../../:/app/
|
|
||||||
- /app/target/
|
|
||||||
@@ -1,21 +0,0 @@
|
|||||||
#!/bin/bash
|
|
||||||
|
|
||||||
rm /app/*.profraw || true
|
|
||||||
rm /app/pgcat.profdata || true
|
|
||||||
rm -rf /app/cov || true
|
|
||||||
|
|
||||||
cd /app/
|
|
||||||
|
|
||||||
cargo build
|
|
||||||
cargo test --tests
|
|
||||||
|
|
||||||
bash .circleci/run_tests.sh
|
|
||||||
|
|
||||||
rust-profdata merge -sparse pgcat-*.profraw -o pgcat.profdata
|
|
||||||
|
|
||||||
rust-cov export -ignore-filename-regex="rustc|registry" -Xdemangler=rustfilt -instr-profile=pgcat.profdata --object ./target/debug/pgcat --format lcov > ./lcov.info
|
|
||||||
|
|
||||||
genhtml lcov.info --output-directory cov --prefix $(pwd)
|
|
||||||
|
|
||||||
rm /app/*.profraw
|
|
||||||
rm /app/pgcat.profdata
|
|
||||||
@@ -18,14 +18,9 @@ def pgcat_start():
|
|||||||
|
|
||||||
|
|
||||||
def pg_cat_send_signal(signal: signal.Signals):
|
def pg_cat_send_signal(signal: signal.Signals):
|
||||||
try:
|
for proc in psutil.process_iter(["pid", "name"]):
|
||||||
for proc in psutil.process_iter(["pid", "name"]):
|
if "pgcat" == proc.name():
|
||||||
if "pgcat" == proc.name():
|
os.kill(proc.pid, signal)
|
||||||
os.kill(proc.pid, signal)
|
|
||||||
except Exception as e:
|
|
||||||
# The process can be gone when we send this signal
|
|
||||||
print(e)
|
|
||||||
|
|
||||||
if signal == signal.SIGTERM:
|
if signal == signal.SIGTERM:
|
||||||
# Returns 0 if pgcat process exists
|
# Returns 0 if pgcat process exists
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
|
|||||||
@@ -1,8 +1,6 @@
|
|||||||
source "https://rubygems.org"
|
source "https://rubygems.org"
|
||||||
|
|
||||||
gem "pg"
|
gem "pg"
|
||||||
gem "toml"
|
|
||||||
gem "rspec"
|
|
||||||
gem "rubocop"
|
|
||||||
gem "toxiproxy"
|
|
||||||
gem "activerecord"
|
gem "activerecord"
|
||||||
|
gem "rubocop"
|
||||||
|
gem "toml", "~> 0.3.0"
|
||||||
|
|||||||
@@ -13,7 +13,6 @@ GEM
|
|||||||
tzinfo (~> 2.0)
|
tzinfo (~> 2.0)
|
||||||
ast (2.4.2)
|
ast (2.4.2)
|
||||||
concurrent-ruby (1.1.10)
|
concurrent-ruby (1.1.10)
|
||||||
diff-lcs (1.5.0)
|
|
||||||
i18n (1.11.0)
|
i18n (1.11.0)
|
||||||
concurrent-ruby (~> 1.0)
|
concurrent-ruby (~> 1.0)
|
||||||
minitest (5.16.2)
|
minitest (5.16.2)
|
||||||
@@ -25,19 +24,6 @@ GEM
|
|||||||
rainbow (3.1.1)
|
rainbow (3.1.1)
|
||||||
regexp_parser (2.3.1)
|
regexp_parser (2.3.1)
|
||||||
rexml (3.2.5)
|
rexml (3.2.5)
|
||||||
rspec (3.11.0)
|
|
||||||
rspec-core (~> 3.11.0)
|
|
||||||
rspec-expectations (~> 3.11.0)
|
|
||||||
rspec-mocks (~> 3.11.0)
|
|
||||||
rspec-core (3.11.0)
|
|
||||||
rspec-support (~> 3.11.0)
|
|
||||||
rspec-expectations (3.11.0)
|
|
||||||
diff-lcs (>= 1.2.0, < 2.0)
|
|
||||||
rspec-support (~> 3.11.0)
|
|
||||||
rspec-mocks (3.11.1)
|
|
||||||
diff-lcs (>= 1.2.0, < 2.0)
|
|
||||||
rspec-support (~> 3.11.0)
|
|
||||||
rspec-support (3.11.0)
|
|
||||||
rubocop (1.29.0)
|
rubocop (1.29.0)
|
||||||
parallel (~> 1.10)
|
parallel (~> 1.10)
|
||||||
parser (>= 3.1.0.0)
|
parser (>= 3.1.0.0)
|
||||||
@@ -52,23 +38,19 @@ GEM
|
|||||||
ruby-progressbar (1.11.0)
|
ruby-progressbar (1.11.0)
|
||||||
toml (0.3.0)
|
toml (0.3.0)
|
||||||
parslet (>= 1.8.0, < 3.0.0)
|
parslet (>= 1.8.0, < 3.0.0)
|
||||||
toxiproxy (2.0.1)
|
|
||||||
tzinfo (2.0.4)
|
tzinfo (2.0.4)
|
||||||
concurrent-ruby (~> 1.0)
|
concurrent-ruby (~> 1.0)
|
||||||
unicode-display_width (2.1.0)
|
unicode-display_width (2.1.0)
|
||||||
|
|
||||||
PLATFORMS
|
PLATFORMS
|
||||||
aarch64-linux
|
|
||||||
arm64-darwin-21
|
arm64-darwin-21
|
||||||
x86_64-linux
|
x86_64-linux
|
||||||
|
|
||||||
DEPENDENCIES
|
DEPENDENCIES
|
||||||
activerecord
|
activerecord
|
||||||
pg
|
pg
|
||||||
rspec
|
|
||||||
rubocop
|
rubocop
|
||||||
toml
|
toml (~> 0.3.0)
|
||||||
toxiproxy
|
|
||||||
|
|
||||||
BUNDLED WITH
|
BUNDLED WITH
|
||||||
2.3.21
|
2.3.7
|
||||||
|
|||||||
@@ -1,82 +0,0 @@
|
|||||||
require 'pg'
|
|
||||||
require 'toxiproxy'
|
|
||||||
|
|
||||||
class PgInstance
|
|
||||||
attr_reader :port
|
|
||||||
attr_reader :username
|
|
||||||
attr_reader :password
|
|
||||||
attr_reader :database_name
|
|
||||||
|
|
||||||
def initialize(port, username, password, database_name)
|
|
||||||
@original_port = port
|
|
||||||
@toxiproxy_port = 10000 + port.to_i
|
|
||||||
@port = @toxiproxy_port
|
|
||||||
|
|
||||||
@username = username
|
|
||||||
@password = password
|
|
||||||
@database_name = database_name
|
|
||||||
@toxiproxy_name = "database_#{@original_port}"
|
|
||||||
Toxiproxy.populate([{
|
|
||||||
name: @toxiproxy_name,
|
|
||||||
listen: "0.0.0.0:#{@toxiproxy_port}",
|
|
||||||
upstream: "localhost:#{@original_port}",
|
|
||||||
}])
|
|
||||||
|
|
||||||
# Toxiproxy server will outlive our PgInstance objects
|
|
||||||
# so we want to destroy our proxies before exiting
|
|
||||||
# Ruby finalizer is ideal for doing this
|
|
||||||
ObjectSpace.define_finalizer(@toxiproxy_name, proc { Toxiproxy[@toxiproxy_name].destroy })
|
|
||||||
end
|
|
||||||
|
|
||||||
def with_connection
|
|
||||||
conn = PG.connect("postgres://#{@username}:#{@password}@localhost:#{port}/#{database_name}")
|
|
||||||
yield conn
|
|
||||||
ensure
|
|
||||||
conn&.close
|
|
||||||
end
|
|
||||||
|
|
||||||
def reset
|
|
||||||
reset_toxics
|
|
||||||
reset_stats
|
|
||||||
end
|
|
||||||
|
|
||||||
def toxiproxy
|
|
||||||
Toxiproxy[@toxiproxy_name]
|
|
||||||
end
|
|
||||||
|
|
||||||
def take_down
|
|
||||||
if block_given?
|
|
||||||
Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).apply { yield }
|
|
||||||
else
|
|
||||||
Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).toxics.each(&:save)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def add_latency(latency)
|
|
||||||
if block_given?
|
|
||||||
Toxiproxy[@toxiproxy_name].toxic(:latency, latency: latency).apply { yield }
|
|
||||||
else
|
|
||||||
Toxiproxy[@toxiproxy_name].toxic(:latency, latency: latency).toxics.each(&:save)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def delete_proxy
|
|
||||||
Toxiproxy[@toxiproxy_name].delete
|
|
||||||
end
|
|
||||||
|
|
||||||
def reset_toxics
|
|
||||||
Toxiproxy[@toxiproxy_name].toxics.each(&:destroy)
|
|
||||||
end
|
|
||||||
|
|
||||||
def reset_stats
|
|
||||||
with_connection { |c| c.async_exec("SELECT pg_stat_statements_reset()") }
|
|
||||||
end
|
|
||||||
|
|
||||||
def count_query(query)
|
|
||||||
with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = '#{query}'")[0]["sum"].to_i }
|
|
||||||
end
|
|
||||||
|
|
||||||
def count_select_1_plus_2
|
|
||||||
with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = 'SELECT $1 + $2'")[0]["sum"].to_i }
|
|
||||||
end
|
|
||||||
end
|
|
||||||
@@ -1,100 +0,0 @@
|
|||||||
require 'json'
|
|
||||||
require 'ostruct'
|
|
||||||
require_relative 'pgcat_process'
|
|
||||||
require_relative 'pg_instance'
|
|
||||||
|
|
||||||
module Helpers
|
|
||||||
module Pgcat
|
|
||||||
def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction")
|
|
||||||
user = {
|
|
||||||
"password" => "sharding_user",
|
|
||||||
"pool_size" => pool_size,
|
|
||||||
"statement_timeout" => 0,
|
|
||||||
"username" => "sharding_user"
|
|
||||||
}
|
|
||||||
|
|
||||||
pgcat = PgcatProcess.new("info")
|
|
||||||
primary0 = PgInstance.new(5432, user["username"], user["password"], "shard0")
|
|
||||||
primary1 = PgInstance.new(7432, user["username"], user["password"], "shard1")
|
|
||||||
primary2 = PgInstance.new(8432, user["username"], user["password"], "shard2")
|
|
||||||
|
|
||||||
pgcat_cfg = pgcat.current_config
|
|
||||||
pgcat_cfg["pools"] = {
|
|
||||||
"#{pool_name}" => {
|
|
||||||
"default_role" => "any",
|
|
||||||
"pool_mode" => pool_mode,
|
|
||||||
"primary_reads_enabled" => false,
|
|
||||||
"query_parser_enabled" => false,
|
|
||||||
"sharding_function" => "pg_bigint_hash",
|
|
||||||
"shards" => {
|
|
||||||
"0" => { "database" => "shard0", "servers" => [["localhost", primary0.port.to_s, "primary"]] },
|
|
||||||
"1" => { "database" => "shard1", "servers" => [["localhost", primary1.port.to_s, "primary"]] },
|
|
||||||
"2" => { "database" => "shard2", "servers" => [["localhost", primary2.port.to_s, "primary"]] },
|
|
||||||
},
|
|
||||||
"users" => { "0" => user }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pgcat.update_config(pgcat_cfg)
|
|
||||||
|
|
||||||
pgcat.start
|
|
||||||
pgcat.wait_until_ready
|
|
||||||
|
|
||||||
OpenStruct.new.tap do |struct|
|
|
||||||
struct.pgcat = pgcat
|
|
||||||
struct.shards = [primary0, primary1, primary2]
|
|
||||||
struct.all_databases = [primary0, primary1, primary2]
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction")
|
|
||||||
user = {
|
|
||||||
"password" => "sharding_user",
|
|
||||||
"pool_size" => pool_size,
|
|
||||||
"statement_timeout" => 0,
|
|
||||||
"username" => "sharding_user"
|
|
||||||
}
|
|
||||||
|
|
||||||
pgcat = PgcatProcess.new("info")
|
|
||||||
pgcat_cfg = pgcat.current_config
|
|
||||||
|
|
||||||
primary = PgInstance.new(5432, user["username"], user["password"], "shard0")
|
|
||||||
replica0 = PgInstance.new(7432, user["username"], user["password"], "shard0")
|
|
||||||
replica1 = PgInstance.new(8432, user["username"], user["password"], "shard0")
|
|
||||||
replica2 = PgInstance.new(9432, user["username"], user["password"], "shard0")
|
|
||||||
|
|
||||||
# Main proxy configs
|
|
||||||
pgcat_cfg["pools"] = {
|
|
||||||
"#{pool_name}" => {
|
|
||||||
"default_role" => "any",
|
|
||||||
"pool_mode" => pool_mode,
|
|
||||||
"primary_reads_enabled" => false,
|
|
||||||
"query_parser_enabled" => false,
|
|
||||||
"sharding_function" => "pg_bigint_hash",
|
|
||||||
"shards" => {
|
|
||||||
"0" => {
|
|
||||||
"database" => "shard0",
|
|
||||||
"servers" => [
|
|
||||||
["localhost", primary.port.to_s, "primary"],
|
|
||||||
["localhost", replica0.port.to_s, "replica"],
|
|
||||||
["localhost", replica1.port.to_s, "replica"],
|
|
||||||
["localhost", replica2.port.to_s, "replica"]
|
|
||||||
]
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"users" => { "0" => user }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pgcat_cfg["general"]["port"] = pgcat.port
|
|
||||||
pgcat.update_config(pgcat_cfg)
|
|
||||||
pgcat.start
|
|
||||||
pgcat.wait_until_ready
|
|
||||||
|
|
||||||
OpenStruct.new.tap do |struct|
|
|
||||||
struct.pgcat = pgcat
|
|
||||||
struct.primary = primary
|
|
||||||
struct.replicas = [replica0, replica1, replica2]
|
|
||||||
struct.all_databases = [primary, replica0, replica1, replica2]
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
@@ -1,116 +0,0 @@
|
|||||||
require 'pg'
|
|
||||||
require 'toml'
|
|
||||||
require 'fileutils'
|
|
||||||
require 'securerandom'
|
|
||||||
|
|
||||||
class PgcatProcess
|
|
||||||
attr_reader :port
|
|
||||||
attr_reader :pid
|
|
||||||
|
|
||||||
def self.finalize(pid, log_filename, config_filename)
|
|
||||||
`kill #{pid}`
|
|
||||||
File.delete(config_filename) if File.exist?(config_filename)
|
|
||||||
File.delete(log_filename) if File.exist?(log_filename)
|
|
||||||
end
|
|
||||||
|
|
||||||
def initialize(log_level)
|
|
||||||
@env = {"RUST_LOG" => log_level}
|
|
||||||
@port = rand(20000..32760)
|
|
||||||
@log_level = log_level
|
|
||||||
@log_filename = "/tmp/pgcat_log_#{SecureRandom.urlsafe_base64}.log"
|
|
||||||
@config_filename = "/tmp/pgcat_cfg_#{SecureRandom.urlsafe_base64}.toml"
|
|
||||||
|
|
||||||
@command = "../../target/debug/pgcat #{@config_filename}"
|
|
||||||
|
|
||||||
FileUtils.cp("../../pgcat.toml", @config_filename)
|
|
||||||
cfg = current_config
|
|
||||||
cfg["general"]["port"] = @port.to_i
|
|
||||||
cfg["general"]["enable_prometheus_exporter"] = false
|
|
||||||
|
|
||||||
update_config(cfg)
|
|
||||||
end
|
|
||||||
|
|
||||||
def logs
|
|
||||||
File.read(@log_filename)
|
|
||||||
end
|
|
||||||
|
|
||||||
def update_config(config_hash)
|
|
||||||
@original_config = current_config
|
|
||||||
output_to_write = TOML::Generator.new(config_hash).body
|
|
||||||
output_to_write = output_to_write.gsub(/,\s*["|'](\d+)["|']\s*,/, ',\1,')
|
|
||||||
File.write(@config_filename, output_to_write)
|
|
||||||
end
|
|
||||||
|
|
||||||
def current_config
|
|
||||||
old_cfg = File.read(@config_filename)
|
|
||||||
loadable_string = old_cfg.gsub(/,\s*(\d+)\s*,/, ', "\1",')
|
|
||||||
TOML.load(loadable_string)
|
|
||||||
end
|
|
||||||
|
|
||||||
def reload_config
|
|
||||||
`kill -s HUP #{@pid}`
|
|
||||||
sleep 0.1
|
|
||||||
end
|
|
||||||
|
|
||||||
def start
|
|
||||||
raise StandardError, "Process is already started" unless @pid.nil?
|
|
||||||
@pid = Process.spawn(@env, @command, err: @log_filename, out: @log_filename)
|
|
||||||
ObjectSpace.define_finalizer(@log_filename, proc { PgcatProcess.finalize(@pid, @log_filename, @config_filename) })
|
|
||||||
|
|
||||||
return self
|
|
||||||
end
|
|
||||||
|
|
||||||
def wait_until_ready
|
|
||||||
exc = nil
|
|
||||||
10.times do
|
|
||||||
PG::connect(example_connection_string).close
|
|
||||||
|
|
||||||
return self
|
|
||||||
rescue => e
|
|
||||||
exc = e
|
|
||||||
sleep(0.5)
|
|
||||||
end
|
|
||||||
puts exc
|
|
||||||
raise StandardError, "Process #{@pid} never became ready. Logs #{logs}"
|
|
||||||
end
|
|
||||||
|
|
||||||
def stop
|
|
||||||
`kill #{@pid}`
|
|
||||||
sleep 0.1
|
|
||||||
end
|
|
||||||
|
|
||||||
def shutdown
|
|
||||||
stop
|
|
||||||
File.delete(@config_filename) if File.exist?(@config_filename)
|
|
||||||
File.delete(@log_filename) if File.exist?(@log_filename)
|
|
||||||
end
|
|
||||||
|
|
||||||
def admin_connection_string
|
|
||||||
cfg = current_config
|
|
||||||
username = cfg["general"]["admin_username"]
|
|
||||||
password = cfg["general"]["admin_password"]
|
|
||||||
|
|
||||||
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/pgcat"
|
|
||||||
end
|
|
||||||
|
|
||||||
def connection_string(pool_name, username)
|
|
||||||
cfg = current_config
|
|
||||||
|
|
||||||
user_idx, user_obj = cfg["pools"][pool_name]["users"].detect { |k, user| user["username"] == username }
|
|
||||||
password = user_obj["password"]
|
|
||||||
|
|
||||||
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/#{pool_name}"
|
|
||||||
end
|
|
||||||
|
|
||||||
def example_connection_string
|
|
||||||
cfg = current_config
|
|
||||||
first_pool_name = cfg["pools"].keys[0]
|
|
||||||
|
|
||||||
db_name = first_pool_name
|
|
||||||
|
|
||||||
username = cfg["pools"][first_pool_name]["users"]["0"]["username"]
|
|
||||||
password = cfg["pools"][first_pool_name]["users"]["0"]["password"]
|
|
||||||
|
|
||||||
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/#{db_name}"
|
|
||||||
end
|
|
||||||
end
|
|
||||||
@@ -1,61 +0,0 @@
|
|||||||
# frozen_string_literal: true
|
|
||||||
require_relative 'spec_helper'
|
|
||||||
|
|
||||||
describe "Load Balancing" do
|
|
||||||
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
|
|
||||||
after do
|
|
||||||
processes.all_databases.map(&:reset)
|
|
||||||
processes.pgcat.shutdown
|
|
||||||
end
|
|
||||||
|
|
||||||
context "under regular circumstances" do
|
|
||||||
it "balances query volume between all instances" do
|
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
|
|
||||||
query_count = QUERY_COUNT
|
|
||||||
expected_share = query_count / processes.all_databases.count
|
|
||||||
failed_count = 0
|
|
||||||
|
|
||||||
query_count.times do
|
|
||||||
conn.async_exec("SELECT 1 + 2")
|
|
||||||
rescue
|
|
||||||
failed_count += 1
|
|
||||||
end
|
|
||||||
|
|
||||||
expect(failed_count).to eq(0)
|
|
||||||
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
|
|
||||||
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
context "when some replicas are down" do
|
|
||||||
it "balances query volume between working instances" do
|
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
|
|
||||||
failed_count = 0
|
|
||||||
|
|
||||||
processes[:replicas][0].take_down do
|
|
||||||
processes[:replicas][1].take_down do
|
|
||||||
QUERY_COUNT.times do
|
|
||||||
conn.async_exec("SELECT 1 + 2")
|
|
||||||
rescue
|
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
failed_count += 1
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
expect(failed_count).to eq(2)
|
|
||||||
processes.all_databases.each do |instance|
|
|
||||||
queries_routed = instance.count_select_1_plus_2
|
|
||||||
if processes.replicas[0..1].include?(instance)
|
|
||||||
expect(queries_routed).to eq(0)
|
|
||||||
else
|
|
||||||
expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
@@ -1,193 +0,0 @@
|
|||||||
# frozen_string_literal: true
|
|
||||||
require_relative 'spec_helper'
|
|
||||||
|
|
||||||
describe "Miscellaneous" do
|
|
||||||
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
|
|
||||||
after do
|
|
||||||
processes.all_databases.map(&:reset)
|
|
||||||
processes.pgcat.shutdown
|
|
||||||
end
|
|
||||||
|
|
||||||
describe "Extended Protocol handling" do
|
|
||||||
it "does not send packets that client does not expect during extended protocol sequence" do
|
|
||||||
new_configs = processes.pgcat.current_config
|
|
||||||
|
|
||||||
new_configs["general"]["connect_timeout"] = 500
|
|
||||||
new_configs["general"]["ban_time"] = 1
|
|
||||||
new_configs["general"]["shutdown_timeout"] = 1
|
|
||||||
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
|
|
||||||
|
|
||||||
processes.pgcat.update_config(new_configs)
|
|
||||||
processes.pgcat.reload_config
|
|
||||||
|
|
||||||
25.times do
|
|
||||||
Thread.new do
|
|
||||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
conn.async_exec("SELECT pg_sleep(5)") rescue PG::SystemError
|
|
||||||
ensure
|
|
||||||
conn&.close
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
sleep(0.5)
|
|
||||||
conn_under_test = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
stdout, stderr = with_captured_stdout_stderr do
|
|
||||||
15.times do |i|
|
|
||||||
conn_under_test.async_exec("SELECT 1") rescue PG::SystemError
|
|
||||||
conn_under_test.exec_params("SELECT #{i} + $1", [i]) rescue PG::SystemError
|
|
||||||
sleep 1
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
raise StandardError, "Libpq got unexpected messages while idle" if stderr.include?("arrived from server while idle")
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
describe "Pool recycling after config reload" do
|
|
||||||
let(:processes) { Helpers::Pgcat.three_shard_setup("sharded_db", 5) }
|
|
||||||
|
|
||||||
it "should update pools for new clients and clients that are no longer in transaction" do
|
|
||||||
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
server_conn.async_exec("BEGIN")
|
|
||||||
|
|
||||||
# No config change yet, client should set old configs
|
|
||||||
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
|
|
||||||
expect(current_datebase_from_pg).to eq('shard0')
|
|
||||||
|
|
||||||
# Swap shards
|
|
||||||
new_config = processes.pgcat.current_config
|
|
||||||
shard0 = new_config["pools"]["sharded_db"]["shards"]["0"]
|
|
||||||
shard1 = new_config["pools"]["sharded_db"]["shards"]["1"]
|
|
||||||
new_config["pools"]["sharded_db"]["shards"]["0"] = shard1
|
|
||||||
new_config["pools"]["sharded_db"]["shards"]["1"] = shard0
|
|
||||||
|
|
||||||
# Reload config
|
|
||||||
processes.pgcat.update_config(new_config)
|
|
||||||
processes.pgcat.reload_config
|
|
||||||
sleep 0.5
|
|
||||||
|
|
||||||
# Config changed but transaction is in progress, client should set old configs
|
|
||||||
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
|
|
||||||
expect(current_datebase_from_pg).to eq('shard0')
|
|
||||||
server_conn.async_exec("COMMIT")
|
|
||||||
|
|
||||||
# Transaction finished, client should get new configs
|
|
||||||
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
|
|
||||||
expect(current_datebase_from_pg).to eq('shard1')
|
|
||||||
|
|
||||||
# New connection should get new configs
|
|
||||||
server_conn.close()
|
|
||||||
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
|
|
||||||
expect(current_datebase_from_pg).to eq('shard1')
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
describe "Clients closing connection in the middle of transaction" do
|
|
||||||
it "sends a rollback to the server" do
|
|
||||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
conn.async_exec("SET SERVER ROLE to 'primary'")
|
|
||||||
conn.async_exec("BEGIN")
|
|
||||||
conn.close
|
|
||||||
|
|
||||||
expect(processes.primary.count_query("ROLLBACK")).to eq(1)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
describe "Server version reporting" do
|
|
||||||
it "reports correct version for normal and admin databases" do
|
|
||||||
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
expect(server_conn.server_version).not_to eq(0)
|
|
||||||
server_conn.close
|
|
||||||
|
|
||||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
|
||||||
expect(admin_conn.server_version).not_to eq(0)
|
|
||||||
admin_conn.close
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
describe "State clearance" do
|
|
||||||
context "session mode" do
|
|
||||||
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "session") }
|
|
||||||
|
|
||||||
it "Clears state before connection checkin" do
|
|
||||||
# Both modes of operation should not raise
|
|
||||||
# ERROR: prepared statement "prepared_q" already exists
|
|
||||||
15.times do
|
|
||||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
conn.async_exec("PREPARE prepared_q (int) AS SELECT $1")
|
|
||||||
conn.close
|
|
||||||
end
|
|
||||||
|
|
||||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
initial_value = conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]
|
|
||||||
conn.async_exec("SET statement_timeout to 1000")
|
|
||||||
current_value = conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]
|
|
||||||
expect(conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]).to eq("1s")
|
|
||||||
conn.close
|
|
||||||
end
|
|
||||||
|
|
||||||
it "Does not send DISCARD ALL unless necessary" do
|
|
||||||
10.times do
|
|
||||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
conn.async_exec("SET SERVER ROLE to 'primary'")
|
|
||||||
conn.async_exec("SELECT 1")
|
|
||||||
conn.close
|
|
||||||
end
|
|
||||||
|
|
||||||
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
|
|
||||||
|
|
||||||
10.times do
|
|
||||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
conn.async_exec("SET SERVER ROLE to 'primary'")
|
|
||||||
conn.async_exec("SELECT 1")
|
|
||||||
conn.async_exec("SET statement_timeout to 5000")
|
|
||||||
conn.close
|
|
||||||
end
|
|
||||||
|
|
||||||
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
context "transaction mode" do
|
|
||||||
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction") }
|
|
||||||
it "Clears state before connection checkin" do
|
|
||||||
# Both modes of operation should not raise
|
|
||||||
# ERROR: prepared statement "prepared_q" already exists
|
|
||||||
15.times do
|
|
||||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
conn.async_exec("PREPARE prepared_q (int) AS SELECT $1")
|
|
||||||
conn.close
|
|
||||||
end
|
|
||||||
|
|
||||||
15.times do
|
|
||||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
conn.prepare("prepared_q", "SELECT $1")
|
|
||||||
conn.close
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
it "Does not send DISCARD ALL unless necessary" do
|
|
||||||
10.times do
|
|
||||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
conn.async_exec("SET SERVER ROLE to 'primary'")
|
|
||||||
conn.async_exec("SELECT 1")
|
|
||||||
conn.exec_params("SELECT $1", [1])
|
|
||||||
conn.close
|
|
||||||
end
|
|
||||||
|
|
||||||
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
|
|
||||||
|
|
||||||
10.times do
|
|
||||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
conn.async_exec("SET SERVER ROLE to 'primary'")
|
|
||||||
conn.async_exec("SELECT 1")
|
|
||||||
conn.async_exec("SET statement_timeout to 5000")
|
|
||||||
conn.close
|
|
||||||
end
|
|
||||||
|
|
||||||
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
@@ -1,81 +0,0 @@
|
|||||||
# frozen_string_literal: true
|
|
||||||
require_relative 'spec_helper'
|
|
||||||
|
|
||||||
|
|
||||||
describe "Routing" do
|
|
||||||
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
|
|
||||||
after do
|
|
||||||
processes.all_databases.map(&:reset)
|
|
||||||
processes.pgcat.shutdown
|
|
||||||
end
|
|
||||||
|
|
||||||
describe "SET ROLE" do
|
|
||||||
context "primary" do
|
|
||||||
it "routes queries only to primary" do
|
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
conn.async_exec("SET SERVER ROLE to 'primary'")
|
|
||||||
|
|
||||||
query_count = 30
|
|
||||||
failed_count = 0
|
|
||||||
|
|
||||||
query_count.times do
|
|
||||||
conn.async_exec("SELECT 1 + 2")
|
|
||||||
rescue
|
|
||||||
failed_count += 1
|
|
||||||
end
|
|
||||||
|
|
||||||
expect(failed_count).to eq(0)
|
|
||||||
processes.replicas.map(&:count_select_1_plus_2).each do |instance_share|
|
|
||||||
expect(instance_share).to eq(0)
|
|
||||||
end
|
|
||||||
|
|
||||||
expect(processes.primary.count_select_1_plus_2).to eq(query_count)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
context "replica" do
|
|
||||||
it "routes queries only to replicas" do
|
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
conn.async_exec("SET SERVER ROLE to 'replica'")
|
|
||||||
|
|
||||||
expected_share = QUERY_COUNT / processes.replicas.count
|
|
||||||
failed_count = 0
|
|
||||||
|
|
||||||
QUERY_COUNT.times do
|
|
||||||
conn.async_exec("SELECT 1 + 2")
|
|
||||||
rescue
|
|
||||||
failed_count += 1
|
|
||||||
end
|
|
||||||
|
|
||||||
expect(failed_count).to eq(0)
|
|
||||||
|
|
||||||
processes.replicas.map(&:count_select_1_plus_2).each do |instance_share|
|
|
||||||
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
|
||||||
end
|
|
||||||
|
|
||||||
expect(processes.primary.count_select_1_plus_2).to eq(0)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
context "any" do
|
|
||||||
it "routes queries to all instances" do
|
|
||||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
|
||||||
conn.async_exec("SET SERVER ROLE to 'any'")
|
|
||||||
|
|
||||||
expected_share = QUERY_COUNT / processes.all_databases.count
|
|
||||||
failed_count = 0
|
|
||||||
|
|
||||||
QUERY_COUNT.times do
|
|
||||||
conn.async_exec("SELECT 1 + 2")
|
|
||||||
rescue
|
|
||||||
failed_count += 1
|
|
||||||
end
|
|
||||||
|
|
||||||
expect(failed_count).to eq(0)
|
|
||||||
|
|
||||||
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
|
|
||||||
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
@@ -1,21 +0,0 @@
|
|||||||
# frozen_string_literal: true
|
|
||||||
|
|
||||||
require 'pg'
|
|
||||||
require_relative 'helpers/pgcat_helper'
|
|
||||||
|
|
||||||
QUERY_COUNT = 300
|
|
||||||
MARGIN_OF_ERROR = 0.30
|
|
||||||
|
|
||||||
def with_captured_stdout_stderr
|
|
||||||
sout = STDOUT.clone
|
|
||||||
serr = STDERR.clone
|
|
||||||
STDOUT.reopen("/tmp/out.txt", "w+")
|
|
||||||
STDERR.reopen("/tmp/err.txt", "w+")
|
|
||||||
STDOUT.sync = true
|
|
||||||
STDERR.sync = true
|
|
||||||
yield
|
|
||||||
return File.read('/tmp/out.txt'), File.read('/tmp/err.txt')
|
|
||||||
ensure
|
|
||||||
STDOUT.reopen(sout)
|
|
||||||
STDERR.reopen(serr)
|
|
||||||
end
|
|
||||||
@@ -1,6 +1,93 @@
|
|||||||
# frozen_string_literal: true
|
# frozen_string_literal: true
|
||||||
require 'pg'
|
|
||||||
require 'active_record'
|
require 'active_record'
|
||||||
|
require 'pg'
|
||||||
|
require 'toml'
|
||||||
|
|
||||||
|
$stdout.sync = true
|
||||||
|
$stderr.sync = true
|
||||||
|
|
||||||
|
class ConfigEditor
|
||||||
|
def initialize
|
||||||
|
@original_config_text = File.read('../../.circleci/pgcat.toml')
|
||||||
|
text_to_load = @original_config_text.gsub("5432", "\"5432\"")
|
||||||
|
|
||||||
|
@original_configs = TOML.load(text_to_load)
|
||||||
|
end
|
||||||
|
|
||||||
|
def original_configs
|
||||||
|
TOML.load(TOML::Generator.new(@original_configs).body)
|
||||||
|
end
|
||||||
|
|
||||||
|
def with_modified_configs(new_configs)
|
||||||
|
text_to_write = TOML::Generator.new(new_configs).body
|
||||||
|
text_to_write = text_to_write.gsub("\"5432\"", "5432")
|
||||||
|
File.write('../../.circleci/pgcat.toml', text_to_write)
|
||||||
|
yield
|
||||||
|
ensure
|
||||||
|
File.write('../../.circleci/pgcat.toml', @original_config_text)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def with_captured_stdout_stderr
|
||||||
|
sout = STDOUT.clone
|
||||||
|
serr = STDERR.clone
|
||||||
|
STDOUT.reopen("/tmp/out.txt", "w+")
|
||||||
|
STDERR.reopen("/tmp/err.txt", "w+")
|
||||||
|
STDOUT.sync = true
|
||||||
|
STDERR.sync = true
|
||||||
|
yield
|
||||||
|
return File.read('/tmp/out.txt'), File.read('/tmp/err.txt')
|
||||||
|
ensure
|
||||||
|
STDOUT.reopen(sout)
|
||||||
|
STDERR.reopen(serr)
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
|
def test_extended_protocol_pooler_errors
|
||||||
|
admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat")
|
||||||
|
|
||||||
|
conf_editor = ConfigEditor.new
|
||||||
|
new_configs = conf_editor.original_configs
|
||||||
|
|
||||||
|
# shorter timeouts
|
||||||
|
new_configs["general"]["connect_timeout"] = 500
|
||||||
|
new_configs["general"]["ban_time"] = 1
|
||||||
|
new_configs["general"]["shutdown_timeout"] = 1
|
||||||
|
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
|
||||||
|
new_configs["pools"]["sharded_db"]["users"]["1"]["pool_size"] = 1
|
||||||
|
|
||||||
|
conf_editor.with_modified_configs(new_configs) { admin_conn.async_exec("RELOAD") }
|
||||||
|
|
||||||
|
conn_str = "postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db"
|
||||||
|
10.times do
|
||||||
|
Thread.new do
|
||||||
|
conn = PG::connect(conn_str)
|
||||||
|
conn.async_exec("SELECT pg_sleep(5)") rescue PG::SystemError
|
||||||
|
ensure
|
||||||
|
conn&.close
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
sleep(0.5)
|
||||||
|
conn_under_test = PG::connect(conn_str)
|
||||||
|
stdout, stderr = with_captured_stdout_stderr do
|
||||||
|
5.times do |i|
|
||||||
|
conn_under_test.async_exec("SELECT 1") rescue PG::SystemError
|
||||||
|
conn_under_test.exec_params("SELECT #{i} + $1", [i]) rescue PG::SystemError
|
||||||
|
sleep 1
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
raise StandardError, "Libpq got unexpected messages while idle" if stderr.include?("arrived from server while idle")
|
||||||
|
puts "Pool checkout errors not breaking clients passed"
|
||||||
|
ensure
|
||||||
|
sleep 1
|
||||||
|
admin_conn.async_exec("RELOAD") # Reset state
|
||||||
|
conn_under_test&.close
|
||||||
|
end
|
||||||
|
|
||||||
|
test_extended_protocol_pooler_errors
|
||||||
|
|
||||||
# Uncomment these two to see all queries.
|
# Uncomment these two to see all queries.
|
||||||
# ActiveRecord.verbose_query_logs = true
|
# ActiveRecord.verbose_query_logs = true
|
||||||
@@ -111,3 +198,68 @@ begin
|
|||||||
rescue ActiveRecord::StatementInvalid
|
rescue ActiveRecord::StatementInvalid
|
||||||
puts 'OK'
|
puts 'OK'
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Test evil clients
|
||||||
|
def poorly_behaved_client
|
||||||
|
conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat")
|
||||||
|
conn.async_exec 'BEGIN'
|
||||||
|
conn.async_exec 'SELECT 1'
|
||||||
|
|
||||||
|
conn.close
|
||||||
|
puts 'Bad client ok'
|
||||||
|
end
|
||||||
|
|
||||||
|
25.times do
|
||||||
|
poorly_behaved_client
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
|
def test_server_parameters
|
||||||
|
server_conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat")
|
||||||
|
raise StandardError, "Bad server version" if server_conn.server_version == 0
|
||||||
|
server_conn.close
|
||||||
|
|
||||||
|
admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat")
|
||||||
|
raise StandardError, "Bad server version" if admin_conn.server_version == 0
|
||||||
|
admin_conn.close
|
||||||
|
|
||||||
|
puts 'Server parameters ok'
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
|
def test_reload_pool_recycling
|
||||||
|
admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat")
|
||||||
|
server_conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat")
|
||||||
|
|
||||||
|
server_conn.async_exec("BEGIN")
|
||||||
|
conf_editor = ConfigEditor.new
|
||||||
|
new_configs = conf_editor.original_configs
|
||||||
|
|
||||||
|
# swap shards
|
||||||
|
new_configs["pools"]["sharded_db"]["shards"]["0"]["database"] = "shard1"
|
||||||
|
new_configs["pools"]["sharded_db"]["shards"]["1"]["database"] = "shard0"
|
||||||
|
|
||||||
|
raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard0'
|
||||||
|
conf_editor.with_modified_configs(new_configs) { admin_conn.async_exec("RELOAD") }
|
||||||
|
raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard0'
|
||||||
|
server_conn.async_exec("COMMIT;")
|
||||||
|
|
||||||
|
# Transaction finished, client should get new configs
|
||||||
|
raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard1'
|
||||||
|
server_conn.close()
|
||||||
|
|
||||||
|
# New connection should get new configs
|
||||||
|
server_conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat")
|
||||||
|
raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard1'
|
||||||
|
|
||||||
|
ensure
|
||||||
|
admin_conn.async_exec("RELOAD") # Go back to old state
|
||||||
|
admin_conn.close
|
||||||
|
server_conn.close
|
||||||
|
puts "Pool Recycling okay!"
|
||||||
|
end
|
||||||
|
|
||||||
|
test_reload_pool_recycling
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -70,35 +70,23 @@ GRANT CONNECT ON DATABASE shard2 TO other_user;
|
|||||||
GRANT CONNECT ON DATABASE some_db TO simple_user;
|
GRANT CONNECT ON DATABASE some_db TO simple_user;
|
||||||
|
|
||||||
\c shard0
|
\c shard0
|
||||||
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
|
|
||||||
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO sharding_user;
|
|
||||||
GRANT ALL ON SCHEMA public TO sharding_user;
|
GRANT ALL ON SCHEMA public TO sharding_user;
|
||||||
GRANT ALL ON TABLE data TO sharding_user;
|
GRANT ALL ON TABLE data TO sharding_user;
|
||||||
GRANT ALL ON SCHEMA public TO other_user;
|
GRANT ALL ON SCHEMA public TO other_user;
|
||||||
GRANT ALL ON TABLE data TO other_user;
|
GRANT ALL ON TABLE data TO other_user;
|
||||||
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO other_user;
|
|
||||||
|
|
||||||
\c shard1
|
\c shard1
|
||||||
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
|
|
||||||
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO sharding_user;
|
|
||||||
GRANT ALL ON SCHEMA public TO sharding_user;
|
GRANT ALL ON SCHEMA public TO sharding_user;
|
||||||
GRANT ALL ON TABLE data TO sharding_user;
|
GRANT ALL ON TABLE data TO sharding_user;
|
||||||
GRANT ALL ON SCHEMA public TO other_user;
|
GRANT ALL ON SCHEMA public TO other_user;
|
||||||
GRANT ALL ON TABLE data TO other_user;
|
GRANT ALL ON TABLE data TO other_user;
|
||||||
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO other_user;
|
|
||||||
|
|
||||||
|
|
||||||
\c shard2
|
\c shard2
|
||||||
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
|
|
||||||
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO sharding_user;
|
|
||||||
GRANT ALL ON SCHEMA public TO sharding_user;
|
GRANT ALL ON SCHEMA public TO sharding_user;
|
||||||
GRANT ALL ON TABLE data TO sharding_user;
|
GRANT ALL ON TABLE data TO sharding_user;
|
||||||
GRANT ALL ON SCHEMA public TO other_user;
|
GRANT ALL ON SCHEMA public TO other_user;
|
||||||
GRANT ALL ON TABLE data TO other_user;
|
GRANT ALL ON TABLE data TO other_user;
|
||||||
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO other_user;
|
|
||||||
|
|
||||||
\c some_db
|
\c some_db
|
||||||
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
|
|
||||||
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO simple_user;
|
|
||||||
GRANT ALL ON SCHEMA public TO simple_user;
|
GRANT ALL ON SCHEMA public TO simple_user;
|
||||||
GRANT ALL ON TABLE data TO simple_user;
|
GRANT ALL ON TABLE data TO simple_user;
|
||||||
|
|||||||
Reference in New Issue
Block a user