mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 09:26:30 +00:00
Compare commits
5 Commits
levkk-dist
...
levkk-log-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
61b9756ded | ||
|
|
2cd9e15849 | ||
|
|
fd57fae280 | ||
|
|
a460a645f5 | ||
|
|
f7d33fba7a |
@@ -15,34 +15,14 @@ jobs:
|
||||
RUSTFLAGS: "-C instrument-coverage"
|
||||
LLVM_PROFILE_FILE: "pgcat-%m.profraw"
|
||||
- image: postgres:14
|
||||
command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements"]
|
||||
# auth:
|
||||
# username: mydockerhub-user
|
||||
# password: $DOCKERHUB_PASSWORD
|
||||
environment:
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_DB: postgres
|
||||
POSTGRES_PASSWORD: postgres
|
||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||
- image: postgres:14
|
||||
command: ["postgres", "-p", "7432", "-c", "shared_preload_libraries=pg_stat_statements"]
|
||||
environment:
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_DB: postgres
|
||||
POSTGRES_PASSWORD: postgres
|
||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||
- image: postgres:14
|
||||
command: ["postgres", "-p", "8432", "-c", "shared_preload_libraries=pg_stat_statements"]
|
||||
environment:
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_DB: postgres
|
||||
POSTGRES_PASSWORD: postgres
|
||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||
- image: postgres:14
|
||||
command: ["postgres", "-p", "9432", "-c", "shared_preload_libraries=pg_stat_statements"]
|
||||
environment:
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_DB: postgres
|
||||
POSTGRES_PASSWORD: postgres
|
||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||
|
||||
# Add steps to the job
|
||||
# See: https://circleci.com/docs/2.0/configuration-reference/#steps
|
||||
steps:
|
||||
|
||||
@@ -3,9 +3,6 @@
|
||||
set -e
|
||||
set -o xtrace
|
||||
|
||||
# non-zero exit code if we provide bad configs
|
||||
(! ./target/debug/pgcat "fake_configs" 2>/dev/null)
|
||||
|
||||
# Start PgCat with a particular log level
|
||||
# for inspection.
|
||||
function start_pgcat() {
|
||||
@@ -16,20 +13,17 @@ function start_pgcat() {
|
||||
|
||||
# Setup the database with shards and user
|
||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql
|
||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 7432 -U postgres -f tests/sharding/query_routing_setup.sql
|
||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 8432 -U postgres -f tests/sharding/query_routing_setup.sql
|
||||
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 9432 -U postgres -f tests/sharding/query_routing_setup.sql
|
||||
|
||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard0 -i
|
||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
|
||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
|
||||
|
||||
# Install Toxiproxy to simulate a downed/slow database
|
||||
wget -O toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/download/v2.4.0/toxiproxy_2.4.0_linux_$(dpkg --print-architecture).deb
|
||||
sudo dpkg -i toxiproxy-2.4.0.deb
|
||||
wget -O toxiproxy-2.1.4.deb https://github.com/Shopify/toxiproxy/releases/download/v2.1.4/toxiproxy_2.1.4_amd64.deb
|
||||
sudo dpkg -i toxiproxy-2.1.4.deb
|
||||
|
||||
# Start Toxiproxy
|
||||
LOG_LEVEL=error toxiproxy-server &
|
||||
toxiproxy-server &
|
||||
sleep 1
|
||||
|
||||
# Create a database at port 5433, forward it to Postgres
|
||||
@@ -90,8 +84,7 @@ kill -SIGHUP $(pgrep pgcat) # Reload config again
|
||||
cd tests/ruby
|
||||
sudo gem install bundler
|
||||
bundle install
|
||||
bundle exec ruby tests.rb || exit 1
|
||||
bundle exec rspec *_spec.rb || exit 1
|
||||
ruby tests.rb
|
||||
cd ../..
|
||||
|
||||
#
|
||||
@@ -99,7 +92,7 @@ cd ../..
|
||||
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
|
||||
#
|
||||
pip3 install -r tests/python/requirements.txt
|
||||
python3 tests/python/tests.py || exit 1
|
||||
python3 tests/python/tests.py
|
||||
|
||||
start_pgcat "info"
|
||||
|
||||
@@ -109,9 +102,9 @@ psql -U admin_user -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/n
|
||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RELOAD' > /dev/null
|
||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW CONFIG' > /dev/null
|
||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES' > /dev/null
|
||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgcat -c 'SHOW LISTS' > /dev/null
|
||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgcat -c 'SHOW POOLS' > /dev/null
|
||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgcat -c 'SHOW VERSION' > /dev/null
|
||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW LISTS' > /dev/null
|
||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW POOLS' > /dev/null
|
||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW VERSION' > /dev/null
|
||||
psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c "SET client_encoding TO 'utf8'" > /dev/null # will ignore
|
||||
(! psql -U admin_user -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null)
|
||||
export PGPASSWORD=sharding_user
|
||||
@@ -136,14 +129,11 @@ toxiproxy-cli toxic remove --toxicName latency_downstream postgres_replica
|
||||
start_pgcat "info"
|
||||
|
||||
# Test session mode (and config reload)
|
||||
sed -i '0,/simple_db/s/pool_mode = "transaction"/pool_mode = "session"/' .circleci/pgcat.toml
|
||||
sed -i 's/pool_mode = "transaction"/pool_mode = "session"/' .circleci/pgcat.toml
|
||||
|
||||
# Reload config test
|
||||
kill -SIGHUP $(pgrep pgcat)
|
||||
|
||||
# Revert settings after reload. Makes test runs idempotent
|
||||
sed -i '0,/simple_db/s/pool_mode = "session"/pool_mode = "transaction"/' .circleci/pgcat.toml
|
||||
|
||||
sleep 1
|
||||
|
||||
# Prepared statements that will only work in session mode
|
||||
|
||||
7
Cargo.lock
generated
7
Cargo.lock
generated
@@ -159,12 +159,6 @@ dependencies = [
|
||||
"termcolor",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "exitcode"
|
||||
version = "1.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "de853764b47027c2e862a995c34978ffa63c1501f2e15f987ba11bd4f9bba193"
|
||||
|
||||
[[package]]
|
||||
name = "fnv"
|
||||
version = "1.0.7"
|
||||
@@ -521,7 +515,6 @@ dependencies = [
|
||||
"bytes",
|
||||
"chrono",
|
||||
"env_logger",
|
||||
"exitcode",
|
||||
"hmac",
|
||||
"hyper",
|
||||
"log",
|
||||
|
||||
@@ -33,4 +33,3 @@ tokio-rustls = "0.23"
|
||||
rustls-pemfile = "1"
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
phf = { version = "0.10", features = ["macros"] }
|
||||
exitcode = "1.1.2"
|
||||
|
||||
49
README.md
49
README.md
@@ -1,11 +1,8 @@
|
||||

|
||||
|
||||
##### PgCat: PostgreSQL at petabyte scale
|
||||
# PgCat
|
||||
|
||||
[](https://circleci.com/gh/levkk/pgcat/tree/main)
|
||||
<a href="https://discord.gg/DmyJP3qJ7U" target="_blank">
|
||||
<img src="https://img.shields.io/discord/1013868243036930099" alt="Join our Discord!" />
|
||||
</a>
|
||||
|
||||

|
||||
|
||||
PostgreSQL pooler (like PgBouncer) with sharding, load balancing and failover support.
|
||||
|
||||
@@ -18,7 +15,7 @@ PostgreSQL pooler (like PgBouncer) with sharding, load balancing and failover su
|
||||
| Session pooling | :white_check_mark: | Identical to PgBouncer. |
|
||||
| `COPY` support | :white_check_mark: | Both `COPY TO` and `COPY FROM` are supported. |
|
||||
| Query cancellation | :white_check_mark: | Supported both in transaction and session pooling modes. |
|
||||
| Load balancing of read queries | :white_check_mark: | Using random between replicas. Primary is included when `primary_reads_enabled` is enabled (default). |
|
||||
| Load balancing of read queries | :white_check_mark: | Using round-robin between replicas. Primary is included when `primary_reads_enabled` is enabled (default). |
|
||||
| Sharding | :white_check_mark: | Transactions are sharded using `SET SHARD TO` and `SET SHARDING KEY TO` syntax extensions; see examples below. |
|
||||
| Failover | :white_check_mark: | Replicas are tested with a health check. If a health check fails, remaining replicas are attempted; see below for algorithm description and examples. |
|
||||
| Statistics | :white_check_mark: | Statistics available in the admin database (`pgcat` and `pgbouncer`) with `SHOW STATS`, `SHOW POOLS` and others. |
|
||||
@@ -66,7 +63,7 @@ psql -h 127.0.0.1 -p 6432 -c 'SELECT 1'
|
||||
| `database` | The name of the database to connect to. This is the same on all servers that are part of one shard. | |
|
||||
| | | |
|
||||
| **`query_router`** | | |
|
||||
| `default_role` | Traffic is routed to this role by default (random), unless the client specifies otherwise. Default is `any`, for any role available. | `any`, `primary`, `replica` |
|
||||
| `default_role` | Traffic is routed to this role by default (round-robin), unless the client specifies otherwise. Default is `any`, for any role available. | `any`, `primary`, `replica` |
|
||||
| `query_parser_enabled` | Enable the query parser which will inspect incoming queries and route them to a primary or replicas. | `false` |
|
||||
| `primary_reads_enabled` | Enable this to allow read queries on the primary; otherwise read queries are routed to the replicas. | `true` |
|
||||
|
||||
@@ -90,14 +87,6 @@ pgbench -t 1000 -p 6432 -h 127.0.0.1 --protocol extended
|
||||
|
||||
See [sharding README](./tests/sharding/README.md) for sharding logic testing.
|
||||
|
||||
Run `cargo test` to run Rust tests.
|
||||
|
||||
Run the following commands to run Integration tests locally.
|
||||
```
|
||||
cd tests/docker/
|
||||
docker compose up --exit-code-from main # This will also produce coverage report under ./cov/
|
||||
```
|
||||
|
||||
| **Feature** | **Tested in CI** | **Tested manually** | **Comments** |
|
||||
|-----------------------|--------------------|---------------------|--------------------------------------------------------------------------------------------------------------------------|
|
||||
| Transaction pooling | :white_check_mark: | :white_check_mark: | Used by default for all tests. |
|
||||
@@ -123,7 +112,7 @@ In transaction mode, a client talks to one server for the duration of a single t
|
||||
This mode is enabled by default.
|
||||
|
||||
### Load balancing of read queries
|
||||
All queries are load balanced against the configured servers using the random algorithm. The most straight forward configuration example would be to put this pooler in front of several replicas and let it load balance all queries.
|
||||
All queries are load balanced against the configured servers using the round-robin algorithm. The most straight forward configuration example would be to put this pooler in front of several replicas and let it load balance all queries.
|
||||
|
||||
If the configuration includes a primary and replicas, the queries can be separated with the built-in query parser. The query parser will interpret the query and route all `SELECT` queries to a replica, while all other queries including explicit transactions will be routed to the primary.
|
||||
|
||||
@@ -162,18 +151,18 @@ Failover behavior can get pretty interesting (read complex) when multiple config
|
||||
|
||||
| **Query** | **`SET SERVER ROLE TO`** | **`query_parser_enabled`** | **`primary_reads_enabled`** | **Target state** | **Outcome** |
|
||||
|---------------------------|--------------------------|----------------------------|-----------------------------|------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| Read query, i.e. `SELECT` | unset (any) | false | false | up | Query is routed to the first instance in the random loop. |
|
||||
| Read query | unset (any) | true | false | up | Query is routed to the first replica instance in the random loop. |
|
||||
| Read query | unset (any) | true | true | up | Query is routed to the first instance in the random loop. |
|
||||
| Read query | replica | false | false | up | Query is routed to the first replica instance in the random loop. |
|
||||
| Read query, i.e. `SELECT` | unset (any) | false | false | up | Query is routed to the first instance in the round-robin loop. |
|
||||
| Read query | unset (any) | true | false | up | Query is routed to the first replica instance in the round-robin loop. |
|
||||
| Read query | unset (any) | true | true | up | Query is routed to the first instance in the round-robin loop. |
|
||||
| Read query | replica | false | false | up | Query is routed to the first replica instance in the round-robin loop. |
|
||||
| Read query | primary | false | false | up | Query is routed to the primary. |
|
||||
| Read query | unset (any) | false | false | down | First instance is banned for reads. Next target in the random loop is attempted. |
|
||||
| Read query | unset (any) | true | false | down | First replica instance is banned. Next replica instance is attempted in the random loop. |
|
||||
| Read query | unset (any) | true | true | down | First instance (even if primary) is banned for reads. Next instance is attempted in the random loop. |
|
||||
| Read query | replica | false | false | down | First replica instance is banned. Next replica instance is attempted in the random loop. |
|
||||
| Read query | unset (any) | false | false | down | First instance is banned for reads. Next target in the round-robin loop is attempted. |
|
||||
| Read query | unset (any) | true | false | down | First replica instance is banned. Next replica instance is attempted in the round-robin loop. |
|
||||
| Read query | unset (any) | true | true | down | First instance (even if primary) is banned for reads. Next instance is attempted in the round-robin loop. |
|
||||
| Read query | replica | false | false | down | First replica instance is banned. Next replica instance is attempted in the round-robin loop. |
|
||||
| Read query | primary | false | false | down | The query is attempted against the primary and fails. The client receives an error. |
|
||||
| | | | | | |
|
||||
| Write query e.g. `INSERT` | unset (any) | false | false | up | The query is attempted against the first available instance in the random loop. If the instance is a replica, the query fails and the client receives an error. |
|
||||
| Write query e.g. `INSERT` | unset (any) | false | false | up | The query is attempted against the first available instance in the round-robin loop. If the instance is a replica, the query fails and the client receives an error. |
|
||||
| Write query | unset (any) | true | false | up | The query is routed to the primary. |
|
||||
| Write query | unset (any) | true | true | up | The query is routed to the primary. |
|
||||
| Write query | primary | false | false | up | The query is routed to the primary. |
|
||||
@@ -458,7 +447,7 @@ Always good to have a base line.
|
||||
|
||||
```
|
||||
$ pgbench -t 1000 -c 16 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
|
||||
Password:
|
||||
Password:
|
||||
starting vacuum...end.
|
||||
transaction type: <builtin: select only>
|
||||
scaling factor: 1
|
||||
@@ -472,7 +461,7 @@ tps = 139443.955722 (including connections establishing)
|
||||
tps = 142314.859075 (excluding connections establishing)
|
||||
|
||||
$ pgbench -t 1000 -c 32 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
|
||||
Password:
|
||||
Password:
|
||||
starting vacuum...end.
|
||||
transaction type: <builtin: select only>
|
||||
scaling factor: 1
|
||||
@@ -486,7 +475,7 @@ tps = 150644.840891 (including connections establishing)
|
||||
tps = 152218.499430 (excluding connections establishing)
|
||||
|
||||
$ pgbench -t 1000 -c 64 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
|
||||
Password:
|
||||
Password:
|
||||
starting vacuum...end.
|
||||
transaction type: <builtin: select only>
|
||||
scaling factor: 1
|
||||
@@ -500,7 +489,7 @@ tps = 152517.663404 (including connections establishing)
|
||||
tps = 153319.188482 (excluding connections establishing)
|
||||
|
||||
$ pgbench -t 1000 -c 128 -j 2 -p 5432 -h 127.0.0.1 -S --protocol extended shard0
|
||||
Password:
|
||||
Password:
|
||||
starting vacuum...end.
|
||||
transaction type: <builtin: select only>
|
||||
scaling factor: 1
|
||||
|
||||
@@ -265,11 +265,11 @@ where
|
||||
for (_, pool) in get_all_pools() {
|
||||
let pool_config = pool.settings.clone();
|
||||
for shard in 0..pool.shards() {
|
||||
let database_name = &pool.address(shard, 0).database;
|
||||
let database_name = &pool_config.shards[&shard.to_string()].database;
|
||||
for server in 0..pool.servers(shard) {
|
||||
let address = pool.address(shard, server);
|
||||
let pool_state = pool.pool_state(shard, server);
|
||||
let banned = pool.is_banned(address, Some(address.role));
|
||||
let banned = pool.is_banned(address, shard, Some(address.role));
|
||||
|
||||
res.put(data_row(&vec![
|
||||
address.name(), // name
|
||||
|
||||
735
src/client.rs
735
src/client.rs
File diff suppressed because it is too large
Load Diff
@@ -57,35 +57,13 @@ impl PartialEq<Role> for Option<Role> {
|
||||
/// Address identifying a PostgreSQL server uniquely.
|
||||
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Debug)]
|
||||
pub struct Address {
|
||||
/// Unique ID per addressable Postgres server.
|
||||
pub id: usize,
|
||||
|
||||
/// Server host.
|
||||
pub host: String,
|
||||
|
||||
/// Server port.
|
||||
pub port: u16,
|
||||
|
||||
/// Shard number of this Postgres server.
|
||||
pub port: String,
|
||||
pub shard: usize,
|
||||
|
||||
/// The name of the Postgres database.
|
||||
pub database: String,
|
||||
|
||||
/// Server role: replica, primary.
|
||||
pub role: Role,
|
||||
|
||||
/// If it's a replica, number it for reference and failover.
|
||||
pub replica_number: usize,
|
||||
|
||||
/// Position of the server in the pool for failover.
|
||||
pub address_index: usize,
|
||||
|
||||
/// The name of the user configured to use this pool.
|
||||
pub username: String,
|
||||
|
||||
/// The name of this pool (i.e. database name visible to the client).
|
||||
pub pool_name: String,
|
||||
}
|
||||
|
||||
impl Default for Address {
|
||||
@@ -93,14 +71,11 @@ impl Default for Address {
|
||||
Address {
|
||||
id: 0,
|
||||
host: String::from("127.0.0.1"),
|
||||
port: 5432,
|
||||
port: String::from("5432"),
|
||||
shard: 0,
|
||||
address_index: 0,
|
||||
replica_number: 0,
|
||||
database: String::from("database"),
|
||||
role: Role::Replica,
|
||||
username: String::from("username"),
|
||||
pool_name: String::from("pool_name"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -109,11 +84,11 @@ impl Address {
|
||||
/// Address name (aka database) used in `SHOW STATS`, `SHOW DATABASES`, and `SHOW POOLS`.
|
||||
pub fn name(&self) -> String {
|
||||
match self.role {
|
||||
Role::Primary => format!("{}_shard_{}_primary", self.pool_name, self.shard),
|
||||
Role::Primary => format!("{}_shard_{}_primary", self.database, self.shard),
|
||||
|
||||
Role::Replica => format!(
|
||||
"{}_shard_{}_replica_{}",
|
||||
self.pool_name, self.shard, self.replica_number
|
||||
self.database, self.shard, self.replica_number
|
||||
),
|
||||
}
|
||||
}
|
||||
@@ -360,9 +335,9 @@ impl Config {
|
||||
|
||||
for (pool_name, pool_config) in &self.pools {
|
||||
// TODO: Make this output prettier (maybe a table?)
|
||||
info!("--- Settings for pool {} ---", pool_name);
|
||||
info!(
|
||||
"[pool: {}] Maximum user connections: {}",
|
||||
pool_name,
|
||||
"Pool size from all users: {}",
|
||||
pool_config
|
||||
.users
|
||||
.iter()
|
||||
@@ -370,39 +345,20 @@ impl Config {
|
||||
.sum::<u32>()
|
||||
.to_string()
|
||||
);
|
||||
info!("[pool: {}] Pool mode: {}", pool_name, pool_config.pool_mode);
|
||||
info!(
|
||||
"[pool: {}] Sharding function: {}",
|
||||
pool_name, pool_config.sharding_function
|
||||
);
|
||||
info!(
|
||||
"[pool: {}] Primary reads: {}",
|
||||
pool_name, pool_config.primary_reads_enabled
|
||||
);
|
||||
info!(
|
||||
"[pool: {}] Query router: {}",
|
||||
pool_name, pool_config.query_parser_enabled
|
||||
);
|
||||
info!(
|
||||
"[pool: {}] Number of shards: {}",
|
||||
pool_name,
|
||||
pool_config.shards.len()
|
||||
);
|
||||
info!(
|
||||
"[pool: {}] Number of users: {}",
|
||||
pool_name,
|
||||
pool_config.users.len()
|
||||
);
|
||||
info!("Pool mode: {}", pool_config.pool_mode);
|
||||
info!("Sharding function: {}", pool_config.sharding_function);
|
||||
info!("Primary reads: {}", pool_config.primary_reads_enabled);
|
||||
info!("Query router: {}", pool_config.query_parser_enabled);
|
||||
|
||||
// TODO: Make this prettier.
|
||||
info!("Number of shards: {}", pool_config.shards.len());
|
||||
info!("Number of users: {}", pool_config.users.len());
|
||||
|
||||
for user in &pool_config.users {
|
||||
info!(
|
||||
"[pool: {}][user: {}] Pool size: {}",
|
||||
pool_name, user.1.username, user.1.pool_size,
|
||||
"{} pool size: {}, statement timeout: {}",
|
||||
user.1.username, user.1.pool_size, user.1.statement_timeout
|
||||
);
|
||||
info!(
|
||||
"[pool: {}][user: {}] Statement timeout: {}",
|
||||
pool_name, user.1.username, user.1.statement_timeout
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -500,18 +456,6 @@ pub async fn parse(path: &str) -> Result<(), Error> {
|
||||
}
|
||||
};
|
||||
|
||||
match pool.pool_mode.as_ref() {
|
||||
"transaction" => (),
|
||||
"session" => (),
|
||||
other => {
|
||||
error!(
|
||||
"pool_mode can be 'session' or 'transaction', got: '{}'",
|
||||
other
|
||||
);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
};
|
||||
|
||||
for shard in &pool.shards {
|
||||
// We use addresses as unique identifiers,
|
||||
// let's make sure they are unique in the config as well.
|
||||
|
||||
@@ -12,5 +12,4 @@ pub enum Error {
|
||||
ClientError,
|
||||
TlsError,
|
||||
StatementTimeout,
|
||||
ShuttingDown,
|
||||
}
|
||||
|
||||
303
src/main.rs
303
src/main.rs
@@ -24,7 +24,6 @@ extern crate async_trait;
|
||||
extern crate bb8;
|
||||
extern crate bytes;
|
||||
extern crate env_logger;
|
||||
extern crate exitcode;
|
||||
extern crate log;
|
||||
extern crate md5;
|
||||
extern crate num_cpus;
|
||||
@@ -67,20 +66,18 @@ mod stats;
|
||||
mod tls;
|
||||
|
||||
use crate::config::{get_config, reload_config, VERSION};
|
||||
use crate::errors::Error;
|
||||
use crate::pool::{ClientServerMap, ConnectionPool};
|
||||
use crate::prometheus::start_metric_server;
|
||||
use crate::stats::{Collector, Reporter, REPORTER};
|
||||
|
||||
#[tokio::main(worker_threads = 4)]
|
||||
async fn main() {
|
||||
env_logger::builder().format_timestamp_micros().init();
|
||||
|
||||
env_logger::init();
|
||||
info!("Welcome to PgCat! Meow. (Version {})", VERSION);
|
||||
|
||||
if !query_router::QueryRouter::setup() {
|
||||
error!("Could not setup query router");
|
||||
std::process::exit(exitcode::CONFIG);
|
||||
return;
|
||||
}
|
||||
|
||||
let args = std::env::args().collect::<Vec<String>>();
|
||||
@@ -95,7 +92,7 @@ async fn main() {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
error!("Config parse error: {:?}", err);
|
||||
std::process::exit(exitcode::CONFIG);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -110,7 +107,7 @@ async fn main() {
|
||||
Ok(addr) => addr,
|
||||
Err(err) => {
|
||||
error!("Invalid http address: {}", err);
|
||||
std::process::exit(exitcode::CONFIG);
|
||||
return;
|
||||
}
|
||||
};
|
||||
tokio::task::spawn(async move {
|
||||
@@ -124,7 +121,7 @@ async fn main() {
|
||||
Ok(sock) => sock,
|
||||
Err(err) => {
|
||||
error!("Listener socket error: {:?}", err);
|
||||
std::process::exit(exitcode::CONFIG);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -136,33 +133,125 @@ async fn main() {
|
||||
let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
// Statistics reporting.
|
||||
let (stats_tx, stats_rx) = mpsc::channel(100_000);
|
||||
REPORTER.store(Arc::new(Reporter::new(stats_tx.clone())));
|
||||
let (tx, rx) = mpsc::channel(100_000);
|
||||
REPORTER.store(Arc::new(Reporter::new(tx.clone())));
|
||||
|
||||
// Connection pool that allows to query all shards and replicas.
|
||||
match ConnectionPool::from_config(client_server_map.clone()).await {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
error!("Pool error: {:?}", err);
|
||||
std::process::exit(exitcode::CONFIG);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Statistics collector task.
|
||||
let collector_tx = tx.clone();
|
||||
|
||||
// Save these for reloading
|
||||
let reload_client_server_map = client_server_map.clone();
|
||||
let autoreload_client_server_map = client_server_map.clone();
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
let mut stats_collector = Collector::new(stats_rx, stats_tx.clone());
|
||||
let mut stats_collector = Collector::new(rx, collector_tx);
|
||||
stats_collector.collect().await;
|
||||
});
|
||||
|
||||
info!("Config autoreloader: {}", config.general.autoreload);
|
||||
info!("Waiting for clients");
|
||||
|
||||
let mut autoreload_interval = tokio::time::interval(tokio::time::Duration::from_millis(15_000));
|
||||
let autoreload_client_server_map = client_server_map.clone();
|
||||
let (shutdown_event_tx, mut shutdown_event_rx) = broadcast::channel::<()>(1);
|
||||
|
||||
let shutdown_event_tx_clone = shutdown_event_tx.clone();
|
||||
|
||||
// Client connection loop.
|
||||
tokio::task::spawn(async move {
|
||||
// Creates event subscriber for shutdown event, this is dropped when shutdown event is broadcast
|
||||
let mut listener_shutdown_event_rx = shutdown_event_tx_clone.subscribe();
|
||||
loop {
|
||||
autoreload_interval.tick().await;
|
||||
if config.general.autoreload {
|
||||
info!("Automatically reloading config");
|
||||
let client_server_map = client_server_map.clone();
|
||||
|
||||
// Listen for shutdown event and client connection at the same time
|
||||
let (socket, addr) = tokio::select! {
|
||||
_ = listener_shutdown_event_rx.recv() => {
|
||||
// Exits client connection loop which drops listener, listener_shutdown_event_rx and shutdown_event_tx_clone
|
||||
break;
|
||||
}
|
||||
|
||||
listener_response = listener.accept() => {
|
||||
match listener_response {
|
||||
Ok((socket, addr)) => (socket, addr),
|
||||
Err(err) => {
|
||||
error!("{:?}", err);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Used to signal shutdown
|
||||
let client_shutdown_handler_rx = shutdown_event_tx_clone.subscribe();
|
||||
|
||||
// Used to signal that the task has completed
|
||||
let dummy_tx = shutdown_event_tx_clone.clone();
|
||||
|
||||
// Handle client.
|
||||
tokio::task::spawn(async move {
|
||||
let start = chrono::offset::Utc::now().naive_utc();
|
||||
|
||||
match client::client_entrypoint(
|
||||
socket,
|
||||
client_server_map,
|
||||
client_shutdown_handler_rx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
let duration = chrono::offset::Utc::now().naive_utc() - start;
|
||||
|
||||
info!(
|
||||
"Client {:?} disconnected, session duration: {}",
|
||||
addr,
|
||||
format_duration(&duration)
|
||||
);
|
||||
}
|
||||
|
||||
Err(err) => {
|
||||
debug!("Client disconnected with error {:?}", err);
|
||||
}
|
||||
};
|
||||
// Drop this transmitter so receiver knows that the task is completed
|
||||
drop(dummy_tx);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// Reload config:
|
||||
// kill -SIGHUP $(pgrep pgcat)
|
||||
tokio::task::spawn(async move {
|
||||
let mut stream = unix_signal(SignalKind::hangup()).unwrap();
|
||||
|
||||
loop {
|
||||
stream.recv().await;
|
||||
|
||||
info!("Reloading config");
|
||||
|
||||
match reload_config(reload_client_server_map.clone()).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
get_config().show();
|
||||
}
|
||||
});
|
||||
|
||||
if config.general.autoreload {
|
||||
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(15_000));
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
info!("Config autoreloader started");
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
match reload_config(autoreload_client_server_map.clone()).await {
|
||||
Ok(changed) => {
|
||||
if changed {
|
||||
@@ -172,131 +261,43 @@ async fn main() {
|
||||
Err(_) => (),
|
||||
};
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
let mut term_signal = unix_signal(SignalKind::terminate()).unwrap();
|
||||
let mut interrupt_signal = unix_signal(SignalKind::interrupt()).unwrap();
|
||||
let mut sighup_signal = unix_signal(SignalKind::hangup()).unwrap();
|
||||
let (shutdown_tx, _) = broadcast::channel::<()>(1);
|
||||
let (drain_tx, mut drain_rx) = mpsc::channel::<i32>(2048);
|
||||
let (exit_tx, mut exit_rx) = mpsc::channel::<()>(1);
|
||||
|
||||
info!("Waiting for clients");
|
||||
tokio::select! {
|
||||
// Initiate graceful shutdown sequence on sig int
|
||||
_ = interrupt_signal.recv() => {
|
||||
info!("Got SIGINT, waiting for client connection drain now");
|
||||
|
||||
let mut admin_only = false;
|
||||
let mut total_clients = 0;
|
||||
// Broadcast that client tasks need to finish
|
||||
shutdown_event_tx.send(()).unwrap();
|
||||
// Closes transmitter
|
||||
drop(shutdown_event_tx);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Reload config:
|
||||
// kill -SIGHUP $(pgrep pgcat)
|
||||
_ = sighup_signal.recv() => {
|
||||
info!("Reloading config");
|
||||
|
||||
match reload_config(client_server_map.clone()).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => (),
|
||||
};
|
||||
|
||||
get_config().show();
|
||||
},
|
||||
|
||||
// Initiate graceful shutdown sequence on sig int
|
||||
_ = interrupt_signal.recv() => {
|
||||
info!("Got SIGINT, waiting for client connection drain now");
|
||||
admin_only = true;
|
||||
|
||||
// Broadcast that client tasks need to finish
|
||||
let _ = shutdown_tx.send(());
|
||||
let exit_tx = exit_tx.clone();
|
||||
let _ = drain_tx.send(0).await;
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(config.general.shutdown_timeout));
|
||||
|
||||
// First tick fires immediately.
|
||||
interval.tick().await;
|
||||
|
||||
// Second one in the interval time.
|
||||
interval.tick().await;
|
||||
|
||||
// We're done waiting.
|
||||
error!("Graceful shutdown timed out. {} active clients being closed", total_clients);
|
||||
|
||||
let _ = exit_tx.send(()).await;
|
||||
});
|
||||
},
|
||||
|
||||
_ = term_signal.recv() => {
|
||||
info!("Got SIGTERM, closing with {} clients active", total_clients);
|
||||
break;
|
||||
},
|
||||
|
||||
new_client = listener.accept() => {
|
||||
let (socket, addr) = match new_client {
|
||||
Ok((socket, addr)) => (socket, addr),
|
||||
Err(err) => {
|
||||
error!("{:?}", err);
|
||||
continue;
|
||||
// This is in a loop because the first event that the receiver receives will be the shutdown event
|
||||
// This is not what we are waiting for instead, we want the receiver to send an error once all senders are closed which is reached after the shutdown event is received
|
||||
loop {
|
||||
match tokio::time::timeout(
|
||||
tokio::time::Duration::from_millis(config.general.shutdown_timeout),
|
||||
shutdown_event_rx.recv(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => match res {
|
||||
Ok(_) => {}
|
||||
Err(_) => break,
|
||||
},
|
||||
Err(_) => {
|
||||
info!("Timed out while waiting for clients to shutdown");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
let shutdown_rx = shutdown_tx.subscribe();
|
||||
let drain_tx = drain_tx.clone();
|
||||
let client_server_map = client_server_map.clone();
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
let start = chrono::offset::Utc::now().naive_utc();
|
||||
|
||||
match client::client_entrypoint(
|
||||
socket,
|
||||
client_server_map,
|
||||
shutdown_rx,
|
||||
drain_tx,
|
||||
admin_only,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
|
||||
let duration = chrono::offset::Utc::now().naive_utc() - start;
|
||||
|
||||
info!(
|
||||
"Client {:?} disconnected, session duration: {}",
|
||||
addr,
|
||||
format_duration(&duration)
|
||||
);
|
||||
}
|
||||
|
||||
Err(err) => {
|
||||
match err {
|
||||
// Don't count the clients we rejected.
|
||||
Error::ShuttingDown => (),
|
||||
_ => {
|
||||
// drain_tx.send(-1).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Client disconnected with error {:?}", err);
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
_ = exit_rx.recv() => {
|
||||
break;
|
||||
}
|
||||
|
||||
client_ping = drain_rx.recv() => {
|
||||
let client_ping = client_ping.unwrap();
|
||||
total_clients += client_ping;
|
||||
|
||||
if total_clients == 0 && admin_only {
|
||||
let _ = exit_tx.send(()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
_ = term_signal.recv() => (),
|
||||
}
|
||||
|
||||
info!("Shutting down...");
|
||||
@@ -308,18 +309,34 @@ async fn main() {
|
||||
///
|
||||
/// * `duration` - A duration of time
|
||||
fn format_duration(duration: &chrono::Duration) -> String {
|
||||
let milliseconds = format!("{:0>3}", duration.num_milliseconds() % 1000);
|
||||
let seconds = {
|
||||
let seconds = duration.num_seconds() % 60;
|
||||
if seconds < 10 {
|
||||
format!("0{}", seconds)
|
||||
} else {
|
||||
format!("{}", seconds)
|
||||
}
|
||||
};
|
||||
|
||||
let seconds = format!("{:0>2}", duration.num_seconds() % 60);
|
||||
let minutes = {
|
||||
let minutes = duration.num_minutes() % 60;
|
||||
if minutes < 10 {
|
||||
format!("0{}", minutes)
|
||||
} else {
|
||||
format!("{}", minutes)
|
||||
}
|
||||
};
|
||||
|
||||
let minutes = format!("{:0>2}", duration.num_minutes() % 60);
|
||||
|
||||
let hours = format!("{:0>2}", duration.num_hours() % 24);
|
||||
let hours = {
|
||||
let hours = duration.num_hours() % 24;
|
||||
if hours < 10 {
|
||||
format!("0{}", hours)
|
||||
} else {
|
||||
format!("{}", hours)
|
||||
}
|
||||
};
|
||||
|
||||
let days = duration.num_days().to_string();
|
||||
|
||||
format!(
|
||||
"{}d {}:{}:{}.{}",
|
||||
days, hours, minutes, seconds, milliseconds
|
||||
)
|
||||
format!("{}d {}:{}:{}", days, hours, minutes, seconds)
|
||||
}
|
||||
|
||||
262
src/pool.rs
262
src/pool.rs
@@ -6,80 +6,44 @@ use chrono::naive::NaiveDateTime;
|
||||
use log::{debug, error, info, warn};
|
||||
use once_cell::sync::Lazy;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::config::{get_config, Address, Role, User};
|
||||
use crate::config::{get_config, Address, Role, Shard, User};
|
||||
use crate::errors::Error;
|
||||
|
||||
use crate::server::Server;
|
||||
use crate::sharding::ShardingFunction;
|
||||
use crate::stats::{get_reporter, Reporter};
|
||||
|
||||
pub type BanList = Arc<RwLock<Vec<HashMap<Address, NaiveDateTime>>>>;
|
||||
pub type ClientServerMap = Arc<Mutex<HashMap<(i32, i32), (i32, i32, String, u16)>>>;
|
||||
pub type ClientServerMap = Arc<Mutex<HashMap<(i32, i32), (i32, i32, String, String)>>>;
|
||||
pub type PoolMap = HashMap<(String, String), ConnectionPool>;
|
||||
/// The connection pool, globally available.
|
||||
/// This is atomic and safe and read-optimized.
|
||||
/// The pool is recreated dynamically when the config is reloaded.
|
||||
pub static POOLS: Lazy<ArcSwap<PoolMap>> = Lazy::new(|| ArcSwap::from_pointee(HashMap::default()));
|
||||
|
||||
/// Pool mode:
|
||||
/// - transaction: server serves one transaction,
|
||||
/// - session: server is attached to the client.
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub enum PoolMode {
|
||||
Session,
|
||||
Transaction,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for PoolMode {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match *self {
|
||||
PoolMode::Session => write!(f, "session"),
|
||||
PoolMode::Transaction => write!(f, "transaction"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Pool settings.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct PoolSettings {
|
||||
/// Transaction or Session.
|
||||
pub pool_mode: PoolMode,
|
||||
|
||||
// Number of shards.
|
||||
pub shards: usize,
|
||||
|
||||
// Connecting user.
|
||||
pub pool_mode: String,
|
||||
pub shards: HashMap<String, Shard>,
|
||||
pub user: User,
|
||||
|
||||
// Default server role to connect to.
|
||||
pub default_role: Option<Role>,
|
||||
|
||||
// Enable/disable query parser.
|
||||
pub default_role: String,
|
||||
pub query_parser_enabled: bool,
|
||||
|
||||
// Read from the primary as well or not.
|
||||
pub primary_reads_enabled: bool,
|
||||
|
||||
// Sharding function.
|
||||
pub sharding_function: ShardingFunction,
|
||||
pub sharding_function: String,
|
||||
}
|
||||
|
||||
impl Default for PoolSettings {
|
||||
fn default() -> PoolSettings {
|
||||
PoolSettings {
|
||||
pool_mode: PoolMode::Transaction,
|
||||
shards: 1,
|
||||
pool_mode: String::from("transaction"),
|
||||
shards: HashMap::from([(String::from("1"), Shard::default())]),
|
||||
user: User::default(),
|
||||
default_role: None,
|
||||
default_role: String::from("any"),
|
||||
query_parser_enabled: false,
|
||||
primary_reads_enabled: true,
|
||||
sharding_function: ShardingFunction::PgBigintHash,
|
||||
sharding_function: "pg_bigint_hash".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -107,7 +71,6 @@ pub struct ConnectionPool {
|
||||
/// on pool creation and save the K messages here.
|
||||
server_info: BytesMut,
|
||||
|
||||
/// Pool configuration.
|
||||
pub settings: PoolSettings,
|
||||
}
|
||||
|
||||
@@ -115,13 +78,11 @@ impl ConnectionPool {
|
||||
/// Construct the connection pool from the configuration.
|
||||
pub async fn from_config(client_server_map: ClientServerMap) -> Result<(), Error> {
|
||||
let config = get_config();
|
||||
let mut new_pools = PoolMap::default();
|
||||
|
||||
let mut new_pools = HashMap::new();
|
||||
let mut address_id = 0;
|
||||
|
||||
for (pool_name, pool_config) in &config.pools {
|
||||
// There is one pool per database/user pair.
|
||||
for (_, user) in &pool_config.users {
|
||||
for (_user_index, user_info) in &pool_config.users {
|
||||
let mut shards = Vec::new();
|
||||
let mut addresses = Vec::new();
|
||||
let mut banlist = Vec::new();
|
||||
@@ -135,11 +96,10 @@ impl ConnectionPool {
|
||||
// Sort by shard number to ensure consistency.
|
||||
shard_ids.sort_by_key(|k| k.parse::<i64>().unwrap());
|
||||
|
||||
for shard_idx in &shard_ids {
|
||||
let shard = &pool_config.shards[shard_idx];
|
||||
for shard_idx in shard_ids {
|
||||
let shard = &pool_config.shards[&shard_idx];
|
||||
let mut pools = Vec::new();
|
||||
let mut servers = Vec::new();
|
||||
let mut address_index = 0;
|
||||
let mut replica_number = 0;
|
||||
|
||||
for server in shard.servers.iter() {
|
||||
@@ -154,19 +114,15 @@ impl ConnectionPool {
|
||||
|
||||
let address = Address {
|
||||
id: address_id,
|
||||
database: shard.database.clone(),
|
||||
database: pool_name.clone(),
|
||||
host: server.0.clone(),
|
||||
port: server.1 as u16,
|
||||
port: server.1.to_string(),
|
||||
role: role,
|
||||
address_index,
|
||||
replica_number,
|
||||
shard: shard_idx.parse::<usize>().unwrap(),
|
||||
username: user.username.clone(),
|
||||
pool_name: pool_name.clone(),
|
||||
};
|
||||
|
||||
address_id += 1;
|
||||
address_index += 1;
|
||||
|
||||
if role == Role::Replica {
|
||||
replica_number += 1;
|
||||
@@ -174,14 +130,14 @@ impl ConnectionPool {
|
||||
|
||||
let manager = ServerPool::new(
|
||||
address.clone(),
|
||||
user.clone(),
|
||||
user_info.clone(),
|
||||
&shard.database,
|
||||
client_server_map.clone(),
|
||||
get_reporter(),
|
||||
);
|
||||
|
||||
let pool = Pool::builder()
|
||||
.max_size(user.pool_size)
|
||||
.max_size(user_info.pool_size)
|
||||
.connection_timeout(std::time::Duration::from_millis(
|
||||
config.general.connect_timeout,
|
||||
))
|
||||
@@ -208,27 +164,13 @@ impl ConnectionPool {
|
||||
stats: get_reporter(),
|
||||
server_info: BytesMut::new(),
|
||||
settings: PoolSettings {
|
||||
pool_mode: match pool_config.pool_mode.as_str() {
|
||||
"transaction" => PoolMode::Transaction,
|
||||
"session" => PoolMode::Session,
|
||||
_ => unreachable!(),
|
||||
},
|
||||
// shards: pool_config.shards.clone(),
|
||||
shards: shard_ids.len(),
|
||||
user: user.clone(),
|
||||
default_role: match pool_config.default_role.as_str() {
|
||||
"any" => None,
|
||||
"replica" => Some(Role::Replica),
|
||||
"primary" => Some(Role::Primary),
|
||||
_ => unreachable!(),
|
||||
},
|
||||
pool_mode: pool_config.pool_mode.clone(),
|
||||
shards: pool_config.shards.clone(),
|
||||
user: user_info.clone(),
|
||||
default_role: pool_config.default_role.clone(),
|
||||
query_parser_enabled: pool_config.query_parser_enabled.clone(),
|
||||
primary_reads_enabled: pool_config.primary_reads_enabled,
|
||||
sharding_function: match pool_config.sharding_function.as_str() {
|
||||
"pg_bigint_hash" => ShardingFunction::PgBigintHash,
|
||||
"sha1" => ShardingFunction::Sha1,
|
||||
_ => unreachable!(),
|
||||
},
|
||||
sharding_function: pool_config.sharding_function.clone(),
|
||||
},
|
||||
};
|
||||
|
||||
@@ -241,9 +183,7 @@ impl ConnectionPool {
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
// There is one pool per database/user pair.
|
||||
new_pools.insert((pool_name.clone(), user.username.clone()), pool);
|
||||
new_pools.insert((pool_name.clone(), user_info.username.clone()), pool);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -259,9 +199,16 @@ impl ConnectionPool {
|
||||
/// the pooler starts up.
|
||||
async fn validate(&mut self) -> Result<(), Error> {
|
||||
let mut server_infos = Vec::new();
|
||||
let stats = self.stats.clone();
|
||||
|
||||
for shard in 0..self.shards() {
|
||||
for server in 0..self.servers(shard) {
|
||||
let connection = match self.databases[shard][server].get().await {
|
||||
let mut round_robin = 0;
|
||||
|
||||
for _ in 0..self.servers(shard) {
|
||||
// To keep stats consistent.
|
||||
let fake_process_id = 0;
|
||||
|
||||
let connection = match self.get(shard, None, fake_process_id, round_robin).await {
|
||||
Ok(conn) => conn,
|
||||
Err(err) => {
|
||||
error!("Shard {} down or misconfigured: {:?}", shard, err);
|
||||
@@ -269,21 +216,25 @@ impl ConnectionPool {
|
||||
}
|
||||
};
|
||||
|
||||
let proxy = connection;
|
||||
let proxy = connection.0;
|
||||
let address = connection.1;
|
||||
let server = &*proxy;
|
||||
let server_info = server.server_info();
|
||||
|
||||
stats.client_disconnecting(fake_process_id, address.id);
|
||||
|
||||
if server_infos.len() > 0 {
|
||||
// Compare against the last server checked.
|
||||
if server_info != server_infos[server_infos.len() - 1] {
|
||||
warn!(
|
||||
"{:?} has different server configuration than the last server",
|
||||
proxy.address()
|
||||
address
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
server_infos.push(server_info);
|
||||
round_robin += 1;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -293,8 +244,6 @@ impl ConnectionPool {
|
||||
return Err(Error::AllServersDown);
|
||||
}
|
||||
|
||||
// We're assuming all servers are identical.
|
||||
// TODO: not true.
|
||||
self.server_info = server_infos[0].clone();
|
||||
|
||||
Ok(())
|
||||
@@ -303,31 +252,58 @@ impl ConnectionPool {
|
||||
/// Get a connection from the pool.
|
||||
pub async fn get(
|
||||
&self,
|
||||
shard: usize, // shard number
|
||||
role: Option<Role>, // primary or replica
|
||||
process_id: i32, // client id
|
||||
shard: usize, // shard number
|
||||
role: Option<Role>, // primary or replica
|
||||
process_id: i32, // client id
|
||||
mut round_robin: usize, // round robin offset
|
||||
) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> {
|
||||
let now = Instant::now();
|
||||
let mut candidates: Vec<&Address> = self.addresses[shard]
|
||||
.iter()
|
||||
.filter(|address| address.role == role)
|
||||
.collect();
|
||||
let addresses = &self.addresses[shard];
|
||||
|
||||
// Random load balancing
|
||||
candidates.shuffle(&mut thread_rng());
|
||||
let mut allowed_attempts = match role {
|
||||
// Primary-specific queries get one attempt, if the primary is down,
|
||||
// nothing we should do about it I think. It's dangerous to retry
|
||||
// write queries.
|
||||
Some(Role::Primary) => 1,
|
||||
|
||||
// Replicas get to try as many times as there are replicas
|
||||
// and connections in the pool.
|
||||
_ => addresses.len(),
|
||||
};
|
||||
|
||||
debug!("Allowed attempts for {:?}: {}", role, allowed_attempts);
|
||||
|
||||
let exists = match role {
|
||||
Some(role) => addresses.iter().filter(|addr| addr.role == role).count() > 0,
|
||||
None => true,
|
||||
};
|
||||
|
||||
if !exists {
|
||||
error!("Requested role {:?}, but none are configured", role);
|
||||
return Err(Error::BadConfig);
|
||||
}
|
||||
|
||||
let healthcheck_timeout = get_config().general.healthcheck_timeout;
|
||||
let healthcheck_delay = get_config().general.healthcheck_delay as u128;
|
||||
|
||||
while !candidates.is_empty() {
|
||||
// Get the next candidate
|
||||
let address = match candidates.pop() {
|
||||
Some(address) => address,
|
||||
None => break,
|
||||
};
|
||||
while allowed_attempts > 0 {
|
||||
// Round-robin replicas.
|
||||
round_robin += 1;
|
||||
|
||||
if self.is_banned(&address, role) {
|
||||
debug!("Address {:?} is banned", address);
|
||||
let index = round_robin % addresses.len();
|
||||
let address = &addresses[index];
|
||||
|
||||
// Make sure you're getting a primary or a replica
|
||||
// as per request. If no specific role is requested, the first
|
||||
// available will be chosen.
|
||||
if address.role != role {
|
||||
continue;
|
||||
}
|
||||
|
||||
allowed_attempts -= 1;
|
||||
|
||||
// Don't attempt to connect to banned servers.
|
||||
if self.is_banned(address, shard, role) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -335,14 +311,12 @@ impl ConnectionPool {
|
||||
self.stats.client_waiting(process_id, address.id);
|
||||
|
||||
// Check if we can connect
|
||||
let mut conn = match self.databases[address.shard][address.address_index]
|
||||
.get()
|
||||
.await
|
||||
{
|
||||
let mut conn = match self.databases[shard][index].get().await {
|
||||
Ok(conn) => conn,
|
||||
Err(err) => {
|
||||
error!("Banning instance {:?}, error: {:?}", address, err);
|
||||
self.ban(&address, process_id);
|
||||
error!("Banning replica {}, error: {:?}", index, err);
|
||||
self.ban(address, shard, process_id);
|
||||
self.stats.client_disconnecting(process_id, address.id);
|
||||
self.stats
|
||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||
continue;
|
||||
@@ -356,13 +330,10 @@ impl ConnectionPool {
|
||||
let require_healthcheck =
|
||||
server.last_activity().elapsed().unwrap().as_millis() > healthcheck_delay;
|
||||
|
||||
// Do not issue a health check unless it's been a little while
|
||||
// since we last checked the server is ok.
|
||||
// Health checks are pretty expensive.
|
||||
if !require_healthcheck {
|
||||
self.stats
|
||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||
self.stats.server_active(conn.process_id(), address.id);
|
||||
self.stats.server_idle(conn.process_id(), address.id);
|
||||
return Ok((conn, address.clone()));
|
||||
}
|
||||
|
||||
@@ -372,7 +343,7 @@ impl ConnectionPool {
|
||||
|
||||
match tokio::time::timeout(
|
||||
tokio::time::Duration::from_millis(healthcheck_timeout),
|
||||
server.query(";"), // Cheap query (query parser not used in PG)
|
||||
server.query(";"),
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -381,72 +352,67 @@ impl ConnectionPool {
|
||||
Ok(_) => {
|
||||
self.stats
|
||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||
self.stats.server_active(conn.process_id(), address.id);
|
||||
self.stats.server_idle(conn.process_id(), address.id);
|
||||
return Ok((conn, address.clone()));
|
||||
}
|
||||
|
||||
// Health check failed.
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Banning instance {:?} because of failed health check, {:?}",
|
||||
address, err
|
||||
);
|
||||
Err(_) => {
|
||||
error!("Banning replica {} because of failed health check", index);
|
||||
|
||||
// Don't leave a bad connection in the pool.
|
||||
server.mark_bad();
|
||||
|
||||
self.ban(&address, process_id);
|
||||
self.ban(address, shard, process_id);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
|
||||
// Health check timed out.
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Banning instance {:?} because of health check timeout, {:?}",
|
||||
address, err
|
||||
);
|
||||
Err(_) => {
|
||||
error!("Banning replica {} because of health check timeout", index);
|
||||
// Don't leave a bad connection in the pool.
|
||||
server.mark_bad();
|
||||
|
||||
self.ban(&address, process_id);
|
||||
self.ban(address, shard, process_id);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(Error::AllServersDown)
|
||||
return Err(Error::AllServersDown);
|
||||
}
|
||||
|
||||
/// Ban an address (i.e. replica). It no longer will serve
|
||||
/// traffic for any new transactions. Existing transactions on that replica
|
||||
/// will finish successfully or error out to the clients.
|
||||
pub fn ban(&self, address: &Address, process_id: i32) {
|
||||
pub fn ban(&self, address: &Address, shard: usize, process_id: i32) {
|
||||
self.stats.client_disconnecting(process_id, address.id);
|
||||
self.stats
|
||||
.checkout_time(Instant::now().elapsed().as_micros(), process_id, address.id);
|
||||
|
||||
error!("Banning {:?}", address);
|
||||
|
||||
let now = chrono::offset::Utc::now().naive_utc();
|
||||
let mut guard = self.banlist.write();
|
||||
guard[address.shard].insert(address.clone(), now);
|
||||
guard[shard].insert(address.clone(), now);
|
||||
}
|
||||
|
||||
/// Clear the replica to receive traffic again. Takes effect immediately
|
||||
/// for all new transactions.
|
||||
pub fn _unban(&self, address: &Address) {
|
||||
pub fn _unban(&self, address: &Address, shard: usize) {
|
||||
let mut guard = self.banlist.write();
|
||||
guard[address.shard].remove(address);
|
||||
guard[shard].remove(address);
|
||||
}
|
||||
|
||||
/// Check if a replica can serve traffic. If all replicas are banned,
|
||||
/// we unban all of them. Better to try then not to.
|
||||
pub fn is_banned(&self, address: &Address, role: Option<Role>) -> bool {
|
||||
pub fn is_banned(&self, address: &Address, shard: usize, role: Option<Role>) -> bool {
|
||||
let replicas_available = match role {
|
||||
Some(Role::Replica) => self.addresses[address.shard]
|
||||
Some(Role::Replica) => self.addresses[shard]
|
||||
.iter()
|
||||
.filter(|addr| addr.role == Role::Replica)
|
||||
.count(),
|
||||
None => self.addresses[address.shard].len(),
|
||||
None => self.addresses[shard].len(),
|
||||
Some(Role::Primary) => return false, // Primary cannot be banned.
|
||||
};
|
||||
|
||||
@@ -455,17 +421,17 @@ impl ConnectionPool {
|
||||
let guard = self.banlist.read();
|
||||
|
||||
// Everything is banned = nothing is banned.
|
||||
if guard[address.shard].len() == replicas_available {
|
||||
if guard[shard].len() == replicas_available {
|
||||
drop(guard);
|
||||
let mut guard = self.banlist.write();
|
||||
guard[address.shard].clear();
|
||||
guard[shard].clear();
|
||||
drop(guard);
|
||||
warn!("Unbanning all replicas.");
|
||||
return false;
|
||||
}
|
||||
|
||||
// I expect this to miss 99.9999% of the time.
|
||||
match guard[address.shard].get(address) {
|
||||
match guard[shard].get(address) {
|
||||
Some(timestamp) => {
|
||||
let now = chrono::offset::Utc::now().naive_utc();
|
||||
let config = get_config();
|
||||
@@ -475,7 +441,7 @@ impl ConnectionPool {
|
||||
drop(guard);
|
||||
warn!("Unbanning {:?}", address);
|
||||
let mut guard = self.banlist.write();
|
||||
guard[address.shard].remove(address);
|
||||
guard[shard].remove(address);
|
||||
false
|
||||
} else {
|
||||
debug!("{:?} is banned", address);
|
||||
@@ -559,7 +525,11 @@ impl ManageConnection for ServerPool {
|
||||
|
||||
/// Attempts to create a new connection.
|
||||
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
|
||||
info!("Creating a new server connection {:?}", self.address);
|
||||
info!(
|
||||
"Creating a new connection to {:?} using user {:?}",
|
||||
self.address.name(),
|
||||
self.user.username
|
||||
);
|
||||
|
||||
// Put a temporary process_id into the stats
|
||||
// for server login.
|
||||
@@ -608,7 +578,6 @@ pub fn get_pool(db: String, user: String) -> Option<ConnectionPool> {
|
||||
}
|
||||
}
|
||||
|
||||
/// How many total servers we have in the config.
|
||||
pub fn get_number_of_addresses() -> usize {
|
||||
get_all_pools()
|
||||
.iter()
|
||||
@@ -616,7 +585,6 @@ pub fn get_number_of_addresses() -> usize {
|
||||
.sum()
|
||||
}
|
||||
|
||||
/// Get a pointer to all configured pools.
|
||||
pub fn get_all_pools() -> HashMap<(String, String), ConnectionPool> {
|
||||
return (*(*POOLS.load())).clone();
|
||||
}
|
||||
|
||||
@@ -10,10 +10,10 @@ use sqlparser::parser::Parser;
|
||||
|
||||
use crate::config::Role;
|
||||
use crate::pool::PoolSettings;
|
||||
use crate::sharding::Sharder;
|
||||
use crate::sharding::{Sharder, ShardingFunction};
|
||||
|
||||
/// Regexes used to parse custom commands.
|
||||
const CUSTOM_SQL_REGEXES: [&str; 8] = [
|
||||
const CUSTOM_SQL_REGEXES: [&str; 7] = [
|
||||
r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$",
|
||||
r"(?i)^ *SET SHARD TO '?([0-9]+|ANY)'? *;? *$",
|
||||
r"(?i)^ *SHOW SHARD *;? *$",
|
||||
@@ -21,7 +21,6 @@ const CUSTOM_SQL_REGEXES: [&str; 8] = [
|
||||
r"(?i)^ *SHOW SERVER ROLE *;? *$",
|
||||
r"(?i)^ *SET PRIMARY READS TO '?(on|off|default)'? *;? *$",
|
||||
r"(?i)^ *SHOW PRIMARY READS *;? *$",
|
||||
r"(?i)^ *SHARDED_COPY '?([0-9]+)'? *;? *$",
|
||||
];
|
||||
|
||||
/// Custom commands.
|
||||
@@ -34,7 +33,6 @@ pub enum Command {
|
||||
ShowServerRole,
|
||||
SetPrimaryReads,
|
||||
ShowPrimaryReads,
|
||||
StartShardedCopy,
|
||||
}
|
||||
|
||||
/// Quickly test for match when a query is received.
|
||||
@@ -57,16 +55,11 @@ pub struct QueryRouter {
|
||||
/// Include the primary into the replica pool for reads.
|
||||
primary_reads_enabled: bool,
|
||||
|
||||
/// Pool configuration.
|
||||
pool_settings: PoolSettings,
|
||||
|
||||
// Sharding key column
|
||||
sharding_key_column: Option<usize>,
|
||||
}
|
||||
|
||||
impl QueryRouter {
|
||||
/// One-time initialization of regexes
|
||||
/// that parse our custom SQL protocol.
|
||||
/// One-time initialization of regexes.
|
||||
pub fn setup() -> bool {
|
||||
let set = match RegexSet::new(&CUSTOM_SQL_REGEXES) {
|
||||
Ok(rgx) => rgx,
|
||||
@@ -81,7 +74,10 @@ impl QueryRouter {
|
||||
.map(|rgx| Regex::new(rgx).unwrap())
|
||||
.collect();
|
||||
|
||||
assert_eq!(list.len(), set.len());
|
||||
// Impossible
|
||||
if list.len() != set.len() {
|
||||
return false;
|
||||
}
|
||||
|
||||
match CUSTOM_SQL_REGEX_LIST.set(list) {
|
||||
Ok(_) => true,
|
||||
@@ -94,8 +90,7 @@ impl QueryRouter {
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new instance of the query router.
|
||||
/// Each client gets its own.
|
||||
/// Create a new instance of the query router. Each client gets its own.
|
||||
pub fn new() -> QueryRouter {
|
||||
QueryRouter {
|
||||
active_shard: None,
|
||||
@@ -103,11 +98,9 @@ impl QueryRouter {
|
||||
query_parser_enabled: false,
|
||||
primary_reads_enabled: false,
|
||||
pool_settings: PoolSettings::default(),
|
||||
sharding_key_column: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Pool settings can change because of a config reload.
|
||||
pub fn update_pool_settings(&mut self, pool_settings: PoolSettings) {
|
||||
self.pool_settings = pool_settings;
|
||||
}
|
||||
@@ -143,6 +136,19 @@ impl QueryRouter {
|
||||
return None;
|
||||
}
|
||||
|
||||
let sharding_function = match self.pool_settings.sharding_function.as_ref() {
|
||||
"pg_bigint_hash" => ShardingFunction::PgBigintHash,
|
||||
"sha1" => ShardingFunction::Sha1,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
let default_server_role = match self.pool_settings.default_role.as_ref() {
|
||||
"any" => None,
|
||||
"primary" => Some(Role::Primary),
|
||||
"replica" => Some(Role::Replica),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
let command = match matches[0] {
|
||||
0 => Command::SetShardingKey,
|
||||
1 => Command::SetShard,
|
||||
@@ -151,7 +157,6 @@ impl QueryRouter {
|
||||
4 => Command::ShowServerRole,
|
||||
5 => Command::SetPrimaryReads,
|
||||
6 => Command::ShowPrimaryReads,
|
||||
7 => Command::StartShardedCopy,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
@@ -159,8 +164,7 @@ impl QueryRouter {
|
||||
Command::SetShardingKey
|
||||
| Command::SetShard
|
||||
| Command::SetServerRole
|
||||
| Command::SetPrimaryReads
|
||||
| Command::StartShardedCopy => {
|
||||
| Command::SetPrimaryReads => {
|
||||
// Capture value. I know this re-runs the regex engine, but I haven't
|
||||
// figured out a better way just yet. I think I can write a single Regex
|
||||
// that matches all 5 custom SQL patterns, but maybe that's not very legible?
|
||||
@@ -196,10 +200,7 @@ impl QueryRouter {
|
||||
|
||||
match command {
|
||||
Command::SetShardingKey => {
|
||||
let sharder = Sharder::new(
|
||||
self.pool_settings.shards,
|
||||
self.pool_settings.sharding_function,
|
||||
);
|
||||
let sharder = Sharder::new(self.pool_settings.shards.len(), sharding_function);
|
||||
let shard = sharder.shard(value.parse::<i64>().unwrap());
|
||||
self.active_shard = Some(shard);
|
||||
value = shard.to_string();
|
||||
@@ -207,18 +208,11 @@ impl QueryRouter {
|
||||
|
||||
Command::SetShard => {
|
||||
self.active_shard = match value.to_ascii_uppercase().as_ref() {
|
||||
"ANY" => Some(rand::random::<usize>() % self.pool_settings.shards),
|
||||
"ANY" => Some(rand::random::<usize>() % self.pool_settings.shards.len()),
|
||||
_ => Some(value.parse::<usize>().unwrap()),
|
||||
};
|
||||
}
|
||||
|
||||
Command::StartShardedCopy => {
|
||||
self.sharding_key_column = match value.parse::<usize>() {
|
||||
Ok(value) => Some(value),
|
||||
Err(_) => return None,
|
||||
}
|
||||
}
|
||||
|
||||
Command::SetServerRole => {
|
||||
self.active_role = match value.to_ascii_lowercase().as_ref() {
|
||||
"primary" => {
|
||||
@@ -242,7 +236,7 @@ impl QueryRouter {
|
||||
}
|
||||
|
||||
"default" => {
|
||||
self.active_role = self.pool_settings.default_role;
|
||||
self.active_role = default_server_role;
|
||||
self.query_parser_enabled = self.query_parser_enabled;
|
||||
self.active_role
|
||||
}
|
||||
@@ -373,10 +367,10 @@ impl QueryRouter {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::collections::HashMap;
|
||||
|
||||
use super::*;
|
||||
use crate::messages::simple_query;
|
||||
use crate::pool::PoolMode;
|
||||
use crate::sharding::ShardingFunction;
|
||||
use bytes::BufMut;
|
||||
|
||||
#[test]
|
||||
@@ -639,13 +633,13 @@ mod test {
|
||||
QueryRouter::setup();
|
||||
|
||||
let pool_settings = PoolSettings {
|
||||
pool_mode: PoolMode::Transaction,
|
||||
shards: 0,
|
||||
pool_mode: "transaction".to_string(),
|
||||
shards: HashMap::default(),
|
||||
user: crate::config::User::default(),
|
||||
default_role: Some(Role::Replica),
|
||||
default_role: Role::Replica.to_string(),
|
||||
query_parser_enabled: true,
|
||||
primary_reads_enabled: false,
|
||||
sharding_function: ShardingFunction::PgBigintHash,
|
||||
sharding_function: "pg_bigint_hash".to_string(),
|
||||
};
|
||||
let mut qr = QueryRouter::new();
|
||||
assert_eq!(qr.active_role, None);
|
||||
@@ -667,6 +661,9 @@ mod test {
|
||||
|
||||
let q2 = simple_query("SET SERVER ROLE TO 'default'");
|
||||
assert!(qr.try_execute_command(q2) != None);
|
||||
assert_eq!(qr.active_role.unwrap(), pool_settings.clone().default_role);
|
||||
assert_eq!(
|
||||
qr.active_role.unwrap().to_string(),
|
||||
pool_settings.clone().default_role
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
/// Implementation of the PostgreSQL server (database) protocol.
|
||||
/// Here we are pretending to the a Postgres client.
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use std::io::Read;
|
||||
use log::{debug, error, info, trace};
|
||||
use std::time::SystemTime;
|
||||
use tokio::io::{AsyncReadExt, BufReader};
|
||||
use tokio::net::{
|
||||
@@ -49,9 +48,6 @@ pub struct Server {
|
||||
/// Is the server broken? We'll remote it from the pool if so.
|
||||
bad: bool,
|
||||
|
||||
/// If server connection requires a DISCARD ALL before checkin
|
||||
needs_cleanup: bool,
|
||||
|
||||
/// Mapping of clients and servers used for query cancellation.
|
||||
client_server_map: ClientServerMap,
|
||||
|
||||
@@ -79,7 +75,7 @@ impl Server {
|
||||
stats: Reporter,
|
||||
) -> Result<Server, Error> {
|
||||
let mut stream =
|
||||
match TcpStream::connect(&format!("{}:{}", &address.host, address.port)).await {
|
||||
match TcpStream::connect(&format!("{}:{}", &address.host, &address.port)).await {
|
||||
Ok(stream) => stream,
|
||||
Err(err) => {
|
||||
error!("Could not connect to server: {}", err);
|
||||
@@ -320,7 +316,6 @@ impl Server {
|
||||
in_transaction: false,
|
||||
data_available: false,
|
||||
bad: false,
|
||||
needs_cleanup: false,
|
||||
client_server_map: client_server_map,
|
||||
connected_at: chrono::offset::Utc::now().naive_utc(),
|
||||
stats: stats,
|
||||
@@ -347,7 +342,7 @@ impl Server {
|
||||
/// Uses a separate connection that's not part of the connection pool.
|
||||
pub async fn cancel(
|
||||
host: &str,
|
||||
port: u16,
|
||||
port: &str,
|
||||
process_id: i32,
|
||||
secret_key: i32,
|
||||
) -> Result<(), Error> {
|
||||
@@ -445,29 +440,6 @@ impl Server {
|
||||
break;
|
||||
}
|
||||
|
||||
// CommandComplete
|
||||
'C' => {
|
||||
let mut command_tag = String::new();
|
||||
match message.reader().read_to_string(&mut command_tag) {
|
||||
Ok(_) => {
|
||||
// Non-exhaustive list of commands that are likely to change session variables/resources
|
||||
// which can leak between clients. This is a best effort to block bad clients
|
||||
// from poisoning a transaction-mode pool by setting inappropriate session variables
|
||||
match command_tag.as_str() {
|
||||
"SET\0" | "PREPARE\0" => {
|
||||
debug!("Server connection marked for clean up");
|
||||
self.needs_cleanup = true;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
Err(err) => {
|
||||
warn!("Encountered an error while parsing CommandTag {}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// DataRow
|
||||
'D' => {
|
||||
// More data is available after this message, this is not the end of the reply.
|
||||
@@ -557,7 +529,7 @@ impl Server {
|
||||
self.process_id,
|
||||
self.secret_key,
|
||||
self.address.host.clone(),
|
||||
self.address.port,
|
||||
self.address.port.clone(),
|
||||
),
|
||||
);
|
||||
}
|
||||
@@ -581,43 +553,14 @@ impl Server {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Perform any necessary cleanup before putting the server
|
||||
/// connection back in the pool
|
||||
pub async fn checkin_cleanup(&mut self) -> Result<(), Error> {
|
||||
// Client disconnected with an open transaction on the server connection.
|
||||
// Pgbouncer behavior is to close the server connection but that can cause
|
||||
// server connection thrashing if clients repeatedly do this.
|
||||
// Instead, we ROLLBACK that transaction before putting the connection back in the pool
|
||||
if self.in_transaction() {
|
||||
self.query("ROLLBACK").await?;
|
||||
}
|
||||
|
||||
// Client disconnected but it perfromed session-altering operations such as
|
||||
// SET statement_timeout to 1 or create a prepared statement. We clear that
|
||||
// to avoid leaking state between clients. For performance reasons we only
|
||||
// send `DISCARD ALL` if we think the session is altered instead of just sending
|
||||
// it before each checkin.
|
||||
if self.needs_cleanup {
|
||||
self.query("DISCARD ALL").await?;
|
||||
self.needs_cleanup = false;
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
/// A shorthand for `SET application_name = $1`.
|
||||
#[allow(dead_code)]
|
||||
pub async fn set_name(&mut self, name: &str) -> Result<(), Error> {
|
||||
if self.application_name != name {
|
||||
self.application_name = name.to_string();
|
||||
// We don't want `SET application_name` to mark the server connection
|
||||
// as needing cleanup
|
||||
let needs_cleanup_before = self.needs_cleanup;
|
||||
|
||||
let result = Ok(self
|
||||
Ok(self
|
||||
.query(&format!("SET application_name = '{}'", name))
|
||||
.await?);
|
||||
self.needs_cleanup = needs_cleanup_before;
|
||||
return result;
|
||||
.await?)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
@@ -638,11 +581,6 @@ impl Server {
|
||||
pub fn last_activity(&self) -> SystemTime {
|
||||
self.last_activity
|
||||
}
|
||||
|
||||
// Marks a connection as needing DISCARD ALL at checkin
|
||||
pub fn mark_dirty(&mut self) {
|
||||
self.needs_cleanup = true;
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Server {
|
||||
@@ -669,8 +607,7 @@ impl Drop for Server {
|
||||
let duration = now - self.connected_at;
|
||||
|
||||
info!(
|
||||
"Server connection closed {:?}, session duration: {}",
|
||||
self.address,
|
||||
"Server connection closed, session duration: {}",
|
||||
crate::format_duration(&duration)
|
||||
);
|
||||
}
|
||||
|
||||
124
src/stats.rs
124
src/stats.rs
@@ -4,6 +4,7 @@ use log::{error, info, trace};
|
||||
use once_cell::sync::Lazy;
|
||||
use parking_lot::Mutex;
|
||||
use std::collections::HashMap;
|
||||
use std::time::SystemTime;
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
|
||||
@@ -42,6 +43,26 @@ enum EventName {
|
||||
UpdateAverages,
|
||||
}
|
||||
|
||||
/// Send an event via the channel and log
|
||||
/// an error if it fails.
|
||||
fn send(tx: &Sender<Event>, event: Event) {
|
||||
let name = event.name;
|
||||
let result = tx.try_send(event);
|
||||
|
||||
match result {
|
||||
Ok(_) => trace!(
|
||||
"{:?} event reported successfully, capacity: {}",
|
||||
name,
|
||||
tx.capacity()
|
||||
),
|
||||
|
||||
Err(err) => match err {
|
||||
TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name),
|
||||
TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/// Event data sent to the collector
|
||||
/// from clients and servers.
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -80,25 +101,6 @@ impl Reporter {
|
||||
Reporter { tx: tx }
|
||||
}
|
||||
|
||||
/// Send statistics to the task keeping track of stats.
|
||||
fn send(&self, event: Event) {
|
||||
let name = event.name;
|
||||
let result = self.tx.try_send(event);
|
||||
|
||||
match result {
|
||||
Ok(_) => trace!(
|
||||
"{:?} event reported successfully, capacity: {}",
|
||||
name,
|
||||
self.tx.capacity()
|
||||
),
|
||||
|
||||
Err(err) => match err {
|
||||
TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name),
|
||||
TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/// Report a query executed by a client against
|
||||
/// a server identified by the `address_id`.
|
||||
pub fn query(&self, process_id: i32, address_id: usize) {
|
||||
@@ -109,7 +111,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event);
|
||||
send(&self.tx, event);
|
||||
}
|
||||
|
||||
/// Report a transaction executed by a client against
|
||||
@@ -122,7 +124,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Report data sent to a server identified by `address_id`.
|
||||
@@ -135,7 +137,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Report data received from a server identified by `address_id`.
|
||||
@@ -148,7 +150,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Time spent waiting to get a healthy connection from the pool
|
||||
@@ -162,7 +164,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Reports a client identified by `process_id` waiting for a connection
|
||||
@@ -175,7 +177,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Reports a client identified by `process_id` is done waiting for a connection
|
||||
@@ -188,7 +190,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Reports a client identified by `process_id` is done querying the server
|
||||
@@ -201,7 +203,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Reports a client identified by `process_id` is disconecting from the pooler.
|
||||
@@ -214,7 +216,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Reports a server connection identified by `process_id` for
|
||||
@@ -228,7 +230,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Reports a server connection identified by `process_id` for
|
||||
@@ -242,7 +244,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Reports a server connection identified by `process_id` for
|
||||
@@ -256,7 +258,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Reports a server connection identified by `process_id` for
|
||||
@@ -270,7 +272,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Reports a server connection identified by `process_id` is disconecting from the pooler.
|
||||
@@ -283,7 +285,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -345,6 +347,9 @@ impl Collector {
|
||||
// Track which state the client and server are at any given time.
|
||||
let mut client_server_states: HashMap<usize, HashMap<i32, EventName>> = HashMap::new();
|
||||
|
||||
// Average update times
|
||||
let mut last_updated_avg: HashMap<usize, SystemTime> = HashMap::new();
|
||||
|
||||
// Flush stats to StatsD and calculate averages every 15 seconds.
|
||||
let tx = self.tx.clone();
|
||||
tokio::task::spawn(async move {
|
||||
@@ -354,12 +359,15 @@ impl Collector {
|
||||
interval.tick().await;
|
||||
let address_count = get_number_of_addresses();
|
||||
for address_id in 0..address_count {
|
||||
let _ = tx.try_send(Event {
|
||||
name: EventName::UpdateStats,
|
||||
value: 0,
|
||||
process_id: -1,
|
||||
address_id: address_id,
|
||||
});
|
||||
send(
|
||||
&tx,
|
||||
Event {
|
||||
name: EventName::UpdateStats,
|
||||
value: 0,
|
||||
process_id: -1,
|
||||
address_id: address_id,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -372,12 +380,15 @@ impl Collector {
|
||||
interval.tick().await;
|
||||
let address_count = get_number_of_addresses();
|
||||
for address_id in 0..address_count {
|
||||
let _ = tx.try_send(Event {
|
||||
name: EventName::UpdateAverages,
|
||||
value: 0,
|
||||
process_id: -1,
|
||||
address_id: address_id,
|
||||
});
|
||||
send(
|
||||
&tx,
|
||||
Event {
|
||||
name: EventName::UpdateAverages,
|
||||
value: 0,
|
||||
process_id: -1,
|
||||
address_id: address_id,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -399,6 +410,9 @@ impl Collector {
|
||||
.entry(stat.address_id)
|
||||
.or_insert(HashMap::new());
|
||||
let old_stats = old_stats.entry(stat.address_id).or_insert(HashMap::new());
|
||||
let last_updated_avg = last_updated_avg
|
||||
.entry(stat.address_id)
|
||||
.or_insert(SystemTime::now());
|
||||
|
||||
// Some are counters, some are gauges...
|
||||
match stat.name {
|
||||
@@ -524,6 +538,24 @@ impl Collector {
|
||||
}
|
||||
|
||||
EventName::UpdateAverages => {
|
||||
let elapsed = match last_updated_avg.elapsed() {
|
||||
Ok(elapsed) => elapsed.as_secs(),
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Could not get elapsed time, averages may be incorrect: {:?}",
|
||||
err
|
||||
);
|
||||
STAT_PERIOD / 1_000
|
||||
}
|
||||
} as i64;
|
||||
|
||||
*last_updated_avg = SystemTime::now();
|
||||
|
||||
// Tokio triggers the interval on first tick and then sleeps.
|
||||
if elapsed == 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Calculate averages
|
||||
for stat in &[
|
||||
"avg_query_count",
|
||||
@@ -541,7 +573,7 @@ impl Collector {
|
||||
|
||||
let old_value = old_stats.entry(total_name.clone()).or_insert(0);
|
||||
let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned();
|
||||
let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second
|
||||
let avg = (new_value - *old_value) / elapsed; // Avg / second
|
||||
|
||||
stats.insert(stat, avg);
|
||||
*old_value = new_value;
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
FROM rust:bullseye
|
||||
|
||||
RUN apt-get update && apt-get install llvm-11 psmisc postgresql-contrib postgresql-client ruby ruby-dev libpq-dev python3 python3-pip lcov sudo curl -y
|
||||
RUN cargo install cargo-binutils rustfilt
|
||||
RUN rustup component add llvm-tools-preview
|
||||
@@ -1,47 +0,0 @@
|
||||
version: "3"
|
||||
services:
|
||||
pg1:
|
||||
image: postgres:14
|
||||
network_mode: "service:main"
|
||||
environment:
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_DB: postgres
|
||||
POSTGRES_PASSWORD: postgres
|
||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "5432"]
|
||||
pg2:
|
||||
image: postgres:14
|
||||
network_mode: "service:main"
|
||||
environment:
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_DB: postgres
|
||||
POSTGRES_PASSWORD: postgres
|
||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "7432"]
|
||||
pg3:
|
||||
image: postgres:14
|
||||
network_mode: "service:main"
|
||||
environment:
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_DB: postgres
|
||||
POSTGRES_PASSWORD: postgres
|
||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "8432"]
|
||||
pg4:
|
||||
image: postgres:14
|
||||
network_mode: "service:main"
|
||||
environment:
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_DB: postgres
|
||||
POSTGRES_PASSWORD: postgres
|
||||
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "9432"]
|
||||
main:
|
||||
build: .
|
||||
command: ["bash", "/app/tests/docker/run.sh"]
|
||||
environment:
|
||||
RUSTFLAGS: "-C instrument-coverage"
|
||||
LLVM_PROFILE_FILE: "pgcat-%m.profraw"
|
||||
volumes:
|
||||
- ../../:/app/
|
||||
- /app/target/
|
||||
@@ -1,21 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
rm /app/*.profraw || true
|
||||
rm /app/pgcat.profdata || true
|
||||
rm -rf /app/cov || true
|
||||
|
||||
cd /app/
|
||||
|
||||
cargo build
|
||||
cargo test --tests
|
||||
|
||||
bash .circleci/run_tests.sh
|
||||
|
||||
rust-profdata merge -sparse pgcat-*.profraw -o pgcat.profdata
|
||||
|
||||
rust-cov export -ignore-filename-regex="rustc|registry" -Xdemangler=rustfilt -instr-profile=pgcat.profdata --object ./target/debug/pgcat --format lcov > ./lcov.info
|
||||
|
||||
genhtml lcov.info --output-directory cov --prefix $(pwd)
|
||||
|
||||
rm /app/*.profraw
|
||||
rm /app/pgcat.profdata
|
||||
@@ -14,18 +14,12 @@ PGCAT_PORT = "6432"
|
||||
def pgcat_start():
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
|
||||
time.sleep(2)
|
||||
|
||||
|
||||
def pg_cat_send_signal(signal: signal.Signals):
|
||||
try:
|
||||
for proc in psutil.process_iter(["pid", "name"]):
|
||||
if "pgcat" == proc.name():
|
||||
os.kill(proc.pid, signal)
|
||||
except Exception as e:
|
||||
# The process can be gone when we send this signal
|
||||
print(e)
|
||||
|
||||
for proc in psutil.process_iter(["pid", "name"]):
|
||||
if "pgcat" == proc.name():
|
||||
os.kill(proc.pid, signal)
|
||||
if signal == signal.SIGTERM:
|
||||
# Returns 0 if pgcat process exists
|
||||
time.sleep(2)
|
||||
@@ -33,23 +27,11 @@ def pg_cat_send_signal(signal: signal.Signals):
|
||||
raise Exception("pgcat not closed after SIGTERM")
|
||||
|
||||
|
||||
def connect_db(
|
||||
autocommit: bool = True,
|
||||
admin: bool = False,
|
||||
def connect_normal_db(
|
||||
autocommit: bool = False,
|
||||
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
||||
|
||||
if admin:
|
||||
user = "admin_user"
|
||||
password = "admin_pass"
|
||||
db = "pgcat"
|
||||
else:
|
||||
user = "sharding_user"
|
||||
password = "sharding_user"
|
||||
db = "sharded_db"
|
||||
|
||||
conn = psycopg2.connect(
|
||||
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
||||
connect_timeout=2,
|
||||
f"postgres://sharding_user:sharding_user@{PGCAT_HOST}:{PGCAT_PORT}/sharded_db?application_name=testing_pgcat"
|
||||
)
|
||||
conn.autocommit = autocommit
|
||||
cur = conn.cursor()
|
||||
@@ -63,7 +45,7 @@ def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.
|
||||
|
||||
|
||||
def test_normal_db_access():
|
||||
conn, cur = connect_db(autocommit=False)
|
||||
conn, cur = connect_normal_db()
|
||||
cur.execute("SELECT 1")
|
||||
res = cur.fetchall()
|
||||
print(res)
|
||||
@@ -71,7 +53,11 @@ def test_normal_db_access():
|
||||
|
||||
|
||||
def test_admin_db_access():
|
||||
conn, cur = connect_db(admin=True)
|
||||
conn = psycopg2.connect(
|
||||
f"postgres://admin_user:admin_pass@{PGCAT_HOST}:{PGCAT_PORT}/pgcat"
|
||||
)
|
||||
conn.autocommit = True # BEGIN/COMMIT is not supported by admin db
|
||||
cur = conn.cursor()
|
||||
|
||||
cur.execute("SHOW POOLS")
|
||||
res = cur.fetchall()
|
||||
@@ -81,14 +67,15 @@ def test_admin_db_access():
|
||||
|
||||
def test_shutdown_logic():
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# NO ACTIVE QUERIES SIGINT HANDLING
|
||||
|
||||
##### NO ACTIVE QUERIES SIGINT HANDLING #####
|
||||
# Start pgcat
|
||||
pgcat_start()
|
||||
|
||||
# Wait for server to fully start up
|
||||
time.sleep(2)
|
||||
|
||||
# Create client connection and send query (not in transaction)
|
||||
conn, cur = connect_db()
|
||||
conn, cur = connect_normal_db(True)
|
||||
|
||||
cur.execute("BEGIN;")
|
||||
cur.execute("SELECT 1;")
|
||||
@@ -110,14 +97,17 @@ def test_shutdown_logic():
|
||||
cleanup_conn(conn, cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# HANDLE TRANSACTION WITH SIGINT
|
||||
##### END #####
|
||||
|
||||
##### HANDLE TRANSACTION WITH SIGINT #####
|
||||
# Start pgcat
|
||||
pgcat_start()
|
||||
|
||||
# Wait for server to fully start up
|
||||
time.sleep(2)
|
||||
|
||||
# Create client connection and begin transaction
|
||||
conn, cur = connect_db()
|
||||
conn, cur = connect_normal_db(True)
|
||||
|
||||
cur.execute("BEGIN;")
|
||||
cur.execute("SELECT 1;")
|
||||
@@ -136,97 +126,17 @@ def test_shutdown_logic():
|
||||
cleanup_conn(conn, cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
|
||||
##### END #####
|
||||
|
||||
##### HANDLE SHUTDOWN TIMEOUT WITH SIGINT #####
|
||||
# Start pgcat
|
||||
pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction
|
||||
transaction_conn, transaction_cur = connect_db()
|
||||
|
||||
transaction_cur.execute("BEGIN;")
|
||||
transaction_cur.execute("SELECT 1;")
|
||||
|
||||
# Send sigint to pgcat while still in transaction
|
||||
pg_cat_send_signal(signal.SIGINT)
|
||||
time.sleep(1)
|
||||
|
||||
start = time.perf_counter()
|
||||
try:
|
||||
conn, cur = connect_db()
|
||||
cur.execute("SELECT 1;")
|
||||
cleanup_conn(conn, cur)
|
||||
except psycopg2.OperationalError as e:
|
||||
time_taken = time.perf_counter() - start
|
||||
if time_taken > 0.1:
|
||||
raise Exception(
|
||||
"Failed to reject connection within 0.1 seconds, got", time_taken, "seconds")
|
||||
pass
|
||||
else:
|
||||
raise Exception("Able connect to database during shutdown")
|
||||
|
||||
cleanup_conn(transaction_conn, transaction_cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
|
||||
# Start pgcat
|
||||
pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction
|
||||
transaction_conn, transaction_cur = connect_db()
|
||||
|
||||
transaction_cur.execute("BEGIN;")
|
||||
transaction_cur.execute("SELECT 1;")
|
||||
|
||||
# Send sigint to pgcat while still in transaction
|
||||
pg_cat_send_signal(signal.SIGINT)
|
||||
time.sleep(1)
|
||||
|
||||
try:
|
||||
conn, cur = connect_db(admin=True)
|
||||
cur.execute("SHOW DATABASES;")
|
||||
cleanup_conn(conn, cur)
|
||||
except psycopg2.OperationalError as e:
|
||||
raise Exception(e)
|
||||
|
||||
cleanup_conn(transaction_conn, transaction_cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
|
||||
# Start pgcat
|
||||
pgcat_start()
|
||||
|
||||
# Create client connection and begin transaction
|
||||
transaction_conn, transaction_cur = connect_db()
|
||||
transaction_cur.execute("BEGIN;")
|
||||
transaction_cur.execute("SELECT 1;")
|
||||
|
||||
admin_conn, admin_cur = connect_db(admin=True)
|
||||
admin_cur.execute("SHOW DATABASES;")
|
||||
|
||||
# Send sigint to pgcat while still in transaction
|
||||
pg_cat_send_signal(signal.SIGINT)
|
||||
time.sleep(1)
|
||||
|
||||
try:
|
||||
admin_cur.execute("SHOW DATABASES;")
|
||||
except psycopg2.OperationalError as e:
|
||||
raise Exception("Could not execute admin command:", e)
|
||||
|
||||
cleanup_conn(transaction_conn, transaction_cur)
|
||||
cleanup_conn(admin_conn, admin_cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT
|
||||
|
||||
# Start pgcat
|
||||
pgcat_start()
|
||||
# Wait for server to fully start up
|
||||
time.sleep(3)
|
||||
|
||||
# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
|
||||
conn, cur = connect_db()
|
||||
conn, cur = connect_normal_db(True)
|
||||
|
||||
cur.execute("BEGIN;")
|
||||
cur.execute("SELECT 1;")
|
||||
@@ -249,7 +159,7 @@ def test_shutdown_logic():
|
||||
cleanup_conn(conn, cur)
|
||||
pg_cat_send_signal(signal.SIGTERM)
|
||||
|
||||
# - - - - - - - - - - - - - - - - - -
|
||||
##### END #####
|
||||
|
||||
|
||||
test_normal_db_access()
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
source "https://rubygems.org"
|
||||
|
||||
gem "pg"
|
||||
gem "toml"
|
||||
gem "rspec"
|
||||
gem "rubocop"
|
||||
gem "toxiproxy"
|
||||
gem "activerecord"
|
||||
gem "rubocop"
|
||||
gem "toml", "~> 0.3.0"
|
||||
|
||||
@@ -13,7 +13,6 @@ GEM
|
||||
tzinfo (~> 2.0)
|
||||
ast (2.4.2)
|
||||
concurrent-ruby (1.1.10)
|
||||
diff-lcs (1.5.0)
|
||||
i18n (1.11.0)
|
||||
concurrent-ruby (~> 1.0)
|
||||
minitest (5.16.2)
|
||||
@@ -25,19 +24,6 @@ GEM
|
||||
rainbow (3.1.1)
|
||||
regexp_parser (2.3.1)
|
||||
rexml (3.2.5)
|
||||
rspec (3.11.0)
|
||||
rspec-core (~> 3.11.0)
|
||||
rspec-expectations (~> 3.11.0)
|
||||
rspec-mocks (~> 3.11.0)
|
||||
rspec-core (3.11.0)
|
||||
rspec-support (~> 3.11.0)
|
||||
rspec-expectations (3.11.0)
|
||||
diff-lcs (>= 1.2.0, < 2.0)
|
||||
rspec-support (~> 3.11.0)
|
||||
rspec-mocks (3.11.1)
|
||||
diff-lcs (>= 1.2.0, < 2.0)
|
||||
rspec-support (~> 3.11.0)
|
||||
rspec-support (3.11.0)
|
||||
rubocop (1.29.0)
|
||||
parallel (~> 1.10)
|
||||
parser (>= 3.1.0.0)
|
||||
@@ -52,23 +38,19 @@ GEM
|
||||
ruby-progressbar (1.11.0)
|
||||
toml (0.3.0)
|
||||
parslet (>= 1.8.0, < 3.0.0)
|
||||
toxiproxy (2.0.1)
|
||||
tzinfo (2.0.4)
|
||||
concurrent-ruby (~> 1.0)
|
||||
unicode-display_width (2.1.0)
|
||||
|
||||
PLATFORMS
|
||||
aarch64-linux
|
||||
arm64-darwin-21
|
||||
x86_64-linux
|
||||
|
||||
DEPENDENCIES
|
||||
activerecord
|
||||
pg
|
||||
rspec
|
||||
rubocop
|
||||
toml
|
||||
toxiproxy
|
||||
toml (~> 0.3.0)
|
||||
|
||||
BUNDLED WITH
|
||||
2.3.21
|
||||
2.3.7
|
||||
|
||||
@@ -1,82 +0,0 @@
|
||||
require 'pg'
|
||||
require 'toxiproxy'
|
||||
|
||||
class PgInstance
|
||||
attr_reader :port
|
||||
attr_reader :username
|
||||
attr_reader :password
|
||||
attr_reader :database_name
|
||||
|
||||
def initialize(port, username, password, database_name)
|
||||
@original_port = port
|
||||
@toxiproxy_port = 10000 + port.to_i
|
||||
@port = @toxiproxy_port
|
||||
|
||||
@username = username
|
||||
@password = password
|
||||
@database_name = database_name
|
||||
@toxiproxy_name = "database_#{@original_port}"
|
||||
Toxiproxy.populate([{
|
||||
name: @toxiproxy_name,
|
||||
listen: "0.0.0.0:#{@toxiproxy_port}",
|
||||
upstream: "localhost:#{@original_port}",
|
||||
}])
|
||||
|
||||
# Toxiproxy server will outlive our PgInstance objects
|
||||
# so we want to destroy our proxies before exiting
|
||||
# Ruby finalizer is ideal for doing this
|
||||
ObjectSpace.define_finalizer(@toxiproxy_name, proc { Toxiproxy[@toxiproxy_name].destroy })
|
||||
end
|
||||
|
||||
def with_connection
|
||||
conn = PG.connect("postgres://#{@username}:#{@password}@localhost:#{port}/#{database_name}")
|
||||
yield conn
|
||||
ensure
|
||||
conn&.close
|
||||
end
|
||||
|
||||
def reset
|
||||
reset_toxics
|
||||
reset_stats
|
||||
end
|
||||
|
||||
def toxiproxy
|
||||
Toxiproxy[@toxiproxy_name]
|
||||
end
|
||||
|
||||
def take_down
|
||||
if block_given?
|
||||
Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).apply { yield }
|
||||
else
|
||||
Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).toxics.each(&:save)
|
||||
end
|
||||
end
|
||||
|
||||
def add_latency(latency)
|
||||
if block_given?
|
||||
Toxiproxy[@toxiproxy_name].toxic(:latency, latency: latency).apply { yield }
|
||||
else
|
||||
Toxiproxy[@toxiproxy_name].toxic(:latency, latency: latency).toxics.each(&:save)
|
||||
end
|
||||
end
|
||||
|
||||
def delete_proxy
|
||||
Toxiproxy[@toxiproxy_name].delete
|
||||
end
|
||||
|
||||
def reset_toxics
|
||||
Toxiproxy[@toxiproxy_name].toxics.each(&:destroy)
|
||||
end
|
||||
|
||||
def reset_stats
|
||||
with_connection { |c| c.async_exec("SELECT pg_stat_statements_reset()") }
|
||||
end
|
||||
|
||||
def count_query(query)
|
||||
with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = '#{query}'")[0]["sum"].to_i }
|
||||
end
|
||||
|
||||
def count_select_1_plus_2
|
||||
with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = 'SELECT $1 + $2'")[0]["sum"].to_i }
|
||||
end
|
||||
end
|
||||
@@ -1,100 +0,0 @@
|
||||
require 'json'
|
||||
require 'ostruct'
|
||||
require_relative 'pgcat_process'
|
||||
require_relative 'pg_instance'
|
||||
|
||||
module Helpers
|
||||
module Pgcat
|
||||
def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction")
|
||||
user = {
|
||||
"password" => "sharding_user",
|
||||
"pool_size" => pool_size,
|
||||
"statement_timeout" => 0,
|
||||
"username" => "sharding_user"
|
||||
}
|
||||
|
||||
pgcat = PgcatProcess.new("info")
|
||||
primary0 = PgInstance.new(5432, user["username"], user["password"], "shard0")
|
||||
primary1 = PgInstance.new(7432, user["username"], user["password"], "shard1")
|
||||
primary2 = PgInstance.new(8432, user["username"], user["password"], "shard2")
|
||||
|
||||
pgcat_cfg = pgcat.current_config
|
||||
pgcat_cfg["pools"] = {
|
||||
"#{pool_name}" => {
|
||||
"default_role" => "any",
|
||||
"pool_mode" => pool_mode,
|
||||
"primary_reads_enabled" => false,
|
||||
"query_parser_enabled" => false,
|
||||
"sharding_function" => "pg_bigint_hash",
|
||||
"shards" => {
|
||||
"0" => { "database" => "shard0", "servers" => [["localhost", primary0.port.to_s, "primary"]] },
|
||||
"1" => { "database" => "shard1", "servers" => [["localhost", primary1.port.to_s, "primary"]] },
|
||||
"2" => { "database" => "shard2", "servers" => [["localhost", primary2.port.to_s, "primary"]] },
|
||||
},
|
||||
"users" => { "0" => user }
|
||||
}
|
||||
}
|
||||
pgcat.update_config(pgcat_cfg)
|
||||
|
||||
pgcat.start
|
||||
pgcat.wait_until_ready
|
||||
|
||||
OpenStruct.new.tap do |struct|
|
||||
struct.pgcat = pgcat
|
||||
struct.shards = [primary0, primary1, primary2]
|
||||
struct.all_databases = [primary0, primary1, primary2]
|
||||
end
|
||||
end
|
||||
|
||||
def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction")
|
||||
user = {
|
||||
"password" => "sharding_user",
|
||||
"pool_size" => pool_size,
|
||||
"statement_timeout" => 0,
|
||||
"username" => "sharding_user"
|
||||
}
|
||||
|
||||
pgcat = PgcatProcess.new("info")
|
||||
pgcat_cfg = pgcat.current_config
|
||||
|
||||
primary = PgInstance.new(5432, user["username"], user["password"], "shard0")
|
||||
replica0 = PgInstance.new(7432, user["username"], user["password"], "shard0")
|
||||
replica1 = PgInstance.new(8432, user["username"], user["password"], "shard0")
|
||||
replica2 = PgInstance.new(9432, user["username"], user["password"], "shard0")
|
||||
|
||||
# Main proxy configs
|
||||
pgcat_cfg["pools"] = {
|
||||
"#{pool_name}" => {
|
||||
"default_role" => "any",
|
||||
"pool_mode" => pool_mode,
|
||||
"primary_reads_enabled" => false,
|
||||
"query_parser_enabled" => false,
|
||||
"sharding_function" => "pg_bigint_hash",
|
||||
"shards" => {
|
||||
"0" => {
|
||||
"database" => "shard0",
|
||||
"servers" => [
|
||||
["localhost", primary.port.to_s, "primary"],
|
||||
["localhost", replica0.port.to_s, "replica"],
|
||||
["localhost", replica1.port.to_s, "replica"],
|
||||
["localhost", replica2.port.to_s, "replica"]
|
||||
]
|
||||
},
|
||||
},
|
||||
"users" => { "0" => user }
|
||||
}
|
||||
}
|
||||
pgcat_cfg["general"]["port"] = pgcat.port
|
||||
pgcat.update_config(pgcat_cfg)
|
||||
pgcat.start
|
||||
pgcat.wait_until_ready
|
||||
|
||||
OpenStruct.new.tap do |struct|
|
||||
struct.pgcat = pgcat
|
||||
struct.primary = primary
|
||||
struct.replicas = [replica0, replica1, replica2]
|
||||
struct.all_databases = [primary, replica0, replica1, replica2]
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -1,116 +0,0 @@
|
||||
require 'pg'
|
||||
require 'toml'
|
||||
require 'fileutils'
|
||||
require 'securerandom'
|
||||
|
||||
class PgcatProcess
|
||||
attr_reader :port
|
||||
attr_reader :pid
|
||||
|
||||
def self.finalize(pid, log_filename, config_filename)
|
||||
`kill #{pid}`
|
||||
File.delete(config_filename) if File.exist?(config_filename)
|
||||
File.delete(log_filename) if File.exist?(log_filename)
|
||||
end
|
||||
|
||||
def initialize(log_level)
|
||||
@env = {"RUST_LOG" => log_level}
|
||||
@port = rand(20000..32760)
|
||||
@log_level = log_level
|
||||
@log_filename = "/tmp/pgcat_log_#{SecureRandom.urlsafe_base64}.log"
|
||||
@config_filename = "/tmp/pgcat_cfg_#{SecureRandom.urlsafe_base64}.toml"
|
||||
|
||||
@command = "../../target/debug/pgcat #{@config_filename}"
|
||||
|
||||
FileUtils.cp("../../pgcat.toml", @config_filename)
|
||||
cfg = current_config
|
||||
cfg["general"]["port"] = @port.to_i
|
||||
cfg["general"]["enable_prometheus_exporter"] = false
|
||||
|
||||
update_config(cfg)
|
||||
end
|
||||
|
||||
def logs
|
||||
File.read(@log_filename)
|
||||
end
|
||||
|
||||
def update_config(config_hash)
|
||||
@original_config = current_config
|
||||
output_to_write = TOML::Generator.new(config_hash).body
|
||||
output_to_write = output_to_write.gsub(/,\s*["|'](\d+)["|']\s*,/, ',\1,')
|
||||
File.write(@config_filename, output_to_write)
|
||||
end
|
||||
|
||||
def current_config
|
||||
old_cfg = File.read(@config_filename)
|
||||
loadable_string = old_cfg.gsub(/,\s*(\d+)\s*,/, ', "\1",')
|
||||
TOML.load(loadable_string)
|
||||
end
|
||||
|
||||
def reload_config
|
||||
`kill -s HUP #{@pid}`
|
||||
sleep 0.1
|
||||
end
|
||||
|
||||
def start
|
||||
raise StandardError, "Process is already started" unless @pid.nil?
|
||||
@pid = Process.spawn(@env, @command, err: @log_filename, out: @log_filename)
|
||||
ObjectSpace.define_finalizer(@log_filename, proc { PgcatProcess.finalize(@pid, @log_filename, @config_filename) })
|
||||
|
||||
return self
|
||||
end
|
||||
|
||||
def wait_until_ready
|
||||
exc = nil
|
||||
10.times do
|
||||
PG::connect(example_connection_string).close
|
||||
|
||||
return self
|
||||
rescue => e
|
||||
exc = e
|
||||
sleep(0.5)
|
||||
end
|
||||
puts exc
|
||||
raise StandardError, "Process #{@pid} never became ready. Logs #{logs}"
|
||||
end
|
||||
|
||||
def stop
|
||||
`kill #{@pid}`
|
||||
sleep 0.1
|
||||
end
|
||||
|
||||
def shutdown
|
||||
stop
|
||||
File.delete(@config_filename) if File.exist?(@config_filename)
|
||||
File.delete(@log_filename) if File.exist?(@log_filename)
|
||||
end
|
||||
|
||||
def admin_connection_string
|
||||
cfg = current_config
|
||||
username = cfg["general"]["admin_username"]
|
||||
password = cfg["general"]["admin_password"]
|
||||
|
||||
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/pgcat"
|
||||
end
|
||||
|
||||
def connection_string(pool_name, username)
|
||||
cfg = current_config
|
||||
|
||||
user_idx, user_obj = cfg["pools"][pool_name]["users"].detect { |k, user| user["username"] == username }
|
||||
password = user_obj["password"]
|
||||
|
||||
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/#{pool_name}"
|
||||
end
|
||||
|
||||
def example_connection_string
|
||||
cfg = current_config
|
||||
first_pool_name = cfg["pools"].keys[0]
|
||||
|
||||
db_name = first_pool_name
|
||||
|
||||
username = cfg["pools"][first_pool_name]["users"]["0"]["username"]
|
||||
password = cfg["pools"][first_pool_name]["users"]["0"]["password"]
|
||||
|
||||
"postgresql://#{username}:#{password}@0.0.0.0:#{@port}/#{db_name}"
|
||||
end
|
||||
end
|
||||
@@ -1,61 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
require_relative 'spec_helper'
|
||||
|
||||
describe "Load Balancing" do
|
||||
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
|
||||
after do
|
||||
processes.all_databases.map(&:reset)
|
||||
processes.pgcat.shutdown
|
||||
end
|
||||
|
||||
context "under regular circumstances" do
|
||||
it "balances query volume between all instances" do
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
|
||||
query_count = QUERY_COUNT
|
||||
expected_share = query_count / processes.all_databases.count
|
||||
failed_count = 0
|
||||
|
||||
query_count.times do
|
||||
conn.async_exec("SELECT 1 + 2")
|
||||
rescue
|
||||
failed_count += 1
|
||||
end
|
||||
|
||||
expect(failed_count).to eq(0)
|
||||
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
|
||||
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context "when some replicas are down" do
|
||||
it "balances query volume between working instances" do
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
|
||||
failed_count = 0
|
||||
|
||||
processes[:replicas][0].take_down do
|
||||
processes[:replicas][1].take_down do
|
||||
QUERY_COUNT.times do
|
||||
conn.async_exec("SELECT 1 + 2")
|
||||
rescue
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
failed_count += 1
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
expect(failed_count).to eq(2)
|
||||
processes.all_databases.each do |instance|
|
||||
queries_routed = instance.count_select_1_plus_2
|
||||
if processes.replicas[0..1].include?(instance)
|
||||
expect(queries_routed).to eq(0)
|
||||
else
|
||||
expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,193 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
require_relative 'spec_helper'
|
||||
|
||||
describe "Miscellaneous" do
|
||||
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
|
||||
after do
|
||||
processes.all_databases.map(&:reset)
|
||||
processes.pgcat.shutdown
|
||||
end
|
||||
|
||||
describe "Extended Protocol handling" do
|
||||
it "does not send packets that client does not expect during extended protocol sequence" do
|
||||
new_configs = processes.pgcat.current_config
|
||||
|
||||
new_configs["general"]["connect_timeout"] = 500
|
||||
new_configs["general"]["ban_time"] = 1
|
||||
new_configs["general"]["shutdown_timeout"] = 1
|
||||
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
|
||||
|
||||
processes.pgcat.update_config(new_configs)
|
||||
processes.pgcat.reload_config
|
||||
|
||||
25.times do
|
||||
Thread.new do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("SELECT pg_sleep(5)") rescue PG::SystemError
|
||||
ensure
|
||||
conn&.close
|
||||
end
|
||||
end
|
||||
|
||||
sleep(0.5)
|
||||
conn_under_test = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
stdout, stderr = with_captured_stdout_stderr do
|
||||
15.times do |i|
|
||||
conn_under_test.async_exec("SELECT 1") rescue PG::SystemError
|
||||
conn_under_test.exec_params("SELECT #{i} + $1", [i]) rescue PG::SystemError
|
||||
sleep 1
|
||||
end
|
||||
end
|
||||
|
||||
raise StandardError, "Libpq got unexpected messages while idle" if stderr.include?("arrived from server while idle")
|
||||
end
|
||||
end
|
||||
|
||||
describe "Pool recycling after config reload" do
|
||||
let(:processes) { Helpers::Pgcat.three_shard_setup("sharded_db", 5) }
|
||||
|
||||
it "should update pools for new clients and clients that are no longer in transaction" do
|
||||
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
server_conn.async_exec("BEGIN")
|
||||
|
||||
# No config change yet, client should set old configs
|
||||
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
|
||||
expect(current_datebase_from_pg).to eq('shard0')
|
||||
|
||||
# Swap shards
|
||||
new_config = processes.pgcat.current_config
|
||||
shard0 = new_config["pools"]["sharded_db"]["shards"]["0"]
|
||||
shard1 = new_config["pools"]["sharded_db"]["shards"]["1"]
|
||||
new_config["pools"]["sharded_db"]["shards"]["0"] = shard1
|
||||
new_config["pools"]["sharded_db"]["shards"]["1"] = shard0
|
||||
|
||||
# Reload config
|
||||
processes.pgcat.update_config(new_config)
|
||||
processes.pgcat.reload_config
|
||||
sleep 0.5
|
||||
|
||||
# Config changed but transaction is in progress, client should set old configs
|
||||
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
|
||||
expect(current_datebase_from_pg).to eq('shard0')
|
||||
server_conn.async_exec("COMMIT")
|
||||
|
||||
# Transaction finished, client should get new configs
|
||||
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
|
||||
expect(current_datebase_from_pg).to eq('shard1')
|
||||
|
||||
# New connection should get new configs
|
||||
server_conn.close()
|
||||
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
|
||||
expect(current_datebase_from_pg).to eq('shard1')
|
||||
end
|
||||
end
|
||||
|
||||
describe "Clients closing connection in the middle of transaction" do
|
||||
it "sends a rollback to the server" do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("SET SERVER ROLE to 'primary'")
|
||||
conn.async_exec("BEGIN")
|
||||
conn.close
|
||||
|
||||
expect(processes.primary.count_query("ROLLBACK")).to eq(1)
|
||||
end
|
||||
end
|
||||
|
||||
describe "Server version reporting" do
|
||||
it "reports correct version for normal and admin databases" do
|
||||
server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
expect(server_conn.server_version).not_to eq(0)
|
||||
server_conn.close
|
||||
|
||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||
expect(admin_conn.server_version).not_to eq(0)
|
||||
admin_conn.close
|
||||
end
|
||||
end
|
||||
|
||||
describe "State clearance" do
|
||||
context "session mode" do
|
||||
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "session") }
|
||||
|
||||
it "Clears state before connection checkin" do
|
||||
# Both modes of operation should not raise
|
||||
# ERROR: prepared statement "prepared_q" already exists
|
||||
15.times do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("PREPARE prepared_q (int) AS SELECT $1")
|
||||
conn.close
|
||||
end
|
||||
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
initial_value = conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]
|
||||
conn.async_exec("SET statement_timeout to 1000")
|
||||
current_value = conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]
|
||||
expect(conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]).to eq("1s")
|
||||
conn.close
|
||||
end
|
||||
|
||||
it "Does not send DISCARD ALL unless necessary" do
|
||||
10.times do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("SET SERVER ROLE to 'primary'")
|
||||
conn.async_exec("SELECT 1")
|
||||
conn.close
|
||||
end
|
||||
|
||||
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
|
||||
|
||||
10.times do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("SET SERVER ROLE to 'primary'")
|
||||
conn.async_exec("SELECT 1")
|
||||
conn.async_exec("SET statement_timeout to 5000")
|
||||
conn.close
|
||||
end
|
||||
|
||||
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
|
||||
end
|
||||
end
|
||||
|
||||
context "transaction mode" do
|
||||
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction") }
|
||||
it "Clears state before connection checkin" do
|
||||
# Both modes of operation should not raise
|
||||
# ERROR: prepared statement "prepared_q" already exists
|
||||
15.times do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("PREPARE prepared_q (int) AS SELECT $1")
|
||||
conn.close
|
||||
end
|
||||
|
||||
15.times do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.prepare("prepared_q", "SELECT $1")
|
||||
conn.close
|
||||
end
|
||||
end
|
||||
|
||||
it "Does not send DISCARD ALL unless necessary" do
|
||||
10.times do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("SET SERVER ROLE to 'primary'")
|
||||
conn.async_exec("SELECT 1")
|
||||
conn.exec_params("SELECT $1", [1])
|
||||
conn.close
|
||||
end
|
||||
|
||||
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
|
||||
|
||||
10.times do
|
||||
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("SET SERVER ROLE to 'primary'")
|
||||
conn.async_exec("SELECT 1")
|
||||
conn.async_exec("SET statement_timeout to 5000")
|
||||
conn.close
|
||||
end
|
||||
|
||||
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -1,81 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
require_relative 'spec_helper'
|
||||
|
||||
|
||||
describe "Routing" do
|
||||
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
|
||||
after do
|
||||
processes.all_databases.map(&:reset)
|
||||
processes.pgcat.shutdown
|
||||
end
|
||||
|
||||
describe "SET ROLE" do
|
||||
context "primary" do
|
||||
it "routes queries only to primary" do
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("SET SERVER ROLE to 'primary'")
|
||||
|
||||
query_count = 30
|
||||
failed_count = 0
|
||||
|
||||
query_count.times do
|
||||
conn.async_exec("SELECT 1 + 2")
|
||||
rescue
|
||||
failed_count += 1
|
||||
end
|
||||
|
||||
expect(failed_count).to eq(0)
|
||||
processes.replicas.map(&:count_select_1_plus_2).each do |instance_share|
|
||||
expect(instance_share).to eq(0)
|
||||
end
|
||||
|
||||
expect(processes.primary.count_select_1_plus_2).to eq(query_count)
|
||||
end
|
||||
end
|
||||
context "replica" do
|
||||
it "routes queries only to replicas" do
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("SET SERVER ROLE to 'replica'")
|
||||
|
||||
expected_share = QUERY_COUNT / processes.replicas.count
|
||||
failed_count = 0
|
||||
|
||||
QUERY_COUNT.times do
|
||||
conn.async_exec("SELECT 1 + 2")
|
||||
rescue
|
||||
failed_count += 1
|
||||
end
|
||||
|
||||
expect(failed_count).to eq(0)
|
||||
|
||||
processes.replicas.map(&:count_select_1_plus_2).each do |instance_share|
|
||||
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
||||
end
|
||||
|
||||
expect(processes.primary.count_select_1_plus_2).to eq(0)
|
||||
end
|
||||
end
|
||||
|
||||
context "any" do
|
||||
it "routes queries to all instances" do
|
||||
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
|
||||
conn.async_exec("SET SERVER ROLE to 'any'")
|
||||
|
||||
expected_share = QUERY_COUNT / processes.all_databases.count
|
||||
failed_count = 0
|
||||
|
||||
QUERY_COUNT.times do
|
||||
conn.async_exec("SELECT 1 + 2")
|
||||
rescue
|
||||
failed_count += 1
|
||||
end
|
||||
|
||||
expect(failed_count).to eq(0)
|
||||
|
||||
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
|
||||
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -1,21 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
require 'pg'
|
||||
require_relative 'helpers/pgcat_helper'
|
||||
|
||||
QUERY_COUNT = 300
|
||||
MARGIN_OF_ERROR = 0.30
|
||||
|
||||
def with_captured_stdout_stderr
|
||||
sout = STDOUT.clone
|
||||
serr = STDERR.clone
|
||||
STDOUT.reopen("/tmp/out.txt", "w+")
|
||||
STDERR.reopen("/tmp/err.txt", "w+")
|
||||
STDOUT.sync = true
|
||||
STDERR.sync = true
|
||||
yield
|
||||
return File.read('/tmp/out.txt'), File.read('/tmp/err.txt')
|
||||
ensure
|
||||
STDOUT.reopen(sout)
|
||||
STDERR.reopen(serr)
|
||||
end
|
||||
@@ -1,6 +1,10 @@
|
||||
# frozen_string_literal: true
|
||||
require 'pg'
|
||||
|
||||
require 'active_record'
|
||||
require 'pg'
|
||||
require 'toml'
|
||||
|
||||
$stdout.sync = true
|
||||
|
||||
# Uncomment these two to see all queries.
|
||||
# ActiveRecord.verbose_query_logs = true
|
||||
@@ -111,3 +115,89 @@ begin
|
||||
rescue ActiveRecord::StatementInvalid
|
||||
puts 'OK'
|
||||
end
|
||||
|
||||
# Test evil clients
|
||||
def poorly_behaved_client
|
||||
conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat")
|
||||
conn.async_exec 'BEGIN'
|
||||
conn.async_exec 'SELECT 1'
|
||||
|
||||
conn.close
|
||||
puts 'Bad client ok'
|
||||
end
|
||||
|
||||
25.times do
|
||||
poorly_behaved_client
|
||||
end
|
||||
|
||||
|
||||
def test_server_parameters
|
||||
server_conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat")
|
||||
raise StandardError, "Bad server version" if server_conn.server_version == 0
|
||||
server_conn.close
|
||||
|
||||
admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat")
|
||||
raise StandardError, "Bad server version" if admin_conn.server_version == 0
|
||||
admin_conn.close
|
||||
|
||||
puts 'Server parameters ok'
|
||||
end
|
||||
|
||||
|
||||
class ConfigEditor
|
||||
def initialize
|
||||
@original_config_text = File.read('../../.circleci/pgcat.toml')
|
||||
text_to_load = @original_config_text.gsub("5432", "\"5432\"")
|
||||
|
||||
@original_configs = TOML.load(text_to_load)
|
||||
end
|
||||
|
||||
def original_configs
|
||||
TOML.load(TOML::Generator.new(@original_configs).body)
|
||||
end
|
||||
|
||||
def with_modified_configs(new_configs)
|
||||
text_to_write = TOML::Generator.new(new_configs).body
|
||||
text_to_write = text_to_write.gsub("\"5432\"", "5432")
|
||||
File.write('../../.circleci/pgcat.toml', text_to_write)
|
||||
yield
|
||||
ensure
|
||||
File.write('../../.circleci/pgcat.toml', @original_config_text)
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
|
||||
def test_reload_pool_recycling
|
||||
admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat")
|
||||
server_conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat")
|
||||
|
||||
server_conn.async_exec("BEGIN")
|
||||
conf_editor = ConfigEditor.new
|
||||
new_configs = conf_editor.original_configs
|
||||
|
||||
# swap shards
|
||||
new_configs["pools"]["sharded_db"]["shards"]["0"]["database"] = "shard1"
|
||||
new_configs["pools"]["sharded_db"]["shards"]["1"]["database"] = "shard0"
|
||||
|
||||
raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard0'
|
||||
conf_editor.with_modified_configs(new_configs) { admin_conn.async_exec("RELOAD") }
|
||||
raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard0'
|
||||
server_conn.async_exec("COMMIT;")
|
||||
|
||||
# Transaction finished, client should get new configs
|
||||
raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard1'
|
||||
server_conn.close()
|
||||
|
||||
# New connection should get new configs
|
||||
server_conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat")
|
||||
raise StandardError if server_conn.async_exec("SELECT current_database();")[0]["current_database"] != 'shard1'
|
||||
|
||||
ensure
|
||||
admin_conn.async_exec("RELOAD") # Go back to old state
|
||||
admin_conn.close
|
||||
server_conn.close
|
||||
puts "Pool Recycling okay!"
|
||||
end
|
||||
|
||||
test_reload_pool_recycling
|
||||
|
||||
@@ -70,35 +70,23 @@ GRANT CONNECT ON DATABASE shard2 TO other_user;
|
||||
GRANT CONNECT ON DATABASE some_db TO simple_user;
|
||||
|
||||
\c shard0
|
||||
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
|
||||
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO sharding_user;
|
||||
GRANT ALL ON SCHEMA public TO sharding_user;
|
||||
GRANT ALL ON TABLE data TO sharding_user;
|
||||
GRANT ALL ON SCHEMA public TO other_user;
|
||||
GRANT ALL ON TABLE data TO other_user;
|
||||
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO other_user;
|
||||
|
||||
\c shard1
|
||||
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
|
||||
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO sharding_user;
|
||||
GRANT ALL ON SCHEMA public TO sharding_user;
|
||||
GRANT ALL ON TABLE data TO sharding_user;
|
||||
GRANT ALL ON SCHEMA public TO other_user;
|
||||
GRANT ALL ON TABLE data TO other_user;
|
||||
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO other_user;
|
||||
|
||||
|
||||
\c shard2
|
||||
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
|
||||
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO sharding_user;
|
||||
GRANT ALL ON SCHEMA public TO sharding_user;
|
||||
GRANT ALL ON TABLE data TO sharding_user;
|
||||
GRANT ALL ON SCHEMA public TO other_user;
|
||||
GRANT ALL ON TABLE data TO other_user;
|
||||
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO other_user;
|
||||
|
||||
\c some_db
|
||||
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
|
||||
GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO simple_user;
|
||||
GRANT ALL ON SCHEMA public TO simple_user;
|
||||
GRANT ALL ON TABLE data TO simple_user;
|
||||
|
||||
Reference in New Issue
Block a user