mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 01:16:30 +00:00
Compare commits
1 Commits
levkk-drop
...
levkk-fix-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
28172cc1d5 |
@@ -11,12 +11,9 @@ host = "0.0.0.0"
|
||||
# Port to run on, same as PgBouncer used in this example.
|
||||
port = 6432
|
||||
|
||||
# Whether to enable prometheus exporter or not.
|
||||
# enable prometheus exporter on port 9930
|
||||
enable_prometheus_exporter = true
|
||||
|
||||
# Port at which prometheus exporter listens on.
|
||||
prometheus_exporter_port = 9930
|
||||
|
||||
# How long to wait before aborting a server connection (ms).
|
||||
connect_timeout = 100
|
||||
|
||||
@@ -91,13 +88,11 @@ password = "sharding_user"
|
||||
# The maximum number of connection from a single Pgcat process to any database in the cluster
|
||||
# is the sum of pool_size across all users.
|
||||
pool_size = 9
|
||||
statement_timeout = 0
|
||||
|
||||
[pools.sharded_db.users.1]
|
||||
username = "other_user"
|
||||
password = "other_user"
|
||||
pool_size = 21
|
||||
statement_timeout = 30000
|
||||
|
||||
# Shard 0
|
||||
[pools.sharded_db.shards.0]
|
||||
@@ -135,7 +130,6 @@ sharding_function = "pg_bigint_hash"
|
||||
username = "simple_user"
|
||||
password = "simple_user"
|
||||
pool_size = 5
|
||||
statement_timeout = 30000
|
||||
|
||||
[pools.simple_db.shards.0]
|
||||
servers = [
|
||||
|
||||
@@ -66,18 +66,6 @@ psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_te
|
||||
# Replica/primary selection & more sharding tests
|
||||
psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null
|
||||
|
||||
# Statement timeout tests
|
||||
sed -i 's/statement_timeout = 0/statement_timeout = 100/' .circleci/pgcat.toml
|
||||
kill -SIGHUP $(pgrep pgcat) # Reload config
|
||||
sleep 0.2
|
||||
|
||||
# This should timeout
|
||||
(! psql -U sharding_user -e -h 127.0.0.1 -p 6432 -c 'select pg_sleep(0.5)')
|
||||
|
||||
# Disable statement timeout
|
||||
sed -i 's/statement_timeout = 100/statement_timeout = 0/' .circleci/pgcat.toml
|
||||
kill -SIGHUP $(pgrep pgcat) # Reload config again
|
||||
|
||||
#
|
||||
# ActiveRecord tests
|
||||
#
|
||||
|
||||
52
README.md
52
README.md
@@ -38,34 +38,30 @@ psql -h 127.0.0.1 -p 6432 -c 'SELECT 1'
|
||||
|
||||
### Config
|
||||
|
||||
| **Name** | **Description** | **Examples** |
|
||||
|------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------|
|
||||
| **`general`** | | |
|
||||
| `host` | The pooler will run on this host, 0.0.0.0 means accessible from everywhere. | `0.0.0.0` |
|
||||
| `port` | The pooler will run on this port. | `6432` |
|
||||
| `enable_prometheus_exporter` | Enable prometheus exporter which will export metrics in prometheus exposition format. | `true` |
|
||||
| `prometheus_exporter_port` | Port at which prometheus exporter listens on. | `9930` |
|
||||
| `pool_size` | Maximum allowed server connections per pool. Pools are separated for each user/shard/server role. The connections are allocated as needed. | `15` |
|
||||
| `pool_mode` | The pool mode to use, i.e. `session` or `transaction`. | `transaction` |
|
||||
| `connect_timeout` | Maximum time to establish a connection to a server (milliseconds). If reached, the server is banned and the next target is attempted. | `5000` |
|
||||
| `healthcheck_timeout` | Maximum time to pass a health check (`SELECT 1`, milliseconds). If reached, the server is banned and the next target is attempted. | `1000` |
|
||||
| `shutdown_timeout` | Maximum time to give clients during shutdown before forcibly killing client connections (ms). | `60000` |
|
||||
| `healthcheck_delay` | How long to keep connection available for immediate re-use, without running a healthcheck query on it | `30000` |
|
||||
| `ban_time` | Ban time for a server (seconds). It won't be allowed to serve transactions until the ban expires; failover targets will be used instead. | `60` |
|
||||
| `autoreload` | Enable auto-reload of config after fixed time-interval. | `false` |
|
||||
| | | |
|
||||
| **`user`** | | |
|
||||
| `name` | The user name. | `sharding_user` |
|
||||
| `password` | The user password in plaintext. | `hunter2` |
|
||||
| | | |
|
||||
| **`shards`** | Shards are numerically numbered starting from 0; the order in the config is preserved by the pooler to route queries accordingly. | `[shards.0]` |
|
||||
| `servers` | List of servers to connect to and their roles. A server is: `[host, port, role]`, where `role` is either `primary` or `replica`. | `["127.0.0.1", 5432, "primary"]` |
|
||||
| `database` | The name of the database to connect to. This is the same on all servers that are part of one shard. | |
|
||||
| | | |
|
||||
| **`query_router`** | | |
|
||||
| `default_role` | Traffic is routed to this role by default (round-robin), unless the client specifies otherwise. Default is `any`, for any role available. | `any`, `primary`, `replica` |
|
||||
| `query_parser_enabled` | Enable the query parser which will inspect incoming queries and route them to a primary or replicas. | `false` |
|
||||
| `primary_reads_enabled` | Enable this to allow read queries on the primary; otherwise read queries are routed to the replicas. | `true` |
|
||||
| **Name** | **Description** | **Examples** |
|
||||
|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------|
|
||||
| **`general`** | | |
|
||||
| `host` | The pooler will run on this host, 0.0.0.0 means accessible from everywhere. | `0.0.0.0` |
|
||||
| `port` | The pooler will run on this port. | `6432` |
|
||||
| `pool_size` | Maximum allowed server connections per pool. Pools are separated for each user/shard/server role. The connections are allocated as needed. | `15` |
|
||||
| `pool_mode` | The pool mode to use, i.e. `session` or `transaction`. | `transaction` |
|
||||
| `connect_timeout` | Maximum time to establish a connection to a server (milliseconds). If reached, the server is banned and the next target is attempted. | `5000` |
|
||||
| `healthcheck_timeout` | Maximum time to pass a health check (`SELECT 1`, milliseconds). If reached, the server is banned and the next target is attempted. | `1000` |
|
||||
| `shutdown_timeout` | Maximum time to give clients during shutdown before forcibly killing client connections (ms). | `60000` |
|
||||
| `healthcheck_delay` | How long to keep connection available for immediate re-use, without running a healthcheck query on it | `30000` |
|
||||
| `ban_time` | Ban time for a server (seconds). It won't be allowed to serve transactions until the ban expires; failover targets will be used instead. | `60` |
|
||||
| | | |
|
||||
| **`user`** | | |
|
||||
| `name` | The user name. | `sharding_user` |
|
||||
| `password` | The user password in plaintext. | `hunter2` |
|
||||
| | | |
|
||||
| **`shards`** | Shards are numerically numbered starting from 0; the order in the config is preserved by the pooler to route queries accordingly. | `[shards.0]` |
|
||||
| `servers` | List of servers to connect to and their roles. A server is: `[host, port, role]`, where `role` is either `primary` or `replica`. | `["127.0.0.1", 5432, "primary"]` |
|
||||
| `database` | The name of the database to connect to. This is the same on all servers that are part of one shard. | |
|
||||
| **`query_router`** | | |
|
||||
| `default_role` | Traffic is routed to this role by default (round-robin), unless the client specifies otherwise. Default is `any`, for any role available. | `any`, `primary`, `replica` |
|
||||
| `query_parser_enabled` | Enable the query parser which will inspect incoming queries and route them to a primary or replicas. | `false` |
|
||||
| `primary_reads_enabled` | Enable this to allow read queries on the primary; otherwise read queries are routed to the replicas. | `true` |
|
||||
|
||||
## Local development
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
version: "3"
|
||||
services:
|
||||
postgres:
|
||||
image: postgres:14
|
||||
image: postgres:13
|
||||
environment:
|
||||
POSTGRES_PASSWORD: postgres
|
||||
POSTGRES_HOST_AUTH_METHOD: md5
|
||||
|
||||
@@ -11,12 +11,9 @@ host = "0.0.0.0"
|
||||
# Port to run on, same as PgBouncer used in this example.
|
||||
port = 6432
|
||||
|
||||
# Whether to enable prometheus exporter or not.
|
||||
# enable prometheus exporter on port 9930
|
||||
enable_prometheus_exporter = true
|
||||
|
||||
# Port at which prometheus exporter listens on.
|
||||
prometheus_exporter_port = 9930
|
||||
|
||||
# How long to wait before aborting a server connection (ms).
|
||||
connect_timeout = 5000
|
||||
|
||||
@@ -92,14 +89,10 @@ password = "postgres"
|
||||
# is the sum of pool_size across all users.
|
||||
pool_size = 9
|
||||
|
||||
# Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
||||
statement_timeout = 0
|
||||
|
||||
[pools.sharded.users.1]
|
||||
username = "postgres"
|
||||
password = "postgres"
|
||||
pool_size = 21
|
||||
statement_timeout = 15000
|
||||
|
||||
# Shard 0
|
||||
[pools.sharded.shards.0]
|
||||
@@ -137,7 +130,6 @@ sharding_function = "pg_bigint_hash"
|
||||
username = "postgres"
|
||||
password = "postgres"
|
||||
pool_size = 5
|
||||
statement_timeout = 0
|
||||
|
||||
[pools.simple_db.shards.0]
|
||||
servers = [
|
||||
|
||||
10
pgcat.toml
10
pgcat.toml
@@ -11,12 +11,9 @@ host = "0.0.0.0"
|
||||
# Port to run on, same as PgBouncer used in this example.
|
||||
port = 6432
|
||||
|
||||
# Whether to enable prometheus exporter or not.
|
||||
# enable prometheus exporter on port 9930
|
||||
enable_prometheus_exporter = true
|
||||
|
||||
# Port at which prometheus exporter listens on.
|
||||
prometheus_exporter_port = 9930
|
||||
|
||||
# How long to wait before aborting a server connection (ms).
|
||||
connect_timeout = 5000
|
||||
|
||||
@@ -92,14 +89,10 @@ password = "sharding_user"
|
||||
# is the sum of pool_size across all users.
|
||||
pool_size = 9
|
||||
|
||||
# Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
|
||||
statement_timeout = 0
|
||||
|
||||
[pools.sharded_db.users.1]
|
||||
username = "other_user"
|
||||
password = "other_user"
|
||||
pool_size = 21
|
||||
statement_timeout = 15000
|
||||
|
||||
# Shard 0
|
||||
[pools.sharded_db.shards.0]
|
||||
@@ -137,7 +130,6 @@ sharding_function = "pg_bigint_hash"
|
||||
username = "simple_user"
|
||||
password = "simple_user"
|
||||
pool_size = 5
|
||||
statement_timeout = 0
|
||||
|
||||
[pools.simple_db.shards.0]
|
||||
servers = [
|
||||
|
||||
66
src/admin.rs
66
src/admin.rs
@@ -44,45 +44,32 @@ where
|
||||
|
||||
trace!("Admin query: {}", query);
|
||||
|
||||
let query_parts: Vec<&str> = query.trim_end_matches(';').split_whitespace().collect();
|
||||
|
||||
match query_parts[0] {
|
||||
"RELOAD" => {
|
||||
trace!("RELOAD");
|
||||
reload(stream, client_server_map).await
|
||||
}
|
||||
"SET" => {
|
||||
trace!("SET");
|
||||
ignore_set(stream).await
|
||||
}
|
||||
"SHOW" => match query_parts[1] {
|
||||
"CONFIG" => {
|
||||
trace!("SHOW CONFIG");
|
||||
show_config(stream).await
|
||||
}
|
||||
"DATABASES" => {
|
||||
trace!("SHOW DATABASES");
|
||||
show_databases(stream).await
|
||||
}
|
||||
"LISTS" => {
|
||||
trace!("SHOW LISTS");
|
||||
show_lists(stream).await
|
||||
}
|
||||
"POOLS" => {
|
||||
trace!("SHOW POOLS");
|
||||
show_pools(stream).await
|
||||
}
|
||||
"STATS" => {
|
||||
trace!("SHOW STATS");
|
||||
show_stats(stream).await
|
||||
}
|
||||
"VERSION" => {
|
||||
trace!("SHOW VERSION");
|
||||
show_version(stream).await
|
||||
}
|
||||
_ => error_response(stream, "Unsupported SHOW query against the admin database").await,
|
||||
},
|
||||
_ => error_response(stream, "Unsupported query against the admin database").await,
|
||||
if query.starts_with("SHOW STATS") {
|
||||
trace!("SHOW STATS");
|
||||
show_stats(stream).await
|
||||
} else if query.starts_with("RELOAD") {
|
||||
trace!("RELOAD");
|
||||
reload(stream, client_server_map).await
|
||||
} else if query.starts_with("SHOW CONFIG") {
|
||||
trace!("SHOW CONFIG");
|
||||
show_config(stream).await
|
||||
} else if query.starts_with("SHOW DATABASES") {
|
||||
trace!("SHOW DATABASES");
|
||||
show_databases(stream).await
|
||||
} else if query.starts_with("SHOW POOLS") {
|
||||
trace!("SHOW POOLS");
|
||||
show_pools(stream).await
|
||||
} else if query.starts_with("SHOW LISTS") {
|
||||
trace!("SHOW LISTS");
|
||||
show_lists(stream).await
|
||||
} else if query.starts_with("SHOW VERSION") {
|
||||
trace!("SHOW VERSION");
|
||||
show_version(stream).await
|
||||
} else if query.starts_with("SET ") {
|
||||
trace!("SET");
|
||||
ignore_set(stream).await
|
||||
} else {
|
||||
error_response(stream, "Unsupported query against the admin database").await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -187,7 +174,6 @@ where
|
||||
let columns = vec![
|
||||
("database", DataType::Text),
|
||||
("user", DataType::Text),
|
||||
("cl_idle", DataType::Numeric),
|
||||
("cl_active", DataType::Numeric),
|
||||
("cl_waiting", DataType::Numeric),
|
||||
("cl_cancel_req", DataType::Numeric),
|
||||
|
||||
173
src/client.rs
173
src/client.rs
@@ -499,7 +499,7 @@ where
|
||||
// The query router determines where the query is going to go,
|
||||
// e.g. primary, replica, which shard.
|
||||
let mut query_router = QueryRouter::new();
|
||||
let mut round_robin = rand::random();
|
||||
let mut round_robin = 0;
|
||||
|
||||
// Our custom protocol loop.
|
||||
// We expect the client to either start a transaction with regular queries
|
||||
@@ -667,6 +667,7 @@ where
|
||||
.client_disconnecting(self.process_id, last_address_id);
|
||||
}
|
||||
self.stats.client_active(self.process_id, address.id);
|
||||
self.stats.server_active(server.process_id(), address.id);
|
||||
|
||||
self.last_address_id = Some(address.id);
|
||||
self.last_server_id = Some(server.process_id());
|
||||
@@ -730,16 +731,44 @@ where
|
||||
'Q' => {
|
||||
debug!("Sending query to server");
|
||||
|
||||
self.send_and_receive_loop(
|
||||
code,
|
||||
original,
|
||||
self.send_server_message(
|
||||
server,
|
||||
original,
|
||||
&address,
|
||||
query_router.shard(),
|
||||
&pool,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Read all data the server has to offer, which can be multiple messages
|
||||
// buffered in 8196 bytes chunks.
|
||||
loop {
|
||||
let response = self
|
||||
.receive_server_message(
|
||||
server,
|
||||
&address,
|
||||
query_router.shard(),
|
||||
&pool,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Send server reply to the client.
|
||||
match write_all_half(&mut self.write, response).await {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
server.mark_bad();
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
if !server.is_data_available() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Report query executed statistics.
|
||||
self.stats.query(self.process_id, address.id);
|
||||
|
||||
if !server.in_transaction() {
|
||||
// Report transaction executed statistics.
|
||||
self.stats.transaction(self.process_id, address.id);
|
||||
@@ -747,6 +776,7 @@ where
|
||||
// Release server back to the pool if we are in transaction mode.
|
||||
// If we are in session mode, we keep the server until the client disconnects.
|
||||
if self.transaction_mode {
|
||||
self.stats.server_idle(server.process_id(), address.id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -759,13 +789,9 @@ where
|
||||
// Pgbouncer closes the connection which leads to
|
||||
// connection thrashing when clients misbehave.
|
||||
if server.in_transaction() {
|
||||
// server.query("ROLLBACK").await?;
|
||||
// server.query("DISCARD ALL").await?;
|
||||
// server.set_name("pgcat").await?;
|
||||
|
||||
// TODO: Figure out a clever way to ensure
|
||||
// the server has no more messages for us.
|
||||
server.mark_bad();
|
||||
server.query("ROLLBACK").await?;
|
||||
server.query("DISCARD ALL").await?;
|
||||
server.set_name("pgcat").await?;
|
||||
}
|
||||
|
||||
self.release();
|
||||
@@ -804,10 +830,9 @@ where
|
||||
|
||||
self.buffer.put(&original[..]);
|
||||
|
||||
self.send_and_receive_loop(
|
||||
code,
|
||||
self.buffer.clone(),
|
||||
self.send_server_message(
|
||||
server,
|
||||
self.buffer.clone(),
|
||||
&address,
|
||||
query_router.shard(),
|
||||
&pool,
|
||||
@@ -816,12 +841,41 @@ where
|
||||
|
||||
self.buffer.clear();
|
||||
|
||||
// Read all data the server has to offer, which can be multiple messages
|
||||
// buffered in 8196 bytes chunks.
|
||||
loop {
|
||||
let response = self
|
||||
.receive_server_message(
|
||||
server,
|
||||
&address,
|
||||
query_router.shard(),
|
||||
&pool,
|
||||
)
|
||||
.await?;
|
||||
|
||||
match write_all_half(&mut self.write, response).await {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
server.mark_bad();
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
if !server.is_data_available() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Report query executed statistics.
|
||||
self.stats.query(self.process_id, address.id);
|
||||
|
||||
if !server.in_transaction() {
|
||||
self.stats.transaction(self.process_id, address.id);
|
||||
|
||||
// Release server back to the pool if we are in transaction mode.
|
||||
// If we are in session mode, we keep the server until the client disconnects.
|
||||
if self.transaction_mode {
|
||||
self.stats.server_idle(server.process_id(), address.id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -871,6 +925,7 @@ where
|
||||
// Release server back to the pool if we are in transaction mode.
|
||||
// If we are in session mode, we keep the server until the client disconnects.
|
||||
if self.transaction_mode {
|
||||
self.stats.server_idle(server.process_id(), address.id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -886,7 +941,6 @@ where
|
||||
|
||||
// The server is no longer bound to us, we can't cancel it's queries anymore.
|
||||
debug!("Releasing server back into the pool");
|
||||
self.stats.server_idle(server.process_id(), address.id);
|
||||
self.release();
|
||||
self.stats.client_idle(self.process_id, address.id);
|
||||
}
|
||||
@@ -898,46 +952,6 @@ where
|
||||
guard.remove(&(self.process_id, self.secret_key));
|
||||
}
|
||||
|
||||
async fn send_and_receive_loop(
|
||||
&mut self,
|
||||
code: char,
|
||||
message: BytesMut,
|
||||
server: &mut Server,
|
||||
address: &Address,
|
||||
shard: usize,
|
||||
pool: &ConnectionPool,
|
||||
) -> Result<(), Error> {
|
||||
debug!("Sending {} to server", code);
|
||||
|
||||
self.send_server_message(server, message, &address, shard, &pool)
|
||||
.await?;
|
||||
|
||||
// Read all data the server has to offer, which can be multiple messages
|
||||
// buffered in 8196 bytes chunks.
|
||||
loop {
|
||||
let response = self
|
||||
.receive_server_message(server, &address, shard, &pool)
|
||||
.await?;
|
||||
|
||||
match write_all_half(&mut self.write, response).await {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
server.mark_bad();
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
if !server.is_data_available() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Report query executed statistics.
|
||||
self.stats.query(self.process_id, address.id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_server_message(
|
||||
&self,
|
||||
server: &mut Server,
|
||||
@@ -956,54 +970,17 @@ where
|
||||
}
|
||||
|
||||
async fn receive_server_message(
|
||||
&mut self,
|
||||
&self,
|
||||
server: &mut Server,
|
||||
address: &Address,
|
||||
shard: usize,
|
||||
pool: &ConnectionPool,
|
||||
) -> Result<BytesMut, Error> {
|
||||
if pool.settings.user.statement_timeout > 0 {
|
||||
match tokio::time::timeout(
|
||||
tokio::time::Duration::from_millis(pool.settings.user.statement_timeout),
|
||||
server.recv(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result) => match result {
|
||||
Ok(message) => Ok(message),
|
||||
Err(err) => {
|
||||
pool.ban(address, shard, self.process_id);
|
||||
error_response_terminal(
|
||||
&mut self.write,
|
||||
&format!("error receiving data from server: {:?}", err),
|
||||
)
|
||||
.await?;
|
||||
Err(err)
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
error!(
|
||||
"Statement timeout while talking to {:?} with user {}",
|
||||
address, pool.settings.user.username
|
||||
);
|
||||
server.mark_bad();
|
||||
pool.ban(address, shard, self.process_id);
|
||||
error_response_terminal(&mut self.write, "pool statement timeout").await?;
|
||||
Err(Error::StatementTimeout)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
match server.recv().await {
|
||||
Ok(message) => Ok(message),
|
||||
Err(err) => {
|
||||
pool.ban(address, shard, self.process_id);
|
||||
error_response_terminal(
|
||||
&mut self.write,
|
||||
&format!("error receiving data from server: {:?}", err),
|
||||
)
|
||||
.await?;
|
||||
Err(err)
|
||||
}
|
||||
match server.recv().await {
|
||||
Ok(message) => Ok(message),
|
||||
Err(err) => {
|
||||
pool.ban(address, shard, self.process_id);
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,8 +64,6 @@ pub struct Address {
|
||||
pub database: String,
|
||||
pub role: Role,
|
||||
pub replica_number: usize,
|
||||
pub username: String,
|
||||
pub poolname: String,
|
||||
}
|
||||
|
||||
impl Default for Address {
|
||||
@@ -78,8 +76,6 @@ impl Default for Address {
|
||||
replica_number: 0,
|
||||
database: String::from("database"),
|
||||
role: Role::Replica,
|
||||
username: String::from("username"),
|
||||
poolname: String::from("poolname"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -88,11 +84,11 @@ impl Address {
|
||||
/// Address name (aka database) used in `SHOW STATS`, `SHOW DATABASES`, and `SHOW POOLS`.
|
||||
pub fn name(&self) -> String {
|
||||
match self.role {
|
||||
Role::Primary => format!("{}_shard_{}_primary", self.poolname, self.shard),
|
||||
Role::Primary => format!("{}_shard_{}_primary", self.database, self.shard),
|
||||
|
||||
Role::Replica => format!(
|
||||
"{}_shard_{}_replica_{}",
|
||||
self.poolname, self.shard, self.replica_number
|
||||
self.database, self.shard, self.replica_number
|
||||
),
|
||||
}
|
||||
}
|
||||
@@ -104,7 +100,6 @@ pub struct User {
|
||||
pub username: String,
|
||||
pub password: String,
|
||||
pub pool_size: u32,
|
||||
pub statement_timeout: u64,
|
||||
}
|
||||
|
||||
impl Default for User {
|
||||
@@ -113,7 +108,6 @@ impl Default for User {
|
||||
username: String::from("postgres"),
|
||||
password: String::new(),
|
||||
pool_size: 15,
|
||||
statement_timeout: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -124,7 +118,6 @@ pub struct General {
|
||||
pub host: String,
|
||||
pub port: i16,
|
||||
pub enable_prometheus_exporter: Option<bool>,
|
||||
pub prometheus_exporter_port: i16,
|
||||
pub connect_timeout: u64,
|
||||
pub healthcheck_timeout: u64,
|
||||
pub shutdown_timeout: u64,
|
||||
@@ -143,7 +136,6 @@ impl Default for General {
|
||||
host: String::from("localhost"),
|
||||
port: 5432,
|
||||
enable_prometheus_exporter: Some(false),
|
||||
prometheus_exporter_port: 9930,
|
||||
connect_timeout: 5000,
|
||||
healthcheck_timeout: 1000,
|
||||
shutdown_timeout: 60000,
|
||||
@@ -279,10 +271,6 @@ impl From<&Config> for std::collections::HashMap<String, String> {
|
||||
let mut static_settings = vec![
|
||||
("host".to_string(), config.general.host.to_string()),
|
||||
("port".to_string(), config.general.port.to_string()),
|
||||
(
|
||||
"prometheus_exporter_port".to_string(),
|
||||
config.general.prometheus_exporter_port.to_string(),
|
||||
),
|
||||
(
|
||||
"connect_timeout".to_string(),
|
||||
config.general.connect_timeout.to_string(),
|
||||
@@ -338,7 +326,6 @@ impl Config {
|
||||
};
|
||||
|
||||
for (pool_name, pool_config) in &self.pools {
|
||||
// TODO: Make this output prettier (maybe a table?)
|
||||
info!("--- Settings for pool {} ---", pool_name);
|
||||
info!(
|
||||
"Pool size from all users: {}",
|
||||
@@ -353,17 +340,8 @@ impl Config {
|
||||
info!("Sharding function: {}", pool_config.sharding_function);
|
||||
info!("Primary reads: {}", pool_config.primary_reads_enabled);
|
||||
info!("Query router: {}", pool_config.query_parser_enabled);
|
||||
|
||||
// TODO: Make this prettier.
|
||||
info!("Number of shards: {}", pool_config.shards.len());
|
||||
info!("Number of users: {}", pool_config.users.len());
|
||||
|
||||
for user in &pool_config.users {
|
||||
info!(
|
||||
"{} pool size: {}, statement timeout: {}",
|
||||
user.1.username, user.1.pool_size, user.1.statement_timeout
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,5 +11,4 @@ pub enum Error {
|
||||
AllServersDown,
|
||||
ClientError,
|
||||
TlsError,
|
||||
StatementTimeout,
|
||||
}
|
||||
|
||||
@@ -99,10 +99,7 @@ async fn main() {
|
||||
let config = get_config();
|
||||
|
||||
if let Some(true) = config.general.enable_prometheus_exporter {
|
||||
let http_addr_str = format!(
|
||||
"{}:{}",
|
||||
config.general.host, config.general.prometheus_exporter_port
|
||||
);
|
||||
let http_addr_str = format!("{}:{}", config.general.host, crate::prometheus::HTTP_PORT);
|
||||
let http_addr = match SocketAddr::from_str(&http_addr_str) {
|
||||
Ok(addr) => addr,
|
||||
Err(err) => {
|
||||
@@ -133,7 +130,7 @@ async fn main() {
|
||||
let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
// Statistics reporting.
|
||||
let (tx, rx) = mpsc::channel(100_000);
|
||||
let (tx, rx) = mpsc::channel(100);
|
||||
REPORTER.store(Arc::new(Reporter::new(tx.clone())));
|
||||
|
||||
// Connection pool that allows to query all shards and replicas.
|
||||
|
||||
@@ -114,14 +114,12 @@ impl ConnectionPool {
|
||||
|
||||
let address = Address {
|
||||
id: address_id,
|
||||
database: shard.database.clone(),
|
||||
database: pool_name.clone(),
|
||||
host: server.0.clone(),
|
||||
port: server.1.to_string(),
|
||||
role: role,
|
||||
replica_number,
|
||||
shard: shard_idx.parse::<usize>().unwrap(),
|
||||
username: user_info.username.clone(),
|
||||
poolname: pool_name.clone(),
|
||||
};
|
||||
|
||||
address_id += 1;
|
||||
@@ -335,7 +333,7 @@ impl ConnectionPool {
|
||||
if !require_healthcheck {
|
||||
self.stats
|
||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||
self.stats.server_active(conn.process_id(), address.id);
|
||||
self.stats.server_idle(conn.process_id(), address.id);
|
||||
return Ok((conn, address.clone()));
|
||||
}
|
||||
|
||||
@@ -354,7 +352,7 @@ impl ConnectionPool {
|
||||
Ok(_) => {
|
||||
self.stats
|
||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||
self.stats.server_active(conn.process_id(), address.id);
|
||||
self.stats.server_idle(conn.process_id(), address.id);
|
||||
return Ok((conn, address.clone()));
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,8 @@ use crate::config::Address;
|
||||
use crate::pool::get_all_pools;
|
||||
use crate::stats::get_stats;
|
||||
|
||||
pub const HTTP_PORT: usize = 9930;
|
||||
|
||||
struct MetricHelpType {
|
||||
help: &'static str,
|
||||
ty: &'static str,
|
||||
|
||||
52
src/stats.rs
52
src/stats.rs
@@ -1,10 +1,9 @@
|
||||
use arc_swap::ArcSwap;
|
||||
/// Statistics and reporting.
|
||||
use log::{error, info, trace};
|
||||
use log::info;
|
||||
use once_cell::sync::Lazy;
|
||||
use parking_lot::Mutex;
|
||||
use std::collections::HashMap;
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
|
||||
use crate::pool::get_number_of_addresses;
|
||||
@@ -44,7 +43,7 @@ enum EventName {
|
||||
|
||||
/// Event data sent to the collector
|
||||
/// from clients and servers.
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug)]
|
||||
pub struct Event {
|
||||
/// The name of the event being reported.
|
||||
name: EventName,
|
||||
@@ -80,25 +79,6 @@ impl Reporter {
|
||||
Reporter { tx: tx }
|
||||
}
|
||||
|
||||
/// Send statistics to the task keeping track of stats.
|
||||
fn send(&self, event: Event) {
|
||||
let name = event.name;
|
||||
let result = self.tx.try_send(event);
|
||||
|
||||
match result {
|
||||
Ok(_) => trace!(
|
||||
"{:?} event reported successfully, capacity: {}",
|
||||
name,
|
||||
self.tx.capacity()
|
||||
),
|
||||
|
||||
Err(err) => match err {
|
||||
TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name),
|
||||
TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/// Report a query executed by a client against
|
||||
/// a server identified by the `address_id`.
|
||||
pub fn query(&self, process_id: i32, address_id: usize) {
|
||||
@@ -109,7 +89,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event);
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
/// Report a transaction executed by a client against
|
||||
@@ -122,7 +102,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
/// Report data sent to a server identified by `address_id`.
|
||||
@@ -135,7 +115,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
/// Report data received from a server identified by `address_id`.
|
||||
@@ -148,7 +128,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
/// Time spent waiting to get a healthy connection from the pool
|
||||
@@ -162,7 +142,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
/// Reports a client identified by `process_id` waiting for a connection
|
||||
@@ -175,7 +155,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
/// Reports a client identified by `process_id` is done waiting for a connection
|
||||
@@ -188,7 +168,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
/// Reports a client identified by `process_id` is done querying the server
|
||||
@@ -201,7 +181,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
/// Reports a client identified by `process_id` is disconecting from the pooler.
|
||||
@@ -214,7 +194,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
/// Reports a server connection identified by `process_id` for
|
||||
@@ -228,7 +208,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
/// Reports a server connection identified by `process_id` for
|
||||
@@ -242,7 +222,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
/// Reports a server connection identified by `process_id` for
|
||||
@@ -256,7 +236,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
/// Reports a server connection identified by `process_id` for
|
||||
@@ -270,7 +250,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
|
||||
/// Reports a server connection identified by `process_id` is disconecting from the pooler.
|
||||
@@ -283,7 +263,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
let _ = self.tx.try_send(event);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user