Compare commits

...

17 Commits

Author SHA1 Message Date
Mostafa Abdelraouf
c5257634c3 more lint changes 2024-08-30 08:04:51 -05:00
Mostafa
056f7e2908 fix lint issues 2024-08-30 06:35:36 -05:00
Mostafa
250281cf80 make linter happy 2024-08-30 05:59:36 -05:00
Mostafa
ddcee8a303 Cut 1.2.0 release 2024-08-30 05:57:17 -05:00
Mostafa Abdelraouf
29a476e190 QueryRouter: route to primary when locks exists (select for update) (#782)
Authored-by: Javier Goday <jgoday@gmail.com>
2024-08-30 04:26:36 -05:00
KwongTN
81933b918d Add linux/arm64 docker image build support (#774) 2024-08-29 13:50:38 -05:00
Saraj Munjal
7cbc9178d8 Bump the hyper crate to v1.4.1 and rework prometheus server handling (#778)
Bump hyper to v1.4.1 and rework prometheus server handling
2024-08-29 09:47:58 -05:00
Mostafa Abdelraouf
2c8b2f0776 Fix CI image build step (#780)
The docker CI build image is failing due to this error

249.5     Finished release [optimized] target(s) in 2m 49s
249.5   Installing /home/circleci/.cargo/bin/rustfilt
249.5    Installed package `rustfilt v0.2.1` (executable `rustfilt`)
249.5 error: failed to compile `cargo-binutils v0.3.6`, intermediate artifacts can be found at `/tmp/cargo-installrWENQG`
249.5 
249.5 Caused by:
249.5   package `cargo-platform v0.1.8` cannot be built because it requires rustc 1.73 or newer, while the currently active rustc version is 1.67.1
249.5   Try re-running cargo install with `--locked`
249.5      Summary Successfully installed rustfilt! Failed to install cargo-binutils (see error(s) above).
249.5 error: some crates failed to install

So I am bumping the version up
2024-08-29 08:37:13 -05:00
Mostafa Abdelraouf
8f9a2b8e6f Fix a Panic in admin commands (#779)
We have a panic when we send SHOW or ;;;;;;;;;;;;;;;;; to admin database.

This PR fixes these panics and adds a couple of tests
2024-08-28 21:29:40 -05:00
brandonpike
cbf4d58144 Fix lint warnings for rust-1.79 (#769)
2 things that are recommended by rust-lang - implementing `std::fmt::Display` rather than ToString (1) and using clone_from (2).

[1] https://rust-lang.github.io/rust-clippy/master/index.html#/to_string_trait_impl
[2] https://rust-lang.github.io/rust-clippy/master/index.html#assigning_clones

Signed-off-by: Brandon Pike <pikebrandon@att.net>
2024-07-15 20:30:26 -07:00
Олег Дулецкий
731aa047ba Add ExecReload option to pgcat.service for configuration reloads (#760) 2024-06-24 08:57:58 -07:00
Adrian Garcia Badaracco
88dbcc21d1 update rust version in docker image (#762) 2024-06-24 08:51:38 -07:00
Adrian Garcia Badaracco
c34b15bddc Add STOPSIGNAL to Dockerfile (#758) 2024-06-20 23:23:41 -07:00
Andrey Stikheev
0b034a6831 Add TCP_NODELAY option to improve performance for large response queries (#749)
This commit adds the TCP_NODELAY option to the socket configuration in
`configure_socket` function. Without this option, we observed significant
performance issues when executing SELECT queries with large responses.

Before the fix:
postgres=> SELECT repeat('a', 1); SELECT repeat('a', 8153);
Time: 1.368 ms
Time: 41.364 ms

After the fix:
postgres=> SELECT repeat('a', 1); SELECT repeat('a', 8153);
Time: 1.332 ms
Time: 1.528 ms

By setting TCP_NODELAY, we eliminate the Nagle's algorithm delay, which
results in a substantial improvement in response times for large queries.

This problem was discussed in https://github.com/postgresml/pgcat/issues/616.
2024-05-26 14:47:21 -07:00
Mostafa Abdelraouf
966b8e093c Report checkout error when all servers are down (#736)
We shouldn't report checkout_success when we are going to return Error.
2024-05-08 12:18:27 -05:00
Horacio
c9270a47d4 Use rust:bullseye as base image (#725)
Use rust:bullseye base image

With the original rust:1.70-bullseye image, the container cannot be
built:

17.06   Installing /usr/local/cargo/bin/rustfilt
17.06    Installed package `rustfilt v0.2.1` (executable `rustfilt`)
17.06 error: failed to compile `cargo-binutils v0.3.6`, intermediate artifacts can be found at `/tmp/cargo-installrc6mPb`
17.06
17.06 Caused by:
17.06   package `cargo-platform v0.1.8` cannot be built because it requires rustc 1.73 or newer, while the currently active rustc version is 1.70.0
17.06   Try re-running cargo install with `--locked`
17.06      Summary Successfully installed rustfilt! Failed to install cargo-binutils (see error(s) above).
17.06 error: some crates failed to install

This is the same base image used on tests/docker/Dockerfile
2024-04-19 09:12:57 -07:00
Toby Hede
0d94d0b90a Update sqlparser to 0.41 (#666) 2024-04-12 22:12:37 -07:00
17 changed files with 236 additions and 105 deletions

View File

@@ -23,14 +23,17 @@ jobs:
steps:
- name: Checkout Repository
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
uses: docker/setup-buildx-action@v3
- name: Determine tags
id: metadata
uses: docker/metadata-action@v4
uses: docker/metadata-action@v5
with:
images: ${{ env.registry }}/${{ env.image-name }}
tags: |
@@ -42,15 +45,18 @@ jobs:
type=raw,value=latest,enable={{ is_default_branch }}
- name: Log in to the Container registry
uses: docker/login-action@v2.1.0
uses: docker/login-action@v3
with:
registry: ${{ env.registry }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push ${{ env.image-name }}
uses: docker/build-push-action@v3
uses: docker/build-push-action@v6
with:
context: .
platforms: linux/amd64,linux/arm64
provenance: false
push: true
tags: ${{ steps.metadata.outputs.tags }}
labels: ${{ steps.metadata.outputs.labels }}

104
Cargo.lock generated
View File

@@ -146,6 +146,12 @@ dependencies = [
"syn 2.0.26",
]
[[package]]
name = "atomic-waker"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "atomic_enum"
version = "0.2.0"
@@ -542,29 +548,23 @@ checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e"
[[package]]
name = "h2"
version = "0.3.20"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97ec8491ebaf99c8eaa73058b045fe58073cd6be7f596ac993ced0b0a0c01049"
checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205"
dependencies = [
"atomic-waker",
"bytes",
"fnv",
"futures-core",
"futures-sink",
"futures-util",
"http",
"indexmap 1.9.3",
"indexmap",
"slab",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "hashbrown"
version = "0.14.0"
@@ -609,9 +609,9 @@ dependencies = [
[[package]]
name = "http"
version = "0.2.9"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482"
checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258"
dependencies = [
"bytes",
"fnv",
@@ -620,12 +620,24 @@ dependencies = [
[[package]]
name = "http-body"
version = "0.4.5"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184"
dependencies = [
"bytes",
"http",
]
[[package]]
name = "http-body-util"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f"
dependencies = [
"bytes",
"futures-util",
"http",
"http-body",
"pin-project-lite",
]
@@ -643,13 +655,12 @@ checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]]
name = "hyper"
version = "0.14.27"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468"
checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05"
dependencies = [
"bytes",
"futures-channel",
"futures-core",
"futures-util",
"h2",
"http",
@@ -658,13 +669,26 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"socket2 0.4.9",
"smallvec",
"tokio",
"tower-service",
"tracing",
"want",
]
[[package]]
name = "hyper-util"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9"
dependencies = [
"bytes",
"futures-util",
"http",
"http-body",
"hyper",
"pin-project-lite",
"tokio",
]
[[package]]
name = "iana-time-zone"
version = "0.1.57"
@@ -709,16 +733,6 @@ dependencies = [
"unicode-normalization",
]
[[package]]
name = "indexmap"
version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
dependencies = [
"autocfg",
"hashbrown 0.12.3",
]
[[package]]
name = "indexmap"
version = "2.0.0"
@@ -726,7 +740,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d"
dependencies = [
"equivalent",
"hashbrown 0.14.0",
"hashbrown",
]
[[package]]
@@ -848,7 +862,7 @@ version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1efa59af2ddfad1854ae27d75009d538d0998b4b2fd47083e743ac1a10e46c60"
dependencies = [
"hashbrown 0.14.0",
"hashbrown",
]
[[package]]
@@ -1020,7 +1034,7 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
[[package]]
name = "pgcat"
version = "1.1.2-dev4"
version = "1.2.0"
dependencies = [
"arc-swap",
"async-trait",
@@ -1034,7 +1048,9 @@ dependencies = [
"fallible-iterator",
"futures",
"hmac",
"http-body-util",
"hyper",
"hyper-util",
"itertools",
"jemallocator",
"log",
@@ -1478,9 +1494,9 @@ dependencies = [
[[package]]
name = "smallvec"
version = "1.11.0"
version = "1.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9"
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
[[package]]
name = "socket2"
@@ -1510,9 +1526,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "sqlparser"
version = "0.34.0"
version = "0.41.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37d3706eefb17039056234df6b566b0014f303f867f2656108334a55b8096f59"
checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964"
dependencies = [
"log",
"sqlparser_derive",
@@ -1520,13 +1536,13 @@ dependencies = [
[[package]]
name = "sqlparser_derive"
version = "0.1.1"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e"
checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
"syn 2.0.26",
]
[[package]]
@@ -1741,19 +1757,13 @@ version = "0.19.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a"
dependencies = [
"indexmap 2.0.0",
"indexmap",
"serde",
"serde_spanned",
"toml_datetime",
"winnow",
]
[[package]]
name = "tower-service"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
[[package]]
name = "tracing"
version = "0.1.37"

View File

@@ -1,6 +1,6 @@
[package]
name = "pgcat"
version = "1.1.2-dev4"
version = "1.2.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -19,7 +19,7 @@ serde_derive = "1"
regex = "1"
num_cpus = "1"
once_cell = "1"
sqlparser = {version = "0.34", features = ["visitor"] }
sqlparser = { version = "0.41", features = ["visitor"] }
log = "0.4"
arc-swap = "1"
parking_lot = "0.12.1"
@@ -29,7 +29,9 @@ base64 = "0.21"
stringprep = "0.1"
tokio-rustls = "0.24"
rustls-pemfile = "1"
hyper = { version = "0.14", features = ["full"] }
http-body-util = "0.1.2"
hyper = { version = "1.4.1", features = ["full"] }
hyper-util = { version = "0.1.7", features = ["tokio"] }
phf = { version = "0.11.1", features = ["macros"] }
exitcode = "1.1.2"
futures = "0.3"
@@ -47,9 +49,12 @@ serde_json = "1"
itertools = "0.10"
clap = { version = "4.3.1", features = ["derive", "env"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter", "std"]}
tracing-subscriber = { version = "0.3.17", features = [
"json",
"env-filter",
"std",
] }
lru = "0.12.0"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5.0"

View File

@@ -1,4 +1,4 @@
FROM rust:1-slim-bookworm AS builder
FROM rust:1.79.0-slim-bookworm AS builder
RUN apt-get update && \
apt-get install -y build-essential
@@ -19,3 +19,4 @@ COPY --from=builder /app/pgcat.toml /etc/pgcat/pgcat.toml
WORKDIR /etc/pgcat
ENV RUST_LOG=info
CMD ["pgcat"]
STOPSIGNAL SIGINT

View File

@@ -1,4 +1,4 @@
FROM cimg/rust:1.67.1
FROM cimg/rust:1.79.0
COPY --from=sclevine/yj /bin/yj /bin/yj
RUN /bin/yj -h
RUN sudo apt-get update && \

View File

@@ -4,5 +4,5 @@ description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBounce
maintainers:
- name: Wildcard
email: support@w6d.io
appVersion: "1.1.1"
version: 0.1.0
appVersion: "1.2.0"
version: 0.2.0

View File

@@ -170,13 +170,13 @@ configuration:
connect_timeout: 5000
# How long an idle connection with a server is left open (ms).
idle_timeout: 30000 # milliseconds
idle_timeout: 30000 # milliseconds
# Max connection lifetime before it's closed, even if actively used.
server_lifetime: 86400000 # 24 hours
server_lifetime: 86400000 # 24 hours
# How long a client is allowed to be idle while in a transaction (ms).
idle_client_in_transaction_timeout: 0 # milliseconds
idle_client_in_transaction_timeout: 0 # milliseconds
# @param configuration.general.healthcheck_timeout How much time to give `SELECT 1` health check query to return with a result (ms).
healthcheck_timeout: 1000
@@ -240,7 +240,15 @@ configuration:
## the pool_name is what clients use as database name when connecting
## For the example below a client can connect using "postgres://sharding_user:sharding_user@pgcat_host:pgcat_port/sharded"
## @param [object]
pools: []
pools:
[{
name: "simple", pool_mode: "transaction",
users: [{username: "user", password: "pass", pool_size: 5, statement_timeout: 0}],
shards: [{
servers: [{host: "postgres", port: 5432, role: "primary"}],
database: "postgres"
}]
}]
# - ## default values
# ##
# ##

View File

@@ -1,4 +1,4 @@
FROM rust:1.70-bullseye
FROM rust:bullseye
# Dependencies
COPY --from=sclevine/yj /bin/yj /bin/yj

View File

@@ -11,6 +11,7 @@ RestartSec=1
Environment=RUST_LOG=info
LimitNOFILE=65536
ExecStart=/usr/bin/pgcat /etc/pgcat.toml
ExecReload=/bin/kill -SIGHUP $MAINPID
[Install]
WantedBy=multi-user.target

View File

@@ -55,7 +55,12 @@ where
let query_parts: Vec<&str> = query.trim_end_matches(';').split_whitespace().collect();
match query_parts[0].to_ascii_uppercase().as_str() {
match query_parts
.first()
.unwrap_or(&"")
.to_ascii_uppercase()
.as_str()
{
"BAN" => {
trace!("BAN");
ban(stream, query_parts).await
@@ -84,7 +89,12 @@ where
trace!("SHUTDOWN");
shutdown(stream).await
}
"SHOW" => match query_parts[1].to_ascii_uppercase().as_str() {
"SHOW" => match query_parts
.get(1)
.unwrap_or(&"")
.to_ascii_uppercase()
.as_str()
{
"HELP" => {
trace!("SHOW HELP");
show_help(stream).await

View File

@@ -38,12 +38,12 @@ pub enum Role {
Mirror,
}
impl ToString for Role {
fn to_string(&self) -> String {
match *self {
Role::Primary => "primary".to_string(),
Role::Replica => "replica".to_string(),
Role::Mirror => "mirror".to_string(),
impl std::fmt::Display for Role {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Role::Primary => write!(f, "primary"),
Role::Replica => write!(f, "replica"),
Role::Mirror => write!(f, "mirror"),
}
}
}
@@ -476,11 +476,11 @@ pub enum PoolMode {
Session,
}
impl ToString for PoolMode {
fn to_string(&self) -> String {
match *self {
PoolMode::Transaction => "transaction".to_string(),
PoolMode::Session => "session".to_string(),
impl std::fmt::Display for PoolMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PoolMode::Transaction => write!(f, "transaction"),
PoolMode::Session => write!(f, "session"),
}
}
}
@@ -493,12 +493,13 @@ pub enum LoadBalancingMode {
#[serde(alias = "loc", alias = "LOC", alias = "least_outstanding_connections")]
LeastOutstandingConnections,
}
impl ToString for LoadBalancingMode {
fn to_string(&self) -> String {
match *self {
LoadBalancingMode::Random => "random".to_string(),
impl std::fmt::Display for LoadBalancingMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
LoadBalancingMode::Random => write!(f, "random"),
LoadBalancingMode::LeastOutstandingConnections => {
"least_outstanding_connections".to_string()
write!(f, "least_outstanding_connections")
}
}
}
@@ -999,15 +1000,17 @@ impl Config {
pub fn fill_up_auth_query_config(&mut self) {
for (_name, pool) in self.pools.iter_mut() {
if pool.auth_query.is_none() {
pool.auth_query = self.general.auth_query.clone();
pool.auth_query.clone_from(&self.general.auth_query);
}
if pool.auth_query_user.is_none() {
pool.auth_query_user = self.general.auth_query_user.clone();
pool.auth_query_user
.clone_from(&self.general.auth_query_user);
}
if pool.auth_query_password.is_none() {
pool.auth_query_password = self.general.auth_query_password.clone();
pool.auth_query_password
.clone_from(&self.general.auth_query_password);
}
}
}
@@ -1155,7 +1158,7 @@ impl Config {
"Default max server lifetime: {}ms",
self.general.server_lifetime
);
info!("Sever round robin: {}", self.general.server_round_robin);
info!("Server round robin: {}", self.general.server_round_robin);
match self.general.tls_certificate.clone() {
Some(tls_certificate) => {
info!("TLS certificate: {}", tls_certificate);

View File

@@ -733,6 +733,10 @@ pub fn configure_socket(stream: &TcpStream) {
}
Err(err) => error!("Could not configure socket: {}", err),
}
match sock_ref.set_nodelay(true) {
Ok(_) => (),
Err(err) => error!("Could not configure TCP_NODELAY for socket: {}", err),
}
}
pub trait BytesMutReader {

View File

@@ -813,7 +813,7 @@ impl ConnectionPool {
}
}
client_stats.checkout_success();
client_stats.checkout_error();
Err(Error::AllServersDown)
}

View File

@@ -1,5 +1,11 @@
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use http_body_util::Full;
use hyper::body;
use hyper::body::Bytes;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Method, Request, Response, StatusCode};
use hyper_util::rt::TokioIo;
use log::{debug, error, info};
use phf::phf_map;
use std::collections::HashMap;
@@ -7,6 +13,7 @@ use std::fmt;
use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::net::TcpListener;
use crate::config::Address;
use crate::pool::{get_all_pools, PoolIdentifier};
@@ -243,7 +250,9 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
}
}
async fn prometheus_stats(request: Request<Body>) -> Result<Response<Body>, hyper::http::Error> {
async fn prometheus_stats(
request: Request<body::Incoming>,
) -> Result<Response<Full<Bytes>>, hyper::http::Error> {
match (request.method(), request.uri().path()) {
(&Method::GET, "/metrics") => {
let mut lines = Vec::new();
@@ -374,14 +383,35 @@ fn push_server_stats(lines: &mut Vec<String>) {
}
pub async fn start_metric_server(http_addr: SocketAddr) {
let http_service_factory =
make_service_fn(|_conn| async { Ok::<_, hyper::Error>(service_fn(prometheus_stats)) });
let server = Server::bind(&http_addr).serve(http_service_factory);
let listener = TcpListener::bind(http_addr);
let listener = match listener.await {
Ok(listener) => listener,
Err(e) => {
error!("Failed to bind prometheus server to HTTP address: {}.", e);
return;
}
};
info!(
"Exposing prometheus metrics on http://{}/metrics.",
http_addr
);
if let Err(e) = server.await {
error!("Failed to run HTTP server: {}.", e);
loop {
let stream = match listener.accept().await {
Ok((stream, _)) => stream,
Err(e) => {
error!("Error accepting connection: {}", e);
continue;
}
};
let io = TokioIo::new(stream);
tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
.serve_connection(io, service_fn(prometheus_stats))
.await
{
eprintln!("Error serving HTTP connection for metrics: {:?}", err);
}
});
}
}

View File

@@ -427,8 +427,12 @@ impl QueryRouter {
None => (),
};
// If we already visited a write statement, we should be going to the primary.
if !visited_write_statement {
let has_locks = !query.locks.is_empty();
if has_locks {
self.active_role = Some(Role::Primary);
} else if !visited_write_statement {
// If we already visited a write statement, we should be going to the primary.
self.active_role = match self.primary_reads_enabled() {
false => Some(Role::Replica), // If primary should not be receiving reads, use a replica.
true => None, // Any server role is fine in this case.
@@ -499,6 +503,7 @@ impl QueryRouter {
table: _,
on: _,
returning: _,
ignore: _,
} => {
// Not supported in postgres.
assert!(or.is_none());
@@ -506,7 +511,9 @@ impl QueryRouter {
assert!(after_columns.is_empty());
Self::process_table(table_name, &mut table_names);
Self::process_query(source, &mut exprs, &mut table_names, &Some(columns));
if let Some(source) = source {
Self::process_query(source, &mut exprs, &mut table_names, &Some(columns));
}
}
Delete {
tables,
@@ -514,6 +521,8 @@ impl QueryRouter {
using,
selection,
returning: _,
order_by: _,
limit: _,
} => {
if let Some(expr) = selection {
exprs.push(expr.clone());
@@ -1153,6 +1162,29 @@ mod test {
}
}
#[test]
fn test_select_for_update() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.query_parser_read_write_splitting = true;
let queries_in_primary_role = vec![
simple_query("BEGIN"), // Transaction start
simple_query("SELECT * FROM items WHERE id = 5 FOR UPDATE"),
simple_query("UPDATE items SET name = 'pumpkin' WHERE id = 5"),
];
for query in queries_in_primary_role {
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
assert_eq!(qr.role(), Some(Role::Primary));
}
// query without lock do not change role
let query = simple_query("SELECT * FROM items WHERE id = 5");
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
assert_eq!(qr.role(), None);
}
#[test]
fn test_infer_primary_reads_enabled() {
QueryRouter::setup();

View File

@@ -14,11 +14,11 @@ pub enum ShardingFunction {
Sha1,
}
impl ToString for ShardingFunction {
fn to_string(&self) -> String {
match *self {
ShardingFunction::PgBigintHash => "pg_bigint_hash".to_string(),
ShardingFunction::Sha1 => "sha1".to_string(),
impl std::fmt::Display for ShardingFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ShardingFunction::PgBigintHash => write!(f, "pg_bigint_hash"),
ShardingFunction::Sha1 => write!(f, "sha1"),
}
}
}

View File

@@ -91,6 +91,27 @@ describe "Admin" do
end
end
[
"SHOW ME THE MONEY",
"SHOW ME THE WAY",
"SHOW UP",
"SHOWTIME",
"HAMMER TIME",
"SHOWN TO BE TRUE",
"SHOW ",
"SHOW ",
"SHOW 1",
";;;;;"
].each do |cmd|
describe "Bad command #{cmd}" do
it "does not panic and responds with PG::SystemError" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
expect { admin_conn.async_exec(cmd) }.to raise_error(PG::SystemError).with_message(/Unsupported/)
admin_conn.close
end
end
end
describe "PAUSE" do
it "pauses all pools" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)