mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-26 02:16:30 +00:00
Compare commits
11 Commits
levkk-fix-
...
levkk-log-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
61b9756ded | ||
|
|
2cd9e15849 | ||
|
|
fd57fae280 | ||
|
|
a460a645f5 | ||
|
|
f7d33fba7a | ||
|
|
d64f6793c1 | ||
|
|
cea35db35c | ||
|
|
a3aefabb47 | ||
|
|
3285006440 | ||
|
|
52303cc808 | ||
|
|
be254cedd9 |
@@ -11,9 +11,12 @@ host = "0.0.0.0"
|
|||||||
# Port to run on, same as PgBouncer used in this example.
|
# Port to run on, same as PgBouncer used in this example.
|
||||||
port = 6432
|
port = 6432
|
||||||
|
|
||||||
# enable prometheus exporter on port 9930
|
# Whether to enable prometheus exporter or not.
|
||||||
enable_prometheus_exporter = true
|
enable_prometheus_exporter = true
|
||||||
|
|
||||||
|
# Port at which prometheus exporter listens on.
|
||||||
|
prometheus_exporter_port = 9930
|
||||||
|
|
||||||
# How long to wait before aborting a server connection (ms).
|
# How long to wait before aborting a server connection (ms).
|
||||||
connect_timeout = 100
|
connect_timeout = 100
|
||||||
|
|
||||||
@@ -88,11 +91,13 @@ password = "sharding_user"
|
|||||||
# The maximum number of connection from a single Pgcat process to any database in the cluster
|
# The maximum number of connection from a single Pgcat process to any database in the cluster
|
||||||
# is the sum of pool_size across all users.
|
# is the sum of pool_size across all users.
|
||||||
pool_size = 9
|
pool_size = 9
|
||||||
|
statement_timeout = 0
|
||||||
|
|
||||||
[pools.sharded_db.users.1]
|
[pools.sharded_db.users.1]
|
||||||
username = "other_user"
|
username = "other_user"
|
||||||
password = "other_user"
|
password = "other_user"
|
||||||
pool_size = 21
|
pool_size = 21
|
||||||
|
statement_timeout = 30000
|
||||||
|
|
||||||
# Shard 0
|
# Shard 0
|
||||||
[pools.sharded_db.shards.0]
|
[pools.sharded_db.shards.0]
|
||||||
@@ -130,6 +135,7 @@ sharding_function = "pg_bigint_hash"
|
|||||||
username = "simple_user"
|
username = "simple_user"
|
||||||
password = "simple_user"
|
password = "simple_user"
|
||||||
pool_size = 5
|
pool_size = 5
|
||||||
|
statement_timeout = 30000
|
||||||
|
|
||||||
[pools.simple_db.shards.0]
|
[pools.simple_db.shards.0]
|
||||||
servers = [
|
servers = [
|
||||||
|
|||||||
@@ -66,6 +66,18 @@ psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_te
|
|||||||
# Replica/primary selection & more sharding tests
|
# Replica/primary selection & more sharding tests
|
||||||
psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null
|
psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null
|
||||||
|
|
||||||
|
# Statement timeout tests
|
||||||
|
sed -i 's/statement_timeout = 0/statement_timeout = 100/' .circleci/pgcat.toml
|
||||||
|
kill -SIGHUP $(pgrep pgcat) # Reload config
|
||||||
|
sleep 0.2
|
||||||
|
|
||||||
|
# This should timeout
|
||||||
|
(! psql -U sharding_user -e -h 127.0.0.1 -p 6432 -c 'select pg_sleep(0.5)')
|
||||||
|
|
||||||
|
# Disable statement timeout
|
||||||
|
sed -i 's/statement_timeout = 100/statement_timeout = 0/' .circleci/pgcat.toml
|
||||||
|
kill -SIGHUP $(pgrep pgcat) # Reload config again
|
||||||
|
|
||||||
#
|
#
|
||||||
# ActiveRecord tests
|
# ActiveRecord tests
|
||||||
#
|
#
|
||||||
|
|||||||
52
README.md
52
README.md
@@ -38,30 +38,34 @@ psql -h 127.0.0.1 -p 6432 -c 'SELECT 1'
|
|||||||
|
|
||||||
### Config
|
### Config
|
||||||
|
|
||||||
| **Name** | **Description** | **Examples** |
|
| **Name** | **Description** | **Examples** |
|
||||||
|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------|
|
|------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------|
|
||||||
| **`general`** | | |
|
| **`general`** | | |
|
||||||
| `host` | The pooler will run on this host, 0.0.0.0 means accessible from everywhere. | `0.0.0.0` |
|
| `host` | The pooler will run on this host, 0.0.0.0 means accessible from everywhere. | `0.0.0.0` |
|
||||||
| `port` | The pooler will run on this port. | `6432` |
|
| `port` | The pooler will run on this port. | `6432` |
|
||||||
| `pool_size` | Maximum allowed server connections per pool. Pools are separated for each user/shard/server role. The connections are allocated as needed. | `15` |
|
| `enable_prometheus_exporter` | Enable prometheus exporter which will export metrics in prometheus exposition format. | `true` |
|
||||||
| `pool_mode` | The pool mode to use, i.e. `session` or `transaction`. | `transaction` |
|
| `prometheus_exporter_port` | Port at which prometheus exporter listens on. | `9930` |
|
||||||
| `connect_timeout` | Maximum time to establish a connection to a server (milliseconds). If reached, the server is banned and the next target is attempted. | `5000` |
|
| `pool_size` | Maximum allowed server connections per pool. Pools are separated for each user/shard/server role. The connections are allocated as needed. | `15` |
|
||||||
| `healthcheck_timeout` | Maximum time to pass a health check (`SELECT 1`, milliseconds). If reached, the server is banned and the next target is attempted. | `1000` |
|
| `pool_mode` | The pool mode to use, i.e. `session` or `transaction`. | `transaction` |
|
||||||
| `shutdown_timeout` | Maximum time to give clients during shutdown before forcibly killing client connections (ms). | `60000` |
|
| `connect_timeout` | Maximum time to establish a connection to a server (milliseconds). If reached, the server is banned and the next target is attempted. | `5000` |
|
||||||
| `healthcheck_delay` | How long to keep connection available for immediate re-use, without running a healthcheck query on it | `30000` |
|
| `healthcheck_timeout` | Maximum time to pass a health check (`SELECT 1`, milliseconds). If reached, the server is banned and the next target is attempted. | `1000` |
|
||||||
| `ban_time` | Ban time for a server (seconds). It won't be allowed to serve transactions until the ban expires; failover targets will be used instead. | `60` |
|
| `shutdown_timeout` | Maximum time to give clients during shutdown before forcibly killing client connections (ms). | `60000` |
|
||||||
| | | |
|
| `healthcheck_delay` | How long to keep connection available for immediate re-use, without running a healthcheck query on it | `30000` |
|
||||||
| **`user`** | | |
|
| `ban_time` | Ban time for a server (seconds). It won't be allowed to serve transactions until the ban expires; failover targets will be used instead. | `60` |
|
||||||
| `name` | The user name. | `sharding_user` |
|
| `autoreload` | Enable auto-reload of config after fixed time-interval. | `false` |
|
||||||
| `password` | The user password in plaintext. | `hunter2` |
|
| | | |
|
||||||
| | | |
|
| **`user`** | | |
|
||||||
| **`shards`** | Shards are numerically numbered starting from 0; the order in the config is preserved by the pooler to route queries accordingly. | `[shards.0]` |
|
| `name` | The user name. | `sharding_user` |
|
||||||
| `servers` | List of servers to connect to and their roles. A server is: `[host, port, role]`, where `role` is either `primary` or `replica`. | `["127.0.0.1", 5432, "primary"]` |
|
| `password` | The user password in plaintext. | `hunter2` |
|
||||||
| `database` | The name of the database to connect to. This is the same on all servers that are part of one shard. | |
|
| | | |
|
||||||
| **`query_router`** | | |
|
| **`shards`** | Shards are numerically numbered starting from 0; the order in the config is preserved by the pooler to route queries accordingly. | `[shards.0]` |
|
||||||
| `default_role` | Traffic is routed to this role by default (round-robin), unless the client specifies otherwise. Default is `any`, for any role available. | `any`, `primary`, `replica` |
|
| `servers` | List of servers to connect to and their roles. A server is: `[host, port, role]`, where `role` is either `primary` or `replica`. | `["127.0.0.1", 5432, "primary"]` |
|
||||||
| `query_parser_enabled` | Enable the query parser which will inspect incoming queries and route them to a primary or replicas. | `false` |
|
| `database` | The name of the database to connect to. This is the same on all servers that are part of one shard. | |
|
||||||
| `primary_reads_enabled` | Enable this to allow read queries on the primary; otherwise read queries are routed to the replicas. | `true` |
|
| | | |
|
||||||
|
| **`query_router`** | | |
|
||||||
|
| `default_role` | Traffic is routed to this role by default (round-robin), unless the client specifies otherwise. Default is `any`, for any role available. | `any`, `primary`, `replica` |
|
||||||
|
| `query_parser_enabled` | Enable the query parser which will inspect incoming queries and route them to a primary or replicas. | `false` |
|
||||||
|
| `primary_reads_enabled` | Enable this to allow read queries on the primary; otherwise read queries are routed to the replicas. | `true` |
|
||||||
|
|
||||||
## Local development
|
## Local development
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
version: "3"
|
version: "3"
|
||||||
services:
|
services:
|
||||||
postgres:
|
postgres:
|
||||||
image: postgres:13
|
image: postgres:14
|
||||||
environment:
|
environment:
|
||||||
POSTGRES_PASSWORD: postgres
|
POSTGRES_PASSWORD: postgres
|
||||||
POSTGRES_HOST_AUTH_METHOD: md5
|
POSTGRES_HOST_AUTH_METHOD: md5
|
||||||
|
|||||||
@@ -11,9 +11,12 @@ host = "0.0.0.0"
|
|||||||
# Port to run on, same as PgBouncer used in this example.
|
# Port to run on, same as PgBouncer used in this example.
|
||||||
port = 6432
|
port = 6432
|
||||||
|
|
||||||
# enable prometheus exporter on port 9930
|
# Whether to enable prometheus exporter or not.
|
||||||
enable_prometheus_exporter = true
|
enable_prometheus_exporter = true
|
||||||
|
|
||||||
|
# Port at which prometheus exporter listens on.
|
||||||
|
prometheus_exporter_port = 9930
|
||||||
|
|
||||||
# How long to wait before aborting a server connection (ms).
|
# How long to wait before aborting a server connection (ms).
|
||||||
connect_timeout = 5000
|
connect_timeout = 5000
|
||||||
|
|
||||||
@@ -89,10 +92,14 @@ password = "postgres"
|
|||||||
# is the sum of pool_size across all users.
|
# is the sum of pool_size across all users.
|
||||||
pool_size = 9
|
pool_size = 9
|
||||||
|
|
||||||
|
# Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
||||||
|
statement_timeout = 0
|
||||||
|
|
||||||
[pools.sharded.users.1]
|
[pools.sharded.users.1]
|
||||||
username = "postgres"
|
username = "postgres"
|
||||||
password = "postgres"
|
password = "postgres"
|
||||||
pool_size = 21
|
pool_size = 21
|
||||||
|
statement_timeout = 15000
|
||||||
|
|
||||||
# Shard 0
|
# Shard 0
|
||||||
[pools.sharded.shards.0]
|
[pools.sharded.shards.0]
|
||||||
@@ -130,6 +137,7 @@ sharding_function = "pg_bigint_hash"
|
|||||||
username = "postgres"
|
username = "postgres"
|
||||||
password = "postgres"
|
password = "postgres"
|
||||||
pool_size = 5
|
pool_size = 5
|
||||||
|
statement_timeout = 0
|
||||||
|
|
||||||
[pools.simple_db.shards.0]
|
[pools.simple_db.shards.0]
|
||||||
servers = [
|
servers = [
|
||||||
|
|||||||
10
pgcat.toml
10
pgcat.toml
@@ -11,9 +11,12 @@ host = "0.0.0.0"
|
|||||||
# Port to run on, same as PgBouncer used in this example.
|
# Port to run on, same as PgBouncer used in this example.
|
||||||
port = 6432
|
port = 6432
|
||||||
|
|
||||||
# enable prometheus exporter on port 9930
|
# Whether to enable prometheus exporter or not.
|
||||||
enable_prometheus_exporter = true
|
enable_prometheus_exporter = true
|
||||||
|
|
||||||
|
# Port at which prometheus exporter listens on.
|
||||||
|
prometheus_exporter_port = 9930
|
||||||
|
|
||||||
# How long to wait before aborting a server connection (ms).
|
# How long to wait before aborting a server connection (ms).
|
||||||
connect_timeout = 5000
|
connect_timeout = 5000
|
||||||
|
|
||||||
@@ -89,10 +92,14 @@ password = "sharding_user"
|
|||||||
# is the sum of pool_size across all users.
|
# is the sum of pool_size across all users.
|
||||||
pool_size = 9
|
pool_size = 9
|
||||||
|
|
||||||
|
# Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
||||||
|
statement_timeout = 0
|
||||||
|
|
||||||
[pools.sharded_db.users.1]
|
[pools.sharded_db.users.1]
|
||||||
username = "other_user"
|
username = "other_user"
|
||||||
password = "other_user"
|
password = "other_user"
|
||||||
pool_size = 21
|
pool_size = 21
|
||||||
|
statement_timeout = 15000
|
||||||
|
|
||||||
# Shard 0
|
# Shard 0
|
||||||
[pools.sharded_db.shards.0]
|
[pools.sharded_db.shards.0]
|
||||||
@@ -130,6 +137,7 @@ sharding_function = "pg_bigint_hash"
|
|||||||
username = "simple_user"
|
username = "simple_user"
|
||||||
password = "simple_user"
|
password = "simple_user"
|
||||||
pool_size = 5
|
pool_size = 5
|
||||||
|
statement_timeout = 0
|
||||||
|
|
||||||
[pools.simple_db.shards.0]
|
[pools.simple_db.shards.0]
|
||||||
servers = [
|
servers = [
|
||||||
|
|||||||
66
src/admin.rs
66
src/admin.rs
@@ -44,32 +44,45 @@ where
|
|||||||
|
|
||||||
trace!("Admin query: {}", query);
|
trace!("Admin query: {}", query);
|
||||||
|
|
||||||
if query.starts_with("SHOW STATS") {
|
let query_parts: Vec<&str> = query.trim_end_matches(';').split_whitespace().collect();
|
||||||
trace!("SHOW STATS");
|
|
||||||
show_stats(stream).await
|
match query_parts[0] {
|
||||||
} else if query.starts_with("RELOAD") {
|
"RELOAD" => {
|
||||||
trace!("RELOAD");
|
trace!("RELOAD");
|
||||||
reload(stream, client_server_map).await
|
reload(stream, client_server_map).await
|
||||||
} else if query.starts_with("SHOW CONFIG") {
|
}
|
||||||
trace!("SHOW CONFIG");
|
"SET" => {
|
||||||
show_config(stream).await
|
trace!("SET");
|
||||||
} else if query.starts_with("SHOW DATABASES") {
|
ignore_set(stream).await
|
||||||
trace!("SHOW DATABASES");
|
}
|
||||||
show_databases(stream).await
|
"SHOW" => match query_parts[1] {
|
||||||
} else if query.starts_with("SHOW POOLS") {
|
"CONFIG" => {
|
||||||
trace!("SHOW POOLS");
|
trace!("SHOW CONFIG");
|
||||||
show_pools(stream).await
|
show_config(stream).await
|
||||||
} else if query.starts_with("SHOW LISTS") {
|
}
|
||||||
trace!("SHOW LISTS");
|
"DATABASES" => {
|
||||||
show_lists(stream).await
|
trace!("SHOW DATABASES");
|
||||||
} else if query.starts_with("SHOW VERSION") {
|
show_databases(stream).await
|
||||||
trace!("SHOW VERSION");
|
}
|
||||||
show_version(stream).await
|
"LISTS" => {
|
||||||
} else if query.starts_with("SET ") {
|
trace!("SHOW LISTS");
|
||||||
trace!("SET");
|
show_lists(stream).await
|
||||||
ignore_set(stream).await
|
}
|
||||||
} else {
|
"POOLS" => {
|
||||||
error_response(stream, "Unsupported query against the admin database").await
|
trace!("SHOW POOLS");
|
||||||
|
show_pools(stream).await
|
||||||
|
}
|
||||||
|
"STATS" => {
|
||||||
|
trace!("SHOW STATS");
|
||||||
|
show_stats(stream).await
|
||||||
|
}
|
||||||
|
"VERSION" => {
|
||||||
|
trace!("SHOW VERSION");
|
||||||
|
show_version(stream).await
|
||||||
|
}
|
||||||
|
_ => error_response(stream, "Unsupported SHOW query against the admin database").await,
|
||||||
|
},
|
||||||
|
_ => error_response(stream, "Unsupported query against the admin database").await,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -174,6 +187,7 @@ where
|
|||||||
let columns = vec![
|
let columns = vec![
|
||||||
("database", DataType::Text),
|
("database", DataType::Text),
|
||||||
("user", DataType::Text),
|
("user", DataType::Text),
|
||||||
|
("cl_idle", DataType::Numeric),
|
||||||
("cl_active", DataType::Numeric),
|
("cl_active", DataType::Numeric),
|
||||||
("cl_waiting", DataType::Numeric),
|
("cl_waiting", DataType::Numeric),
|
||||||
("cl_cancel_req", DataType::Numeric),
|
("cl_cancel_req", DataType::Numeric),
|
||||||
|
|||||||
@@ -499,7 +499,7 @@ where
|
|||||||
// The query router determines where the query is going to go,
|
// The query router determines where the query is going to go,
|
||||||
// e.g. primary, replica, which shard.
|
// e.g. primary, replica, which shard.
|
||||||
let mut query_router = QueryRouter::new();
|
let mut query_router = QueryRouter::new();
|
||||||
let mut round_robin = 0;
|
let mut round_robin = rand::random();
|
||||||
|
|
||||||
// Our custom protocol loop.
|
// Our custom protocol loop.
|
||||||
// We expect the client to either start a transaction with regular queries
|
// We expect the client to either start a transaction with regular queries
|
||||||
@@ -970,17 +970,54 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn receive_server_message(
|
async fn receive_server_message(
|
||||||
&self,
|
&mut self,
|
||||||
server: &mut Server,
|
server: &mut Server,
|
||||||
address: &Address,
|
address: &Address,
|
||||||
shard: usize,
|
shard: usize,
|
||||||
pool: &ConnectionPool,
|
pool: &ConnectionPool,
|
||||||
) -> Result<BytesMut, Error> {
|
) -> Result<BytesMut, Error> {
|
||||||
match server.recv().await {
|
if pool.settings.user.statement_timeout > 0 {
|
||||||
Ok(message) => Ok(message),
|
match tokio::time::timeout(
|
||||||
Err(err) => {
|
tokio::time::Duration::from_millis(pool.settings.user.statement_timeout),
|
||||||
pool.ban(address, shard, self.process_id);
|
server.recv(),
|
||||||
Err(err)
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(result) => match result {
|
||||||
|
Ok(message) => Ok(message),
|
||||||
|
Err(err) => {
|
||||||
|
pool.ban(address, shard, self.process_id);
|
||||||
|
error_response_terminal(
|
||||||
|
&mut self.write,
|
||||||
|
&format!("error receiving data from server: {:?}", err),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
Err(err)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(_) => {
|
||||||
|
error!(
|
||||||
|
"Statement timeout while talking to {:?} with user {}",
|
||||||
|
address, pool.settings.user.username
|
||||||
|
);
|
||||||
|
server.mark_bad();
|
||||||
|
pool.ban(address, shard, self.process_id);
|
||||||
|
error_response_terminal(&mut self.write, "pool statement timeout").await?;
|
||||||
|
Err(Error::StatementTimeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
match server.recv().await {
|
||||||
|
Ok(message) => Ok(message),
|
||||||
|
Err(err) => {
|
||||||
|
pool.ban(address, shard, self.process_id);
|
||||||
|
error_response_terminal(
|
||||||
|
&mut self.write,
|
||||||
|
&format!("error receiving data from server: {:?}", err),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
Err(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -100,6 +100,7 @@ pub struct User {
|
|||||||
pub username: String,
|
pub username: String,
|
||||||
pub password: String,
|
pub password: String,
|
||||||
pub pool_size: u32,
|
pub pool_size: u32,
|
||||||
|
pub statement_timeout: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for User {
|
impl Default for User {
|
||||||
@@ -108,6 +109,7 @@ impl Default for User {
|
|||||||
username: String::from("postgres"),
|
username: String::from("postgres"),
|
||||||
password: String::new(),
|
password: String::new(),
|
||||||
pool_size: 15,
|
pool_size: 15,
|
||||||
|
statement_timeout: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -118,6 +120,7 @@ pub struct General {
|
|||||||
pub host: String,
|
pub host: String,
|
||||||
pub port: i16,
|
pub port: i16,
|
||||||
pub enable_prometheus_exporter: Option<bool>,
|
pub enable_prometheus_exporter: Option<bool>,
|
||||||
|
pub prometheus_exporter_port: i16,
|
||||||
pub connect_timeout: u64,
|
pub connect_timeout: u64,
|
||||||
pub healthcheck_timeout: u64,
|
pub healthcheck_timeout: u64,
|
||||||
pub shutdown_timeout: u64,
|
pub shutdown_timeout: u64,
|
||||||
@@ -136,6 +139,7 @@ impl Default for General {
|
|||||||
host: String::from("localhost"),
|
host: String::from("localhost"),
|
||||||
port: 5432,
|
port: 5432,
|
||||||
enable_prometheus_exporter: Some(false),
|
enable_prometheus_exporter: Some(false),
|
||||||
|
prometheus_exporter_port: 9930,
|
||||||
connect_timeout: 5000,
|
connect_timeout: 5000,
|
||||||
healthcheck_timeout: 1000,
|
healthcheck_timeout: 1000,
|
||||||
shutdown_timeout: 60000,
|
shutdown_timeout: 60000,
|
||||||
@@ -271,6 +275,10 @@ impl From<&Config> for std::collections::HashMap<String, String> {
|
|||||||
let mut static_settings = vec![
|
let mut static_settings = vec![
|
||||||
("host".to_string(), config.general.host.to_string()),
|
("host".to_string(), config.general.host.to_string()),
|
||||||
("port".to_string(), config.general.port.to_string()),
|
("port".to_string(), config.general.port.to_string()),
|
||||||
|
(
|
||||||
|
"prometheus_exporter_port".to_string(),
|
||||||
|
config.general.prometheus_exporter_port.to_string(),
|
||||||
|
),
|
||||||
(
|
(
|
||||||
"connect_timeout".to_string(),
|
"connect_timeout".to_string(),
|
||||||
config.general.connect_timeout.to_string(),
|
config.general.connect_timeout.to_string(),
|
||||||
@@ -326,6 +334,7 @@ impl Config {
|
|||||||
};
|
};
|
||||||
|
|
||||||
for (pool_name, pool_config) in &self.pools {
|
for (pool_name, pool_config) in &self.pools {
|
||||||
|
// TODO: Make this output prettier (maybe a table?)
|
||||||
info!("--- Settings for pool {} ---", pool_name);
|
info!("--- Settings for pool {} ---", pool_name);
|
||||||
info!(
|
info!(
|
||||||
"Pool size from all users: {}",
|
"Pool size from all users: {}",
|
||||||
@@ -340,8 +349,17 @@ impl Config {
|
|||||||
info!("Sharding function: {}", pool_config.sharding_function);
|
info!("Sharding function: {}", pool_config.sharding_function);
|
||||||
info!("Primary reads: {}", pool_config.primary_reads_enabled);
|
info!("Primary reads: {}", pool_config.primary_reads_enabled);
|
||||||
info!("Query router: {}", pool_config.query_parser_enabled);
|
info!("Query router: {}", pool_config.query_parser_enabled);
|
||||||
|
|
||||||
|
// TODO: Make this prettier.
|
||||||
info!("Number of shards: {}", pool_config.shards.len());
|
info!("Number of shards: {}", pool_config.shards.len());
|
||||||
info!("Number of users: {}", pool_config.users.len());
|
info!("Number of users: {}", pool_config.users.len());
|
||||||
|
|
||||||
|
for user in &pool_config.users {
|
||||||
|
info!(
|
||||||
|
"{} pool size: {}, statement timeout: {}",
|
||||||
|
user.1.username, user.1.pool_size, user.1.statement_timeout
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,4 +11,5 @@ pub enum Error {
|
|||||||
AllServersDown,
|
AllServersDown,
|
||||||
ClientError,
|
ClientError,
|
||||||
TlsError,
|
TlsError,
|
||||||
|
StatementTimeout,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -99,7 +99,10 @@ async fn main() {
|
|||||||
let config = get_config();
|
let config = get_config();
|
||||||
|
|
||||||
if let Some(true) = config.general.enable_prometheus_exporter {
|
if let Some(true) = config.general.enable_prometheus_exporter {
|
||||||
let http_addr_str = format!("{}:{}", config.general.host, crate::prometheus::HTTP_PORT);
|
let http_addr_str = format!(
|
||||||
|
"{}:{}",
|
||||||
|
config.general.host, config.general.prometheus_exporter_port
|
||||||
|
);
|
||||||
let http_addr = match SocketAddr::from_str(&http_addr_str) {
|
let http_addr = match SocketAddr::from_str(&http_addr_str) {
|
||||||
Ok(addr) => addr,
|
Ok(addr) => addr,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
@@ -130,7 +133,7 @@ async fn main() {
|
|||||||
let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new()));
|
let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new()));
|
||||||
|
|
||||||
// Statistics reporting.
|
// Statistics reporting.
|
||||||
let (tx, rx) = mpsc::channel(100);
|
let (tx, rx) = mpsc::channel(100_000);
|
||||||
REPORTER.store(Arc::new(Reporter::new(tx.clone())));
|
REPORTER.store(Arc::new(Reporter::new(tx.clone())));
|
||||||
|
|
||||||
// Connection pool that allows to query all shards and replicas.
|
// Connection pool that allows to query all shards and replicas.
|
||||||
|
|||||||
@@ -337,7 +337,8 @@ impl ConnectionPool {
|
|||||||
return Ok((conn, address.clone()));
|
return Ok((conn, address.clone()));
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("Running health check for replica {}, {:?}", index, address);
|
debug!("Running health check on server {:?}", address);
|
||||||
|
|
||||||
self.stats.server_tested(server.process_id(), address.id);
|
self.stats.server_tested(server.process_id(), address.id);
|
||||||
|
|
||||||
match tokio::time::timeout(
|
match tokio::time::timeout(
|
||||||
|
|||||||
@@ -10,8 +10,6 @@ use crate::config::Address;
|
|||||||
use crate::pool::get_all_pools;
|
use crate::pool::get_all_pools;
|
||||||
use crate::stats::get_stats;
|
use crate::stats::get_stats;
|
||||||
|
|
||||||
pub const HTTP_PORT: usize = 9930;
|
|
||||||
|
|
||||||
struct MetricHelpType {
|
struct MetricHelpType {
|
||||||
help: &'static str,
|
help: &'static str,
|
||||||
ty: &'static str,
|
ty: &'static str,
|
||||||
|
|||||||
110
src/stats.rs
110
src/stats.rs
@@ -1,9 +1,11 @@
|
|||||||
use arc_swap::ArcSwap;
|
use arc_swap::ArcSwap;
|
||||||
/// Statistics and reporting.
|
/// Statistics and reporting.
|
||||||
use log::info;
|
use log::{error, info, trace};
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::time::SystemTime;
|
||||||
|
use tokio::sync::mpsc::error::TrySendError;
|
||||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||||
|
|
||||||
use crate::pool::get_number_of_addresses;
|
use crate::pool::get_number_of_addresses;
|
||||||
@@ -41,9 +43,29 @@ enum EventName {
|
|||||||
UpdateAverages,
|
UpdateAverages,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Send an event via the channel and log
|
||||||
|
/// an error if it fails.
|
||||||
|
fn send(tx: &Sender<Event>, event: Event) {
|
||||||
|
let name = event.name;
|
||||||
|
let result = tx.try_send(event);
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(_) => trace!(
|
||||||
|
"{:?} event reported successfully, capacity: {}",
|
||||||
|
name,
|
||||||
|
tx.capacity()
|
||||||
|
),
|
||||||
|
|
||||||
|
Err(err) => match err {
|
||||||
|
TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name),
|
||||||
|
TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
/// Event data sent to the collector
|
/// Event data sent to the collector
|
||||||
/// from clients and servers.
|
/// from clients and servers.
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Event {
|
pub struct Event {
|
||||||
/// The name of the event being reported.
|
/// The name of the event being reported.
|
||||||
name: EventName,
|
name: EventName,
|
||||||
@@ -89,7 +111,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
send(&self.tx, event);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report a transaction executed by a client against
|
/// Report a transaction executed by a client against
|
||||||
@@ -102,7 +124,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report data sent to a server identified by `address_id`.
|
/// Report data sent to a server identified by `address_id`.
|
||||||
@@ -115,7 +137,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report data received from a server identified by `address_id`.
|
/// Report data received from a server identified by `address_id`.
|
||||||
@@ -128,7 +150,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Time spent waiting to get a healthy connection from the pool
|
/// Time spent waiting to get a healthy connection from the pool
|
||||||
@@ -142,7 +164,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client identified by `process_id` waiting for a connection
|
/// Reports a client identified by `process_id` waiting for a connection
|
||||||
@@ -155,7 +177,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client identified by `process_id` is done waiting for a connection
|
/// Reports a client identified by `process_id` is done waiting for a connection
|
||||||
@@ -168,7 +190,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client identified by `process_id` is done querying the server
|
/// Reports a client identified by `process_id` is done querying the server
|
||||||
@@ -181,7 +203,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client identified by `process_id` is disconecting from the pooler.
|
/// Reports a client identified by `process_id` is disconecting from the pooler.
|
||||||
@@ -194,7 +216,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` for
|
/// Reports a server connection identified by `process_id` for
|
||||||
@@ -208,7 +230,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` for
|
/// Reports a server connection identified by `process_id` for
|
||||||
@@ -222,7 +244,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` for
|
/// Reports a server connection identified by `process_id` for
|
||||||
@@ -236,7 +258,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` for
|
/// Reports a server connection identified by `process_id` for
|
||||||
@@ -250,7 +272,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` is disconecting from the pooler.
|
/// Reports a server connection identified by `process_id` is disconecting from the pooler.
|
||||||
@@ -263,7 +285,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -325,6 +347,9 @@ impl Collector {
|
|||||||
// Track which state the client and server are at any given time.
|
// Track which state the client and server are at any given time.
|
||||||
let mut client_server_states: HashMap<usize, HashMap<i32, EventName>> = HashMap::new();
|
let mut client_server_states: HashMap<usize, HashMap<i32, EventName>> = HashMap::new();
|
||||||
|
|
||||||
|
// Average update times
|
||||||
|
let mut last_updated_avg: HashMap<usize, SystemTime> = HashMap::new();
|
||||||
|
|
||||||
// Flush stats to StatsD and calculate averages every 15 seconds.
|
// Flush stats to StatsD and calculate averages every 15 seconds.
|
||||||
let tx = self.tx.clone();
|
let tx = self.tx.clone();
|
||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
@@ -334,12 +359,15 @@ impl Collector {
|
|||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
let address_count = get_number_of_addresses();
|
let address_count = get_number_of_addresses();
|
||||||
for address_id in 0..address_count {
|
for address_id in 0..address_count {
|
||||||
let _ = tx.try_send(Event {
|
send(
|
||||||
name: EventName::UpdateStats,
|
&tx,
|
||||||
value: 0,
|
Event {
|
||||||
process_id: -1,
|
name: EventName::UpdateStats,
|
||||||
address_id: address_id,
|
value: 0,
|
||||||
});
|
process_id: -1,
|
||||||
|
address_id: address_id,
|
||||||
|
},
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -352,12 +380,15 @@ impl Collector {
|
|||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
let address_count = get_number_of_addresses();
|
let address_count = get_number_of_addresses();
|
||||||
for address_id in 0..address_count {
|
for address_id in 0..address_count {
|
||||||
let _ = tx.try_send(Event {
|
send(
|
||||||
name: EventName::UpdateAverages,
|
&tx,
|
||||||
value: 0,
|
Event {
|
||||||
process_id: -1,
|
name: EventName::UpdateAverages,
|
||||||
address_id: address_id,
|
value: 0,
|
||||||
});
|
process_id: -1,
|
||||||
|
address_id: address_id,
|
||||||
|
},
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -379,6 +410,9 @@ impl Collector {
|
|||||||
.entry(stat.address_id)
|
.entry(stat.address_id)
|
||||||
.or_insert(HashMap::new());
|
.or_insert(HashMap::new());
|
||||||
let old_stats = old_stats.entry(stat.address_id).or_insert(HashMap::new());
|
let old_stats = old_stats.entry(stat.address_id).or_insert(HashMap::new());
|
||||||
|
let last_updated_avg = last_updated_avg
|
||||||
|
.entry(stat.address_id)
|
||||||
|
.or_insert(SystemTime::now());
|
||||||
|
|
||||||
// Some are counters, some are gauges...
|
// Some are counters, some are gauges...
|
||||||
match stat.name {
|
match stat.name {
|
||||||
@@ -504,6 +538,24 @@ impl Collector {
|
|||||||
}
|
}
|
||||||
|
|
||||||
EventName::UpdateAverages => {
|
EventName::UpdateAverages => {
|
||||||
|
let elapsed = match last_updated_avg.elapsed() {
|
||||||
|
Ok(elapsed) => elapsed.as_secs(),
|
||||||
|
Err(err) => {
|
||||||
|
error!(
|
||||||
|
"Could not get elapsed time, averages may be incorrect: {:?}",
|
||||||
|
err
|
||||||
|
);
|
||||||
|
STAT_PERIOD / 1_000
|
||||||
|
}
|
||||||
|
} as i64;
|
||||||
|
|
||||||
|
*last_updated_avg = SystemTime::now();
|
||||||
|
|
||||||
|
// Tokio triggers the interval on first tick and then sleeps.
|
||||||
|
if elapsed == 0 {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// Calculate averages
|
// Calculate averages
|
||||||
for stat in &[
|
for stat in &[
|
||||||
"avg_query_count",
|
"avg_query_count",
|
||||||
@@ -521,7 +573,7 @@ impl Collector {
|
|||||||
|
|
||||||
let old_value = old_stats.entry(total_name.clone()).or_insert(0);
|
let old_value = old_stats.entry(total_name.clone()).or_insert(0);
|
||||||
let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned();
|
let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned();
|
||||||
let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second
|
let avg = (new_value - *old_value) / elapsed; // Avg / second
|
||||||
|
|
||||||
stats.insert(stat, avg);
|
stats.insert(stat, avg);
|
||||||
*old_value = new_value;
|
*old_value = new_value;
|
||||||
|
|||||||
Reference in New Issue
Block a user