mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-25 18:06:29 +00:00
Compare commits
7 Commits
v0.1.0-bet
...
v0.2.0-bet
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d412238f47 | ||
|
|
7782933f59 | ||
|
|
bac4e1f52c | ||
|
|
37e3a86881 | ||
|
|
61db13f614 | ||
|
|
fe32b5ef17 | ||
|
|
54699222f8 |
@@ -12,13 +12,15 @@ jobs:
|
|||||||
- image: cimg/rust:1.58.1
|
- image: cimg/rust:1.58.1
|
||||||
environment:
|
environment:
|
||||||
RUST_LOG: info
|
RUST_LOG: info
|
||||||
- image: cimg/postgres:14.0
|
- image: postgres:14
|
||||||
auth:
|
# auth:
|
||||||
username: mydockerhub-user
|
# username: mydockerhub-user
|
||||||
password: $DOCKERHUB_PASSWORD
|
# password: $DOCKERHUB_PASSWORD
|
||||||
environment:
|
environment:
|
||||||
POSTGRES_USER: postgres
|
POSTGRES_USER: postgres
|
||||||
POSTGRES_DB: postgres
|
POSTGRES_DB: postgres
|
||||||
|
POSTGRES_PASSWORD: postgres
|
||||||
|
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
|
||||||
# Add steps to the job
|
# Add steps to the job
|
||||||
# See: https://circleci.com/docs/2.0/configuration-reference/#steps
|
# See: https://circleci.com/docs/2.0/configuration-reference/#steps
|
||||||
steps:
|
steps:
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ function start_pgcat() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
# Setup the database with shards and user
|
# Setup the database with shards and user
|
||||||
psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql
|
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql
|
||||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard0 -i
|
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard0 -i
|
||||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
|
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
|
||||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
|
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
|
||||||
@@ -72,7 +72,7 @@ psql -h 127.0.0.1 -p 6432 -d pgbouncer -c "SET client_encoding TO 'utf8'" > /dev
|
|||||||
(! psql -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null)
|
(! psql -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null)
|
||||||
|
|
||||||
# Start PgCat in debug to demonstrate failover better
|
# Start PgCat in debug to demonstrate failover better
|
||||||
start_pgcat "debug"
|
start_pgcat "trace"
|
||||||
|
|
||||||
# Add latency to the replica at port 5433 slightly above the healthcheck timeout
|
# Add latency to the replica at port 5433 slightly above the healthcheck timeout
|
||||||
toxiproxy-cli toxic add -t latency -a latency=300 postgres_replica
|
toxiproxy-cli toxic add -t latency -a latency=300 postgres_replica
|
||||||
|
|||||||
91
Cargo.lock
generated
91
Cargo.lock
generated
@@ -45,6 +45,12 @@ version = "1.0.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
|
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "base64"
|
||||||
|
version = "0.13.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bb8"
|
name = "bb8"
|
||||||
version = "0.7.1"
|
version = "0.7.1"
|
||||||
@@ -109,22 +115,23 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "crypto-common"
|
name = "crypto-common"
|
||||||
version = "0.1.1"
|
version = "0.1.3"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "683d6b536309245c849479fba3da410962a43ed8e51c26b729208ec0ac2798d0"
|
checksum = "57952ca27b5e3606ff4dd79b0020231aaf9d6aa76dc05fd30137538c50bd3ce8"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"generic-array",
|
"generic-array",
|
||||||
|
"typenum",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "digest"
|
name = "digest"
|
||||||
version = "0.10.1"
|
version = "0.10.3"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b697d66081d42af4fba142d56918a3cb21dc8eb63372c6b85d14f44fb9c5979b"
|
checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"block-buffer",
|
"block-buffer",
|
||||||
"crypto-common",
|
"crypto-common",
|
||||||
"generic-array",
|
"subtle",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -205,6 +212,15 @@ dependencies = [
|
|||||||
"libc",
|
"libc",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "hmac"
|
||||||
|
version = "0.12.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
|
||||||
|
dependencies = [
|
||||||
|
"digest",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "humantime"
|
name = "humantime"
|
||||||
version = "2.1.0"
|
version = "2.1.0"
|
||||||
@@ -356,10 +372,12 @@ version = "0.1.0-beta2"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"arc-swap",
|
"arc-swap",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
"base64",
|
||||||
"bb8",
|
"bb8",
|
||||||
"bytes",
|
"bytes",
|
||||||
"chrono",
|
"chrono",
|
||||||
"env_logger",
|
"env_logger",
|
||||||
|
"hmac",
|
||||||
"log",
|
"log",
|
||||||
"md-5",
|
"md-5",
|
||||||
"num_cpus",
|
"num_cpus",
|
||||||
@@ -370,7 +388,9 @@ dependencies = [
|
|||||||
"serde",
|
"serde",
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
"sha-1",
|
"sha-1",
|
||||||
|
"sha2",
|
||||||
"sqlparser",
|
"sqlparser",
|
||||||
|
"stringprep",
|
||||||
"tokio",
|
"tokio",
|
||||||
"toml",
|
"toml",
|
||||||
]
|
]
|
||||||
@@ -462,9 +482,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "regex"
|
name = "regex"
|
||||||
version = "1.5.4"
|
version = "1.5.5"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461"
|
checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"aho-corasick",
|
"aho-corasick",
|
||||||
"memchr",
|
"memchr",
|
||||||
@@ -511,6 +531,17 @@ dependencies = [
|
|||||||
"digest",
|
"digest",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "sha2"
|
||||||
|
version = "0.10.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "55deaec60f81eefe3cce0dc50bda92d6d8e88f2a27df7c5033b42afeb1ed2676"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"cpufeatures",
|
||||||
|
"digest",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "signal-hook-registry"
|
name = "signal-hook-registry"
|
||||||
version = "1.4.0"
|
version = "1.4.0"
|
||||||
@@ -541,6 +572,22 @@ dependencies = [
|
|||||||
"log",
|
"log",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "stringprep"
|
||||||
|
version = "0.1.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1"
|
||||||
|
dependencies = [
|
||||||
|
"unicode-bidi",
|
||||||
|
"unicode-normalization",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "subtle"
|
||||||
|
version = "2.4.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "syn"
|
name = "syn"
|
||||||
version = "1.0.86"
|
version = "1.0.86"
|
||||||
@@ -572,6 +619,21 @@ dependencies = [
|
|||||||
"winapi",
|
"winapi",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tinyvec"
|
||||||
|
version = "1.6.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50"
|
||||||
|
dependencies = [
|
||||||
|
"tinyvec_macros",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tinyvec_macros"
|
||||||
|
version = "0.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio"
|
name = "tokio"
|
||||||
version = "1.16.1"
|
version = "1.16.1"
|
||||||
@@ -617,6 +679,21 @@ version = "1.15.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987"
|
checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "unicode-bidi"
|
||||||
|
version = "0.3.8"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "unicode-normalization"
|
||||||
|
version = "0.1.19"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d54590932941a9e9266f0832deed84ebe1bf2e4c9e4a3554d393d18f5e854bf9"
|
||||||
|
dependencies = [
|
||||||
|
"tinyvec",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "unicode-xid"
|
name = "unicode-xid"
|
||||||
version = "0.2.2"
|
version = "0.2.2"
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "pgcat"
|
name = "pgcat"
|
||||||
version = "0.1.0-beta2"
|
version = "0.2.0-beta1"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
@@ -25,3 +25,7 @@ log = "0.4"
|
|||||||
arc-swap = "1"
|
arc-swap = "1"
|
||||||
env_logger = "0.9"
|
env_logger = "0.9"
|
||||||
parking_lot = "0.11"
|
parking_lot = "0.11"
|
||||||
|
hmac = "0.12"
|
||||||
|
sha2 = "0.10"
|
||||||
|
base64 = "0.13"
|
||||||
|
stringprep = "0.1"
|
||||||
@@ -342,6 +342,14 @@ impl Client {
|
|||||||
server.address()
|
server.address()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Set application_name if any.
|
||||||
|
// TODO: investigate other parameters and set them too.
|
||||||
|
if self.parameters.contains_key("application_name") {
|
||||||
|
server
|
||||||
|
.set_name(&self.parameters["application_name"])
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
// Transaction loop. Multiple queries can be issued by the client here.
|
// Transaction loop. Multiple queries can be issued by the client here.
|
||||||
// The connection belongs to the client until the transaction is over,
|
// The connection belongs to the client until the transaction is over,
|
||||||
// or until the client disconnects if we are in session mode.
|
// or until the client disconnects if we are in session mode.
|
||||||
@@ -361,6 +369,7 @@ impl Client {
|
|||||||
if server.in_transaction() {
|
if server.in_transaction() {
|
||||||
server.query("ROLLBACK").await?;
|
server.query("ROLLBACK").await?;
|
||||||
server.query("DISCARD ALL").await?;
|
server.query("DISCARD ALL").await?;
|
||||||
|
server.set_name("pgcat").await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
return Err(err);
|
return Err(err);
|
||||||
@@ -432,8 +441,11 @@ impl Client {
|
|||||||
if server.in_transaction() {
|
if server.in_transaction() {
|
||||||
server.query("ROLLBACK").await?;
|
server.query("ROLLBACK").await?;
|
||||||
server.query("DISCARD ALL").await?;
|
server.query("DISCARD ALL").await?;
|
||||||
|
server.set_name("pgcat").await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.release();
|
||||||
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -14,6 +14,13 @@ pub const CANCEL_REQUEST_CODE: i32 = 80877102;
|
|||||||
// AuthenticationMD5Password
|
// AuthenticationMD5Password
|
||||||
pub const MD5_ENCRYPTED_PASSWORD: i32 = 5;
|
pub const MD5_ENCRYPTED_PASSWORD: i32 = 5;
|
||||||
|
|
||||||
|
// SASL
|
||||||
|
pub const SASL: i32 = 10;
|
||||||
|
pub const SASL_CONTINUE: i32 = 11;
|
||||||
|
pub const SASL_FINAL: i32 = 12;
|
||||||
|
pub const SCRAM_SHA_256: &str = "SCRAM-SHA-256";
|
||||||
|
pub const NONCE_LENGTH: usize = 24;
|
||||||
|
|
||||||
// AuthenticationOk
|
// AuthenticationOk
|
||||||
pub const AUTHENTICATION_SUCCESSFUL: i32 = 0;
|
pub const AUTHENTICATION_SUCCESSFUL: i32 = 0;
|
||||||
|
|
||||||
|
|||||||
@@ -54,6 +54,7 @@ mod errors;
|
|||||||
mod messages;
|
mod messages;
|
||||||
mod pool;
|
mod pool;
|
||||||
mod query_router;
|
mod query_router;
|
||||||
|
mod scram;
|
||||||
mod server;
|
mod server;
|
||||||
mod sharding;
|
mod sharding;
|
||||||
mod stats;
|
mod stats;
|
||||||
|
|||||||
@@ -209,8 +209,6 @@ impl ConnectionPool {
|
|||||||
let index = self.round_robin % addresses.len();
|
let index = self.round_robin % addresses.len();
|
||||||
let address = &addresses[index];
|
let address = &addresses[index];
|
||||||
|
|
||||||
self.stats.client_waiting(process_id, address.id);
|
|
||||||
|
|
||||||
// Make sure you're getting a primary or a replica
|
// Make sure you're getting a primary or a replica
|
||||||
// as per request. If no specific role is requested, the first
|
// as per request. If no specific role is requested, the first
|
||||||
// available will be chosen.
|
// available will be chosen.
|
||||||
@@ -224,6 +222,9 @@ impl ConnectionPool {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Indicate we're waiting on a server connection from a pool.
|
||||||
|
self.stats.client_waiting(process_id, address.id);
|
||||||
|
|
||||||
// Check if we can connect
|
// Check if we can connect
|
||||||
let mut conn = match self.databases[shard][index].get().await {
|
let mut conn = match self.databases[shard][index].get().await {
|
||||||
Ok(conn) => conn,
|
Ok(conn) => conn,
|
||||||
|
|||||||
311
src/scram.rs
Normal file
311
src/scram.rs
Normal file
@@ -0,0 +1,311 @@
|
|||||||
|
// SCRAM authentication...largely copy/pasted from
|
||||||
|
// https://github.com/sfackler/rust-postgres/.
|
||||||
|
|
||||||
|
use bytes::BytesMut;
|
||||||
|
use hmac::{Hmac, Mac};
|
||||||
|
use rand::{self, Rng};
|
||||||
|
use sha2::digest::FixedOutput;
|
||||||
|
use sha2::{Digest, Sha256};
|
||||||
|
|
||||||
|
use std::fmt::Write;
|
||||||
|
|
||||||
|
use crate::constants::*;
|
||||||
|
use crate::errors::Error;
|
||||||
|
|
||||||
|
fn normalize(pass: &[u8]) -> Vec<u8> {
|
||||||
|
let pass = match std::str::from_utf8(pass) {
|
||||||
|
Ok(pass) => pass,
|
||||||
|
Err(_) => return pass.to_vec(),
|
||||||
|
};
|
||||||
|
|
||||||
|
match stringprep::saslprep(pass) {
|
||||||
|
Ok(pass) => pass.into_owned().into_bytes(),
|
||||||
|
Err(_) => pass.as_bytes().to_vec(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ScramSha256 {
|
||||||
|
password: String,
|
||||||
|
salted_password: [u8; 32],
|
||||||
|
auth_message: String,
|
||||||
|
message: BytesMut,
|
||||||
|
nonce: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ScramSha256 {
|
||||||
|
pub fn new(password: &str) -> ScramSha256 {
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let nonce = (0..NONCE_LENGTH)
|
||||||
|
.map(|_| {
|
||||||
|
let mut v = rng.gen_range(0x21u8..0x7e);
|
||||||
|
if v == 0x2c {
|
||||||
|
v = 0x7e
|
||||||
|
}
|
||||||
|
v as char
|
||||||
|
})
|
||||||
|
.collect::<String>();
|
||||||
|
|
||||||
|
Self::from_nonce(password, &nonce)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_nonce(password: &str, nonce: &str) -> ScramSha256 {
|
||||||
|
let message = BytesMut::from(&format!("{}n=,r={}", "n,,", nonce).as_bytes()[..]);
|
||||||
|
|
||||||
|
ScramSha256 {
|
||||||
|
password: password.to_string(),
|
||||||
|
nonce: String::from(nonce),
|
||||||
|
message,
|
||||||
|
salted_password: [0u8; 32],
|
||||||
|
auth_message: String::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn message(&mut self) -> BytesMut {
|
||||||
|
self.message.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update(&mut self, message: &BytesMut) -> Result<BytesMut, Error> {
|
||||||
|
let server_message = Message::parse(message)?;
|
||||||
|
|
||||||
|
if !server_message.nonce.starts_with(&self.nonce) {
|
||||||
|
// trace!("Bad server nonce");
|
||||||
|
return Err(Error::ProtocolSyncError);
|
||||||
|
}
|
||||||
|
|
||||||
|
let salt = match base64::decode(&server_message.salt) {
|
||||||
|
Ok(salt) => salt,
|
||||||
|
Err(_) => return Err(Error::ProtocolSyncError),
|
||||||
|
};
|
||||||
|
|
||||||
|
let salted_password = Self::hi(
|
||||||
|
&normalize(&self.password.as_bytes()[..]),
|
||||||
|
&salt,
|
||||||
|
server_message.iterations,
|
||||||
|
);
|
||||||
|
self.salted_password = salted_password;
|
||||||
|
|
||||||
|
let mut hmac = Hmac::<Sha256>::new_from_slice(&salted_password)
|
||||||
|
.expect("HMAC is able to accept all key sizes");
|
||||||
|
hmac.update(b"Client Key");
|
||||||
|
let client_key = hmac.finalize().into_bytes();
|
||||||
|
|
||||||
|
let mut hash = Sha256::default();
|
||||||
|
hash.update(client_key.as_slice());
|
||||||
|
let stored_key = hash.finalize_fixed();
|
||||||
|
|
||||||
|
let mut cbind_input = vec![];
|
||||||
|
cbind_input.extend("n,,".as_bytes());
|
||||||
|
let cbind_input = base64::encode(&cbind_input);
|
||||||
|
|
||||||
|
self.message.clear();
|
||||||
|
write!(
|
||||||
|
&mut self.message,
|
||||||
|
"c={},r={}",
|
||||||
|
cbind_input, server_message.nonce
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let auth_message = format!(
|
||||||
|
"n=,r={},{},{}",
|
||||||
|
self.nonce,
|
||||||
|
String::from_utf8_lossy(&message[..]),
|
||||||
|
String::from_utf8_lossy(&self.message[..])
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut hmac = Hmac::<Sha256>::new_from_slice(&stored_key)
|
||||||
|
.expect("HMAC is able to accept all key sizes");
|
||||||
|
hmac.update(auth_message.as_bytes());
|
||||||
|
let client_signature = hmac.finalize().into_bytes();
|
||||||
|
|
||||||
|
let mut client_proof = client_key;
|
||||||
|
for (proof, signature) in client_proof.iter_mut().zip(client_signature) {
|
||||||
|
*proof ^= signature;
|
||||||
|
}
|
||||||
|
|
||||||
|
write!(&mut self.message, ",p={}", base64::encode(&*client_proof)).unwrap();
|
||||||
|
|
||||||
|
self.auth_message = auth_message;
|
||||||
|
|
||||||
|
Ok(self.message.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn finish(&mut self, message: &BytesMut) -> Result<(), Error> {
|
||||||
|
let final_message = FinalMessage::parse(message)?;
|
||||||
|
|
||||||
|
let verifier = match base64::decode(&final_message.value) {
|
||||||
|
Ok(verifier) => verifier,
|
||||||
|
Err(_) => return Err(Error::ProtocolSyncError),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut hmac = Hmac::<Sha256>::new_from_slice(&self.salted_password)
|
||||||
|
.expect("HMAC is able to accept all key sizes");
|
||||||
|
hmac.update(b"Server Key");
|
||||||
|
let server_key = hmac.finalize().into_bytes();
|
||||||
|
|
||||||
|
let mut hmac = Hmac::<Sha256>::new_from_slice(&server_key)
|
||||||
|
.expect("HMAC is able to accept all key sizes");
|
||||||
|
hmac.update(self.auth_message.as_bytes());
|
||||||
|
|
||||||
|
match hmac.verify_slice(&verifier) {
|
||||||
|
Ok(_) => Ok(()),
|
||||||
|
Err(_) => return Err(Error::ServerError),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// https://github.com/sfackler/rust-postgres/blob/c3a029e60c1c0bd0be947049859b8fa5bd5ac220/postgres-protocol/src/authentication/sasl.rs#L35
|
||||||
|
fn hi(str: &[u8], salt: &[u8], i: u32) -> [u8; 32] {
|
||||||
|
let mut hmac =
|
||||||
|
Hmac::<Sha256>::new_from_slice(str).expect("HMAC is able to accept all key sizes");
|
||||||
|
hmac.update(salt);
|
||||||
|
hmac.update(&[0, 0, 0, 1]);
|
||||||
|
let mut prev = hmac.finalize().into_bytes();
|
||||||
|
|
||||||
|
let mut hi = prev;
|
||||||
|
|
||||||
|
for _ in 1..i {
|
||||||
|
let mut hmac = Hmac::<Sha256>::new_from_slice(str).expect("already checked above");
|
||||||
|
hmac.update(&prev);
|
||||||
|
prev = hmac.finalize().into_bytes();
|
||||||
|
|
||||||
|
for (hi, prev) in hi.iter_mut().zip(prev) {
|
||||||
|
*hi ^= prev;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
hi.into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default, Debug)]
|
||||||
|
struct Message {
|
||||||
|
nonce: String,
|
||||||
|
salt: String,
|
||||||
|
iterations: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Message {
|
||||||
|
fn parse(message: &BytesMut) -> Result<Message, Error> {
|
||||||
|
if !message.starts_with(b"r=") {
|
||||||
|
return Err(Error::ProtocolSyncError);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut i = 2;
|
||||||
|
|
||||||
|
while message[i] != b',' && i < message.len() {
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
let nonce = String::from_utf8_lossy(&message[2..i]).to_string();
|
||||||
|
|
||||||
|
// Skip the ,
|
||||||
|
i += 1;
|
||||||
|
|
||||||
|
if !&message[i..].starts_with(b"s=") {
|
||||||
|
return Err(Error::ProtocolSyncError);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip the s=
|
||||||
|
i += 2;
|
||||||
|
|
||||||
|
let s = i;
|
||||||
|
while message[i] != b',' && i < message.len() {
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
let salt = String::from_utf8_lossy(&message[s..i]).to_string();
|
||||||
|
|
||||||
|
// Skip the ,
|
||||||
|
i += 1;
|
||||||
|
|
||||||
|
if !&message[i..].starts_with(b"i=") {
|
||||||
|
return Err(Error::ProtocolSyncError);
|
||||||
|
}
|
||||||
|
|
||||||
|
i += 2;
|
||||||
|
|
||||||
|
let iterations = match String::from_utf8_lossy(&message[i..]).parse::<u32>() {
|
||||||
|
Ok(it) => it,
|
||||||
|
Err(_) => return Err(Error::ProtocolSyncError),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Message {
|
||||||
|
nonce,
|
||||||
|
salt,
|
||||||
|
iterations,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct FinalMessage {
|
||||||
|
value: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FinalMessage {
|
||||||
|
pub fn parse(message: &BytesMut) -> Result<FinalMessage, Error> {
|
||||||
|
if !message.starts_with(b"v=") {
|
||||||
|
return Err(Error::ProtocolSyncError);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(FinalMessage {
|
||||||
|
value: String::from_utf8_lossy(&message[2..]).to_string(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_server_first_message() {
|
||||||
|
let message = BytesMut::from(
|
||||||
|
&"r=fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j,s=QSXCR+Q6sek8bf92,i=4096".as_bytes()[..],
|
||||||
|
);
|
||||||
|
let message = Message::parse(&message).unwrap();
|
||||||
|
assert_eq!(message.nonce, "fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j");
|
||||||
|
assert_eq!(message.salt, "QSXCR+Q6sek8bf92");
|
||||||
|
assert_eq!(message.iterations, 4096);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_server_last_message() {
|
||||||
|
let f = FinalMessage::parse(&BytesMut::from(
|
||||||
|
&"v=U+ppxD5XUKtradnv8e2MkeupiA8FU87Sg8CXzXHDAzw".as_bytes()[..],
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
f.value,
|
||||||
|
"U+ppxD5XUKtradnv8e2MkeupiA8FU87Sg8CXzXHDAzw".to_string()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// recorded auth exchange from psql
|
||||||
|
#[test]
|
||||||
|
fn exchange() {
|
||||||
|
let password = "foobar";
|
||||||
|
let nonce = "9IZ2O01zb9IgiIZ1WJ/zgpJB";
|
||||||
|
|
||||||
|
let client_first = "n,,n=,r=9IZ2O01zb9IgiIZ1WJ/zgpJB";
|
||||||
|
let server_first =
|
||||||
|
"r=9IZ2O01zb9IgiIZ1WJ/zgpJBjx/oIRLs02gGSHcw1KEty3eY,s=fs3IXBy7U7+IvVjZ,i\
|
||||||
|
=4096";
|
||||||
|
let client_final =
|
||||||
|
"c=biws,r=9IZ2O01zb9IgiIZ1WJ/zgpJBjx/oIRLs02gGSHcw1KEty3eY,p=AmNKosjJzS3\
|
||||||
|
1NTlQYNs5BTeQjdHdk7lOflDo5re2an8=";
|
||||||
|
let server_final = "v=U+ppxD5XUKtradnv8e2MkeupiA8FU87Sg8CXzXHDAzw=";
|
||||||
|
|
||||||
|
let mut scram = ScramSha256::from_nonce(password, nonce);
|
||||||
|
|
||||||
|
let message = scram.message();
|
||||||
|
assert_eq!(std::str::from_utf8(&message).unwrap(), client_first);
|
||||||
|
|
||||||
|
let result = scram
|
||||||
|
.update(&BytesMut::from(&server_first.as_bytes()[..]))
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(std::str::from_utf8(&result).unwrap(), client_final);
|
||||||
|
|
||||||
|
scram
|
||||||
|
.finish(&BytesMut::from(&server_final.as_bytes()[..]))
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
103
src/server.rs
103
src/server.rs
@@ -12,6 +12,7 @@ use crate::config::{Address, User};
|
|||||||
use crate::constants::*;
|
use crate::constants::*;
|
||||||
use crate::errors::Error;
|
use crate::errors::Error;
|
||||||
use crate::messages::*;
|
use crate::messages::*;
|
||||||
|
use crate::scram::ScramSha256;
|
||||||
use crate::stats::Reporter;
|
use crate::stats::Reporter;
|
||||||
use crate::ClientServerMap;
|
use crate::ClientServerMap;
|
||||||
|
|
||||||
@@ -54,6 +55,9 @@ pub struct Server {
|
|||||||
|
|
||||||
/// Reports various metrics, e.g. data sent & received.
|
/// Reports various metrics, e.g. data sent & received.
|
||||||
stats: Reporter,
|
stats: Reporter,
|
||||||
|
|
||||||
|
/// Application name using the server at the moment.
|
||||||
|
application_name: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Server {
|
impl Server {
|
||||||
@@ -86,6 +90,8 @@ impl Server {
|
|||||||
|
|
||||||
// We'll be handling multiple packets, but they will all be structured the same.
|
// We'll be handling multiple packets, but they will all be structured the same.
|
||||||
// We'll loop here until this exchange is complete.
|
// We'll loop here until this exchange is complete.
|
||||||
|
let mut scram = ScramSha256::new(&user.password);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let code = match stream.read_u8().await {
|
let code = match stream.read_u8().await {
|
||||||
Ok(code) => code as char,
|
Ok(code) => code as char,
|
||||||
@@ -127,6 +133,83 @@ impl Server {
|
|||||||
|
|
||||||
AUTHENTICATION_SUCCESSFUL => (),
|
AUTHENTICATION_SUCCESSFUL => (),
|
||||||
|
|
||||||
|
SASL => {
|
||||||
|
debug!("Starting SASL authentication");
|
||||||
|
let sasl_len = (len - 8) as usize;
|
||||||
|
let mut sasl_auth = vec![0u8; sasl_len];
|
||||||
|
match stream.read_exact(&mut sasl_auth).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(_) => return Err(Error::SocketError),
|
||||||
|
};
|
||||||
|
|
||||||
|
let sasl_type = String::from_utf8_lossy(&sasl_auth[..sasl_len - 2]);
|
||||||
|
|
||||||
|
if sasl_type == SCRAM_SHA_256 {
|
||||||
|
debug!("Using {}", SCRAM_SHA_256);
|
||||||
|
|
||||||
|
// Send client message
|
||||||
|
let sasl_response = scram.message();
|
||||||
|
let mut res = BytesMut::new();
|
||||||
|
res.put_u8(b'p');
|
||||||
|
res.put_i32(
|
||||||
|
4 + SCRAM_SHA_256.len() as i32
|
||||||
|
+ 1
|
||||||
|
+ sasl_response.len() as i32
|
||||||
|
+ 4,
|
||||||
|
);
|
||||||
|
res.put_slice(&format!("{}\0", SCRAM_SHA_256).as_bytes()[..]);
|
||||||
|
res.put_i32(sasl_response.len() as i32);
|
||||||
|
res.put(sasl_response);
|
||||||
|
|
||||||
|
write_all(&mut stream, res).await?;
|
||||||
|
} else {
|
||||||
|
error!("Unsupported SCRAM version: {}", sasl_type);
|
||||||
|
return Err(Error::ServerError);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SASL_CONTINUE => {
|
||||||
|
trace!("Continuing SASL");
|
||||||
|
|
||||||
|
let mut sasl_data = vec![0u8; (len - 8) as usize];
|
||||||
|
|
||||||
|
match stream.read_exact(&mut sasl_data).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(_) => return Err(Error::SocketError),
|
||||||
|
};
|
||||||
|
|
||||||
|
let msg = BytesMut::from(&sasl_data[..]);
|
||||||
|
let sasl_response = scram.update(&msg)?;
|
||||||
|
|
||||||
|
let mut res = BytesMut::new();
|
||||||
|
res.put_u8(b'p');
|
||||||
|
res.put_i32(4 + sasl_response.len() as i32);
|
||||||
|
res.put(sasl_response);
|
||||||
|
|
||||||
|
write_all(&mut stream, res).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
SASL_FINAL => {
|
||||||
|
trace!("Final SASL");
|
||||||
|
|
||||||
|
let mut sasl_final = vec![0u8; len as usize - 8];
|
||||||
|
match stream.read_exact(&mut sasl_final).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(_) => return Err(Error::SocketError),
|
||||||
|
};
|
||||||
|
|
||||||
|
match scram.finish(&BytesMut::from(&sasl_final[..])) {
|
||||||
|
Ok(_) => {
|
||||||
|
debug!("SASL authentication successful");
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(err) => {
|
||||||
|
debug!("SASL authentication failed");
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
_ => {
|
_ => {
|
||||||
error!("Unsupported authentication mechanism: {}", auth_code);
|
error!("Unsupported authentication mechanism: {}", auth_code);
|
||||||
return Err(Error::ServerError);
|
return Err(Error::ServerError);
|
||||||
@@ -210,7 +293,7 @@ impl Server {
|
|||||||
|
|
||||||
let (read, write) = stream.into_split();
|
let (read, write) = stream.into_split();
|
||||||
|
|
||||||
return Ok(Server {
|
let mut server = Server {
|
||||||
address: address.clone(),
|
address: address.clone(),
|
||||||
read: BufReader::new(read),
|
read: BufReader::new(read),
|
||||||
write: write,
|
write: write,
|
||||||
@@ -224,7 +307,12 @@ impl Server {
|
|||||||
client_server_map: client_server_map,
|
client_server_map: client_server_map,
|
||||||
connected_at: chrono::offset::Utc::now().naive_utc(),
|
connected_at: chrono::offset::Utc::now().naive_utc(),
|
||||||
stats: stats,
|
stats: stats,
|
||||||
});
|
application_name: String::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
server.set_name("pgcat").await?;
|
||||||
|
|
||||||
|
return Ok(server);
|
||||||
}
|
}
|
||||||
|
|
||||||
// We have an unexpected message from the server during this exchange.
|
// We have an unexpected message from the server during this exchange.
|
||||||
@@ -448,9 +536,14 @@ impl Server {
|
|||||||
/// A shorthand for `SET application_name = $1`.
|
/// A shorthand for `SET application_name = $1`.
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub async fn set_name(&mut self, name: &str) -> Result<(), Error> {
|
pub async fn set_name(&mut self, name: &str) -> Result<(), Error> {
|
||||||
Ok(self
|
if self.application_name != name {
|
||||||
.query(&format!("SET application_name = '{}'", name))
|
self.application_name = name.to_string();
|
||||||
.await?)
|
Ok(self
|
||||||
|
.query(&format!("SET application_name = '{}'", name))
|
||||||
|
.await?)
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the servers address.
|
/// Get the servers address.
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ ActiveRecord::Base.establish_connection(
|
|||||||
username: 'sharding_user',
|
username: 'sharding_user',
|
||||||
password: 'sharding_user',
|
password: 'sharding_user',
|
||||||
database: 'rails_dev',
|
database: 'rails_dev',
|
||||||
|
application_name: 'testing_pgcat',
|
||||||
prepared_statements: false, # Transaction mode
|
prepared_statements: false, # Transaction mode
|
||||||
advisory_locks: false # Same
|
advisory_locks: false # Same
|
||||||
)
|
)
|
||||||
@@ -116,7 +117,7 @@ end
|
|||||||
|
|
||||||
# Test evil clients
|
# Test evil clients
|
||||||
def poorly_behaved_client
|
def poorly_behaved_client
|
||||||
conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/rails_dev")
|
conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/rails_dev?application_name=testing_pgcat")
|
||||||
conn.async_exec 'BEGIN'
|
conn.async_exec 'BEGIN'
|
||||||
conn.async_exec 'SELECT 1'
|
conn.async_exec 'SELECT 1'
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user