Compare commits

...

4 Commits

Author SHA1 Message Date
Mostafa
d1ff1fc409 Enabled prepared statements in test pgcat 2024-08-31 17:04:04 -05:00
Mostafa
75878c2628 Fix broken integration test #740 2024-08-31 16:43:41 -05:00
Mostafa Abdelraouf
2def40ea6a Add test case for issue 776 (#786)
I am adding a tiny test that uses the SQL statement that was reported to break an older version of SQL parser library

#776
2024-08-31 10:52:33 -05:00
Mostafa Abdelraouf
c05129018d Improve Prometheus stats + Add Grafana dashboard (#785)
We were missing some labels on metrics generated by the Prometheus exporter so I fixed that. There are still some gaps that I want to address with respect to the metrics we track but this seems like a good start.

I also created a Grafana Dashboard and exported it to JSON. It is designed with the same metric names the Prometheus exporter uses.
2024-08-31 08:18:57 -05:00
7 changed files with 2653 additions and 289 deletions

View File

@@ -59,6 +59,7 @@ admin_password = "admin_pass"
# session: one server connection per connected client
# transaction: one server connection per client transaction
pool_mode = "transaction"
prepared_statements_cache_size = 500
# If the client doesn't specify, route traffic to
# this role by default.
@@ -141,6 +142,7 @@ query_parser_enabled = true
query_parser_read_write_splitting = true
primary_reads_enabled = true
sharding_function = "pg_bigint_hash"
prepared_statements_cache_size = 500
[pools.simple_db.users.0]
username = "simple_user"

View File

@@ -268,6 +268,8 @@ psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES'
Additionally, Prometheus statistics are available at `/metrics` via HTTP.
We also have a [basic Grafana dashboard](https://github.com/postgresml/pgcat/blob/main/grafana_dashboard.json) based on Prometheus metrics that you can import into Grafana and build on it or use it for monitoring.
### Live configuration reloading
The config can be reloaded by sending a `kill -s SIGHUP` to the process or by querying `RELOAD` to the admin database. All settings except the `host` and `port` can be reloaded without restarting the pooler, including sharding and replicas configurations.

2124
grafana_dashboard.json Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -12,19 +12,30 @@ use std::collections::HashMap;
use std::fmt;
use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::net::TcpListener;
use crate::config::Address;
use crate::pool::{get_all_pools, PoolIdentifier};
use crate::stats::get_server_stats;
use crate::stats::pool::PoolStats;
use crate::stats::{get_server_stats, ServerStats};
struct MetricHelpType {
help: &'static str,
ty: &'static str,
}
struct ServerPrometheusStats {
bytes_received: u64,
bytes_sent: u64,
transaction_count: u64,
query_count: u64,
error_count: u64,
active_count: u64,
idle_count: u64,
login_count: u64,
tested_count: u64,
}
// reference for metric types: https://prometheus.io/docs/concepts/metric_types/
// counters only increase
// gauges can arbitrarily increase or decrease
@@ -127,22 +138,46 @@ static METRIC_HELP_AND_TYPES_LOOKUP: phf::Map<&'static str, MetricHelpType> = ph
},
"servers_bytes_received" => MetricHelpType {
help: "Volume in bytes of network traffic received by server",
ty: "gauge",
ty: "counter",
},
"servers_bytes_sent" => MetricHelpType {
help: "Volume in bytes of network traffic sent by server",
ty: "gauge",
ty: "counter",
},
"servers_transaction_count" => MetricHelpType {
help: "Number of transactions executed by server",
ty: "gauge",
ty: "counter",
},
"servers_query_count" => MetricHelpType {
help: "Number of queries executed by server",
ty: "gauge",
ty: "counter",
},
"servers_error_count" => MetricHelpType {
help: "Number of errors",
ty: "counter",
},
"servers_idle_count" => MetricHelpType {
help: "Number of server connection in idle state",
ty: "gauge",
},
"servers_active_count" => MetricHelpType {
help: "Number of server connection in active state",
ty: "gauge",
},
"servers_tested_count" => MetricHelpType {
help: "Number of server connection in tested state",
ty: "gauge",
},
"servers_login_count" => MetricHelpType {
help: "Number of server connection in login state",
ty: "gauge",
},
"servers_is_banned" => MetricHelpType {
help: "0 if server is not banned, 1 if server is banned",
ty: "gauge",
},
"servers_is_paused" => MetricHelpType {
help: "0 if server is not paused, 1 if server is paused",
ty: "gauge",
},
"databases_pool_size" => MetricHelpType {
@@ -210,7 +245,9 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("shard", address.shard.to_string());
labels.insert("role", address.role.to_string());
labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone());
Self::from_name(&format!("databases_{}", name), value, labels)
}
@@ -225,8 +262,9 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("shard", address.shard.to_string());
labels.insert("role", address.role.to_string());
labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone());
Self::from_name(&format!("servers_{}", name), value, labels)
}
@@ -236,7 +274,9 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("shard", address.shard.to_string());
labels.insert("pool", address.pool_name.clone());
labels.insert("role", address.role.to_string());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone());
Self::from_name(&format!("stats_{}", name), value, labels)
}
@@ -338,34 +378,51 @@ fn push_database_stats(lines: &mut Vec<String>) {
// Adds relevant metrics shown in a SHOW SERVERS admin command.
fn push_server_stats(lines: &mut Vec<String>) {
let server_stats = get_server_stats();
let mut server_stats_by_addresses = HashMap::<String, Arc<ServerStats>>::new();
let mut prom_stats = HashMap::<String, ServerPrometheusStats>::new();
for (_, stats) in server_stats {
server_stats_by_addresses.insert(stats.address_name(), stats);
let entry = prom_stats
.entry(stats.address_name())
.or_insert(ServerPrometheusStats {
bytes_received: 0,
bytes_sent: 0,
transaction_count: 0,
query_count: 0,
error_count: 0,
active_count: 0,
idle_count: 0,
login_count: 0,
tested_count: 0,
});
entry.bytes_received += stats.bytes_received.load(Ordering::Relaxed);
entry.bytes_sent += stats.bytes_sent.load(Ordering::Relaxed);
entry.transaction_count += stats.transaction_count.load(Ordering::Relaxed);
entry.query_count += stats.query_count.load(Ordering::Relaxed);
entry.error_count += stats.error_count.load(Ordering::Relaxed);
match stats.state.load(Ordering::Relaxed) {
crate::stats::ServerState::Login => entry.login_count += 1,
crate::stats::ServerState::Active => entry.active_count += 1,
crate::stats::ServerState::Tested => entry.tested_count += 1,
crate::stats::ServerState::Idle => entry.idle_count += 1,
}
}
for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
let address = pool.address(shard, server);
if let Some(server_info) = server_stats_by_addresses.get(&address.name()) {
if let Some(server_info) = prom_stats.get(&address.name()) {
let metrics = [
(
"bytes_received",
server_info.bytes_received.load(Ordering::Relaxed),
),
("bytes_sent", server_info.bytes_sent.load(Ordering::Relaxed)),
(
"transaction_count",
server_info.transaction_count.load(Ordering::Relaxed),
),
(
"query_count",
server_info.query_count.load(Ordering::Relaxed),
),
(
"error_count",
server_info.error_count.load(Ordering::Relaxed),
),
("bytes_received", server_info.bytes_received),
("bytes_sent", server_info.bytes_sent),
("transaction_count", server_info.transaction_count),
("query_count", server_info.query_count),
("error_count", server_info.error_count),
("idle_count", server_info.idle_count),
("active_count", server_info.active_count),
("login_count", server_info.login_count),
("tested_count", server_info.tested_count),
("is_banned", if pool.is_banned(address) { 1 } else { 0 }),
("is_paused", if pool.paused() { 1 } else { 0 }),
];
for (key, value) in metrics {
if let Some(prometheus_metric) =

View File

@@ -1399,6 +1399,19 @@ mod test {
assert!(!qr.query_parser_enabled());
}
#[test]
fn test_query_parser() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.query_parser_read_write_splitting = true;
let query = simple_query("SELECT req_tab_0.* FROM validation req_tab_0 WHERE array['http://www.w3.org/ns/shacl#ValidationResult'] && req_tab_0.type::text[] AND ( ( (req_tab_0.focusnode = 'DataSource_Credilogic_DataSourceAddress_144959227') ) )");
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
let query = simple_query("WITH EmployeeSalaries AS (SELECT Department, Salary FROM Employees) SELECT Department, AVG(Salary) AS AverageSalary FROM EmployeeSalaries GROUP BY Department;");
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
}
#[test]
fn test_update_from_pool_settings() {
QueryRouter::setup();

682
tests/rust/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -15,13 +15,11 @@ async fn test_prepared_statements() {
for _ in 0..5 {
let pool = pool.clone();
let handle = tokio::task::spawn(async move {
for _ in 0..1000 {
match sqlx::query("SELECT one").fetch_all(&pool).await {
for i in 0..1000 {
match sqlx::query(&format!("SELECT {:?}", i % 5)).fetch_all(&pool).await {
Ok(_) => (),
Err(err) => {
if err.to_string().contains("prepared statement") {
panic!("prepared statement error: {}", err);
}
panic!("prepared statement error: {}", err);
}
}
}