Compare commits

..

4 Commits

Author SHA1 Message Date
Mostafa
d42f138b88 Rename a couple of variables 2024-10-23 06:22:46 -05:00
Javier Goday
186e72298f #829: read/write splitting on CTE mutable statements (#835) 2024-10-23 06:20:04 -05:00
Sebastian Serth
3935366d86 End Prometheus stats with a new line separator (#826)
End prometheus stats with a new line separator

According to the [OpenMetrics specification](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#overall-structure), each line MUST end with `\n`. Previously, the last line was not ending with `\n`, so that strict parsers had issues reading the Prometheus stats.
2024-09-22 17:14:04 -05:00
Sean McGivern
b575935b1d Improve documentation for connect_timeout and add min_pool_size (#822)
Currently, `connect_timeout` sounds like it should be for connections to
the Postgres server. It's actually used for obtaining a connection from
the pool.
2024-09-18 06:56:17 -05:00
3 changed files with 57 additions and 5 deletions

View File

@@ -36,10 +36,11 @@ Port at which prometheus exporter listens on.
### connect_timeout
```
path: general.connect_timeout
default: 5000 # milliseconds
default: 1000 # milliseconds
```
How long to wait before aborting a server connection (ms).
How long the client waits to obtain a server connection before aborting (ms).
This is similar to PgBouncer's `query_wait_timeout`.
### idle_timeout
```
@@ -462,10 +463,18 @@ path: pools.<pool_name>.users.<user_index>.pool_size
default: 9
```
Maximum number of server connections that can be established for this user
Maximum number of server connections that can be established for this user.
The maximum number of connection from a single Pgcat process to any database in the cluster
is the sum of pool_size across all users.
### min_pool_size
```
path: pools.<pool_name>.users.<user_index>.min_pool_size
default: 0
```
Minimum number of idle server connections to retain for this pool.
### statement_timeout
```
path: pools.<pool_name>.users.<user_index>.statement_timeout
@@ -475,6 +484,16 @@ default: 0
Maximum query duration. Dangerous, but protects against DBs that died in a non-obvious way.
0 means it is disabled.
### connect_timeout
```
path: pools.<pool_name>.users.<user_index>.connect_timeout
default: <UNSET> # milliseconds
```
How long the client waits to obtain a server connection before aborting (ms).
This is similar to PgBouncer's `query_wait_timeout`.
If unset, uses the `connect_timeout` defined globally.
## `pools.<pool_name>.shards.<shard_index>` Section
### servers
@@ -502,4 +521,3 @@ default: "shard0"
```
Database name (e.g. "postgres")

View File

@@ -309,6 +309,7 @@ async fn prometheus_stats(
push_pool_stats(&mut lines);
push_server_stats(&mut lines);
push_database_stats(&mut lines);
lines.push("".to_string()); // Ensure to end the stats with a line terminator as required by the specification.
Response::builder()
.header("content-type", "text/plain; version=0.0.4")

View File

@@ -386,6 +386,18 @@ impl QueryRouter {
}
}
/// Determines if a query is a mutation or not.
fn is_mutation_query(q: &sqlparser::ast::Query) -> bool {
use sqlparser::ast::*;
match q.body.as_ref() {
SetExpr::Insert(_) => true,
SetExpr::Update(_) => true,
SetExpr::Query(q) => Self::is_mutation_query(q),
_ => false,
}
}
/// Try to infer which server to connect to based on the contents of the query.
pub fn infer(&mut self, ast: &Vec<sqlparser::ast::Statement>) -> Result<(), Error> {
if !self.pool_settings.query_parser_read_write_splitting {
@@ -428,8 +440,9 @@ impl QueryRouter {
};
let has_locks = !query.locks.is_empty();
let has_mutation = Self::is_mutation_query(query);
if has_locks {
if has_locks || has_mutation {
self.active_role = Some(Role::Primary);
} else if !visited_write_statement {
// If we already visited a write statement, we should be going to the primary.
@@ -1113,6 +1126,26 @@ mod test {
assert_eq!(qr.role(), None);
}
#[test]
fn test_split_cte_queries() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.query_parser_read_write_splitting = true;
qr.pool_settings.query_parser_enabled = true;
let query = simple_query(
"WITH t AS (
SELECT id FROM users WHERE name ILIKE '%ja%'
)
UPDATE user_languages
SET settings = '{}'
FROM t WHERE t.id = user_id;",
);
let ast = qr.parse(&query).unwrap();
assert!(qr.infer(&ast).is_ok());
assert_eq!(qr.role(), Some(Role::Primary));
}
#[test]
fn test_infer_replica() {
QueryRouter::setup();