Compare commits

..

11 Commits

Author SHA1 Message Date
Kevin Zimmerman
dc649aaee3 simplified write!; generic new functions 2023-07-26 19:53:32 -05:00
Kevin Zimmerman
b4ba3b378c fix formatting 2023-07-26 09:47:58 -05:00
Kevin Zimmerman
81536a0bad make AuthPassthrough generic 2023-07-26 09:44:31 -05:00
Kevin Zimmerman
6eb01e51a0 remove async/spawn in Collector::collect 2023-07-25 19:56:15 -05:00
Kevin Zimmerman
ae3241b634 use Result::map_err and ? in Tls::new 2023-07-25 19:44:04 -05:00
Kevin Zimmerman
33724ea670 simplify TableAccess::run 2023-07-25 19:34:37 -05:00
Kevin Zimmerman
1c26aa3547 simplify format! 2023-07-25 19:24:52 -05:00
Kevin Zimmerman
64eb417125 remove unnecessary allocation 2023-07-25 19:24:04 -05:00
Kevin Zimmerman
22d9d3c90a fix query_logger info! argument order 2023-07-25 15:59:01 -05:00
Kevin Zimmerman
3162d550fd simplify format_duration, reduce String allocs 2023-07-25 15:54:48 -05:00
Kevin Zimmerman
12522562ce fix clippy lints 2023-07-25 15:49:46 -05:00
32 changed files with 297 additions and 477 deletions

View File

@@ -34,7 +34,6 @@ jobs:
tags: |
type=sha,prefix=,format=long
type=schedule
type=ref,event=tag
type=ref,event=branch
type=ref,event=pr
type=raw,value=latest,enable={{ is_default_branch }}

View File

@@ -1,48 +0,0 @@
name: pgcat package (deb)
on:
workflow_dispatch:
inputs:
packageVersion:
default: "1.1.1"
jobs:
build:
strategy:
max-parallel: 1
fail-fast: false # Let the other job finish, or they can lock each other out
matrix:
os: ["buildjet-4vcpu-ubuntu-2204", "buildjet-4vcpu-ubuntu-2204-arm"]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v3
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
- name: Install dependencies
env:
DEBIAN_FRONTEND: noninteractive
TZ: Etc/UTC
run: |
curl -sLO https://github.com/deb-s3/deb-s3/releases/download/0.11.4/deb-s3-0.11.4.gem
sudo gem install deb-s3-0.11.4.gem
dpkg-deb --version
- name: Build and release package
env:
AWS_ACCESS_KEY_ID: ${{ vars.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AWS_DEFAULT_REGION: ${{ vars.AWS_DEFAULT_REGION }}
run: |
if [[ $(arch) == "x86_64" ]]; then
export ARCH=amd64
else
export ARCH=arm64
fi
bash utilities/deb.sh ${{ inputs.packageVersion }}
deb-s3 upload \
--lock \
--bucket apt.postgresml.org \
pgcat-${{ inputs.packageVersion }}-ubuntu22.04-${ARCH}.deb \
--codename $(lsb_release -cs)

View File

@@ -230,7 +230,7 @@ default: "random"
Load balancing mode
`random` selects the server at random
`loc` selects the server with the least outstanding busy connections
`loc` selects the server with the least outstanding busy conncetions
### default_role
```

74
Cargo.lock generated
View File

@@ -353,6 +353,19 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "env_logger"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85cdab6a89accf66733ad5a1693a4dcced6aeff64602b634530dd73c1f3ee9f0"
dependencies = [
"humantime",
"is-terminal",
"log",
"regex",
"termcolor",
]
[[package]]
name = "equivalent"
version = "1.0.1"
@@ -620,6 +633,12 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
version = "0.14.27"
@@ -836,15 +855,6 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata 0.1.10",
]
[[package]]
name = "matches"
version = "0.1.10"
@@ -990,7 +1000,7 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
[[package]]
name = "pgcat"
version = "1.1.1"
version = "1.1.0"
dependencies = [
"arc-swap",
"async-trait",
@@ -1000,6 +1010,7 @@ dependencies = [
"bytes",
"chrono",
"clap",
"env_logger",
"exitcode",
"fallible-iterator",
"futures",
@@ -1207,17 +1218,8 @@ checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata 0.3.3",
"regex-syntax 0.7.4",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax 0.6.29",
"regex-automata",
"regex-syntax",
]
[[package]]
@@ -1228,15 +1230,9 @@ checksum = "39354c10dd07468c2e73926b23bb9c2caca74c5501e38a35da70406f1d923310"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax 0.7.4",
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.7.4"
@@ -1548,6 +1544,15 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "termcolor"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6"
dependencies = [
"winapi-util",
]
[[package]]
name = "thiserror"
version = "1.0.43"
@@ -1783,16 +1788,12 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"serde",
"serde_json",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
"tracing-serde",
@@ -2027,6 +2028,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"

View File

@@ -1,6 +1,6 @@
[package]
name = "pgcat"
version = "1.1.1"
version = "1.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -22,6 +22,7 @@ once_cell = "1"
sqlparser = {version = "0.34", features = ["visitor"] }
log = "0.4"
arc-swap = "1"
env_logger = "0.10"
parking_lot = "0.12.1"
hmac = "0.12"
sha2 = "0.10"
@@ -47,7 +48,7 @@ serde_json = "1"
itertools = "0.10"
clap = { version = "4.3.1", features = ["derive", "env"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter", "std"]}
tracing-subscriber = { version = "0.3.17", features = ["json"]}
[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5.0"

View File

@@ -1,9 +0,0 @@
Package: pgcat
Version: ${PACKAGE_VERSION}
Section: database
Priority: optional
Architecture: ${ARCH}
Maintainer: PostgresML <team@postgresml.org>
Homepage: https://postgresml.org
Description: PgCat - NextGen PostgreSQL Pooler
PostgreSQL pooler and proxy (like PgBouncer) with support for sharding, load balancing, failover and mirroring.

View File

@@ -270,7 +270,7 @@ username = "sharding_user"
# if `server_password` is not set.
password = "sharding_user"
pool_mode = "transaction"
pool_mode = "session"
# PostgreSQL username used to connect to the server.
# server_username = "another_user"

View File

@@ -800,7 +800,7 @@ async fn pause<T>(stream: &mut T, query: &str) -> Result<(), Error>
where
T: tokio::io::AsyncWrite + std::marker::Unpin,
{
let parts: Vec<&str> = query.split(",").map(|part| part.trim()).collect();
let parts: Vec<&str> = query.split(',').map(|part| part.trim()).collect();
if parts.len() != 2 {
error_response(
@@ -847,7 +847,7 @@ async fn resume<T>(stream: &mut T, query: &str) -> Result<(), Error>
where
T: tokio::io::AsyncWrite + std::marker::Unpin,
{
let parts: Vec<&str> = query.split(",").map(|part| part.trim()).collect();
let parts: Vec<&str> = query.split(',').map(|part| part.trim()).collect();
if parts.len() != 2 {
error_response(

View File

@@ -12,7 +12,7 @@ pub struct AuthPassthrough {
impl AuthPassthrough {
/// Initializes an AuthPassthrough.
pub fn new(query: &str, user: &str, password: &str) -> Self {
pub fn new<S: ToString>(query: S, user: S, password: S) -> Self {
AuthPassthrough {
password: password.to_string(),
query: query.to_string(),

View File

@@ -123,7 +123,7 @@ pub async fn client_entrypoint(
// Client requested a TLS connection.
Ok((ClientConnectionType::Tls, _)) => {
// TLS settings are configured, will setup TLS now.
if tls_certificate != None {
if tls_certificate.is_some() {
debug!("Accepting TLS request");
let mut yes = BytesMut::new();
@@ -431,7 +431,7 @@ where
None => "pgcat",
};
let client_identifier = ClientIdentifier::new(&application_name, &username, &pool_name);
let client_identifier = ClientIdentifier::new(application_name, username, pool_name);
let admin = ["pgcat", "pgbouncer"]
.iter()
@@ -930,16 +930,12 @@ where
}
// Check on plugin results.
match plugin_output {
Some(PluginOutput::Deny(error)) => {
self.buffer.clear();
error_response(&mut self.write, &error).await?;
plugin_output = None;
continue;
}
_ => (),
};
if let Some(PluginOutput::Deny(error)) = plugin_output {
self.buffer.clear();
error_response(&mut self.write, &error).await?;
plugin_output = None;
continue;
}
// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
@@ -1213,7 +1209,7 @@ where
// Safe to unwrap because we know this message has a certain length and has the code
// This reads the first byte without advancing the internal pointer and mutating the bytes
let code = *message.get(0).unwrap() as char;
let code = *message.first().unwrap() as char;
trace!("Message: {}", code);
@@ -1331,14 +1327,11 @@ where
let close: Close = (&message).try_into()?;
if close.is_prepared_statement() && !close.anonymous() {
match self.prepared_statements.get(&close.name) {
Some(parse) => {
server.will_close(&parse.generated_name);
}
if let Some(parse) = self.prepared_statements.get(&close.name) {
server.will_close(&parse.generated_name);
} else {
// A prepared statement slipped through? Not impossible, since we don't support PREPARE yet.
None => (),
};
}
}
}
@@ -1376,7 +1369,7 @@ where
self.buffer.put(&message[..]);
let first_message_code = (*self.buffer.get(0).unwrap_or(&0)) as char;
let first_message_code = (*self.buffer.first().unwrap_or(&0)) as char;
// Almost certainly true
if first_message_code == 'P' && !prepared_statements_enabled {

View File

@@ -25,7 +25,7 @@ pub struct Args {
}
pub fn parse() -> Args {
return Args::parse();
Args::parse()
}
#[derive(ValueEnum, Clone, Debug)]

View File

@@ -217,19 +217,15 @@ impl Default for User {
impl User {
fn validate(&self) -> Result<(), Error> {
match self.min_pool_size {
Some(min_pool_size) => {
if min_pool_size > self.pool_size {
error!(
"min_pool_size of {} cannot be larger than pool_size of {}",
min_pool_size, self.pool_size
);
return Err(Error::BadConfig);
}
if let Some(min_pool_size) = self.min_pool_size {
if min_pool_size > self.pool_size {
error!(
"min_pool_size of {} cannot be larger than pool_size of {}",
min_pool_size, self.pool_size
);
return Err(Error::BadConfig);
}
None => (),
};
}
Ok(())
}
@@ -631,9 +627,9 @@ impl Pool {
Some(key) => {
// No quotes in the key so we don't have to compare quoted
// to unquoted idents.
let key = key.replace("\"", "");
let key = key.replace('\"', "");
if key.split(".").count() != 2 {
if key.split('.').count() != 2 {
error!(
"automatic_sharding_key '{}' must be fully qualified, e.g. t.{}`",
key, key
@@ -646,7 +642,7 @@ impl Pool {
None => None,
};
for (_, user) in &self.users {
for user in self.users.values() {
user.validate()?;
}
@@ -818,8 +814,8 @@ pub struct Query {
impl Query {
pub fn substitute(&mut self, db: &str, user: &str) {
for col in self.result.iter_mut() {
for i in 0..col.len() {
col[i] = col[i].replace("${USER}", user).replace("${DATABASE}", db);
for c in col {
*c = c.replace("${USER}", user).replace("${DATABASE}", db);
}
}
}
@@ -929,8 +925,8 @@ impl From<&Config> for std::collections::HashMap<String, String> {
(
format!("pools.{:?}.users", pool_name),
pool.users
.iter()
.map(|(_username, user)| &user.username)
.values()
.map(|user| &user.username)
.cloned()
.collect::<Vec<String>>()
.join(", "),
@@ -1015,13 +1011,9 @@ impl Config {
Some(tls_certificate) => {
info!("TLS certificate: {}", tls_certificate);
match self.general.tls_private_key.clone() {
Some(tls_private_key) => {
info!("TLS private key: {}", tls_private_key);
info!("TLS support is enabled");
}
None => (),
if let Some(tls_private_key) = self.general.tls_private_key.clone() {
info!("TLS private key: {}", tls_private_key);
info!("TLS support is enabled");
}
}
@@ -1056,8 +1048,8 @@ impl Config {
pool_name,
pool_config
.users
.iter()
.map(|(_, user_cfg)| user_cfg.pool_size)
.values()
.map(|user_cfg| user_cfg.pool_size)
.sum::<u32>()
.to_string()
);
@@ -1214,35 +1206,32 @@ impl Config {
}
// Validate TLS!
match self.general.tls_certificate.clone() {
Some(tls_certificate) => {
match load_certs(Path::new(&tls_certificate)) {
Ok(_) => {
// Cert is okay, but what about the private key?
match self.general.tls_private_key.clone() {
Some(tls_private_key) => match load_keys(Path::new(&tls_private_key)) {
Ok(_) => (),
Err(err) => {
error!("tls_private_key is incorrectly configured: {:?}", err);
return Err(Error::BadConfig);
}
},
None => {
error!("tls_certificate is set, but the tls_private_key is not");
if let Some(tls_certificate) = self.general.tls_certificate.clone() {
match load_certs(Path::new(&tls_certificate)) {
Ok(_) => {
// Cert is okay, but what about the private key?
match self.general.tls_private_key.clone() {
Some(tls_private_key) => match load_keys(Path::new(&tls_private_key)) {
Ok(_) => (),
Err(err) => {
error!("tls_private_key is incorrectly configured: {:?}", err);
return Err(Error::BadConfig);
}
};
}
},
Err(err) => {
error!("tls_certificate is incorrectly configured: {:?}", err);
return Err(Error::BadConfig);
}
None => {
error!("tls_certificate is set, but the tls_private_key is not");
return Err(Error::BadConfig);
}
};
}
Err(err) => {
error!("tls_certificate is incorrectly configured: {:?}", err);
return Err(Error::BadConfig);
}
}
None => (),
};
}
for pool in self.pools.values_mut() {
pool.validate()?;

View File

@@ -37,11 +37,11 @@ pub struct ClientIdentifier {
}
impl ClientIdentifier {
pub fn new(application_name: &str, username: &str, pool_name: &str) -> ClientIdentifier {
pub fn new<S: ToString>(application_name: S, username: S, pool_name: S) -> ClientIdentifier {
ClientIdentifier {
application_name: application_name.into(),
username: username.into(),
pool_name: pool_name.into(),
application_name: application_name.to_string(),
username: username.to_string(),
pool_name: pool_name.to_string(),
}
}
}
@@ -63,10 +63,10 @@ pub struct ServerIdentifier {
}
impl ServerIdentifier {
pub fn new(username: &str, database: &str) -> ServerIdentifier {
pub fn new<S: ToString>(username: S, database: S) -> ServerIdentifier {
ServerIdentifier {
username: username.into(),
database: database.into(),
username: username.to_string(),
database: database.to_string(),
}
}
}
@@ -84,41 +84,36 @@ impl std::fmt::Display for ServerIdentifier {
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match &self {
&Error::ClientSocketError(error, client_identifier) => write!(
f,
"Error reading {} from client {}",
error, client_identifier
),
&Error::ClientGeneralError(error, client_identifier) => {
write!(f, "{} {}", error, client_identifier)
Error::ClientSocketError(error, client_identifier) => {
write!(f, "Error reading {error} from client {client_identifier}",)
}
&Error::ClientAuthImpossible(username) => write!(
Error::ClientGeneralError(error, client_identifier) => {
write!(f, "{error} {client_identifier}")
}
Error::ClientAuthImpossible(username) => write!(
f,
"Client auth not possible, \
no cleartext password set for username: {} \
no cleartext password set for username: {username} \
in config and auth passthrough (query_auth) \
is not set up.",
username
is not set up."
),
&Error::ClientAuthPassthroughError(error, client_identifier) => write!(
Error::ClientAuthPassthroughError(error, client_identifier) => write!(
f,
"No cleartext password set, \
and no auth passthrough could not \
obtain the hash from server for {}, \
the error was: {}",
client_identifier, error
obtain the hash from server for {client_identifier}, \
the error was: {error}",
),
&Error::ServerStartupError(error, server_identifier) => write!(
Error::ServerStartupError(error, server_identifier) => write!(
f,
"Error reading {} on server startup {}",
error, server_identifier,
"Error reading {error} on server startup {server_identifier}",
),
&Error::ServerAuthError(error, server_identifier) => {
write!(f, "{} for {}", error, server_identifier,)
Error::ServerAuthError(error, server_identifier) => {
write!(f, "{error} for {server_identifier}")
}
// The rest can use Debug.
err => write!(f, "{:?}", err),
err => write!(f, "{err:?}"),
}
}
}

View File

@@ -25,18 +25,11 @@ pub mod tls;
///
/// * `duration` - A duration of time
pub fn format_duration(duration: &chrono::Duration) -> String {
let milliseconds = format!("{:0>3}", duration.num_milliseconds() % 1000);
let milliseconds = duration.num_milliseconds() % 1000;
let seconds = duration.num_seconds() % 60;
let minutes = duration.num_minutes() % 60;
let hours = duration.num_hours() % 24;
let days = duration.num_days();
let seconds = format!("{:0>2}", duration.num_seconds() % 60);
let minutes = format!("{:0>2}", duration.num_minutes() % 60);
let hours = format!("{:0>2}", duration.num_hours() % 24);
let days = duration.num_days().to_string();
format!(
"{}d {}:{}:{}.{}",
days, hours, minutes, seconds, milliseconds
)
format!("{days}d {hours:0>2}:{minutes:0>2}:{seconds:0>2}.{milliseconds:0>3}")
}

View File

@@ -1,14 +1,9 @@
use crate::cmd_args::{Args, LogFormat};
use tracing_subscriber;
use tracing_subscriber::EnvFilter;
pub fn init(args: &Args) {
// Iniitalize a default filter, and then override the builtin default "warning" with our
// commandline, (default: "info")
let filter = EnvFilter::from_default_env().add_directive(args.log_level.into());
let trace_sub = tracing_subscriber::fmt()
.with_env_filter(filter)
.with_max_level(args.log_level)
.with_ansi(!args.no_color);
match args.log_format {

View File

@@ -23,6 +23,7 @@ extern crate arc_swap;
extern crate async_trait;
extern crate bb8;
extern crate bytes;
extern crate env_logger;
extern crate exitcode;
extern crate log;
extern crate md5;
@@ -159,10 +160,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}
};
tokio::task::spawn(async move {
let mut stats_collector = Collector::default();
stats_collector.collect().await;
});
Collector::collect();
info!("Config autoreloader: {}", match config.general.autoreload {
Some(interval) => format!("{} ms", interval),

View File

@@ -156,12 +156,10 @@ where
match stream.write_all(&startup).await {
Ok(_) => Ok(()),
Err(err) => {
return Err(Error::SocketError(format!(
"Error writing startup to server socket - Error: {:?}",
err
)))
}
Err(err) => Err(Error::SocketError(format!(
"Error writing startup to server socket - Error: {:?}",
err
))),
}
}
@@ -237,8 +235,8 @@ pub fn md5_hash_password(user: &str, password: &str, salt: &[u8]) -> Vec<u8> {
let mut md5 = Md5::new();
// First pass
md5.update(&password.as_bytes());
md5.update(&user.as_bytes());
md5.update(password.as_bytes());
md5.update(user.as_bytes());
let output = md5.finalize_reset();
@@ -274,7 +272,7 @@ where
{
let password = md5_hash_password(user, password, salt);
let mut message = BytesMut::with_capacity(password.len() as usize + 5);
let mut message = BytesMut::with_capacity(password.len() + 5);
message.put_u8(b'p');
message.put_i32(password.len() as i32 + 4);
@@ -288,7 +286,7 @@ where
S: tokio::io::AsyncWrite + std::marker::Unpin,
{
let password = md5_hash_second_pass(hash, salt);
let mut message = BytesMut::with_capacity(password.len() as usize + 5);
let mut message = BytesMut::with_capacity(password.len() + 5);
message.put_u8(b'p');
message.put_i32(password.len() as i32 + 4);
@@ -509,7 +507,7 @@ pub fn data_row_nullable(row: &Vec<Option<String>>) -> BytesMut {
data_row.put_i32(column.len() as i32);
data_row.put_slice(column);
} else {
data_row.put_i32(-1 as i32);
data_row.put_i32(-1_i32);
}
}
@@ -564,12 +562,10 @@ where
{
match stream.write_all(&buf).await {
Ok(_) => Ok(()),
Err(err) => {
return Err(Error::SocketError(format!(
"Error writing to socket - Error: {:?}",
err
)))
}
Err(err) => Err(Error::SocketError(format!(
"Error writing to socket - Error: {:?}",
err
))),
}
}
@@ -580,12 +576,10 @@ where
{
match stream.write_all(buf).await {
Ok(_) => Ok(()),
Err(err) => {
return Err(Error::SocketError(format!(
"Error writing to socket - Error: {:?}",
err
)))
}
Err(err) => Err(Error::SocketError(format!(
"Error writing to socket - Error: {:?}",
err
))),
}
}
@@ -596,19 +590,15 @@ where
match stream.write_all(buf).await {
Ok(_) => match stream.flush().await {
Ok(_) => Ok(()),
Err(err) => {
return Err(Error::SocketError(format!(
"Error flushing socket - Error: {:?}",
err
)))
}
},
Err(err) => {
return Err(Error::SocketError(format!(
"Error writing to socket - Error: {:?}",
Err(err) => Err(Error::SocketError(format!(
"Error flushing socket - Error: {:?}",
err
)))
}
))),
},
Err(err) => Err(Error::SocketError(format!(
"Error writing to socket - Error: {:?}",
err
))),
}
}
@@ -723,7 +713,7 @@ impl BytesMutReader for Cursor<&BytesMut> {
let mut buf = vec![];
match self.read_until(b'\0', &mut buf) {
Ok(_) => Ok(String::from_utf8_lossy(&buf[..buf.len() - 1]).to_string()),
Err(err) => return Err(Error::ParseBytesError(err.to_string())),
Err(err) => Err(Error::ParseBytesError(err.to_string())),
}
}
}

View File

@@ -142,12 +142,12 @@ impl MirroringManager {
});
Self {
byte_senders: byte_senders,
byte_senders,
disconnect_senders: exit_senders,
}
}
pub fn send(self: &mut Self, bytes: &BytesMut) {
pub fn send(&mut self, bytes: &BytesMut) {
// We want to avoid performing an allocation if we won't be able to send the message
// There is a possibility of a race here where we check the capacity and then the channel is
// closed or the capacity is reduced to 0, but mirroring is best effort anyway
@@ -169,7 +169,7 @@ impl MirroringManager {
});
}
pub fn disconnect(self: &mut Self) {
pub fn disconnect(&mut self) {
self.disconnect_senders
.iter_mut()
.for_each(|sender| match sender.try_send(()) {

View File

@@ -92,7 +92,7 @@ impl<'a> Plugin for Intercept<'a> {
.map(|s| {
let s = s.as_str().to_string();
if s == "" {
if s.is_empty() {
None
} else {
Some(s)

View File

@@ -30,6 +30,7 @@ pub enum PluginOutput {
Intercept(BytesMut),
}
#[allow(clippy::ptr_arg)]
#[async_trait]
pub trait Plugin {
// Run before the query is sent to the server.

View File

@@ -20,7 +20,7 @@ impl<'a> Prewarmer<'a> {
self.server.address(),
query
);
self.server.query(&query).await?;
self.server.query(query).await?;
}
Ok(())

View File

@@ -31,7 +31,7 @@ impl<'a> Plugin for QueryLogger<'a> {
.map(|q| q.to_string())
.collect::<Vec<String>>()
.join("; ");
info!("[pool: {}][user: {}] {}", self.user, self.db, query);
info!("[pool: {}][user: {}] {}", self.db, self.user, query);
Ok(PluginOutput::Allow)
}

View File

@@ -30,27 +30,22 @@ impl<'a> Plugin for TableAccess<'a> {
return Ok(PluginOutput::Allow);
}
let mut found = None;
visit_relations(ast, |relation| {
let control_flow = visit_relations(ast, |relation| {
let relation = relation.to_string();
let parts = relation.split(".").collect::<Vec<&str>>();
let table_name = parts.last().unwrap();
let table_name = relation.split('.').last().unwrap().to_string();
if self.tables.contains(&table_name.to_string()) {
found = Some(table_name.to_string());
ControlFlow::<()>::Break(())
if self.tables.contains(&table_name) {
ControlFlow::Break(table_name)
} else {
ControlFlow::<()>::Continue(())
ControlFlow::Continue(())
}
});
if let Some(found) = found {
debug!("Blocking access to table \"{}\"", found);
if let ControlFlow::Break(found) = control_flow {
debug!("Blocking access to table \"{found}\"");
Ok(PluginOutput::Deny(format!(
"permission for table \"{}\" denied",
found
"permission for table \"{found}\" denied",
)))
} else {
Ok(PluginOutput::Allow)

View File

@@ -229,20 +229,17 @@ impl ConnectionPool {
let old_pool_ref = get_pool(pool_name, &user.username);
let identifier = PoolIdentifier::new(pool_name, &user.username);
match old_pool_ref {
Some(pool) => {
// If the pool hasn't changed, get existing reference and insert it into the new_pools.
// We replace all pools at the end, but if the reference is kept, the pool won't get re-created (bb8).
if pool.config_hash == new_pool_hash_value {
info!(
"[pool: {}][user: {}] has not changed",
pool_name, user.username
);
new_pools.insert(identifier.clone(), pool.clone());
continue;
}
if let Some(pool) = old_pool_ref {
// If the pool hasn't changed, get existing reference and insert it into the new_pools.
// We replace all pools at the end, but if the reference is kept, the pool won't get re-created (bb8).
if pool.config_hash == new_pool_hash_value {
info!(
"[pool: {}][user: {}] has not changed",
pool_name, user.username
);
new_pools.insert(identifier.clone(), pool.clone());
continue;
}
None => (),
}
info!(
@@ -628,7 +625,7 @@ impl ConnectionPool {
let mut force_healthcheck = false;
if self.is_banned(address) {
if self.try_unban(&address).await {
if self.try_unban(address).await {
force_healthcheck = true;
} else {
debug!("Address {:?} is banned", address);
@@ -748,8 +745,8 @@ impl ConnectionPool {
// Don't leave a bad connection in the pool.
server.mark_bad();
self.ban(&address, BanReason::FailedHealthCheck, Some(client_info));
return false;
self.ban(address, BanReason::FailedHealthCheck, Some(client_info));
false
}
/// Ban an address (i.e. replica). It no longer will serve
@@ -861,10 +858,10 @@ impl ConnectionPool {
let guard = self.banlist.read();
for banlist in guard.iter() {
for (address, (reason, timestamp)) in banlist.iter() {
bans.push((address.clone(), (reason.clone(), timestamp.clone())));
bans.push((address.clone(), (reason.clone(), *timestamp)));
}
}
return bans;
bans
}
/// Get the address from the host url
@@ -921,7 +918,7 @@ impl ConnectionPool {
}
let busy = provisioned - idle;
debug!("{:?} has {:?} busy connections", address, busy);
return busy;
busy
}
}

View File

@@ -67,6 +67,7 @@ static CUSTOM_SQL_REGEX_SET: OnceCell<RegexSet> = OnceCell::new();
static CUSTOM_SQL_REGEX_LIST: OnceCell<Vec<Regex>> = OnceCell::new();
/// The query router.
#[derive(Default)]
pub struct QueryRouter {
/// Which shard we should be talking to right now.
active_shard: Option<usize>,
@@ -91,7 +92,7 @@ impl QueryRouter {
/// One-time initialization of regexes
/// that parse our custom SQL protocol.
pub fn setup() -> bool {
let set = match RegexSet::new(&CUSTOM_SQL_REGEXES) {
let set = match RegexSet::new(CUSTOM_SQL_REGEXES) {
Ok(rgx) => rgx,
Err(err) => {
error!("QueryRouter::setup Could not compile regex set: {:?}", err);
@@ -116,15 +117,8 @@ impl QueryRouter {
/// Create a new instance of the query router.
/// Each client gets its own.
pub fn new() -> QueryRouter {
QueryRouter {
active_shard: None,
active_role: None,
query_parser_enabled: None,
primary_reads_enabled: None,
pool_settings: PoolSettings::default(),
placeholders: Vec::new(),
}
pub fn new() -> Self {
Self::default()
}
/// Pool settings can change because of a config reload.
@@ -132,7 +126,7 @@ impl QueryRouter {
self.pool_settings = pool_settings;
}
pub fn pool_settings<'a>(&'a self) -> &'a PoolSettings {
pub fn pool_settings(&self) -> &PoolSettings {
&self.pool_settings
}
@@ -143,7 +137,7 @@ impl QueryRouter {
let code = message_cursor.get_u8() as char;
// Check for any sharding regex matches in any queries
match code as char {
match code {
// For Parse and Query messages peek to see if they specify a shard_id as a comment early in the statement
'P' | 'Q' => {
if self.pool_settings.shard_id_regex.is_some()
@@ -397,14 +391,10 @@ impl QueryRouter {
// or discard shard selection. If they point to the same shard though,
// we can let them through as-is.
// This is basically building a database now :)
match self.infer_shard(query) {
Some(shard) => {
self.active_shard = Some(shard);
debug!("Automatically using shard: {:?}", self.active_shard);
}
None => (),
};
if let Some(shard) = self.infer_shard(query) {
self.active_shard = Some(shard);
debug!("Automatically using shard: {:?}", self.active_shard);
}
}
None => (),
@@ -576,8 +566,8 @@ impl QueryRouter {
.automatic_sharding_key
.as_ref()
.unwrap()
.split(".")
.map(|ident| Ident::new(ident))
.split('.')
.map(Ident::new)
.collect::<Vec<Ident>>();
// Sharding key must be always fully qualified
@@ -593,7 +583,7 @@ impl QueryRouter {
Expr::Identifier(ident) => {
// Only if we're dealing with only one table
// and there is no ambiguity
if &ident.value == &sharding_key[1].value {
if ident.value == sharding_key[1].value {
// Sharding key is unique enough, don't worry about
// table names.
if &sharding_key[0].value == "*" {
@@ -606,13 +596,13 @@ impl QueryRouter {
// SELECT * FROM t WHERE sharding_key = 5
// Make sure the table name from the sharding key matches
// the table name from the query.
found = &sharding_key[0].value == &table[0].value;
found = sharding_key[0].value == table[0].value;
} else if table.len() == 2 {
// Table name is fully qualified with the schema: e.g.
// SELECT * FROM public.t WHERE sharding_key = 5
// Ignore the schema (TODO: at some point, we want schema support)
// and use the table name only.
found = &sharding_key[0].value == &table[1].value;
found = sharding_key[0].value == table[1].value;
} else {
debug!("Got table name with more than two idents, which is not possible");
}
@@ -624,8 +614,8 @@ impl QueryRouter {
// The key is fully qualified in the query,
// it will exist or Postgres will throw an error.
if idents.len() == 2 {
found = &sharding_key[0].value == &idents[0].value
&& &sharding_key[1].value == &idents[1].value;
found = sharding_key[0].value == idents[0].value
&& sharding_key[1].value == idents[1].value;
}
// TODO: key can have schema as well, e.g. public.data.id (len == 3)
}
@@ -657,7 +647,7 @@ impl QueryRouter {
}
Expr::Value(Value::Placeholder(placeholder)) => {
match placeholder.replace("$", "").parse::<i16>() {
match placeholder.replace('$', "").parse::<i16>() {
Ok(placeholder) => result.push(ShardingKey::Placeholder(placeholder)),
Err(_) => {
debug!(
@@ -683,12 +673,9 @@ impl QueryRouter {
match &*query.body {
SetExpr::Query(query) => {
match self.infer_shard(&*query) {
Some(shard) => {
shards.insert(shard);
}
None => (),
};
if let Some(shard) = self.infer_shard(query) {
shards.insert(shard);
}
}
// SELECT * FROM ...
@@ -698,38 +685,22 @@ impl QueryRouter {
let mut table_names = Vec::new();
for table in select.from.iter() {
match &table.relation {
TableFactor::Table { name, .. } => {
table_names.push(name.0.clone());
}
_ => (),
};
if let TableFactor::Table { name, .. } = &table.relation {
table_names.push(name.0.clone());
}
// Get table names from all the joins.
for join in table.joins.iter() {
match &join.relation {
TableFactor::Table { name, .. } => {
table_names.push(name.0.clone());
}
_ => (),
};
if let TableFactor::Table { name, .. } = &join.relation {
table_names.push(name.0.clone());
}
// We can filter results based on join conditions, e.g.
// SELECT * FROM t INNER JOIN B ON B.sharding_key = 5;
match &join.join_operator {
JoinOperator::Inner(inner_join) => match &inner_join {
JoinConstraint::On(expr) => {
// Parse the selection criteria later.
exprs.push(expr.clone());
}
_ => (),
},
_ => (),
};
if let JoinOperator::Inner(JoinConstraint::On(expr)) = &join.join_operator {
// Parse the selection criteria later.
exprs.push(expr.clone());
}
}
}
@@ -803,16 +774,16 @@ impl QueryRouter {
db: &self.pool_settings.db,
};
let _ = query_logger.run(&self, ast).await;
let _ = query_logger.run(self, ast).await;
}
if let Some(ref intercept) = plugins.intercept {
let mut intercept = Intercept {
enabled: intercept.enabled,
config: &intercept,
config: intercept,
};
let result = intercept.run(&self, ast).await;
let result = intercept.run(self, ast).await;
if let Ok(PluginOutput::Intercept(output)) = result {
return Ok(PluginOutput::Intercept(output));
@@ -825,7 +796,7 @@ impl QueryRouter {
tables: &table_access.tables,
};
let result = table_access.run(&self, ast).await;
let result = table_access.run(self, ast).await;
if let Ok(PluginOutput::Deny(error)) = result {
return Ok(PluginOutput::Deny(error));
@@ -861,7 +832,7 @@ impl QueryRouter {
/// Should we attempt to parse queries?
pub fn query_parser_enabled(&self) -> bool {
let enabled = match self.query_parser_enabled {
match self.query_parser_enabled {
None => {
debug!(
"Using pool settings, query_parser_enabled: {}",
@@ -877,9 +848,7 @@ impl QueryRouter {
);
value
}
};
enabled
}
}
pub fn primary_reads_enabled(&self) -> bool {
@@ -910,10 +879,14 @@ mod test {
fn test_infer_replica() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
assert!(qr.try_execute_command(&simple_query("SET SERVER ROLE TO 'auto'")) != None);
assert!(qr
.try_execute_command(&simple_query("SET SERVER ROLE TO 'auto'"))
.is_some());
assert!(qr.query_parser_enabled());
assert!(qr.try_execute_command(&simple_query("SET PRIMARY READS TO off")) != None);
assert!(qr
.try_execute_command(&simple_query("SET PRIMARY READS TO off"))
.is_some());
let queries = vec![
simple_query("SELECT * FROM items WHERE id = 5"),
@@ -954,7 +927,9 @@ mod test {
QueryRouter::setup();
let mut qr = QueryRouter::new();
let query = simple_query("SELECT * FROM items WHERE id = 5");
assert!(qr.try_execute_command(&simple_query("SET PRIMARY READS TO on")) != None);
assert!(qr
.try_execute_command(&simple_query("SET PRIMARY READS TO on"))
.is_some());
assert!(qr.infer(&QueryRouter::parse(&query).unwrap()).is_ok());
assert_eq!(qr.role(), None);
@@ -965,7 +940,9 @@ mod test {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.try_execute_command(&simple_query("SET SERVER ROLE TO 'auto'"));
assert!(qr.try_execute_command(&simple_query("SET PRIMARY READS TO off")) != None);
assert!(qr
.try_execute_command(&simple_query("SET PRIMARY READS TO off"))
.is_some());
let prepared_stmt = BytesMut::from(
&b"WITH t AS (SELECT * FROM items WHERE name = $1) SELECT * FROM t WHERE id = $2\0"[..],
@@ -1133,9 +1110,11 @@ mod test {
QueryRouter::setup();
let mut qr = QueryRouter::new();
let query = simple_query("SET SERVER ROLE TO 'auto'");
assert!(qr.try_execute_command(&simple_query("SET PRIMARY READS TO off")) != None);
assert!(qr
.try_execute_command(&simple_query("SET PRIMARY READS TO off"))
.is_some());
assert!(qr.try_execute_command(&query) != None);
assert!(qr.try_execute_command(&query).is_some());
assert!(qr.query_parser_enabled());
assert_eq!(qr.role(), None);
@@ -1149,7 +1128,7 @@ mod test {
assert!(qr.query_parser_enabled());
let query = simple_query("SET SERVER ROLE TO 'default'");
assert!(qr.try_execute_command(&query) != None);
assert!(qr.try_execute_command(&query).is_some());
assert!(!qr.query_parser_enabled());
}
@@ -1194,11 +1173,11 @@ mod test {
assert!(!qr.primary_reads_enabled());
let q1 = simple_query("SET SERVER ROLE TO 'primary'");
assert!(qr.try_execute_command(&q1) != None);
assert!(qr.try_execute_command(&q1).is_some());
assert_eq!(qr.active_role.unwrap(), Role::Primary);
let q2 = simple_query("SET SERVER ROLE TO 'default'");
assert!(qr.try_execute_command(&q2) != None);
assert!(qr.try_execute_command(&q2).is_some());
assert_eq!(qr.active_role.unwrap(), pool_settings.default_role);
}
@@ -1263,17 +1242,17 @@ mod test {
// Make sure setting it works
let q1 = simple_query("/* shard_id: 1 */ select 1 from foo;");
assert!(qr.try_execute_command(&q1) == None);
assert!(qr.try_execute_command(&q1).is_none());
assert_eq!(qr.active_shard, Some(1));
// And make sure changing it works
let q2 = simple_query("/* shard_id: 0 */ select 1 from foo;");
assert!(qr.try_execute_command(&q2) == None);
assert!(qr.try_execute_command(&q2).is_none());
assert_eq!(qr.active_shard, Some(0));
// Validate setting by shard with expected shard copied from sharding.rs tests
let q2 = simple_query("/* sharding_key: 6 */ select 1 from foo;");
assert!(qr.try_execute_command(&q2) == None);
assert!(qr.try_execute_command(&q2).is_none());
assert_eq!(qr.active_shard, Some(2));
}
@@ -1411,9 +1390,11 @@ mod test {
};
QueryRouter::setup();
let mut pool_settings = PoolSettings::default();
pool_settings.query_parser_enabled = true;
pool_settings.plugins = Some(plugins);
let pool_settings = PoolSettings {
query_parser_enabled: true,
plugins: Some(plugins),
..Default::default()
};
let mut qr = QueryRouter::new();
qr.update_pool_settings(pool_settings);

View File

@@ -79,12 +79,12 @@ impl ScramSha256 {
let server_message = Message::parse(message)?;
if !server_message.nonce.starts_with(&self.nonce) {
return Err(Error::ProtocolSyncError(format!("SCRAM")));
return Err(Error::ProtocolSyncError("SCRAM".to_string()));
}
let salt = match general_purpose::STANDARD.decode(&server_message.salt) {
Ok(salt) => salt,
Err(_) => return Err(Error::ProtocolSyncError(format!("SCRAM"))),
Err(_) => return Err(Error::ProtocolSyncError("SCRAM".to_string())),
};
let salted_password = Self::hi(
@@ -166,9 +166,9 @@ impl ScramSha256 {
pub fn finish(&mut self, message: &BytesMut) -> Result<(), Error> {
let final_message = FinalMessage::parse(message)?;
let verifier = match general_purpose::STANDARD.decode(&final_message.value) {
let verifier = match general_purpose::STANDARD.decode(final_message.value) {
Ok(verifier) => verifier,
Err(_) => return Err(Error::ProtocolSyncError(format!("SCRAM"))),
Err(_) => return Err(Error::ProtocolSyncError("SCRAM".to_string())),
};
let mut hmac = match Hmac::<Sha256>::new_from_slice(&self.salted_password) {
@@ -230,14 +230,14 @@ impl Message {
.collect::<Vec<String>>();
if parts.len() != 3 {
return Err(Error::ProtocolSyncError(format!("SCRAM")));
return Err(Error::ProtocolSyncError("SCRAM".to_string()));
}
let nonce = str::replace(&parts[0], "r=", "");
let salt = str::replace(&parts[1], "s=", "");
let iterations = match str::replace(&parts[2], "i=", "").parse::<u32>() {
Ok(iterations) => iterations,
Err(_) => return Err(Error::ProtocolSyncError(format!("SCRAM"))),
Err(_) => return Err(Error::ProtocolSyncError("SCRAM".to_string())),
};
Ok(Message {
@@ -257,7 +257,7 @@ impl FinalMessage {
/// Parse the server final validation message.
pub fn parse(message: &BytesMut) -> Result<FinalMessage, Error> {
if !message.starts_with(b"v=") || message.len() < 4 {
return Err(Error::ProtocolSyncError(format!("SCRAM")));
return Err(Error::ProtocolSyncError("SCRAM".to_string()));
}
Ok(FinalMessage {

View File

@@ -316,10 +316,7 @@ impl Server {
// Something else?
m => {
return Err(Error::SocketError(format!(
"Unknown message: {}",
m as char
)));
return Err(Error::SocketError(format!("Unknown message: {}", { m })));
}
}
} else {
@@ -337,27 +334,18 @@ impl Server {
None => &user.username,
};
let password = match user.server_password {
Some(ref server_password) => Some(server_password),
None => match user.password {
Some(ref password) => Some(password),
None => None,
},
};
let password = user.server_password.as_ref();
startup(&mut stream, username, database).await?;
let mut server_info = BytesMut::new();
let mut process_id: i32 = 0;
let mut secret_key: i32 = 0;
let server_identifier = ServerIdentifier::new(username, &database);
let server_identifier = ServerIdentifier::new(username, database);
// We'll be handling multiple packets, but they will all be structured the same.
// We'll loop here until this exchange is complete.
let mut scram: Option<ScramSha256> = match password {
Some(password) => Some(ScramSha256::new(password)),
None => None,
};
let mut scram: Option<ScramSha256> = password.map(|password| ScramSha256::new(password));
loop {
let code = match stream.read_u8().await {
@@ -753,7 +741,7 @@ impl Server {
self.mirror_send(messages);
self.stats().data_sent(messages.len());
match write_all_flush(&mut self.stream, &messages).await {
match write_all_flush(&mut self.stream, messages).await {
Ok(_) => {
// Successfully sent to server
self.last_activity = SystemTime::now();
@@ -997,9 +985,7 @@ impl Server {
}
}
if !names.is_empty() {
self.deallocate(names).await?;
}
self.deallocate(names).await?;
Ok(())
}
@@ -1023,9 +1009,7 @@ impl Server {
self.send(&bytes).await?;
}
if !names.is_empty() {
self.send(&flush()).await?;
}
self.send(&flush()).await?;
// Read and discard CloseComplete (3)
for name in &names {
@@ -1140,7 +1124,7 @@ impl Server {
// server connection thrashing if clients repeatedly do this.
// Instead, we ROLLBACK that transaction before putting the connection back in the pool
if self.in_transaction() {
warn!(target: "pgcat::server::cleanup", "Server returned while still in transaction, rolling back transaction");
warn!("Server returned while still in transaction, rolling back transaction");
self.query("ROLLBACK").await?;
}
@@ -1150,14 +1134,14 @@ impl Server {
// send `DISCARD ALL` if we think the session is altered instead of just sending
// it before each checkin.
if self.cleanup_state.needs_cleanup() && self.cleanup_connections {
info!(target: "pgcat::server::cleanup", "Server returned with session state altered, discarding state ({}) for application {}", self.cleanup_state, self.application_name);
warn!("Server returned with session state altered, discarding state ({}) for application {}", self.cleanup_state, self.application_name);
self.query("DISCARD ALL").await?;
self.query("RESET ROLE").await?;
self.cleanup_state.reset();
}
if self.in_copy_mode() {
warn!(target: "pgcat::server::cleanup", "Server returned while still in copy-mode");
warn!("Server returned while still in copy-mode");
}
Ok(())
@@ -1203,16 +1187,14 @@ impl Server {
}
pub fn mirror_send(&mut self, bytes: &BytesMut) {
match self.mirror_manager.as_mut() {
Some(manager) => manager.send(bytes),
None => (),
if let Some(manager) = self.mirror_manager.as_mut() {
manager.send(bytes);
}
}
pub fn mirror_disconnect(&mut self) {
match self.mirror_manager.as_mut() {
Some(manager) => manager.disconnect(),
None => (),
if let Some(manager) = self.mirror_manager.as_mut() {
manager.disconnect();
}
}
@@ -1240,7 +1222,7 @@ impl Server {
server.send(&simple_query(query)).await?;
let mut message = server.recv().await?;
Ok(parse_query_message(&mut message).await?)
parse_query_message(&mut message).await
}
}

View File

@@ -64,7 +64,7 @@ impl Sharder {
fn sha1(&self, key: i64) -> usize {
let mut hasher = Sha1::new();
hasher.update(&key.to_string().as_bytes());
hasher.update(key.to_string().as_bytes());
let result = hasher.finalize();

View File

@@ -77,13 +77,12 @@ impl Reporter {
/// The statistics collector which used for calculating averages
/// There is only one collector (kind of like a singleton)
/// it updates averages every 15 seconds.
#[derive(Default)]
pub struct Collector {}
pub struct Collector;
impl Collector {
/// The statistics collection handler. It will collect statistics
/// for `address_id`s starting at 0 up to `addresses`.
pub async fn collect(&mut self) {
pub fn collect() {
info!("Events reporter started");
tokio::task::spawn(async move {

View File

@@ -86,11 +86,11 @@ impl PoolStats {
}
}
return map;
map
}
pub fn generate_header() -> Vec<(&'static str, DataType)> {
return vec![
vec![
("database", DataType::Text),
("user", DataType::Text),
("pool_mode", DataType::Text),
@@ -105,11 +105,11 @@ impl PoolStats {
("sv_login", DataType::Numeric),
("maxwait", DataType::Numeric),
("maxwait_us", DataType::Numeric),
];
]
}
pub fn generate_row(&self) -> Vec<String> {
return vec![
vec![
self.identifier.db.clone(),
self.identifier.user.clone(),
self.mode.to_string(),
@@ -124,7 +124,7 @@ impl PoolStats {
self.sv_login.to_string(),
(self.maxwait / 1_000_000).to_string(),
(self.maxwait % 1_000_000).to_string(),
];
]
}
}

View File

@@ -44,25 +44,17 @@ impl Tls {
pub fn new() -> Result<Self, Error> {
let config = get_config();
let certs = match load_certs(Path::new(&config.general.tls_certificate.unwrap())) {
Ok(certs) => certs,
Err(_) => return Err(Error::TlsError),
};
let certs = load_certs(Path::new(&config.general.tls_certificate.unwrap()))
.map_err(|_| Error::TlsError)?;
let key_der = load_keys(Path::new(&config.general.tls_private_key.unwrap()))
.map_err(|_| Error::TlsError)?
.remove(0);
let mut keys = match load_keys(Path::new(&config.general.tls_private_key.unwrap())) {
Ok(keys) => keys,
Err(_) => return Err(Error::TlsError),
};
let config = match rustls::ServerConfig::builder()
let config = rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(certs, keys.remove(0))
.map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))
{
Ok(c) => c,
Err(_) => return Err(Error::TlsError),
};
.with_single_cert(certs, key_der)
.map_err(|_| Error::TlsError)?;
Ok(Tls {
acceptor: TlsAcceptor::from(Arc::new(config)),

View File

@@ -1,33 +0,0 @@
#!/bin/bash
#
# Build an Ubuntu deb.
#
script_dir=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
deb_dir="/tmp/pgcat-build"
export PACKAGE_VERSION=${1:-"1.1.1"}
if [[ $(arch) == "x86_64" ]]; then
export ARCH=amd64
else
export ARCH=arm64
fi
cd "$script_dir/.."
cargo build --release
rm -rf "$deb_dir"
mkdir -p "$deb_dir/DEBIAN"
mkdir -p "$deb_dir/usr/bin"
mkdir -p "$deb_dir/etc"
cp target/release/pgcat "$deb_dir/usr/bin/pgcat"
chmod +x "$deb_dir/usr/bin/pgcat"
cp pgcat.toml "$deb_dir/etc/pgcat.toml"
(cat control | envsubst) > "$deb_dir/DEBIAN/control"
dpkg-deb \
--root-owner-group \
-z1 \
--build "$deb_dir" \
pgcat-${PACKAGE_VERSION}-ubuntu22.04-${ARCH}.deb