Compare commits

...

9 Commits

Author SHA1 Message Date
Lev Kokotov
d6a13d047d Drop in-transaction connections for now 2022-08-18 13:31:15 -07:00
zainkabani
5948fef6cf Minor Refactoring of re-used code and server stat reporting (#129)
* Minor changes to stats reporting and recduce re-used code

* fmt
2022-08-18 05:12:38 -07:00
Mostafa Abdelraouf
790898c20e Add pool name and username to address object (#128)
* Add pool name and username to address object

* Fix address name

* fmt
2022-08-17 08:40:47 -07:00
Pradeep Chhetri
d64f6793c1 Minor cleanup in admin command (#126)
* Minor cleanup in admin command

* Typo correction

* fix when the admin query is ending with semicolon
2022-08-16 10:01:46 -07:00
Lev Kokotov
cea35db35c Fix lost statistics (#125)
* Lost events

* more logging
2022-08-15 23:54:49 -07:00
Mostafa Abdelraouf
a3aefabb47 Add cl_idle to SHOW POOLS (#124) 2022-08-15 20:51:37 -07:00
Lev Kokotov
3285006440 Statement timeout + replica imbalance fix (#122)
* Statement timeout

* send error message too

* Correct error messages

* Fix replica inbalance

* disable stmt timeout by default

* Redundant mark_bad

* revert healthcheck delay

* tests

* set it to 0

* reload config again
2022-08-13 13:45:58 -07:00
Pradeep Chhetri
52303cc808 Make prometheus port configurable (#121)
* Make prometheus port configurable

* Update circleci config
2022-08-13 10:25:14 -07:00
Lev Kokotov
be254cedd9 Fix debug log (#120) 2022-08-11 22:47:47 -07:00
14 changed files with 277 additions and 155 deletions

View File

@@ -11,9 +11,12 @@ host = "0.0.0.0"
# Port to run on, same as PgBouncer used in this example. # Port to run on, same as PgBouncer used in this example.
port = 6432 port = 6432
# enable prometheus exporter on port 9930 # Whether to enable prometheus exporter or not.
enable_prometheus_exporter = true enable_prometheus_exporter = true
# Port at which prometheus exporter listens on.
prometheus_exporter_port = 9930
# How long to wait before aborting a server connection (ms). # How long to wait before aborting a server connection (ms).
connect_timeout = 100 connect_timeout = 100
@@ -88,11 +91,13 @@ password = "sharding_user"
# The maximum number of connection from a single Pgcat process to any database in the cluster # The maximum number of connection from a single Pgcat process to any database in the cluster
# is the sum of pool_size across all users. # is the sum of pool_size across all users.
pool_size = 9 pool_size = 9
statement_timeout = 0
[pools.sharded_db.users.1] [pools.sharded_db.users.1]
username = "other_user" username = "other_user"
password = "other_user" password = "other_user"
pool_size = 21 pool_size = 21
statement_timeout = 30000
# Shard 0 # Shard 0
[pools.sharded_db.shards.0] [pools.sharded_db.shards.0]
@@ -130,6 +135,7 @@ sharding_function = "pg_bigint_hash"
username = "simple_user" username = "simple_user"
password = "simple_user" password = "simple_user"
pool_size = 5 pool_size = 5
statement_timeout = 30000
[pools.simple_db.shards.0] [pools.simple_db.shards.0]
servers = [ servers = [

View File

@@ -66,6 +66,18 @@ psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_te
# Replica/primary selection & more sharding tests # Replica/primary selection & more sharding tests
psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null
# Statement timeout tests
sed -i 's/statement_timeout = 0/statement_timeout = 100/' .circleci/pgcat.toml
kill -SIGHUP $(pgrep pgcat) # Reload config
sleep 0.2
# This should timeout
(! psql -U sharding_user -e -h 127.0.0.1 -p 6432 -c 'select pg_sleep(0.5)')
# Disable statement timeout
sed -i 's/statement_timeout = 100/statement_timeout = 0/' .circleci/pgcat.toml
kill -SIGHUP $(pgrep pgcat) # Reload config again
# #
# ActiveRecord tests # ActiveRecord tests
# #

View File

@@ -38,30 +38,34 @@ psql -h 127.0.0.1 -p 6432 -c 'SELECT 1'
### Config ### Config
| **Name** | **Description** | **Examples** | | **Name** | **Description** | **Examples** |
|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------| |------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------|
| **`general`** | | | | **`general`** | | |
| `host` | The pooler will run on this host, 0.0.0.0 means accessible from everywhere. | `0.0.0.0` | | `host` | The pooler will run on this host, 0.0.0.0 means accessible from everywhere. | `0.0.0.0` |
| `port` | The pooler will run on this port. | `6432` | | `port` | The pooler will run on this port. | `6432` |
| `pool_size` | Maximum allowed server connections per pool. Pools are separated for each user/shard/server role. The connections are allocated as needed. | `15` | | `enable_prometheus_exporter` | Enable prometheus exporter which will export metrics in prometheus exposition format. | `true` |
| `pool_mode` | The pool mode to use, i.e. `session` or `transaction`. | `transaction` | | `prometheus_exporter_port` | Port at which prometheus exporter listens on. | `9930` |
| `connect_timeout` | Maximum time to establish a connection to a server (milliseconds). If reached, the server is banned and the next target is attempted. | `5000` | | `pool_size` | Maximum allowed server connections per pool. Pools are separated for each user/shard/server role. The connections are allocated as needed. | `15` |
| `healthcheck_timeout` | Maximum time to pass a health check (`SELECT 1`, milliseconds). If reached, the server is banned and the next target is attempted. | `1000` | | `pool_mode` | The pool mode to use, i.e. `session` or `transaction`. | `transaction` |
| `shutdown_timeout` | Maximum time to give clients during shutdown before forcibly killing client connections (ms). | `60000` | | `connect_timeout` | Maximum time to establish a connection to a server (milliseconds). If reached, the server is banned and the next target is attempted. | `5000` |
| `healthcheck_delay` | How long to keep connection available for immediate re-use, without running a healthcheck query on it | `30000` | | `healthcheck_timeout` | Maximum time to pass a health check (`SELECT 1`, milliseconds). If reached, the server is banned and the next target is attempted. | `1000` |
| `ban_time` | Ban time for a server (seconds). It won't be allowed to serve transactions until the ban expires; failover targets will be used instead. | `60` | | `shutdown_timeout` | Maximum time to give clients during shutdown before forcibly killing client connections (ms). | `60000` |
| | | | | `healthcheck_delay` | How long to keep connection available for immediate re-use, without running a healthcheck query on it | `30000` |
| **`user`** | | | | `ban_time` | Ban time for a server (seconds). It won't be allowed to serve transactions until the ban expires; failover targets will be used instead. | `60` |
| `name` | The user name. | `sharding_user` | | `autoreload` | Enable auto-reload of config after fixed time-interval. | `false` |
| `password` | The user password in plaintext. | `hunter2` | | | | |
| | | | | **`user`** | | |
| **`shards`** | Shards are numerically numbered starting from 0; the order in the config is preserved by the pooler to route queries accordingly. | `[shards.0]` | | `name` | The user name. | `sharding_user` |
| `servers` | List of servers to connect to and their roles. A server is: `[host, port, role]`, where `role` is either `primary` or `replica`. | `["127.0.0.1", 5432, "primary"]` | | `password` | The user password in plaintext. | `hunter2` |
| `database` | The name of the database to connect to. This is the same on all servers that are part of one shard. | | | | | |
| **`query_router`** | | | | **`shards`** | Shards are numerically numbered starting from 0; the order in the config is preserved by the pooler to route queries accordingly. | `[shards.0]` |
| `default_role` | Traffic is routed to this role by default (round-robin), unless the client specifies otherwise. Default is `any`, for any role available. | `any`, `primary`, `replica` | | `servers` | List of servers to connect to and their roles. A server is: `[host, port, role]`, where `role` is either `primary` or `replica`. | `["127.0.0.1", 5432, "primary"]` |
| `query_parser_enabled` | Enable the query parser which will inspect incoming queries and route them to a primary or replicas. | `false` | | `database` | The name of the database to connect to. This is the same on all servers that are part of one shard. | |
| `primary_reads_enabled` | Enable this to allow read queries on the primary; otherwise read queries are routed to the replicas. | `true` | | | | |
| **`query_router`** | | |
| `default_role` | Traffic is routed to this role by default (round-robin), unless the client specifies otherwise. Default is `any`, for any role available. | `any`, `primary`, `replica` |
| `query_parser_enabled` | Enable the query parser which will inspect incoming queries and route them to a primary or replicas. | `false` |
| `primary_reads_enabled` | Enable this to allow read queries on the primary; otherwise read queries are routed to the replicas. | `true` |
## Local development ## Local development

View File

@@ -1,7 +1,7 @@
version: "3" version: "3"
services: services:
postgres: postgres:
image: postgres:13 image: postgres:14
environment: environment:
POSTGRES_PASSWORD: postgres POSTGRES_PASSWORD: postgres
POSTGRES_HOST_AUTH_METHOD: md5 POSTGRES_HOST_AUTH_METHOD: md5

View File

@@ -11,9 +11,12 @@ host = "0.0.0.0"
# Port to run on, same as PgBouncer used in this example. # Port to run on, same as PgBouncer used in this example.
port = 6432 port = 6432
# enable prometheus exporter on port 9930 # Whether to enable prometheus exporter or not.
enable_prometheus_exporter = true enable_prometheus_exporter = true
# Port at which prometheus exporter listens on.
prometheus_exporter_port = 9930
# How long to wait before aborting a server connection (ms). # How long to wait before aborting a server connection (ms).
connect_timeout = 5000 connect_timeout = 5000
@@ -89,10 +92,14 @@ password = "postgres"
# is the sum of pool_size across all users. # is the sum of pool_size across all users.
pool_size = 9 pool_size = 9
# Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
statement_timeout = 0
[pools.sharded.users.1] [pools.sharded.users.1]
username = "postgres" username = "postgres"
password = "postgres" password = "postgres"
pool_size = 21 pool_size = 21
statement_timeout = 15000
# Shard 0 # Shard 0
[pools.sharded.shards.0] [pools.sharded.shards.0]
@@ -130,6 +137,7 @@ sharding_function = "pg_bigint_hash"
username = "postgres" username = "postgres"
password = "postgres" password = "postgres"
pool_size = 5 pool_size = 5
statement_timeout = 0
[pools.simple_db.shards.0] [pools.simple_db.shards.0]
servers = [ servers = [

View File

@@ -11,9 +11,12 @@ host = "0.0.0.0"
# Port to run on, same as PgBouncer used in this example. # Port to run on, same as PgBouncer used in this example.
port = 6432 port = 6432
# enable prometheus exporter on port 9930 # Whether to enable prometheus exporter or not.
enable_prometheus_exporter = true enable_prometheus_exporter = true
# Port at which prometheus exporter listens on.
prometheus_exporter_port = 9930
# How long to wait before aborting a server connection (ms). # How long to wait before aborting a server connection (ms).
connect_timeout = 5000 connect_timeout = 5000
@@ -89,10 +92,14 @@ password = "sharding_user"
# is the sum of pool_size across all users. # is the sum of pool_size across all users.
pool_size = 9 pool_size = 9
# Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
statement_timeout = 0
[pools.sharded_db.users.1] [pools.sharded_db.users.1]
username = "other_user" username = "other_user"
password = "other_user" password = "other_user"
pool_size = 21 pool_size = 21
statement_timeout = 15000
# Shard 0 # Shard 0
[pools.sharded_db.shards.0] [pools.sharded_db.shards.0]
@@ -130,6 +137,7 @@ sharding_function = "pg_bigint_hash"
username = "simple_user" username = "simple_user"
password = "simple_user" password = "simple_user"
pool_size = 5 pool_size = 5
statement_timeout = 0
[pools.simple_db.shards.0] [pools.simple_db.shards.0]
servers = [ servers = [

View File

@@ -44,32 +44,45 @@ where
trace!("Admin query: {}", query); trace!("Admin query: {}", query);
if query.starts_with("SHOW STATS") { let query_parts: Vec<&str> = query.trim_end_matches(';').split_whitespace().collect();
trace!("SHOW STATS");
show_stats(stream).await match query_parts[0] {
} else if query.starts_with("RELOAD") { "RELOAD" => {
trace!("RELOAD"); trace!("RELOAD");
reload(stream, client_server_map).await reload(stream, client_server_map).await
} else if query.starts_with("SHOW CONFIG") { }
trace!("SHOW CONFIG"); "SET" => {
show_config(stream).await trace!("SET");
} else if query.starts_with("SHOW DATABASES") { ignore_set(stream).await
trace!("SHOW DATABASES"); }
show_databases(stream).await "SHOW" => match query_parts[1] {
} else if query.starts_with("SHOW POOLS") { "CONFIG" => {
trace!("SHOW POOLS"); trace!("SHOW CONFIG");
show_pools(stream).await show_config(stream).await
} else if query.starts_with("SHOW LISTS") { }
trace!("SHOW LISTS"); "DATABASES" => {
show_lists(stream).await trace!("SHOW DATABASES");
} else if query.starts_with("SHOW VERSION") { show_databases(stream).await
trace!("SHOW VERSION"); }
show_version(stream).await "LISTS" => {
} else if query.starts_with("SET ") { trace!("SHOW LISTS");
trace!("SET"); show_lists(stream).await
ignore_set(stream).await }
} else { "POOLS" => {
error_response(stream, "Unsupported query against the admin database").await trace!("SHOW POOLS");
show_pools(stream).await
}
"STATS" => {
trace!("SHOW STATS");
show_stats(stream).await
}
"VERSION" => {
trace!("SHOW VERSION");
show_version(stream).await
}
_ => error_response(stream, "Unsupported SHOW query against the admin database").await,
},
_ => error_response(stream, "Unsupported query against the admin database").await,
} }
} }
@@ -174,6 +187,7 @@ where
let columns = vec![ let columns = vec![
("database", DataType::Text), ("database", DataType::Text),
("user", DataType::Text), ("user", DataType::Text),
("cl_idle", DataType::Numeric),
("cl_active", DataType::Numeric), ("cl_active", DataType::Numeric),
("cl_waiting", DataType::Numeric), ("cl_waiting", DataType::Numeric),
("cl_cancel_req", DataType::Numeric), ("cl_cancel_req", DataType::Numeric),

View File

@@ -499,7 +499,7 @@ where
// The query router determines where the query is going to go, // The query router determines where the query is going to go,
// e.g. primary, replica, which shard. // e.g. primary, replica, which shard.
let mut query_router = QueryRouter::new(); let mut query_router = QueryRouter::new();
let mut round_robin = 0; let mut round_robin = rand::random();
// Our custom protocol loop. // Our custom protocol loop.
// We expect the client to either start a transaction with regular queries // We expect the client to either start a transaction with regular queries
@@ -667,7 +667,6 @@ where
.client_disconnecting(self.process_id, last_address_id); .client_disconnecting(self.process_id, last_address_id);
} }
self.stats.client_active(self.process_id, address.id); self.stats.client_active(self.process_id, address.id);
self.stats.server_active(server.process_id(), address.id);
self.last_address_id = Some(address.id); self.last_address_id = Some(address.id);
self.last_server_id = Some(server.process_id()); self.last_server_id = Some(server.process_id());
@@ -731,44 +730,16 @@ where
'Q' => { 'Q' => {
debug!("Sending query to server"); debug!("Sending query to server");
self.send_server_message( self.send_and_receive_loop(
server, code,
original, original,
server,
&address, &address,
query_router.shard(), query_router.shard(),
&pool, &pool,
) )
.await?; .await?;
// Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks.
loop {
let response = self
.receive_server_message(
server,
&address,
query_router.shard(),
&pool,
)
.await?;
// Send server reply to the client.
match write_all_half(&mut self.write, response).await {
Ok(_) => (),
Err(err) => {
server.mark_bad();
return Err(err);
}
};
if !server.is_data_available() {
break;
}
}
// Report query executed statistics.
self.stats.query(self.process_id, address.id);
if !server.in_transaction() { if !server.in_transaction() {
// Report transaction executed statistics. // Report transaction executed statistics.
self.stats.transaction(self.process_id, address.id); self.stats.transaction(self.process_id, address.id);
@@ -776,7 +747,6 @@ where
// Release server back to the pool if we are in transaction mode. // Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects. // If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode { if self.transaction_mode {
self.stats.server_idle(server.process_id(), address.id);
break; break;
} }
} }
@@ -789,9 +759,13 @@ where
// Pgbouncer closes the connection which leads to // Pgbouncer closes the connection which leads to
// connection thrashing when clients misbehave. // connection thrashing when clients misbehave.
if server.in_transaction() { if server.in_transaction() {
server.query("ROLLBACK").await?; // server.query("ROLLBACK").await?;
server.query("DISCARD ALL").await?; // server.query("DISCARD ALL").await?;
server.set_name("pgcat").await?; // server.set_name("pgcat").await?;
// TODO: Figure out a clever way to ensure
// the server has no more messages for us.
server.mark_bad();
} }
self.release(); self.release();
@@ -830,9 +804,10 @@ where
self.buffer.put(&original[..]); self.buffer.put(&original[..]);
self.send_server_message( self.send_and_receive_loop(
server, code,
self.buffer.clone(), self.buffer.clone(),
server,
&address, &address,
query_router.shard(), query_router.shard(),
&pool, &pool,
@@ -841,41 +816,12 @@ where
self.buffer.clear(); self.buffer.clear();
// Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks.
loop {
let response = self
.receive_server_message(
server,
&address,
query_router.shard(),
&pool,
)
.await?;
match write_all_half(&mut self.write, response).await {
Ok(_) => (),
Err(err) => {
server.mark_bad();
return Err(err);
}
};
if !server.is_data_available() {
break;
}
}
// Report query executed statistics.
self.stats.query(self.process_id, address.id);
if !server.in_transaction() { if !server.in_transaction() {
self.stats.transaction(self.process_id, address.id); self.stats.transaction(self.process_id, address.id);
// Release server back to the pool if we are in transaction mode. // Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects. // If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode { if self.transaction_mode {
self.stats.server_idle(server.process_id(), address.id);
break; break;
} }
} }
@@ -925,7 +871,6 @@ where
// Release server back to the pool if we are in transaction mode. // Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects. // If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode { if self.transaction_mode {
self.stats.server_idle(server.process_id(), address.id);
break; break;
} }
} }
@@ -941,6 +886,7 @@ where
// The server is no longer bound to us, we can't cancel it's queries anymore. // The server is no longer bound to us, we can't cancel it's queries anymore.
debug!("Releasing server back into the pool"); debug!("Releasing server back into the pool");
self.stats.server_idle(server.process_id(), address.id);
self.release(); self.release();
self.stats.client_idle(self.process_id, address.id); self.stats.client_idle(self.process_id, address.id);
} }
@@ -952,6 +898,46 @@ where
guard.remove(&(self.process_id, self.secret_key)); guard.remove(&(self.process_id, self.secret_key));
} }
async fn send_and_receive_loop(
&mut self,
code: char,
message: BytesMut,
server: &mut Server,
address: &Address,
shard: usize,
pool: &ConnectionPool,
) -> Result<(), Error> {
debug!("Sending {} to server", code);
self.send_server_message(server, message, &address, shard, &pool)
.await?;
// Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks.
loop {
let response = self
.receive_server_message(server, &address, shard, &pool)
.await?;
match write_all_half(&mut self.write, response).await {
Ok(_) => (),
Err(err) => {
server.mark_bad();
return Err(err);
}
};
if !server.is_data_available() {
break;
}
}
// Report query executed statistics.
self.stats.query(self.process_id, address.id);
Ok(())
}
async fn send_server_message( async fn send_server_message(
&self, &self,
server: &mut Server, server: &mut Server,
@@ -970,17 +956,54 @@ where
} }
async fn receive_server_message( async fn receive_server_message(
&self, &mut self,
server: &mut Server, server: &mut Server,
address: &Address, address: &Address,
shard: usize, shard: usize,
pool: &ConnectionPool, pool: &ConnectionPool,
) -> Result<BytesMut, Error> { ) -> Result<BytesMut, Error> {
match server.recv().await { if pool.settings.user.statement_timeout > 0 {
Ok(message) => Ok(message), match tokio::time::timeout(
Err(err) => { tokio::time::Duration::from_millis(pool.settings.user.statement_timeout),
pool.ban(address, shard, self.process_id); server.recv(),
Err(err) )
.await
{
Ok(result) => match result {
Ok(message) => Ok(message),
Err(err) => {
pool.ban(address, shard, self.process_id);
error_response_terminal(
&mut self.write,
&format!("error receiving data from server: {:?}", err),
)
.await?;
Err(err)
}
},
Err(_) => {
error!(
"Statement timeout while talking to {:?} with user {}",
address, pool.settings.user.username
);
server.mark_bad();
pool.ban(address, shard, self.process_id);
error_response_terminal(&mut self.write, "pool statement timeout").await?;
Err(Error::StatementTimeout)
}
}
} else {
match server.recv().await {
Ok(message) => Ok(message),
Err(err) => {
pool.ban(address, shard, self.process_id);
error_response_terminal(
&mut self.write,
&format!("error receiving data from server: {:?}", err),
)
.await?;
Err(err)
}
} }
} }
} }

View File

@@ -64,6 +64,8 @@ pub struct Address {
pub database: String, pub database: String,
pub role: Role, pub role: Role,
pub replica_number: usize, pub replica_number: usize,
pub username: String,
pub poolname: String,
} }
impl Default for Address { impl Default for Address {
@@ -76,6 +78,8 @@ impl Default for Address {
replica_number: 0, replica_number: 0,
database: String::from("database"), database: String::from("database"),
role: Role::Replica, role: Role::Replica,
username: String::from("username"),
poolname: String::from("poolname"),
} }
} }
} }
@@ -84,11 +88,11 @@ impl Address {
/// Address name (aka database) used in `SHOW STATS`, `SHOW DATABASES`, and `SHOW POOLS`. /// Address name (aka database) used in `SHOW STATS`, `SHOW DATABASES`, and `SHOW POOLS`.
pub fn name(&self) -> String { pub fn name(&self) -> String {
match self.role { match self.role {
Role::Primary => format!("{}_shard_{}_primary", self.database, self.shard), Role::Primary => format!("{}_shard_{}_primary", self.poolname, self.shard),
Role::Replica => format!( Role::Replica => format!(
"{}_shard_{}_replica_{}", "{}_shard_{}_replica_{}",
self.database, self.shard, self.replica_number self.poolname, self.shard, self.replica_number
), ),
} }
} }
@@ -100,6 +104,7 @@ pub struct User {
pub username: String, pub username: String,
pub password: String, pub password: String,
pub pool_size: u32, pub pool_size: u32,
pub statement_timeout: u64,
} }
impl Default for User { impl Default for User {
@@ -108,6 +113,7 @@ impl Default for User {
username: String::from("postgres"), username: String::from("postgres"),
password: String::new(), password: String::new(),
pool_size: 15, pool_size: 15,
statement_timeout: 0,
} }
} }
} }
@@ -118,6 +124,7 @@ pub struct General {
pub host: String, pub host: String,
pub port: i16, pub port: i16,
pub enable_prometheus_exporter: Option<bool>, pub enable_prometheus_exporter: Option<bool>,
pub prometheus_exporter_port: i16,
pub connect_timeout: u64, pub connect_timeout: u64,
pub healthcheck_timeout: u64, pub healthcheck_timeout: u64,
pub shutdown_timeout: u64, pub shutdown_timeout: u64,
@@ -136,6 +143,7 @@ impl Default for General {
host: String::from("localhost"), host: String::from("localhost"),
port: 5432, port: 5432,
enable_prometheus_exporter: Some(false), enable_prometheus_exporter: Some(false),
prometheus_exporter_port: 9930,
connect_timeout: 5000, connect_timeout: 5000,
healthcheck_timeout: 1000, healthcheck_timeout: 1000,
shutdown_timeout: 60000, shutdown_timeout: 60000,
@@ -271,6 +279,10 @@ impl From<&Config> for std::collections::HashMap<String, String> {
let mut static_settings = vec![ let mut static_settings = vec![
("host".to_string(), config.general.host.to_string()), ("host".to_string(), config.general.host.to_string()),
("port".to_string(), config.general.port.to_string()), ("port".to_string(), config.general.port.to_string()),
(
"prometheus_exporter_port".to_string(),
config.general.prometheus_exporter_port.to_string(),
),
( (
"connect_timeout".to_string(), "connect_timeout".to_string(),
config.general.connect_timeout.to_string(), config.general.connect_timeout.to_string(),
@@ -326,6 +338,7 @@ impl Config {
}; };
for (pool_name, pool_config) in &self.pools { for (pool_name, pool_config) in &self.pools {
// TODO: Make this output prettier (maybe a table?)
info!("--- Settings for pool {} ---", pool_name); info!("--- Settings for pool {} ---", pool_name);
info!( info!(
"Pool size from all users: {}", "Pool size from all users: {}",
@@ -340,8 +353,17 @@ impl Config {
info!("Sharding function: {}", pool_config.sharding_function); info!("Sharding function: {}", pool_config.sharding_function);
info!("Primary reads: {}", pool_config.primary_reads_enabled); info!("Primary reads: {}", pool_config.primary_reads_enabled);
info!("Query router: {}", pool_config.query_parser_enabled); info!("Query router: {}", pool_config.query_parser_enabled);
// TODO: Make this prettier.
info!("Number of shards: {}", pool_config.shards.len()); info!("Number of shards: {}", pool_config.shards.len());
info!("Number of users: {}", pool_config.users.len()); info!("Number of users: {}", pool_config.users.len());
for user in &pool_config.users {
info!(
"{} pool size: {}, statement timeout: {}",
user.1.username, user.1.pool_size, user.1.statement_timeout
);
}
} }
} }
} }

View File

@@ -11,4 +11,5 @@ pub enum Error {
AllServersDown, AllServersDown,
ClientError, ClientError,
TlsError, TlsError,
StatementTimeout,
} }

View File

@@ -99,7 +99,10 @@ async fn main() {
let config = get_config(); let config = get_config();
if let Some(true) = config.general.enable_prometheus_exporter { if let Some(true) = config.general.enable_prometheus_exporter {
let http_addr_str = format!("{}:{}", config.general.host, crate::prometheus::HTTP_PORT); let http_addr_str = format!(
"{}:{}",
config.general.host, config.general.prometheus_exporter_port
);
let http_addr = match SocketAddr::from_str(&http_addr_str) { let http_addr = match SocketAddr::from_str(&http_addr_str) {
Ok(addr) => addr, Ok(addr) => addr,
Err(err) => { Err(err) => {
@@ -130,7 +133,7 @@ async fn main() {
let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new())); let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new()));
// Statistics reporting. // Statistics reporting.
let (tx, rx) = mpsc::channel(100); let (tx, rx) = mpsc::channel(100_000);
REPORTER.store(Arc::new(Reporter::new(tx.clone()))); REPORTER.store(Arc::new(Reporter::new(tx.clone())));
// Connection pool that allows to query all shards and replicas. // Connection pool that allows to query all shards and replicas.

View File

@@ -114,12 +114,14 @@ impl ConnectionPool {
let address = Address { let address = Address {
id: address_id, id: address_id,
database: pool_name.clone(), database: shard.database.clone(),
host: server.0.clone(), host: server.0.clone(),
port: server.1.to_string(), port: server.1.to_string(),
role: role, role: role,
replica_number, replica_number,
shard: shard_idx.parse::<usize>().unwrap(), shard: shard_idx.parse::<usize>().unwrap(),
username: user_info.username.clone(),
poolname: pool_name.clone(),
}; };
address_id += 1; address_id += 1;
@@ -333,11 +335,12 @@ impl ConnectionPool {
if !require_healthcheck { if !require_healthcheck {
self.stats self.stats
.checkout_time(now.elapsed().as_micros(), process_id, address.id); .checkout_time(now.elapsed().as_micros(), process_id, address.id);
self.stats.server_idle(conn.process_id(), address.id); self.stats.server_active(conn.process_id(), address.id);
return Ok((conn, address.clone())); return Ok((conn, address.clone()));
} }
debug!("Running health check for replica {}, {:?}", index, address); debug!("Running health check on server {:?}", address);
self.stats.server_tested(server.process_id(), address.id); self.stats.server_tested(server.process_id(), address.id);
match tokio::time::timeout( match tokio::time::timeout(
@@ -351,7 +354,7 @@ impl ConnectionPool {
Ok(_) => { Ok(_) => {
self.stats self.stats
.checkout_time(now.elapsed().as_micros(), process_id, address.id); .checkout_time(now.elapsed().as_micros(), process_id, address.id);
self.stats.server_idle(conn.process_id(), address.id); self.stats.server_active(conn.process_id(), address.id);
return Ok((conn, address.clone())); return Ok((conn, address.clone()));
} }

View File

@@ -10,8 +10,6 @@ use crate::config::Address;
use crate::pool::get_all_pools; use crate::pool::get_all_pools;
use crate::stats::get_stats; use crate::stats::get_stats;
pub const HTTP_PORT: usize = 9930;
struct MetricHelpType { struct MetricHelpType {
help: &'static str, help: &'static str,
ty: &'static str, ty: &'static str,

View File

@@ -1,9 +1,10 @@
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
/// Statistics and reporting. /// Statistics and reporting.
use log::info; use log::{error, info, trace};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use parking_lot::Mutex; use parking_lot::Mutex;
use std::collections::HashMap; use std::collections::HashMap;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
use crate::pool::get_number_of_addresses; use crate::pool::get_number_of_addresses;
@@ -43,7 +44,7 @@ enum EventName {
/// Event data sent to the collector /// Event data sent to the collector
/// from clients and servers. /// from clients and servers.
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct Event { pub struct Event {
/// The name of the event being reported. /// The name of the event being reported.
name: EventName, name: EventName,
@@ -79,6 +80,25 @@ impl Reporter {
Reporter { tx: tx } Reporter { tx: tx }
} }
/// Send statistics to the task keeping track of stats.
fn send(&self, event: Event) {
let name = event.name;
let result = self.tx.try_send(event);
match result {
Ok(_) => trace!(
"{:?} event reported successfully, capacity: {}",
name,
self.tx.capacity()
),
Err(err) => match err {
TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name),
TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name),
},
};
}
/// Report a query executed by a client against /// Report a query executed by a client against
/// a server identified by the `address_id`. /// a server identified by the `address_id`.
pub fn query(&self, process_id: i32, address_id: usize) { pub fn query(&self, process_id: i32, address_id: usize) {
@@ -89,7 +109,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); self.send(event);
} }
/// Report a transaction executed by a client against /// Report a transaction executed by a client against
@@ -102,7 +122,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); self.send(event)
} }
/// Report data sent to a server identified by `address_id`. /// Report data sent to a server identified by `address_id`.
@@ -115,7 +135,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); self.send(event)
} }
/// Report data received from a server identified by `address_id`. /// Report data received from a server identified by `address_id`.
@@ -128,7 +148,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); self.send(event)
} }
/// Time spent waiting to get a healthy connection from the pool /// Time spent waiting to get a healthy connection from the pool
@@ -142,7 +162,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); self.send(event)
} }
/// Reports a client identified by `process_id` waiting for a connection /// Reports a client identified by `process_id` waiting for a connection
@@ -155,7 +175,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); self.send(event)
} }
/// Reports a client identified by `process_id` is done waiting for a connection /// Reports a client identified by `process_id` is done waiting for a connection
@@ -168,7 +188,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); self.send(event)
} }
/// Reports a client identified by `process_id` is done querying the server /// Reports a client identified by `process_id` is done querying the server
@@ -181,7 +201,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); self.send(event)
} }
/// Reports a client identified by `process_id` is disconecting from the pooler. /// Reports a client identified by `process_id` is disconecting from the pooler.
@@ -194,7 +214,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); self.send(event)
} }
/// Reports a server connection identified by `process_id` for /// Reports a server connection identified by `process_id` for
@@ -208,7 +228,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); self.send(event)
} }
/// Reports a server connection identified by `process_id` for /// Reports a server connection identified by `process_id` for
@@ -222,7 +242,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); self.send(event)
} }
/// Reports a server connection identified by `process_id` for /// Reports a server connection identified by `process_id` for
@@ -236,7 +256,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); self.send(event)
} }
/// Reports a server connection identified by `process_id` for /// Reports a server connection identified by `process_id` for
@@ -250,7 +270,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); self.send(event)
} }
/// Reports a server connection identified by `process_id` is disconecting from the pooler. /// Reports a server connection identified by `process_id` is disconecting from the pooler.
@@ -263,7 +283,7 @@ impl Reporter {
address_id: address_id, address_id: address_id,
}; };
let _ = self.tx.try_send(event); self.send(event)
} }
} }