mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 17:36:28 +00:00
Compare commits
5 Commits
mostafa_sq
...
mostafa_fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3fc9e5dec1 | ||
|
|
f7c5c0faf9 | ||
|
|
982d03c374 | ||
|
|
686b7ca7c5 | ||
|
|
7c55bf78fe |
@@ -26,7 +26,6 @@ PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
|
|||||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
|
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
|
||||||
|
|
||||||
# Start Toxiproxy
|
# Start Toxiproxy
|
||||||
kill -9 $(pgrep toxiproxy) || true
|
|
||||||
LOG_LEVEL=error toxiproxy-server &
|
LOG_LEVEL=error toxiproxy-server &
|
||||||
sleep 1
|
sleep 1
|
||||||
|
|
||||||
@@ -107,7 +106,7 @@ cd ../..
|
|||||||
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
|
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
|
||||||
#
|
#
|
||||||
pip3 install -r tests/python/requirements.txt
|
pip3 install -r tests/python/requirements.txt
|
||||||
pytest || exit 1
|
python3 tests/python/tests.py || exit 1
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
@@ -178,6 +177,3 @@ killall pgcat -s SIGINT
|
|||||||
|
|
||||||
# Allow for graceful shutdown
|
# Allow for graceful shutdown
|
||||||
sleep 1
|
sleep 1
|
||||||
|
|
||||||
kill -9 $(pgrep toxiproxy)
|
|
||||||
sleep 1
|
|
||||||
|
|||||||
4
.github/workflows/chart-lint-test.yaml
vendored
4
.github/workflows/chart-lint-test.yaml
vendored
@@ -22,7 +22,7 @@ jobs:
|
|||||||
# Python is required because `ct lint` runs Yamale (https://github.com/23andMe/Yamale) and
|
# Python is required because `ct lint` runs Yamale (https://github.com/23andMe/Yamale) and
|
||||||
# yamllint (https://github.com/adrienverge/yamllint) which require Python
|
# yamllint (https://github.com/adrienverge/yamllint) which require Python
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
uses: actions/setup-python@v5.1.0
|
uses: actions/setup-python@v4.1.0
|
||||||
with:
|
with:
|
||||||
python-version: 3.7
|
python-version: 3.7
|
||||||
|
|
||||||
@@ -43,7 +43,7 @@ jobs:
|
|||||||
run: ct lint --config ct.yaml
|
run: ct lint --config ct.yaml
|
||||||
|
|
||||||
- name: Create kind cluster
|
- name: Create kind cluster
|
||||||
uses: helm/kind-action@v1.10.0
|
uses: helm/kind-action@v1.7.0
|
||||||
if: steps.list-changed.outputs.changed == 'true'
|
if: steps.list-changed.outputs.changed == 'true'
|
||||||
|
|
||||||
- name: Run chart-testing (install)
|
- name: Run chart-testing (install)
|
||||||
|
|||||||
15
.github/workflows/publish-deb-package.yml
vendored
15
.github/workflows/publish-deb-package.yml
vendored
@@ -1,9 +1,6 @@
|
|||||||
name: pgcat package (deb)
|
name: pgcat package (deb)
|
||||||
|
|
||||||
on:
|
on:
|
||||||
push:
|
|
||||||
tags:
|
|
||||||
- v*
|
|
||||||
workflow_dispatch:
|
workflow_dispatch:
|
||||||
inputs:
|
inputs:
|
||||||
packageVersion:
|
packageVersion:
|
||||||
@@ -19,14 +16,6 @@ jobs:
|
|||||||
runs-on: ${{ matrix.os }}
|
runs-on: ${{ matrix.os }}
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v3
|
||||||
- name: Set package version
|
|
||||||
if: github.event_name == 'push' # For push event
|
|
||||||
run: |
|
|
||||||
TAG=${{ github.ref_name }}
|
|
||||||
echo "packageVersion=${TAG#v}" >> "$GITHUB_ENV"
|
|
||||||
- name: Set package version (manual dispatch)
|
|
||||||
if: github.event_name == 'workflow_dispatch' # For manual dispatch
|
|
||||||
run: echo "packageVersion=${{ github.event.inputs.packageVersion }}" >> "$GITHUB_ENV"
|
|
||||||
- uses: actions-rs/toolchain@v1
|
- uses: actions-rs/toolchain@v1
|
||||||
with:
|
with:
|
||||||
toolchain: stable
|
toolchain: stable
|
||||||
@@ -50,10 +39,10 @@ jobs:
|
|||||||
export ARCH=arm64
|
export ARCH=arm64
|
||||||
fi
|
fi
|
||||||
|
|
||||||
bash utilities/deb.sh ${{ env.packageVersion }}
|
bash utilities/deb.sh ${{ inputs.packageVersion }}
|
||||||
|
|
||||||
deb-s3 upload \
|
deb-s3 upload \
|
||||||
--lock \
|
--lock \
|
||||||
--bucket apt.postgresml.org \
|
--bucket apt.postgresml.org \
|
||||||
pgcat-${{ env.packageVersion }}-ubuntu22.04-${ARCH}.deb \
|
pgcat-${{ inputs.packageVersion }}-ubuntu22.04-${ARCH}.deb \
|
||||||
--codename $(lsb_release -cs)
|
--codename $(lsb_release -cs)
|
||||||
|
|||||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -11,4 +11,3 @@ dev/.bash_history
|
|||||||
dev/cache
|
dev/cache
|
||||||
!dev/cache/.keepme
|
!dev/cache/.keepme
|
||||||
.venv
|
.venv
|
||||||
**/__pycache__
|
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ Within this test environment you can modify the file in your favorite IDE and re
|
|||||||
|
|
||||||
Once the environment is ready, you can run the tests by running
|
Once the environment is ready, you can run the tests by running
|
||||||
Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation`
|
Ruby: `cd /app/tests/ruby && bundle exec ruby <test_name>.rb --format documentation`
|
||||||
Python: `cd /app/ && pytest`
|
Python: `cd /app && python3 tests/python/tests.py`
|
||||||
Rust: `cd /app/tests/rust && cargo run`
|
Rust: `cd /app/tests/rust && cargo run`
|
||||||
Go: `cd /app/tests/go && /usr/local/go/bin/go test`
|
Go: `cd /app/tests/go && /usr/local/go/bin/go test`
|
||||||
|
|
||||||
|
|||||||
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -1526,9 +1526,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "sqlparser"
|
name = "sqlparser"
|
||||||
version = "0.51.0"
|
version = "0.41.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "5fe11944a61da0da3f592e19a45ebe5ab92dc14a779907ff1f08fbb797bfefc7"
|
checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"log",
|
"log",
|
||||||
"sqlparser_derive",
|
"sqlparser_derive",
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ serde_derive = "1"
|
|||||||
regex = "1"
|
regex = "1"
|
||||||
num_cpus = "1"
|
num_cpus = "1"
|
||||||
once_cell = "1"
|
once_cell = "1"
|
||||||
sqlparser = { version = "0.51", features = ["visitor"] }
|
sqlparser = { version = "0.41", features = ["visitor"] }
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
arc-swap = "1"
|
arc-swap = "1"
|
||||||
parking_lot = "0.12.1"
|
parking_lot = "0.12.1"
|
||||||
|
|||||||
4
postinst
4
postinst
@@ -7,7 +7,3 @@ systemctl enable pgcat
|
|||||||
if ! id pgcat 2> /dev/null; then
|
if ! id pgcat 2> /dev/null; then
|
||||||
useradd -s /usr/bin/false pgcat
|
useradd -s /usr/bin/false pgcat
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if [ -f /etc/pgcat.toml ]; then
|
|
||||||
systemctl start pgcat
|
|
||||||
fi
|
|
||||||
|
|||||||
@@ -1,4 +1,3 @@
|
|||||||
use crate::config::AuthType;
|
|
||||||
use crate::errors::Error;
|
use crate::errors::Error;
|
||||||
use crate::pool::ConnectionPool;
|
use crate::pool::ConnectionPool;
|
||||||
use crate::server::Server;
|
use crate::server::Server;
|
||||||
@@ -72,7 +71,6 @@ impl AuthPassthrough {
|
|||||||
pub async fn fetch_hash(&self, address: &crate::config::Address) -> Result<String, Error> {
|
pub async fn fetch_hash(&self, address: &crate::config::Address) -> Result<String, Error> {
|
||||||
let auth_user = crate::config::User {
|
let auth_user = crate::config::User {
|
||||||
username: self.user.clone(),
|
username: self.user.clone(),
|
||||||
auth_type: AuthType::MD5,
|
|
||||||
password: Some(self.password.clone()),
|
password: Some(self.password.clone()),
|
||||||
server_username: None,
|
server_username: None,
|
||||||
server_password: None,
|
server_password: None,
|
||||||
|
|||||||
332
src/client.rs
332
src/client.rs
@@ -14,9 +14,7 @@ use tokio::sync::mpsc::Sender;
|
|||||||
|
|
||||||
use crate::admin::{generate_server_parameters_for_admin, handle_admin};
|
use crate::admin::{generate_server_parameters_for_admin, handle_admin};
|
||||||
use crate::auth_passthrough::refetch_auth_hash;
|
use crate::auth_passthrough::refetch_auth_hash;
|
||||||
use crate::config::{
|
use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode};
|
||||||
get_config, get_idle_client_in_transaction_timeout, Address, AuthType, PoolMode,
|
|
||||||
};
|
|
||||||
use crate::constants::*;
|
use crate::constants::*;
|
||||||
use crate::messages::*;
|
use crate::messages::*;
|
||||||
use crate::plugins::PluginOutput;
|
use crate::plugins::PluginOutput;
|
||||||
@@ -465,8 +463,8 @@ where
|
|||||||
.count()
|
.count()
|
||||||
== 1;
|
== 1;
|
||||||
|
|
||||||
|
// Kick any client that's not admin while we're in admin-only mode.
|
||||||
if !admin && admin_only {
|
if !admin && admin_only {
|
||||||
// Kick any client that's not admin while we're in admin-only mode.
|
|
||||||
debug!(
|
debug!(
|
||||||
"Rejecting non-admin connection to {} when in admin only mode",
|
"Rejecting non-admin connection to {} when in admin only mode",
|
||||||
pool_name
|
pool_name
|
||||||
@@ -483,76 +481,72 @@ where
|
|||||||
let process_id: i32 = rand::random();
|
let process_id: i32 = rand::random();
|
||||||
let secret_key: i32 = rand::random();
|
let secret_key: i32 = rand::random();
|
||||||
|
|
||||||
|
// Perform MD5 authentication.
|
||||||
|
// TODO: Add SASL support.
|
||||||
|
let salt = md5_challenge(&mut write).await?;
|
||||||
|
|
||||||
|
let code = match read.read_u8().await {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(_) => {
|
||||||
|
return Err(Error::ClientSocketError(
|
||||||
|
"password code".into(),
|
||||||
|
client_identifier,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// PasswordMessage
|
||||||
|
if code as char != 'p' {
|
||||||
|
return Err(Error::ProtocolSyncError(format!(
|
||||||
|
"Expected p, got {}",
|
||||||
|
code as char
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
let len = match read.read_i32().await {
|
||||||
|
Ok(len) => len,
|
||||||
|
Err(_) => {
|
||||||
|
return Err(Error::ClientSocketError(
|
||||||
|
"password message length".into(),
|
||||||
|
client_identifier,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut password_response = vec![0u8; (len - 4) as usize];
|
||||||
|
|
||||||
|
match read.read_exact(&mut password_response).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(_) => {
|
||||||
|
return Err(Error::ClientSocketError(
|
||||||
|
"password message".into(),
|
||||||
|
client_identifier,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let mut prepared_statements_enabled = false;
|
let mut prepared_statements_enabled = false;
|
||||||
|
|
||||||
// Authenticate admin user.
|
// Authenticate admin user.
|
||||||
let (transaction_mode, mut server_parameters) = if admin {
|
let (transaction_mode, mut server_parameters) = if admin {
|
||||||
let config = get_config();
|
let config = get_config();
|
||||||
// TODO: Add SASL support.
|
|
||||||
// Perform MD5 authentication.
|
|
||||||
match config.general.admin_auth_type {
|
|
||||||
AuthType::Trust => (),
|
|
||||||
AuthType::MD5 => {
|
|
||||||
let salt = md5_challenge(&mut write).await?;
|
|
||||||
|
|
||||||
let code = match read.read_u8().await {
|
// Compare server and client hashes.
|
||||||
Ok(p) => p,
|
let password_hash = md5_hash_password(
|
||||||
Err(_) => {
|
&config.general.admin_username,
|
||||||
return Err(Error::ClientSocketError(
|
&config.general.admin_password,
|
||||||
"password code".into(),
|
&salt,
|
||||||
client_identifier,
|
);
|
||||||
))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// PasswordMessage
|
if password_hash != password_response {
|
||||||
if code as char != 'p' {
|
let error = Error::ClientGeneralError("Invalid password".into(), client_identifier);
|
||||||
return Err(Error::ProtocolSyncError(format!(
|
|
||||||
"Expected p, got {}",
|
|
||||||
code as char
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
let len = match read.read_i32().await {
|
warn!("{}", error);
|
||||||
Ok(len) => len,
|
wrong_password(&mut write, username).await?;
|
||||||
Err(_) => {
|
|
||||||
return Err(Error::ClientSocketError(
|
|
||||||
"password message length".into(),
|
|
||||||
client_identifier,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut password_response = vec![0u8; (len - 4) as usize];
|
return Err(error);
|
||||||
|
|
||||||
match read.read_exact(&mut password_response).await {
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(_) => {
|
|
||||||
return Err(Error::ClientSocketError(
|
|
||||||
"password message".into(),
|
|
||||||
client_identifier,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Compare server and client hashes.
|
|
||||||
let password_hash = md5_hash_password(
|
|
||||||
&config.general.admin_username,
|
|
||||||
&config.general.admin_password,
|
|
||||||
&salt,
|
|
||||||
);
|
|
||||||
|
|
||||||
if password_hash != password_response {
|
|
||||||
let error =
|
|
||||||
Error::ClientGeneralError("Invalid password".into(), client_identifier);
|
|
||||||
|
|
||||||
warn!("{}", error);
|
|
||||||
wrong_password(&mut write, username).await?;
|
|
||||||
|
|
||||||
return Err(error);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
(false, generate_server_parameters_for_admin())
|
(false, generate_server_parameters_for_admin())
|
||||||
}
|
}
|
||||||
// Authenticate normal user.
|
// Authenticate normal user.
|
||||||
@@ -579,143 +573,92 @@ where
|
|||||||
// Obtain the hash to compare, we give preference to that written in cleartext in config
|
// Obtain the hash to compare, we give preference to that written in cleartext in config
|
||||||
// if there is nothing set in cleartext and auth passthrough (auth_query) is configured, we use the hash obtained
|
// if there is nothing set in cleartext and auth passthrough (auth_query) is configured, we use the hash obtained
|
||||||
// when the pool was created. If there is no hash there, we try to fetch it one more time.
|
// when the pool was created. If there is no hash there, we try to fetch it one more time.
|
||||||
match pool.settings.user.auth_type {
|
let password_hash = if let Some(password) = &pool.settings.user.password {
|
||||||
AuthType::Trust => (),
|
Some(md5_hash_password(username, password, &salt))
|
||||||
AuthType::MD5 => {
|
} else {
|
||||||
// Perform MD5 authentication.
|
if !get_config().is_auth_query_configured() {
|
||||||
// TODO: Add SASL support.
|
wrong_password(&mut write, username).await?;
|
||||||
let salt = md5_challenge(&mut write).await?;
|
return Err(Error::ClientAuthImpossible(username.into()));
|
||||||
|
}
|
||||||
|
|
||||||
let code = match read.read_u8().await {
|
let mut hash = (*pool.auth_hash.read()).clone();
|
||||||
Ok(p) => p,
|
|
||||||
Err(_) => {
|
|
||||||
return Err(Error::ClientSocketError(
|
|
||||||
"password code".into(),
|
|
||||||
client_identifier,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// PasswordMessage
|
if hash.is_none() {
|
||||||
if code as char != 'p' {
|
warn!(
|
||||||
return Err(Error::ProtocolSyncError(format!(
|
"Query auth configured \
|
||||||
"Expected p, got {}",
|
but no hash password found \
|
||||||
code as char
|
for pool {}. Will try to refetch it.",
|
||||||
)));
|
pool_name
|
||||||
}
|
);
|
||||||
|
|
||||||
let len = match read.read_i32().await {
|
match refetch_auth_hash(&pool).await {
|
||||||
Ok(len) => len,
|
Ok(fetched_hash) => {
|
||||||
Err(_) => {
|
warn!("Password for {}, obtained. Updating.", client_identifier);
|
||||||
return Err(Error::ClientSocketError(
|
|
||||||
"password message length".into(),
|
|
||||||
client_identifier,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut password_response = vec![0u8; (len - 4) as usize];
|
|
||||||
|
|
||||||
match read.read_exact(&mut password_response).await {
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(_) => {
|
|
||||||
return Err(Error::ClientSocketError(
|
|
||||||
"password message".into(),
|
|
||||||
client_identifier,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let password_hash = if let Some(password) = &pool.settings.user.password {
|
|
||||||
Some(md5_hash_password(username, password, &salt))
|
|
||||||
} else {
|
|
||||||
if !get_config().is_auth_query_configured() {
|
|
||||||
wrong_password(&mut write, username).await?;
|
|
||||||
return Err(Error::ClientAuthImpossible(username.into()));
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut hash = (*pool.auth_hash.read()).clone();
|
|
||||||
|
|
||||||
if hash.is_none() {
|
|
||||||
warn!(
|
|
||||||
"Query auth configured \
|
|
||||||
but no hash password found \
|
|
||||||
for pool {}. Will try to refetch it.",
|
|
||||||
pool_name
|
|
||||||
);
|
|
||||||
|
|
||||||
match refetch_auth_hash(&pool).await {
|
|
||||||
Ok(fetched_hash) => {
|
|
||||||
warn!(
|
|
||||||
"Password for {}, obtained. Updating.",
|
|
||||||
client_identifier
|
|
||||||
);
|
|
||||||
|
|
||||||
{
|
|
||||||
let mut pool_auth_hash = pool.auth_hash.write();
|
|
||||||
*pool_auth_hash = Some(fetched_hash.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
hash = Some(fetched_hash);
|
|
||||||
}
|
|
||||||
|
|
||||||
Err(err) => {
|
|
||||||
wrong_password(&mut write, username).await?;
|
|
||||||
|
|
||||||
return Err(Error::ClientAuthPassthroughError(
|
|
||||||
err.to_string(),
|
|
||||||
client_identifier,
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
Some(md5_hash_second_pass(&hash.unwrap(), &salt))
|
|
||||||
};
|
|
||||||
|
|
||||||
// Once we have the resulting hash, we compare with what the client gave us.
|
|
||||||
// If they do not match and auth query is set up, we try to refetch the hash one more time
|
|
||||||
// to see if the password has changed since the pool was created.
|
|
||||||
//
|
|
||||||
// @TODO: we could end up fetching again the same password twice (see above).
|
|
||||||
if password_hash.unwrap() != password_response {
|
|
||||||
warn!(
|
|
||||||
"Invalid password {}, will try to refetch it.",
|
|
||||||
client_identifier
|
|
||||||
);
|
|
||||||
|
|
||||||
let fetched_hash = match refetch_auth_hash(&pool).await {
|
|
||||||
Ok(fetched_hash) => fetched_hash,
|
|
||||||
Err(err) => {
|
|
||||||
wrong_password(&mut write, username).await?;
|
|
||||||
|
|
||||||
return Err(err);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let new_password_hash = md5_hash_second_pass(&fetched_hash, &salt);
|
|
||||||
|
|
||||||
// Ok password changed in server an auth is possible.
|
|
||||||
if new_password_hash == password_response {
|
|
||||||
warn!(
|
|
||||||
"Password for {}, changed in server. Updating.",
|
|
||||||
client_identifier
|
|
||||||
);
|
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut pool_auth_hash = pool.auth_hash.write();
|
let mut pool_auth_hash = pool.auth_hash.write();
|
||||||
*pool_auth_hash = Some(fetched_hash);
|
*pool_auth_hash = Some(fetched_hash.clone());
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
|
hash = Some(fetched_hash);
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(err) => {
|
||||||
wrong_password(&mut write, username).await?;
|
wrong_password(&mut write, username).await?;
|
||||||
return Err(Error::ClientGeneralError(
|
|
||||||
"Invalid password".into(),
|
return Err(Error::ClientAuthPassthroughError(
|
||||||
|
err.to_string(),
|
||||||
client_identifier,
|
client_identifier,
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Some(md5_hash_second_pass(&hash.unwrap(), &salt))
|
||||||
|
};
|
||||||
|
|
||||||
|
// Once we have the resulting hash, we compare with what the client gave us.
|
||||||
|
// If they do not match and auth query is set up, we try to refetch the hash one more time
|
||||||
|
// to see if the password has changed since the pool was created.
|
||||||
|
//
|
||||||
|
// @TODO: we could end up fetching again the same password twice (see above).
|
||||||
|
if password_hash.unwrap() != password_response {
|
||||||
|
warn!(
|
||||||
|
"Invalid password {}, will try to refetch it.",
|
||||||
|
client_identifier
|
||||||
|
);
|
||||||
|
|
||||||
|
let fetched_hash = match refetch_auth_hash(&pool).await {
|
||||||
|
Ok(fetched_hash) => fetched_hash,
|
||||||
|
Err(err) => {
|
||||||
|
wrong_password(&mut write, username).await?;
|
||||||
|
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let new_password_hash = md5_hash_second_pass(&fetched_hash, &salt);
|
||||||
|
|
||||||
|
// Ok password changed in server an auth is possible.
|
||||||
|
if new_password_hash == password_response {
|
||||||
|
warn!(
|
||||||
|
"Password for {}, changed in server. Updating.",
|
||||||
|
client_identifier
|
||||||
|
);
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut pool_auth_hash = pool.auth_hash.write();
|
||||||
|
*pool_auth_hash = Some(fetched_hash);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
wrong_password(&mut write, username).await?;
|
||||||
|
return Err(Error::ClientGeneralError(
|
||||||
|
"Invalid password".into(),
|
||||||
|
client_identifier,
|
||||||
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let transaction_mode = pool.settings.pool_mode == PoolMode::Transaction;
|
let transaction_mode = pool.settings.pool_mode == PoolMode::Transaction;
|
||||||
prepared_statements_enabled =
|
prepared_statements_enabled =
|
||||||
transaction_mode && pool.prepared_statement_cache.is_some();
|
transaction_mode && pool.prepared_statement_cache.is_some();
|
||||||
@@ -1786,14 +1729,13 @@ where
|
|||||||
/// and also the pool's statement cache. Add it to extended protocol data.
|
/// and also the pool's statement cache. Add it to extended protocol data.
|
||||||
fn buffer_parse(&mut self, message: BytesMut, pool: &ConnectionPool) -> Result<(), Error> {
|
fn buffer_parse(&mut self, message: BytesMut, pool: &ConnectionPool) -> Result<(), Error> {
|
||||||
// Avoid parsing if prepared statements not enabled
|
// Avoid parsing if prepared statements not enabled
|
||||||
if !self.prepared_statements_enabled {
|
let client_given_name = Parse::get_name(&message)?;
|
||||||
|
if !self.prepared_statements_enabled || client_given_name.is_empty() {
|
||||||
debug!("Anonymous parse message");
|
debug!("Anonymous parse message");
|
||||||
self.extended_protocol_data_buffer
|
self.extended_protocol_data_buffer
|
||||||
.push_back(ExtendedProtocolData::create_new_parse(message, None));
|
.push_back(ExtendedProtocolData::create_new_parse(message, None));
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let client_given_name = Parse::get_name(&message)?;
|
|
||||||
let parse: Parse = (&message).try_into()?;
|
let parse: Parse = (&message).try_into()?;
|
||||||
|
|
||||||
// Compute the hash of the parse statement
|
// Compute the hash of the parse statement
|
||||||
@@ -1831,15 +1773,14 @@ where
|
|||||||
/// saved in the client cache.
|
/// saved in the client cache.
|
||||||
async fn buffer_bind(&mut self, message: BytesMut) -> Result<(), Error> {
|
async fn buffer_bind(&mut self, message: BytesMut) -> Result<(), Error> {
|
||||||
// Avoid parsing if prepared statements not enabled
|
// Avoid parsing if prepared statements not enabled
|
||||||
if !self.prepared_statements_enabled {
|
let client_given_name = Bind::get_name(&message)?;
|
||||||
|
if !self.prepared_statements_enabled || client_given_name.is_empty() {
|
||||||
debug!("Anonymous bind message");
|
debug!("Anonymous bind message");
|
||||||
self.extended_protocol_data_buffer
|
self.extended_protocol_data_buffer
|
||||||
.push_back(ExtendedProtocolData::create_new_bind(message, None));
|
.push_back(ExtendedProtocolData::create_new_bind(message, None));
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let client_given_name = Bind::get_name(&message)?;
|
|
||||||
|
|
||||||
match self.prepared_statements.get(&client_given_name) {
|
match self.prepared_statements.get(&client_given_name) {
|
||||||
Some((rewritten_parse, _)) => {
|
Some((rewritten_parse, _)) => {
|
||||||
let message = Bind::rename(message, &rewritten_parse.name)?;
|
let message = Bind::rename(message, &rewritten_parse.name)?;
|
||||||
@@ -1891,7 +1832,8 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
let describe: Describe = (&message).try_into()?;
|
let describe: Describe = (&message).try_into()?;
|
||||||
if describe.target == 'P' {
|
let client_given_name = describe.statement_name.clone();
|
||||||
|
if describe.target == 'P' || client_given_name.is_empty() {
|
||||||
debug!("Portal describe message");
|
debug!("Portal describe message");
|
||||||
self.extended_protocol_data_buffer
|
self.extended_protocol_data_buffer
|
||||||
.push_back(ExtendedProtocolData::create_new_describe(message, None));
|
.push_back(ExtendedProtocolData::create_new_describe(message, None));
|
||||||
@@ -1899,8 +1841,6 @@ where
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let client_given_name = describe.statement_name.clone();
|
|
||||||
|
|
||||||
match self.prepared_statements.get(&client_given_name) {
|
match self.prepared_statements.get(&client_given_name) {
|
||||||
Some((rewritten_parse, _)) => {
|
Some((rewritten_parse, _)) => {
|
||||||
let describe = describe.rename(&rewritten_parse.name);
|
let describe = describe.rename(&rewritten_parse.name);
|
||||||
|
|||||||
@@ -208,9 +208,6 @@ impl Address {
|
|||||||
pub struct User {
|
pub struct User {
|
||||||
pub username: String,
|
pub username: String,
|
||||||
pub password: Option<String>,
|
pub password: Option<String>,
|
||||||
|
|
||||||
#[serde(default = "User::default_auth_type")]
|
|
||||||
pub auth_type: AuthType,
|
|
||||||
pub server_username: Option<String>,
|
pub server_username: Option<String>,
|
||||||
pub server_password: Option<String>,
|
pub server_password: Option<String>,
|
||||||
pub pool_size: u32,
|
pub pool_size: u32,
|
||||||
@@ -228,7 +225,6 @@ impl Default for User {
|
|||||||
User {
|
User {
|
||||||
username: String::from("postgres"),
|
username: String::from("postgres"),
|
||||||
password: None,
|
password: None,
|
||||||
auth_type: AuthType::MD5,
|
|
||||||
server_username: None,
|
server_username: None,
|
||||||
server_password: None,
|
server_password: None,
|
||||||
pool_size: 15,
|
pool_size: 15,
|
||||||
@@ -243,10 +239,6 @@ impl Default for User {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl User {
|
impl User {
|
||||||
pub fn default_auth_type() -> AuthType {
|
|
||||||
AuthType::MD5
|
|
||||||
}
|
|
||||||
|
|
||||||
fn validate(&self) -> Result<(), Error> {
|
fn validate(&self) -> Result<(), Error> {
|
||||||
if let Some(min_pool_size) = self.min_pool_size {
|
if let Some(min_pool_size) = self.min_pool_size {
|
||||||
if min_pool_size > self.pool_size {
|
if min_pool_size > self.pool_size {
|
||||||
@@ -342,9 +334,6 @@ pub struct General {
|
|||||||
pub admin_username: String,
|
pub admin_username: String,
|
||||||
pub admin_password: String,
|
pub admin_password: String,
|
||||||
|
|
||||||
#[serde(default = "General::default_admin_auth_type")]
|
|
||||||
pub admin_auth_type: AuthType,
|
|
||||||
|
|
||||||
#[serde(default = "General::default_validate_config")]
|
#[serde(default = "General::default_validate_config")]
|
||||||
pub validate_config: bool,
|
pub validate_config: bool,
|
||||||
|
|
||||||
@@ -359,10 +348,6 @@ impl General {
|
|||||||
"0.0.0.0".into()
|
"0.0.0.0".into()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn default_admin_auth_type() -> AuthType {
|
|
||||||
AuthType::MD5
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn default_port() -> u16 {
|
pub fn default_port() -> u16 {
|
||||||
5432
|
5432
|
||||||
}
|
}
|
||||||
@@ -471,7 +456,6 @@ impl Default for General {
|
|||||||
verify_server_certificate: false,
|
verify_server_certificate: false,
|
||||||
admin_username: String::from("admin"),
|
admin_username: String::from("admin"),
|
||||||
admin_password: String::from("admin"),
|
admin_password: String::from("admin"),
|
||||||
admin_auth_type: AuthType::MD5,
|
|
||||||
validate_config: true,
|
validate_config: true,
|
||||||
auth_query: None,
|
auth_query: None,
|
||||||
auth_query_user: None,
|
auth_query_user: None,
|
||||||
@@ -492,15 +476,6 @@ pub enum PoolMode {
|
|||||||
Session,
|
Session,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Copy, Hash)]
|
|
||||||
pub enum AuthType {
|
|
||||||
#[serde(alias = "trust", alias = "Trust")]
|
|
||||||
Trust,
|
|
||||||
|
|
||||||
#[serde(alias = "md5", alias = "MD5")]
|
|
||||||
MD5,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Display for PoolMode {
|
impl std::fmt::Display for PoolMode {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
|
|||||||
@@ -821,10 +821,10 @@ impl ExtendedProtocolData {
|
|||||||
pub struct Parse {
|
pub struct Parse {
|
||||||
code: char,
|
code: char,
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
len: i32,
|
len: u32,
|
||||||
pub name: String,
|
pub name: String,
|
||||||
query: String,
|
query: String,
|
||||||
num_params: i16,
|
num_params: u16,
|
||||||
param_types: Vec<i32>,
|
param_types: Vec<i32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -834,12 +834,11 @@ impl TryFrom<&BytesMut> for Parse {
|
|||||||
fn try_from(buf: &BytesMut) -> Result<Parse, Error> {
|
fn try_from(buf: &BytesMut) -> Result<Parse, Error> {
|
||||||
let mut cursor = Cursor::new(buf);
|
let mut cursor = Cursor::new(buf);
|
||||||
let code = cursor.get_u8() as char;
|
let code = cursor.get_u8() as char;
|
||||||
let len = cursor.get_i32();
|
let len = cursor.get_u32();
|
||||||
let name = cursor.read_string()?;
|
let name = cursor.read_string()?;
|
||||||
let query = cursor.read_string()?;
|
let query = cursor.read_string()?;
|
||||||
let num_params = cursor.get_i16();
|
let num_params = cursor.get_u16();
|
||||||
let mut param_types = Vec::new();
|
let mut param_types = Vec::new();
|
||||||
|
|
||||||
for _ in 0..num_params {
|
for _ in 0..num_params {
|
||||||
param_types.push(cursor.get_i32());
|
param_types.push(cursor.get_i32());
|
||||||
}
|
}
|
||||||
@@ -875,10 +874,10 @@ impl TryFrom<Parse> for BytesMut {
|
|||||||
+ 4 * parse.num_params as usize;
|
+ 4 * parse.num_params as usize;
|
||||||
|
|
||||||
bytes.put_u8(parse.code as u8);
|
bytes.put_u8(parse.code as u8);
|
||||||
bytes.put_i32(len as i32);
|
bytes.put_u32(len as u32);
|
||||||
bytes.put_slice(name);
|
bytes.put_slice(name);
|
||||||
bytes.put_slice(query);
|
bytes.put_slice(query);
|
||||||
bytes.put_i16(parse.num_params);
|
bytes.put_u16(parse.num_params);
|
||||||
for param in parse.param_types {
|
for param in parse.param_types {
|
||||||
bytes.put_i32(param);
|
bytes.put_i32(param);
|
||||||
}
|
}
|
||||||
@@ -945,14 +944,14 @@ impl Parse {
|
|||||||
pub struct Bind {
|
pub struct Bind {
|
||||||
code: char,
|
code: char,
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
len: i64,
|
len: u64,
|
||||||
portal: String,
|
portal: String,
|
||||||
pub prepared_statement: String,
|
pub prepared_statement: String,
|
||||||
num_param_format_codes: i16,
|
num_param_format_codes: u16,
|
||||||
param_format_codes: Vec<i16>,
|
param_format_codes: Vec<i16>,
|
||||||
num_param_values: i16,
|
num_param_values: u16,
|
||||||
param_values: Vec<(i32, BytesMut)>,
|
param_values: Vec<(i32, BytesMut)>,
|
||||||
num_result_column_format_codes: i16,
|
num_result_column_format_codes: u16,
|
||||||
result_columns_format_codes: Vec<i16>,
|
result_columns_format_codes: Vec<i16>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -962,17 +961,17 @@ impl TryFrom<&BytesMut> for Bind {
|
|||||||
fn try_from(buf: &BytesMut) -> Result<Bind, Error> {
|
fn try_from(buf: &BytesMut) -> Result<Bind, Error> {
|
||||||
let mut cursor = Cursor::new(buf);
|
let mut cursor = Cursor::new(buf);
|
||||||
let code = cursor.get_u8() as char;
|
let code = cursor.get_u8() as char;
|
||||||
let len = cursor.get_i32();
|
let len = cursor.get_u32();
|
||||||
let portal = cursor.read_string()?;
|
let portal = cursor.read_string()?;
|
||||||
let prepared_statement = cursor.read_string()?;
|
let prepared_statement = cursor.read_string()?;
|
||||||
let num_param_format_codes = cursor.get_i16();
|
let num_param_format_codes = cursor.get_u16();
|
||||||
let mut param_format_codes = Vec::new();
|
let mut param_format_codes = Vec::new();
|
||||||
|
|
||||||
for _ in 0..num_param_format_codes {
|
for _ in 0..num_param_format_codes {
|
||||||
param_format_codes.push(cursor.get_i16());
|
param_format_codes.push(cursor.get_i16());
|
||||||
}
|
}
|
||||||
|
|
||||||
let num_param_values = cursor.get_i16();
|
let num_param_values = cursor.get_u16();
|
||||||
let mut param_values = Vec::new();
|
let mut param_values = Vec::new();
|
||||||
|
|
||||||
for _ in 0..num_param_values {
|
for _ in 0..num_param_values {
|
||||||
@@ -994,7 +993,7 @@ impl TryFrom<&BytesMut> for Bind {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let num_result_column_format_codes = cursor.get_i16();
|
let num_result_column_format_codes = cursor.get_u16();
|
||||||
let mut result_columns_format_codes = Vec::new();
|
let mut result_columns_format_codes = Vec::new();
|
||||||
|
|
||||||
for _ in 0..num_result_column_format_codes {
|
for _ in 0..num_result_column_format_codes {
|
||||||
@@ -1003,7 +1002,7 @@ impl TryFrom<&BytesMut> for Bind {
|
|||||||
|
|
||||||
Ok(Bind {
|
Ok(Bind {
|
||||||
code,
|
code,
|
||||||
len: len as i64,
|
len: len as u64,
|
||||||
portal,
|
portal,
|
||||||
prepared_statement,
|
prepared_statement,
|
||||||
num_param_format_codes,
|
num_param_format_codes,
|
||||||
@@ -1042,19 +1041,19 @@ impl TryFrom<Bind> for BytesMut {
|
|||||||
len += 2 * bind.num_result_column_format_codes as usize;
|
len += 2 * bind.num_result_column_format_codes as usize;
|
||||||
|
|
||||||
bytes.put_u8(bind.code as u8);
|
bytes.put_u8(bind.code as u8);
|
||||||
bytes.put_i32(len as i32);
|
bytes.put_u32(len as u32);
|
||||||
bytes.put_slice(portal);
|
bytes.put_slice(portal);
|
||||||
bytes.put_slice(prepared_statement);
|
bytes.put_slice(prepared_statement);
|
||||||
bytes.put_i16(bind.num_param_format_codes);
|
bytes.put_u16(bind.num_param_format_codes);
|
||||||
for param_format_code in bind.param_format_codes {
|
for param_format_code in bind.param_format_codes {
|
||||||
bytes.put_i16(param_format_code);
|
bytes.put_i16(param_format_code);
|
||||||
}
|
}
|
||||||
bytes.put_i16(bind.num_param_values);
|
bytes.put_u16(bind.num_param_values);
|
||||||
for (param_len, param) in bind.param_values {
|
for (param_len, param) in bind.param_values {
|
||||||
bytes.put_i32(param_len);
|
bytes.put_i32(param_len);
|
||||||
bytes.put_slice(¶m);
|
bytes.put_slice(¶m);
|
||||||
}
|
}
|
||||||
bytes.put_i16(bind.num_result_column_format_codes);
|
bytes.put_u16(bind.num_result_column_format_codes);
|
||||||
for result_column_format_code in bind.result_columns_format_codes {
|
for result_column_format_code in bind.result_columns_format_codes {
|
||||||
bytes.put_i16(result_column_format_code);
|
bytes.put_i16(result_column_format_code);
|
||||||
}
|
}
|
||||||
@@ -1068,7 +1067,7 @@ impl Bind {
|
|||||||
pub fn get_name(buf: &BytesMut) -> Result<String, Error> {
|
pub fn get_name(buf: &BytesMut) -> Result<String, Error> {
|
||||||
let mut cursor = Cursor::new(buf);
|
let mut cursor = Cursor::new(buf);
|
||||||
// Skip the code and length
|
// Skip the code and length
|
||||||
cursor.advance(mem::size_of::<u8>() + mem::size_of::<i32>());
|
cursor.advance(mem::size_of::<u8>() + mem::size_of::<u32>());
|
||||||
cursor.read_string()?;
|
cursor.read_string()?;
|
||||||
cursor.read_string()
|
cursor.read_string()
|
||||||
}
|
}
|
||||||
@@ -1078,17 +1077,17 @@ impl Bind {
|
|||||||
let mut cursor = Cursor::new(&buf);
|
let mut cursor = Cursor::new(&buf);
|
||||||
// Read basic data from the cursor
|
// Read basic data from the cursor
|
||||||
let code = cursor.get_u8();
|
let code = cursor.get_u8();
|
||||||
let current_len = cursor.get_i32();
|
let current_len = cursor.get_u32();
|
||||||
let portal = cursor.read_string()?;
|
let portal = cursor.read_string()?;
|
||||||
let prepared_statement = cursor.read_string()?;
|
let prepared_statement = cursor.read_string()?;
|
||||||
|
|
||||||
// Calculate new length
|
// Calculate new length
|
||||||
let new_len = current_len + new_name.len() as i32 - prepared_statement.len() as i32;
|
let new_len = current_len + new_name.len() as u32 - prepared_statement.len() as u32;
|
||||||
|
|
||||||
// Begin building the response buffer
|
// Begin building the response buffer
|
||||||
let mut response_buf = BytesMut::with_capacity(new_len as usize + 1);
|
let mut response_buf = BytesMut::with_capacity(new_len as usize + 1);
|
||||||
response_buf.put_u8(code);
|
response_buf.put_u8(code);
|
||||||
response_buf.put_i32(new_len);
|
response_buf.put_u32(new_len);
|
||||||
|
|
||||||
// Put the portal and new name into the buffer
|
// Put the portal and new name into the buffer
|
||||||
// Note: panic if the provided string contains null byte
|
// Note: panic if the provided string contains null byte
|
||||||
@@ -1112,7 +1111,7 @@ pub struct Describe {
|
|||||||
code: char,
|
code: char,
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
len: i32,
|
len: u32,
|
||||||
pub target: char,
|
pub target: char,
|
||||||
pub statement_name: String,
|
pub statement_name: String,
|
||||||
}
|
}
|
||||||
@@ -1123,7 +1122,7 @@ impl TryFrom<&BytesMut> for Describe {
|
|||||||
fn try_from(bytes: &BytesMut) -> Result<Describe, Error> {
|
fn try_from(bytes: &BytesMut) -> Result<Describe, Error> {
|
||||||
let mut cursor = Cursor::new(bytes);
|
let mut cursor = Cursor::new(bytes);
|
||||||
let code = cursor.get_u8() as char;
|
let code = cursor.get_u8() as char;
|
||||||
let len = cursor.get_i32();
|
let len = cursor.get_u32();
|
||||||
let target = cursor.get_u8() as char;
|
let target = cursor.get_u8() as char;
|
||||||
let statement_name = cursor.read_string()?;
|
let statement_name = cursor.read_string()?;
|
||||||
|
|
||||||
@@ -1146,7 +1145,7 @@ impl TryFrom<Describe> for BytesMut {
|
|||||||
let len = 4 + 1 + statement_name.len();
|
let len = 4 + 1 + statement_name.len();
|
||||||
|
|
||||||
bytes.put_u8(describe.code as u8);
|
bytes.put_u8(describe.code as u8);
|
||||||
bytes.put_i32(len as i32);
|
bytes.put_u32(len as u32);
|
||||||
bytes.put_u8(describe.target as u8);
|
bytes.put_u8(describe.target as u8);
|
||||||
bytes.put_slice(statement_name);
|
bytes.put_slice(statement_name);
|
||||||
|
|
||||||
|
|||||||
@@ -200,17 +200,18 @@ struct PrometheusMetric<Value: fmt::Display> {
|
|||||||
|
|
||||||
impl<Value: fmt::Display> fmt::Display for PrometheusMetric<Value> {
|
impl<Value: fmt::Display> fmt::Display for PrometheusMetric<Value> {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
let mut sorted_labels: Vec<_> = self.labels.iter().collect();
|
let formatted_labels = self
|
||||||
sorted_labels.sort_by_key(|&(key, _)| key);
|
.labels
|
||||||
let formatted_labels = sorted_labels
|
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(key, value)| format!("{}=\"{}\"", key, value))
|
.map(|(key, value)| format!("{}=\"{}\"", key, value))
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.join(",");
|
.join(",");
|
||||||
write!(
|
write!(
|
||||||
f,
|
f,
|
||||||
"{name}{{{formatted_labels}}} {value}",
|
"# HELP {name} {help}\n# TYPE {name} {ty}\n{name}{{{formatted_labels}}} {value}\n",
|
||||||
name = format_args!("pgcat_{}", self.name),
|
name = format_args!("pgcat_{}", self.name),
|
||||||
|
help = self.help,
|
||||||
|
ty = self.ty,
|
||||||
formatted_labels = formatted_labels,
|
formatted_labels = formatted_labels,
|
||||||
value = self.value
|
value = self.value
|
||||||
)
|
)
|
||||||
@@ -246,7 +247,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
|||||||
labels.insert("pool", address.pool_name.clone());
|
labels.insert("pool", address.pool_name.clone());
|
||||||
labels.insert("index", address.address_index.to_string());
|
labels.insert("index", address.address_index.to_string());
|
||||||
labels.insert("database", address.database.to_string());
|
labels.insert("database", address.database.to_string());
|
||||||
labels.insert("username", address.username.clone());
|
labels.insert("user", address.username.clone());
|
||||||
|
|
||||||
Self::from_name(&format!("databases_{}", name), value, labels)
|
Self::from_name(&format!("databases_{}", name), value, labels)
|
||||||
}
|
}
|
||||||
@@ -263,8 +264,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
|||||||
labels.insert("pool", address.pool_name.clone());
|
labels.insert("pool", address.pool_name.clone());
|
||||||
labels.insert("index", address.address_index.to_string());
|
labels.insert("index", address.address_index.to_string());
|
||||||
labels.insert("database", address.database.to_string());
|
labels.insert("database", address.database.to_string());
|
||||||
labels.insert("username", address.username.clone());
|
labels.insert("user", address.username.clone());
|
||||||
|
|
||||||
Self::from_name(&format!("servers_{}", name), value, labels)
|
Self::from_name(&format!("servers_{}", name), value, labels)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -276,7 +276,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
|||||||
labels.insert("role", address.role.to_string());
|
labels.insert("role", address.role.to_string());
|
||||||
labels.insert("index", address.address_index.to_string());
|
labels.insert("index", address.address_index.to_string());
|
||||||
labels.insert("database", address.database.to_string());
|
labels.insert("database", address.database.to_string());
|
||||||
labels.insert("username", address.username.clone());
|
labels.insert("user", address.username.clone());
|
||||||
|
|
||||||
Self::from_name(&format!("stats_{}", name), value, labels)
|
Self::from_name(&format!("stats_{}", name), value, labels)
|
||||||
}
|
}
|
||||||
@@ -288,15 +288,6 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
|||||||
|
|
||||||
Self::from_name(&format!("pools_{}", name), value, labels)
|
Self::from_name(&format!("pools_{}", name), value, labels)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_header(&self) -> String {
|
|
||||||
format!(
|
|
||||||
"\n# HELP {name} {help}\n# TYPE {name} {ty}",
|
|
||||||
name = format_args!("pgcat_{}", self.name),
|
|
||||||
help = self.help,
|
|
||||||
ty = self.ty,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn prometheus_stats(
|
async fn prometheus_stats(
|
||||||
@@ -322,7 +313,6 @@ async fn prometheus_stats(
|
|||||||
|
|
||||||
// Adds metrics shown in a SHOW STATS admin command.
|
// Adds metrics shown in a SHOW STATS admin command.
|
||||||
fn push_address_stats(lines: &mut Vec<String>) {
|
fn push_address_stats(lines: &mut Vec<String>) {
|
||||||
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
|
|
||||||
for (_, pool) in get_all_pools() {
|
for (_, pool) in get_all_pools() {
|
||||||
for shard in 0..pool.shards() {
|
for shard in 0..pool.shards() {
|
||||||
for server in 0..pool.servers(shard) {
|
for server in 0..pool.servers(shard) {
|
||||||
@@ -332,10 +322,7 @@ fn push_address_stats(lines: &mut Vec<String>) {
|
|||||||
if let Some(prometheus_metric) =
|
if let Some(prometheus_metric) =
|
||||||
PrometheusMetric::<u64>::from_address(address, &key, value)
|
PrometheusMetric::<u64>::from_address(address, &key, value)
|
||||||
{
|
{
|
||||||
grouped_metrics
|
lines.push(prometheus_metric.to_string());
|
||||||
.entry(key)
|
|
||||||
.or_default()
|
|
||||||
.push(prometheus_metric);
|
|
||||||
} else {
|
} else {
|
||||||
debug!("Metric {} not implemented for {}", key, address.name());
|
debug!("Metric {} not implemented for {}", key, address.name());
|
||||||
}
|
}
|
||||||
@@ -343,53 +330,33 @@ fn push_address_stats(lines: &mut Vec<String>) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (_key, metrics) in grouped_metrics {
|
|
||||||
if !metrics.is_empty() {
|
|
||||||
lines.push(metrics[0].get_header());
|
|
||||||
for metric in metrics {
|
|
||||||
lines.push(metric.to_string());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adds relevant metrics shown in a SHOW POOLS admin command.
|
// Adds relevant metrics shown in a SHOW POOLS admin command.
|
||||||
fn push_pool_stats(lines: &mut Vec<String>) {
|
fn push_pool_stats(lines: &mut Vec<String>) {
|
||||||
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
|
|
||||||
let pool_stats = PoolStats::construct_pool_lookup();
|
let pool_stats = PoolStats::construct_pool_lookup();
|
||||||
for (pool_id, stats) in pool_stats.iter() {
|
for (pool_id, stats) in pool_stats.iter() {
|
||||||
for (name, value) in stats.clone() {
|
for (name, value) in stats.clone() {
|
||||||
if let Some(prometheus_metric) =
|
if let Some(prometheus_metric) =
|
||||||
PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value)
|
PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value)
|
||||||
{
|
{
|
||||||
grouped_metrics
|
lines.push(prometheus_metric.to_string());
|
||||||
.entry(name)
|
|
||||||
.or_default()
|
|
||||||
.push(prometheus_metric);
|
|
||||||
} else {
|
} else {
|
||||||
debug!("Metric {} not implemented for ({})", name, *pool_id);
|
debug!("Metric {} not implemented for ({})", name, *pool_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (_key, metrics) in grouped_metrics {
|
|
||||||
if !metrics.is_empty() {
|
|
||||||
lines.push(metrics[0].get_header());
|
|
||||||
for metric in metrics {
|
|
||||||
lines.push(metric.to_string());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adds relevant metrics shown in a SHOW DATABASES admin command.
|
// Adds relevant metrics shown in a SHOW DATABASES admin command.
|
||||||
fn push_database_stats(lines: &mut Vec<String>) {
|
fn push_database_stats(lines: &mut Vec<String>) {
|
||||||
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u32>>> = HashMap::new();
|
|
||||||
for (_, pool) in get_all_pools() {
|
for (_, pool) in get_all_pools() {
|
||||||
let pool_config = pool.settings.clone();
|
let pool_config = pool.settings.clone();
|
||||||
for shard in 0..pool.shards() {
|
for shard in 0..pool.shards() {
|
||||||
for server in 0..pool.servers(shard) {
|
for server in 0..pool.servers(shard) {
|
||||||
let address = pool.address(shard, server);
|
let address = pool.address(shard, server);
|
||||||
let pool_state = pool.pool_state(shard, server);
|
let pool_state = pool.pool_state(shard, server);
|
||||||
|
|
||||||
let metrics = vec![
|
let metrics = vec![
|
||||||
("pool_size", pool_config.user.pool_size),
|
("pool_size", pool_config.user.pool_size),
|
||||||
("current_connections", pool_state.connections),
|
("current_connections", pool_state.connections),
|
||||||
@@ -398,10 +365,7 @@ fn push_database_stats(lines: &mut Vec<String>) {
|
|||||||
if let Some(prometheus_metric) =
|
if let Some(prometheus_metric) =
|
||||||
PrometheusMetric::<u32>::from_database_info(address, key, value)
|
PrometheusMetric::<u32>::from_database_info(address, key, value)
|
||||||
{
|
{
|
||||||
grouped_metrics
|
lines.push(prometheus_metric.to_string());
|
||||||
.entry(key.to_string())
|
|
||||||
.or_default()
|
|
||||||
.push(prometheus_metric);
|
|
||||||
} else {
|
} else {
|
||||||
debug!("Metric {} not implemented for {}", key, address.name());
|
debug!("Metric {} not implemented for {}", key, address.name());
|
||||||
}
|
}
|
||||||
@@ -409,14 +373,6 @@ fn push_database_stats(lines: &mut Vec<String>) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (_key, metrics) in grouped_metrics {
|
|
||||||
if !metrics.is_empty() {
|
|
||||||
lines.push(metrics[0].get_header());
|
|
||||||
for metric in metrics {
|
|
||||||
lines.push(metric.to_string());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adds relevant metrics shown in a SHOW SERVERS admin command.
|
// Adds relevant metrics shown in a SHOW SERVERS admin command.
|
||||||
@@ -449,7 +405,7 @@ fn push_server_stats(lines: &mut Vec<String>) {
|
|||||||
crate::stats::ServerState::Idle => entry.idle_count += 1,
|
crate::stats::ServerState::Idle => entry.idle_count += 1,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
|
|
||||||
for (_, pool) in get_all_pools() {
|
for (_, pool) in get_all_pools() {
|
||||||
for shard in 0..pool.shards() {
|
for shard in 0..pool.shards() {
|
||||||
for server in 0..pool.servers(shard) {
|
for server in 0..pool.servers(shard) {
|
||||||
@@ -472,10 +428,7 @@ fn push_server_stats(lines: &mut Vec<String>) {
|
|||||||
if let Some(prometheus_metric) =
|
if let Some(prometheus_metric) =
|
||||||
PrometheusMetric::<u64>::from_server_info(address, key, value)
|
PrometheusMetric::<u64>::from_server_info(address, key, value)
|
||||||
{
|
{
|
||||||
grouped_metrics
|
lines.push(prometheus_metric.to_string());
|
||||||
.entry(key.to_string())
|
|
||||||
.or_default()
|
|
||||||
.push(prometheus_metric);
|
|
||||||
} else {
|
} else {
|
||||||
debug!("Metric {} not implemented for {}", key, address.name());
|
debug!("Metric {} not implemented for {}", key, address.name());
|
||||||
}
|
}
|
||||||
@@ -484,14 +437,6 @@ fn push_server_stats(lines: &mut Vec<String>) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (_key, metrics) in grouped_metrics {
|
|
||||||
if !metrics.is_empty() {
|
|
||||||
lines.push(metrics[0].get_header());
|
|
||||||
for metric in metrics {
|
|
||||||
lines.push(metric.to_string());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn start_metric_server(http_addr: SocketAddr) {
|
pub async fn start_metric_server(http_addr: SocketAddr) {
|
||||||
|
|||||||
@@ -698,7 +698,6 @@ impl Server {
|
|||||||
))
|
))
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
trace!("Error: {}", error_code);
|
trace!("Error: {}", error_code);
|
||||||
|
|
||||||
match error_code {
|
match error_code {
|
||||||
@@ -1013,6 +1012,12 @@ impl Server {
|
|||||||
// which can leak between clients. This is a best effort to block bad clients
|
// which can leak between clients. This is a best effort to block bad clients
|
||||||
// from poisoning a transaction-mode pool by setting inappropriate session variables
|
// from poisoning a transaction-mode pool by setting inappropriate session variables
|
||||||
match command.as_str() {
|
match command.as_str() {
|
||||||
|
"DISCARD ALL" => {
|
||||||
|
self.clear_prepared_statement_cache();
|
||||||
|
}
|
||||||
|
"DEALLOCATE ALL" => {
|
||||||
|
self.clear_prepared_statement_cache();
|
||||||
|
}
|
||||||
"SET" => {
|
"SET" => {
|
||||||
// We don't detect set statements in transactions
|
// We don't detect set statements in transactions
|
||||||
// No great way to differentiate between set and set local
|
// No great way to differentiate between set and set local
|
||||||
@@ -1132,6 +1137,12 @@ impl Server {
|
|||||||
has_it
|
has_it
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn clear_prepared_statement_cache(&mut self) {
|
||||||
|
if let Some(cache) = &mut self.prepared_statement_cache {
|
||||||
|
cache.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn add_prepared_statement_to_cache(&mut self, name: &str) -> Option<String> {
|
fn add_prepared_statement_to_cache(&mut self, name: &str) -> Option<String> {
|
||||||
let cache = match &mut self.prepared_statement_cache {
|
let cache = match &mut self.prepared_statement_cache {
|
||||||
Some(cache) => cache,
|
Some(cache) => cache,
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ docker compose exec --workdir /app/tests/python main pip3 install -r requirement
|
|||||||
echo "Interactive test environment ready"
|
echo "Interactive test environment ready"
|
||||||
echo "To run integration tests, you can use the following commands:"
|
echo "To run integration tests, you can use the following commands:"
|
||||||
echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}"
|
echo -e " ${BLUE}Ruby: ${RED}cd /app/tests/ruby && bundle exec ruby tests.rb --format documentation${RESET}"
|
||||||
echo -e " ${BLUE}Python: ${RED}cd /app/ && pytest ${RESET}"
|
echo -e " ${BLUE}Python: ${RED}cd /app && python3 tests/python/tests.py${RESET}"
|
||||||
echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}"
|
echo -e " ${BLUE}Rust: ${RED}cd /app/tests/rust && cargo run ${RESET}"
|
||||||
echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}"
|
echo -e " ${BLUE}Go: ${RED}cd /app/tests/go && /usr/local/go/bin/go test${RESET}"
|
||||||
echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again"
|
echo "the source code for tests are directly linked to the source code in the container so you can modify the code and run the tests again"
|
||||||
|
|||||||
@@ -1,3 +1,2 @@
|
|||||||
pytest
|
|
||||||
psycopg2==2.9.3
|
psycopg2==2.9.3
|
||||||
psutil==5.9.1
|
psutil==5.9.1
|
||||||
@@ -1,71 +0,0 @@
|
|||||||
import utils
|
|
||||||
import signal
|
|
||||||
|
|
||||||
class TestTrustAuth:
|
|
||||||
@classmethod
|
|
||||||
def setup_method(cls):
|
|
||||||
config= """
|
|
||||||
[general]
|
|
||||||
host = "0.0.0.0"
|
|
||||||
port = 6432
|
|
||||||
admin_username = "admin_user"
|
|
||||||
admin_password = ""
|
|
||||||
admin_auth_type = "trust"
|
|
||||||
|
|
||||||
[pools.sharded_db.users.0]
|
|
||||||
username = "sharding_user"
|
|
||||||
password = "sharding_user"
|
|
||||||
auth_type = "trust"
|
|
||||||
pool_size = 10
|
|
||||||
min_pool_size = 1
|
|
||||||
pool_mode = "transaction"
|
|
||||||
|
|
||||||
[pools.sharded_db.shards.0]
|
|
||||||
servers = [
|
|
||||||
[ "127.0.0.1", 5432, "primary" ],
|
|
||||||
]
|
|
||||||
database = "shard0"
|
|
||||||
"""
|
|
||||||
utils.pgcat_generic_start(config)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def teardown_method(self):
|
|
||||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
|
||||||
|
|
||||||
def test_admin_trust_auth(self):
|
|
||||||
conn, cur = utils.connect_db_trust(admin=True)
|
|
||||||
cur.execute("SHOW POOLS")
|
|
||||||
res = cur.fetchall()
|
|
||||||
print(res)
|
|
||||||
utils.cleanup_conn(conn, cur)
|
|
||||||
|
|
||||||
def test_normal_trust_auth(self):
|
|
||||||
conn, cur = utils.connect_db_trust(autocommit=False)
|
|
||||||
cur.execute("SELECT 1")
|
|
||||||
res = cur.fetchall()
|
|
||||||
print(res)
|
|
||||||
utils.cleanup_conn(conn, cur)
|
|
||||||
|
|
||||||
class TestMD5Auth:
|
|
||||||
@classmethod
|
|
||||||
def setup_method(cls):
|
|
||||||
utils.pgcat_start()
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def teardown_method(self):
|
|
||||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
|
||||||
|
|
||||||
def test_normal_db_access(self):
|
|
||||||
conn, cur = utils.connect_db(autocommit=False)
|
|
||||||
cur.execute("SELECT 1")
|
|
||||||
res = cur.fetchall()
|
|
||||||
print(res)
|
|
||||||
utils.cleanup_conn(conn, cur)
|
|
||||||
|
|
||||||
def test_admin_db_access(self):
|
|
||||||
conn, cur = utils.connect_db(admin=True)
|
|
||||||
|
|
||||||
cur.execute("SHOW POOLS")
|
|
||||||
res = cur.fetchall()
|
|
||||||
print(res)
|
|
||||||
utils.cleanup_conn(conn, cur)
|
|
||||||
@@ -1,12 +1,84 @@
|
|||||||
|
from typing import Tuple
|
||||||
|
import psycopg2
|
||||||
|
import psutil
|
||||||
|
import os
|
||||||
import signal
|
import signal
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import psycopg2
|
|
||||||
import utils
|
|
||||||
|
|
||||||
SHUTDOWN_TIMEOUT = 5
|
SHUTDOWN_TIMEOUT = 5
|
||||||
|
|
||||||
|
PGCAT_HOST = "127.0.0.1"
|
||||||
|
PGCAT_PORT = "6432"
|
||||||
|
|
||||||
|
|
||||||
|
def pgcat_start():
|
||||||
|
pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
os.system("./target/debug/pgcat .circleci/pgcat.toml &")
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
|
||||||
|
def pg_cat_send_signal(signal: signal.Signals):
|
||||||
|
try:
|
||||||
|
for proc in psutil.process_iter(["pid", "name"]):
|
||||||
|
if "pgcat" == proc.name():
|
||||||
|
os.kill(proc.pid, signal)
|
||||||
|
except Exception as e:
|
||||||
|
# The process can be gone when we send this signal
|
||||||
|
print(e)
|
||||||
|
|
||||||
|
if signal == signal.SIGTERM:
|
||||||
|
# Returns 0 if pgcat process exists
|
||||||
|
time.sleep(2)
|
||||||
|
if not os.system('pgrep pgcat'):
|
||||||
|
raise Exception("pgcat not closed after SIGTERM")
|
||||||
|
|
||||||
|
|
||||||
|
def connect_db(
|
||||||
|
autocommit: bool = True,
|
||||||
|
admin: bool = False,
|
||||||
|
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
||||||
|
|
||||||
|
if admin:
|
||||||
|
user = "admin_user"
|
||||||
|
password = "admin_pass"
|
||||||
|
db = "pgcat"
|
||||||
|
else:
|
||||||
|
user = "sharding_user"
|
||||||
|
password = "sharding_user"
|
||||||
|
db = "sharded_db"
|
||||||
|
|
||||||
|
conn = psycopg2.connect(
|
||||||
|
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
||||||
|
connect_timeout=2,
|
||||||
|
)
|
||||||
|
conn.autocommit = autocommit
|
||||||
|
cur = conn.cursor()
|
||||||
|
|
||||||
|
return (conn, cur)
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
|
||||||
|
cur.close()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def test_normal_db_access():
|
||||||
|
pgcat_start()
|
||||||
|
conn, cur = connect_db(autocommit=False)
|
||||||
|
cur.execute("SELECT 1")
|
||||||
|
res = cur.fetchall()
|
||||||
|
print(res)
|
||||||
|
cleanup_conn(conn, cur)
|
||||||
|
|
||||||
|
|
||||||
|
def test_admin_db_access():
|
||||||
|
conn, cur = connect_db(admin=True)
|
||||||
|
|
||||||
|
cur.execute("SHOW POOLS")
|
||||||
|
res = cur.fetchall()
|
||||||
|
print(res)
|
||||||
|
cleanup_conn(conn, cur)
|
||||||
|
|
||||||
|
|
||||||
def test_shutdown_logic():
|
def test_shutdown_logic():
|
||||||
|
|
||||||
@@ -14,17 +86,17 @@ def test_shutdown_logic():
|
|||||||
# NO ACTIVE QUERIES SIGINT HANDLING
|
# NO ACTIVE QUERIES SIGINT HANDLING
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
utils.pgcat_start()
|
pgcat_start()
|
||||||
|
|
||||||
# Create client connection and send query (not in transaction)
|
# Create client connection and send query (not in transaction)
|
||||||
conn, cur = utils.connect_db()
|
conn, cur = connect_db()
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
cur.execute("COMMIT;")
|
cur.execute("COMMIT;")
|
||||||
|
|
||||||
# Send sigint to pgcat
|
# Send sigint to pgcat
|
||||||
utils.pg_cat_send_signal(signal.SIGINT)
|
pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
# Check that any new queries fail after sigint since server should close with no active transactions
|
# Check that any new queries fail after sigint since server should close with no active transactions
|
||||||
@@ -36,18 +108,18 @@ def test_shutdown_logic():
|
|||||||
# Fail if query execution succeeded
|
# Fail if query execution succeeded
|
||||||
raise Exception("Server not closed after sigint")
|
raise Exception("Server not closed after sigint")
|
||||||
|
|
||||||
utils.cleanup_conn(conn, cur)
|
cleanup_conn(conn, cur)
|
||||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
|
# NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
utils.pgcat_start()
|
pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
conn, cur = utils.connect_db()
|
conn, cur = connect_db()
|
||||||
admin_conn, admin_cur = utils.connect_db(admin=True)
|
admin_conn, admin_cur = connect_db(admin=True)
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
@@ -66,24 +138,24 @@ def test_shutdown_logic():
|
|||||||
# Fail if query execution succeeded
|
# Fail if query execution succeeded
|
||||||
raise Exception("Server not closed after sigint")
|
raise Exception("Server not closed after sigint")
|
||||||
|
|
||||||
utils.cleanup_conn(conn, cur)
|
cleanup_conn(conn, cur)
|
||||||
utils.cleanup_conn(admin_conn, admin_cur)
|
cleanup_conn(admin_conn, admin_cur)
|
||||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# HANDLE TRANSACTION WITH SIGINT
|
# HANDLE TRANSACTION WITH SIGINT
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
utils.pgcat_start()
|
pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
conn, cur = utils.connect_db()
|
conn, cur = connect_db()
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
utils.pg_cat_send_signal(signal.SIGINT)
|
pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
# Check that any new queries succeed after sigint since server should still allow transaction to complete
|
# Check that any new queries succeed after sigint since server should still allow transaction to complete
|
||||||
@@ -93,18 +165,18 @@ def test_shutdown_logic():
|
|||||||
# Fail if query fails since server closed
|
# Fail if query fails since server closed
|
||||||
raise Exception("Server closed while in transaction", e.pgerror)
|
raise Exception("Server closed while in transaction", e.pgerror)
|
||||||
|
|
||||||
utils.cleanup_conn(conn, cur)
|
cleanup_conn(conn, cur)
|
||||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
|
# HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
utils.pgcat_start()
|
pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
conn, cur = utils.connect_db()
|
conn, cur = connect_db()
|
||||||
admin_conn, admin_cur = utils.connect_db(admin=True)
|
admin_conn, admin_cur = connect_db(admin=True)
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
@@ -122,30 +194,30 @@ def test_shutdown_logic():
|
|||||||
# Fail if query fails since server closed
|
# Fail if query fails since server closed
|
||||||
raise Exception("Server closed while in transaction", e.pgerror)
|
raise Exception("Server closed while in transaction", e.pgerror)
|
||||||
|
|
||||||
utils.cleanup_conn(conn, cur)
|
cleanup_conn(conn, cur)
|
||||||
utils.cleanup_conn(admin_conn, admin_cur)
|
cleanup_conn(admin_conn, admin_cur)
|
||||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
|
# NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
utils.pgcat_start()
|
pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
transaction_conn, transaction_cur = utils.connect_db()
|
transaction_conn, transaction_cur = connect_db()
|
||||||
|
|
||||||
transaction_cur.execute("BEGIN;")
|
transaction_cur.execute("BEGIN;")
|
||||||
transaction_cur.execute("SELECT 1;")
|
transaction_cur.execute("SELECT 1;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
utils.pg_cat_send_signal(signal.SIGINT)
|
pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
try:
|
try:
|
||||||
conn, cur = utils.connect_db()
|
conn, cur = connect_db()
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
utils.cleanup_conn(conn, cur)
|
cleanup_conn(conn, cur)
|
||||||
except psycopg2.OperationalError as e:
|
except psycopg2.OperationalError as e:
|
||||||
time_taken = time.perf_counter() - start
|
time_taken = time.perf_counter() - start
|
||||||
if time_taken > 0.1:
|
if time_taken > 0.1:
|
||||||
@@ -155,49 +227,49 @@ def test_shutdown_logic():
|
|||||||
else:
|
else:
|
||||||
raise Exception("Able connect to database during shutdown")
|
raise Exception("Able connect to database during shutdown")
|
||||||
|
|
||||||
utils.cleanup_conn(transaction_conn, transaction_cur)
|
cleanup_conn(transaction_conn, transaction_cur)
|
||||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
|
# ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
utils.pgcat_start()
|
pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
transaction_conn, transaction_cur = utils.connect_db()
|
transaction_conn, transaction_cur = connect_db()
|
||||||
|
|
||||||
transaction_cur.execute("BEGIN;")
|
transaction_cur.execute("BEGIN;")
|
||||||
transaction_cur.execute("SELECT 1;")
|
transaction_cur.execute("SELECT 1;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
utils.pg_cat_send_signal(signal.SIGINT)
|
pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
conn, cur = utils.connect_db(admin=True)
|
conn, cur = connect_db(admin=True)
|
||||||
cur.execute("SHOW DATABASES;")
|
cur.execute("SHOW DATABASES;")
|
||||||
utils.cleanup_conn(conn, cur)
|
cleanup_conn(conn, cur)
|
||||||
except psycopg2.OperationalError as e:
|
except psycopg2.OperationalError as e:
|
||||||
raise Exception(e)
|
raise Exception(e)
|
||||||
|
|
||||||
utils.cleanup_conn(transaction_conn, transaction_cur)
|
cleanup_conn(transaction_conn, transaction_cur)
|
||||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
|
# ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
utils.pgcat_start()
|
pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction
|
# Create client connection and begin transaction
|
||||||
transaction_conn, transaction_cur = utils.connect_db()
|
transaction_conn, transaction_cur = connect_db()
|
||||||
transaction_cur.execute("BEGIN;")
|
transaction_cur.execute("BEGIN;")
|
||||||
transaction_cur.execute("SELECT 1;")
|
transaction_cur.execute("SELECT 1;")
|
||||||
|
|
||||||
admin_conn, admin_cur = utils.connect_db(admin=True)
|
admin_conn, admin_cur = connect_db(admin=True)
|
||||||
admin_cur.execute("SHOW DATABASES;")
|
admin_cur.execute("SHOW DATABASES;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
utils.pg_cat_send_signal(signal.SIGINT)
|
pg_cat_send_signal(signal.SIGINT)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -205,24 +277,24 @@ def test_shutdown_logic():
|
|||||||
except psycopg2.OperationalError as e:
|
except psycopg2.OperationalError as e:
|
||||||
raise Exception("Could not execute admin command:", e)
|
raise Exception("Could not execute admin command:", e)
|
||||||
|
|
||||||
utils.cleanup_conn(transaction_conn, transaction_cur)
|
cleanup_conn(transaction_conn, transaction_cur)
|
||||||
utils.cleanup_conn(admin_conn, admin_cur)
|
cleanup_conn(admin_conn, admin_cur)
|
||||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT
|
# HANDLE SHUTDOWN TIMEOUT WITH SIGINT
|
||||||
|
|
||||||
# Start pgcat
|
# Start pgcat
|
||||||
utils.pgcat_start()
|
pgcat_start()
|
||||||
|
|
||||||
# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
|
# Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
|
||||||
conn, cur = utils.connect_db()
|
conn, cur = connect_db()
|
||||||
|
|
||||||
cur.execute("BEGIN;")
|
cur.execute("BEGIN;")
|
||||||
cur.execute("SELECT 1;")
|
cur.execute("SELECT 1;")
|
||||||
|
|
||||||
# Send sigint to pgcat while still in transaction
|
# Send sigint to pgcat while still in transaction
|
||||||
utils.pg_cat_send_signal(signal.SIGINT)
|
pg_cat_send_signal(signal.SIGINT)
|
||||||
|
|
||||||
# pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
|
# pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
|
||||||
time.sleep(SHUTDOWN_TIMEOUT + 1)
|
time.sleep(SHUTDOWN_TIMEOUT + 1)
|
||||||
@@ -236,7 +308,12 @@ def test_shutdown_logic():
|
|||||||
# Fail if query execution succeeded
|
# Fail if query execution succeeded
|
||||||
raise Exception("Server not closed after sigint and expected timeout")
|
raise Exception("Server not closed after sigint and expected timeout")
|
||||||
|
|
||||||
utils.cleanup_conn(conn, cur)
|
cleanup_conn(conn, cur)
|
||||||
utils.pg_cat_send_signal(signal.SIGTERM)
|
pg_cat_send_signal(signal.SIGTERM)
|
||||||
|
|
||||||
# - - - - - - - - - - - - - - - - - -
|
# - - - - - - - - - - - - - - - - - -
|
||||||
|
|
||||||
|
|
||||||
|
test_normal_db_access()
|
||||||
|
test_admin_db_access()
|
||||||
|
test_shutdown_logic()
|
||||||
@@ -1,110 +0,0 @@
|
|||||||
import os
|
|
||||||
import signal
|
|
||||||
import time
|
|
||||||
from typing import Tuple
|
|
||||||
import tempfile
|
|
||||||
|
|
||||||
import psutil
|
|
||||||
import psycopg2
|
|
||||||
|
|
||||||
PGCAT_HOST = "127.0.0.1"
|
|
||||||
PGCAT_PORT = "6432"
|
|
||||||
|
|
||||||
|
|
||||||
def _pgcat_start(config_path: str):
|
|
||||||
pg_cat_send_signal(signal.SIGTERM)
|
|
||||||
os.system(f"./target/debug/pgcat {config_path} &")
|
|
||||||
time.sleep(2)
|
|
||||||
|
|
||||||
|
|
||||||
def pgcat_start():
|
|
||||||
_pgcat_start(config_path='.circleci/pgcat.toml')
|
|
||||||
|
|
||||||
|
|
||||||
def pgcat_generic_start(config: str):
|
|
||||||
tmp = tempfile.NamedTemporaryFile()
|
|
||||||
with open(tmp.name, 'w') as f:
|
|
||||||
f.write(config)
|
|
||||||
_pgcat_start(config_path=tmp.name)
|
|
||||||
|
|
||||||
|
|
||||||
def glauth_send_signal(signal: signal.Signals):
|
|
||||||
try:
|
|
||||||
for proc in psutil.process_iter(["pid", "name"]):
|
|
||||||
if proc.name() == "glauth":
|
|
||||||
os.kill(proc.pid, signal)
|
|
||||||
except Exception as e:
|
|
||||||
# The process can be gone when we send this signal
|
|
||||||
print(e)
|
|
||||||
|
|
||||||
if signal == signal.SIGTERM:
|
|
||||||
# Returns 0 if pgcat process exists
|
|
||||||
time.sleep(2)
|
|
||||||
if not os.system('pgrep glauth'):
|
|
||||||
raise Exception("glauth not closed after SIGTERM")
|
|
||||||
|
|
||||||
|
|
||||||
def pg_cat_send_signal(signal: signal.Signals):
|
|
||||||
try:
|
|
||||||
for proc in psutil.process_iter(["pid", "name"]):
|
|
||||||
if "pgcat" == proc.name():
|
|
||||||
os.kill(proc.pid, signal)
|
|
||||||
except Exception as e:
|
|
||||||
# The process can be gone when we send this signal
|
|
||||||
print(e)
|
|
||||||
|
|
||||||
if signal == signal.SIGTERM:
|
|
||||||
# Returns 0 if pgcat process exists
|
|
||||||
time.sleep(2)
|
|
||||||
if not os.system('pgrep pgcat'):
|
|
||||||
raise Exception("pgcat not closed after SIGTERM")
|
|
||||||
|
|
||||||
|
|
||||||
def connect_db(
|
|
||||||
autocommit: bool = True,
|
|
||||||
admin: bool = False,
|
|
||||||
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
|
||||||
|
|
||||||
if admin:
|
|
||||||
user = "admin_user"
|
|
||||||
password = "admin_pass"
|
|
||||||
db = "pgcat"
|
|
||||||
else:
|
|
||||||
user = "sharding_user"
|
|
||||||
password = "sharding_user"
|
|
||||||
db = "sharded_db"
|
|
||||||
|
|
||||||
conn = psycopg2.connect(
|
|
||||||
f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
|
||||||
connect_timeout=2,
|
|
||||||
)
|
|
||||||
conn.autocommit = autocommit
|
|
||||||
cur = conn.cursor()
|
|
||||||
|
|
||||||
return (conn, cur)
|
|
||||||
|
|
||||||
def connect_db_trust(
|
|
||||||
autocommit: bool = True,
|
|
||||||
admin: bool = False,
|
|
||||||
) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
|
|
||||||
|
|
||||||
if admin:
|
|
||||||
user = "admin_user"
|
|
||||||
db = "pgcat"
|
|
||||||
else:
|
|
||||||
user = "sharding_user"
|
|
||||||
db = "sharded_db"
|
|
||||||
|
|
||||||
conn = psycopg2.connect(
|
|
||||||
f"postgres://{user}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
|
|
||||||
connect_timeout=2,
|
|
||||||
)
|
|
||||||
conn.autocommit = autocommit
|
|
||||||
cur = conn.cursor()
|
|
||||||
|
|
||||||
return (conn, cur)
|
|
||||||
|
|
||||||
|
|
||||||
def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
|
|
||||||
cur.close()
|
|
||||||
conn.close()
|
|
||||||
@@ -24,8 +24,7 @@ GEM
|
|||||||
pg (1.3.2)
|
pg (1.3.2)
|
||||||
rainbow (3.1.1)
|
rainbow (3.1.1)
|
||||||
regexp_parser (2.3.1)
|
regexp_parser (2.3.1)
|
||||||
rexml (3.3.6)
|
rexml (3.2.5)
|
||||||
strscan
|
|
||||||
rspec (3.11.0)
|
rspec (3.11.0)
|
||||||
rspec-core (~> 3.11.0)
|
rspec-core (~> 3.11.0)
|
||||||
rspec-expectations (~> 3.11.0)
|
rspec-expectations (~> 3.11.0)
|
||||||
@@ -51,7 +50,6 @@ GEM
|
|||||||
rubocop-ast (1.17.0)
|
rubocop-ast (1.17.0)
|
||||||
parser (>= 3.1.1.0)
|
parser (>= 3.1.1.0)
|
||||||
ruby-progressbar (1.11.0)
|
ruby-progressbar (1.11.0)
|
||||||
strscan (3.1.0)
|
|
||||||
toml (0.3.0)
|
toml (0.3.0)
|
||||||
parslet (>= 1.8.0, < 3.0.0)
|
parslet (>= 1.8.0, < 3.0.0)
|
||||||
toxiproxy (2.0.1)
|
toxiproxy (2.0.1)
|
||||||
|
|||||||
145
tests/ruby/helpers/frontend_messages.rb
Normal file
145
tests/ruby/helpers/frontend_messages.rb
Normal file
@@ -0,0 +1,145 @@
|
|||||||
|
|
||||||
|
class PostgresMessage
|
||||||
|
# Base class for common functionality
|
||||||
|
|
||||||
|
def encode_string(str)
|
||||||
|
"#{str}\0" # Encode a string with a null terminator
|
||||||
|
end
|
||||||
|
|
||||||
|
def encode_int16(value)
|
||||||
|
[value].pack('n') # Encode an Int16
|
||||||
|
end
|
||||||
|
|
||||||
|
def encode_int32(value)
|
||||||
|
[value].pack('N') # Encode an Int32
|
||||||
|
end
|
||||||
|
|
||||||
|
def message_prefix(type, length)
|
||||||
|
"#{type}#{encode_int32(length)}" # Message type and length prefix
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
class SimpleQueryMessage < PostgresMessage
|
||||||
|
attr_accessor :query
|
||||||
|
|
||||||
|
def initialize(query = "")
|
||||||
|
@query = query
|
||||||
|
end
|
||||||
|
|
||||||
|
def to_bytes
|
||||||
|
query_bytes = encode_string(@query)
|
||||||
|
length = 4 + query_bytes.size # Length includes 4 bytes for length itself
|
||||||
|
message_prefix('Q', length) + query_bytes
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
class ParseMessage < PostgresMessage
|
||||||
|
attr_accessor :statement_name, :query, :parameter_types
|
||||||
|
|
||||||
|
def initialize(statement_name = "", query = "", parameter_types = [])
|
||||||
|
@statement_name = statement_name
|
||||||
|
@query = query
|
||||||
|
@parameter_types = parameter_types
|
||||||
|
end
|
||||||
|
|
||||||
|
def to_bytes
|
||||||
|
statement_name_bytes = encode_string(@statement_name)
|
||||||
|
query_bytes = encode_string(@query)
|
||||||
|
parameter_types_bytes = @parameter_types.pack('N*')
|
||||||
|
|
||||||
|
length = 4 + statement_name_bytes.size + query_bytes.size + 2 + parameter_types_bytes.size
|
||||||
|
message_prefix('P', length) + statement_name_bytes + query_bytes + encode_int16(@parameter_types.size) + parameter_types_bytes
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
class BindMessage < PostgresMessage
|
||||||
|
attr_accessor :portal_name, :statement_name, :parameter_format_codes, :parameters, :result_column_format_codes
|
||||||
|
|
||||||
|
def initialize(portal_name = "", statement_name = "", parameter_format_codes = [], parameters = [], result_column_format_codes = [])
|
||||||
|
@portal_name = portal_name
|
||||||
|
@statement_name = statement_name
|
||||||
|
@parameter_format_codes = parameter_format_codes
|
||||||
|
@parameters = parameters
|
||||||
|
@result_column_format_codes = result_column_format_codes
|
||||||
|
end
|
||||||
|
|
||||||
|
def to_bytes
|
||||||
|
portal_name_bytes = encode_string(@portal_name)
|
||||||
|
statement_name_bytes = encode_string(@statement_name)
|
||||||
|
parameter_format_codes_bytes = @parameter_format_codes.pack('n*')
|
||||||
|
|
||||||
|
parameters_bytes = @parameters.map do |param|
|
||||||
|
if param.nil?
|
||||||
|
encode_int32(-1)
|
||||||
|
else
|
||||||
|
encode_int32(param.bytesize) + param
|
||||||
|
end
|
||||||
|
end.join
|
||||||
|
|
||||||
|
result_column_format_codes_bytes = @result_column_format_codes.pack('n*')
|
||||||
|
|
||||||
|
length = 4 + portal_name_bytes.size + statement_name_bytes.size + 2 + parameter_format_codes_bytes.size + 2 + parameters_bytes.size + 2 + result_column_format_codes_bytes.size
|
||||||
|
message_prefix('B', length) + portal_name_bytes + statement_name_bytes + encode_int16(@parameter_format_codes.size) + parameter_format_codes_bytes + encode_int16(@parameters.size) + parameters_bytes + encode_int16(@result_column_format_codes.size) + result_column_format_codes_bytes
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
class DescribeMessage < PostgresMessage
|
||||||
|
attr_accessor :type, :name
|
||||||
|
|
||||||
|
def initialize(type = 'S', name = "")
|
||||||
|
@type = type
|
||||||
|
@name = name
|
||||||
|
end
|
||||||
|
|
||||||
|
def to_bytes
|
||||||
|
name_bytes = encode_string(@name)
|
||||||
|
length = 4 + 1 + name_bytes.size
|
||||||
|
message_prefix('D', length) + @type + name_bytes
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
|
class ExecuteMessage < PostgresMessage
|
||||||
|
attr_accessor :portal_name, :max_rows
|
||||||
|
|
||||||
|
def initialize(portal_name = "", max_rows = 0)
|
||||||
|
@portal_name = portal_name
|
||||||
|
@max_rows = max_rows
|
||||||
|
end
|
||||||
|
|
||||||
|
def to_bytes
|
||||||
|
portal_name_bytes = encode_string(@portal_name)
|
||||||
|
length = 4 + portal_name_bytes.size + 4
|
||||||
|
message_prefix('E', length) + portal_name_bytes + encode_int32(@max_rows)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
class FlushMessage < PostgresMessage
|
||||||
|
def to_bytes
|
||||||
|
length = 4
|
||||||
|
message_prefix('H', length)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
class SyncMessage < PostgresMessage
|
||||||
|
def to_bytes
|
||||||
|
length = 4
|
||||||
|
message_prefix('S', length)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
class CloseMessage < PostgresMessage
|
||||||
|
attr_accessor :type, :name
|
||||||
|
|
||||||
|
def initialize(type = 'S', name = "")
|
||||||
|
@type = type
|
||||||
|
@name = name
|
||||||
|
end
|
||||||
|
|
||||||
|
def to_bytes
|
||||||
|
name_bytes = encode_string(@name)
|
||||||
|
length = 4 + 1 + name_bytes.size
|
||||||
|
message_prefix('C', length) + @type + name_bytes
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
require 'socket'
|
require 'socket'
|
||||||
require 'digest/md5'
|
require 'digest/md5'
|
||||||
|
require_relative 'frontend_messages'
|
||||||
|
|
||||||
BACKEND_MESSAGE_CODES = {
|
BACKEND_MESSAGE_CODES = {
|
||||||
'Z' => "ReadyForQuery",
|
'Z' => "ReadyForQuery",
|
||||||
@@ -18,7 +19,11 @@ class PostgresSocket
|
|||||||
@host = host
|
@host = host
|
||||||
@socket = TCPSocket.new @host, @port
|
@socket = TCPSocket.new @host, @port
|
||||||
@parameters = {}
|
@parameters = {}
|
||||||
@verbose = true
|
@verbose = false
|
||||||
|
end
|
||||||
|
|
||||||
|
def send_message(message)
|
||||||
|
@socket.write(message.to_bytes)
|
||||||
end
|
end
|
||||||
|
|
||||||
def send_md5_password_message(username, password, salt)
|
def send_md5_password_message(username, password, salt)
|
||||||
@@ -113,107 +118,6 @@ class PostgresSocket
|
|||||||
log "[F] Sent CancelRequest message"
|
log "[F] Sent CancelRequest message"
|
||||||
end
|
end
|
||||||
|
|
||||||
def send_query_message(query)
|
|
||||||
query_size = query.length
|
|
||||||
message_size = 1 + 4 + query_size
|
|
||||||
message = []
|
|
||||||
message << "Q".ord
|
|
||||||
message << [message_size].pack('l>').unpack('CCCC') # 4
|
|
||||||
message << query.split('').map(&:ord) # 2, 11
|
|
||||||
message << 0 # 1, 12
|
|
||||||
message.flatten!
|
|
||||||
@socket.write(message.flatten.pack('C*'))
|
|
||||||
log "[F] Sent Q message (#{query})"
|
|
||||||
end
|
|
||||||
|
|
||||||
def send_parse_message(query)
|
|
||||||
query_size = query.length
|
|
||||||
message_size = 2 + 2 + 4 + query_size
|
|
||||||
message = []
|
|
||||||
message << "P".ord
|
|
||||||
message << [message_size].pack('l>').unpack('CCCC') # 4
|
|
||||||
message << 0 # unnamed statement
|
|
||||||
message << query.split('').map(&:ord) # 2, 11
|
|
||||||
message << 0 # 1, 12
|
|
||||||
message << [0, 0]
|
|
||||||
message.flatten!
|
|
||||||
@socket.write(message.flatten.pack('C*'))
|
|
||||||
log "[F] Sent P message (#{query})"
|
|
||||||
end
|
|
||||||
|
|
||||||
def send_bind_message
|
|
||||||
message = []
|
|
||||||
message << "B".ord
|
|
||||||
message << [12].pack('l>').unpack('CCCC') # 4
|
|
||||||
message << 0 # unnamed statement
|
|
||||||
message << 0 # unnamed statement
|
|
||||||
message << [0, 0] # 2
|
|
||||||
message << [0, 0] # 2
|
|
||||||
message << [0, 0] # 2
|
|
||||||
message.flatten!
|
|
||||||
@socket.write(message.flatten.pack('C*'))
|
|
||||||
log "[F] Sent B message"
|
|
||||||
end
|
|
||||||
|
|
||||||
def send_describe_message(mode)
|
|
||||||
message = []
|
|
||||||
message << "D".ord
|
|
||||||
message << [6].pack('l>').unpack('CCCC') # 4
|
|
||||||
message << mode.ord
|
|
||||||
message << 0 # unnamed statement
|
|
||||||
message.flatten!
|
|
||||||
@socket.write(message.flatten.pack('C*'))
|
|
||||||
log "[F] Sent D message"
|
|
||||||
end
|
|
||||||
|
|
||||||
def send_execute_message(limit=0)
|
|
||||||
message = []
|
|
||||||
message << "E".ord
|
|
||||||
message << [9].pack('l>').unpack('CCCC') # 4
|
|
||||||
message << 0 # unnamed statement
|
|
||||||
message << [limit].pack('l>').unpack('CCCC') # 4
|
|
||||||
message.flatten!
|
|
||||||
@socket.write(message.flatten.pack('C*'))
|
|
||||||
log "[F] Sent E message"
|
|
||||||
end
|
|
||||||
|
|
||||||
def send_sync_message
|
|
||||||
message = []
|
|
||||||
message << "S".ord
|
|
||||||
message << [4].pack('l>').unpack('CCCC') # 4
|
|
||||||
message.flatten!
|
|
||||||
@socket.write(message.flatten.pack('C*'))
|
|
||||||
log "[F] Sent S message"
|
|
||||||
end
|
|
||||||
|
|
||||||
def send_copydone_message
|
|
||||||
message = []
|
|
||||||
message << "c".ord
|
|
||||||
message << [4].pack('l>').unpack('CCCC') # 4
|
|
||||||
message.flatten!
|
|
||||||
@socket.write(message.flatten.pack('C*'))
|
|
||||||
log "[F] Sent c message"
|
|
||||||
end
|
|
||||||
|
|
||||||
def send_copyfail_message
|
|
||||||
message = []
|
|
||||||
message << "f".ord
|
|
||||||
message << [5].pack('l>').unpack('CCCC') # 4
|
|
||||||
message << 0
|
|
||||||
message.flatten!
|
|
||||||
@socket.write(message.flatten.pack('C*'))
|
|
||||||
log "[F] Sent f message"
|
|
||||||
end
|
|
||||||
|
|
||||||
def send_flush_message
|
|
||||||
message = []
|
|
||||||
message << "H".ord
|
|
||||||
message << [4].pack('l>').unpack('CCCC') # 4
|
|
||||||
message.flatten!
|
|
||||||
@socket.write(message.flatten.pack('C*'))
|
|
||||||
log "[F] Sent H message"
|
|
||||||
end
|
|
||||||
|
|
||||||
def read_from_server()
|
def read_from_server()
|
||||||
output_messages = []
|
output_messages = []
|
||||||
retry_count = 0
|
retry_count = 0
|
||||||
|
|||||||
@@ -16,10 +16,14 @@ describe "Portocol handling" do
|
|||||||
end
|
end
|
||||||
|
|
||||||
def run_comparison(sequence, socket_a, socket_b)
|
def run_comparison(sequence, socket_a, socket_b)
|
||||||
sequence.each do |msg, *args|
|
sequence.each do |msg|
|
||||||
socket_a.send(msg, *args)
|
if msg.is_a?(Symbol)
|
||||||
socket_b.send(msg, *args)
|
socket_a.send(msg)
|
||||||
|
socket_b.send(msg)
|
||||||
|
else
|
||||||
|
socket_a.send_message(msg)
|
||||||
|
socket_b.send_message(msg)
|
||||||
|
end
|
||||||
compare_messages(
|
compare_messages(
|
||||||
socket_a.read_from_server,
|
socket_a.read_from_server,
|
||||||
socket_b.read_from_server
|
socket_b.read_from_server
|
||||||
@@ -84,8 +88,8 @@ describe "Portocol handling" do
|
|||||||
context "Cancel Query" do
|
context "Cancel Query" do
|
||||||
let(:sequence) {
|
let(:sequence) {
|
||||||
[
|
[
|
||||||
[:send_query_message, "SELECT pg_sleep(5)"],
|
SimpleQueryMessage.new("SELECT pg_sleep(5)"),
|
||||||
[:cancel_query]
|
:cancel_query
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -95,12 +99,12 @@ describe "Portocol handling" do
|
|||||||
xcontext "Simple query after parse" do
|
xcontext "Simple query after parse" do
|
||||||
let(:sequence) {
|
let(:sequence) {
|
||||||
[
|
[
|
||||||
[:send_parse_message, "SELECT 5"],
|
ParseMessage.new("", "SELECT 5", []),
|
||||||
[:send_query_message, "SELECT 1"],
|
SimpleQueryMessage.new("SELECT 1"),
|
||||||
[:send_bind_message],
|
BindMessage.new("", "", [], [], [0]),
|
||||||
[:send_describe_message, "P"],
|
DescribeMessage.new("P", ""),
|
||||||
[:send_execute_message],
|
ExecuteMessage.new("", 1),
|
||||||
[:send_sync_message],
|
SyncMessage.new
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -111,8 +115,8 @@ describe "Portocol handling" do
|
|||||||
xcontext "Flush message" do
|
xcontext "Flush message" do
|
||||||
let(:sequence) {
|
let(:sequence) {
|
||||||
[
|
[
|
||||||
[:send_parse_message, "SELECT 1"],
|
ParseMessage.new("", "SELECT 1", []),
|
||||||
[:send_flush_message]
|
FlushMessage.new
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -122,9 +126,7 @@ describe "Portocol handling" do
|
|||||||
|
|
||||||
xcontext "Bind without parse" do
|
xcontext "Bind without parse" do
|
||||||
let(:sequence) {
|
let(:sequence) {
|
||||||
[
|
[BindMessage.new("", "", [], [], [0])]
|
||||||
[:send_bind_message]
|
|
||||||
]
|
|
||||||
}
|
}
|
||||||
# This is known to fail.
|
# This is known to fail.
|
||||||
# Server responds immediately, Proxy buffers the message
|
# Server responds immediately, Proxy buffers the message
|
||||||
@@ -133,23 +135,155 @@ describe "Portocol handling" do
|
|||||||
|
|
||||||
context "Simple message" do
|
context "Simple message" do
|
||||||
let(:sequence) {
|
let(:sequence) {
|
||||||
[[:send_query_message, "SELECT 1"]]
|
[SimpleQueryMessage.new("SELECT 1")]
|
||||||
}
|
}
|
||||||
|
|
||||||
it_behaves_like "at parity with database"
|
it_behaves_like "at parity with database"
|
||||||
end
|
end
|
||||||
|
|
||||||
context "Extended protocol" do
|
10.times do |i|
|
||||||
let(:sequence) {
|
context "Extended protocol" do
|
||||||
[
|
let(:sequence) {
|
||||||
[:send_parse_message, "SELECT 1"],
|
[
|
||||||
[:send_bind_message],
|
ParseMessage.new("", "SELECT 1", []),
|
||||||
[:send_describe_message, "P"],
|
BindMessage.new("", "", [], [], [0]),
|
||||||
[:send_execute_message],
|
DescribeMessage.new("S", ""),
|
||||||
[:send_sync_message],
|
ExecuteMessage.new("", 1),
|
||||||
]
|
SyncMessage.new
|
||||||
}
|
]
|
||||||
|
}
|
||||||
|
|
||||||
it_behaves_like "at parity with database"
|
it_behaves_like "at parity with database"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "Protocol-level prepared statements" do
|
||||||
|
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 1, "transaction") }
|
||||||
|
before do
|
||||||
|
q_sock = PostgresSocket.new('localhost', processes.pgcat.port)
|
||||||
|
q_sock.send_startup_message("sharding_user", "sharded_db", "sharding_user")
|
||||||
|
table_query = "CREATE TABLE IF NOT EXISTS employees (employee_id SERIAL PRIMARY KEY, salary NUMERIC(10, 2) CHECK (salary > 0));"
|
||||||
|
q_sock.send_message(SimpleQueryMessage.new(table_query))
|
||||||
|
q_sock.close
|
||||||
|
|
||||||
|
current_configs = processes.pgcat.current_config
|
||||||
|
current_configs["pools"]["sharded_db"]["prepared_statements_cache_size"] = 500
|
||||||
|
processes.pgcat.update_config(current_configs)
|
||||||
|
processes.pgcat.reload_config
|
||||||
|
end
|
||||||
|
after do
|
||||||
|
q_sock = PostgresSocket.new('localhost', processes.pgcat.port)
|
||||||
|
q_sock.send_startup_message("sharding_user", "sharded_db", "sharding_user")
|
||||||
|
table_query = "DROP TABLE IF EXISTS employees;"
|
||||||
|
q_sock.send_message(SimpleQueryMessage.new(table_query))
|
||||||
|
q_sock.close
|
||||||
|
end
|
||||||
|
|
||||||
|
context "When unnamed prepared statements are used" do
|
||||||
|
it "does not cache them" do
|
||||||
|
socket = PostgresSocket.new('localhost', processes.pgcat.port)
|
||||||
|
socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
|
||||||
|
|
||||||
|
socket.send_message(SimpleQueryMessage.new("DISCARD ALL"))
|
||||||
|
socket.read_from_server
|
||||||
|
|
||||||
|
10.times do |i|
|
||||||
|
socket.send_message(ParseMessage.new("", "SELECT #{i}", []))
|
||||||
|
socket.send_message(BindMessage.new("", "", [], [], [0]))
|
||||||
|
socket.send_message(DescribeMessage.new("S", ""))
|
||||||
|
socket.send_message(ExecuteMessage.new("", 1))
|
||||||
|
socket.send_message(SyncMessage.new)
|
||||||
|
socket.read_from_server
|
||||||
|
end
|
||||||
|
|
||||||
|
socket.send_message(SimpleQueryMessage.new("SELECT name, statement, prepare_time, parameter_types FROM pg_prepared_statements"))
|
||||||
|
result = socket.read_from_server
|
||||||
|
number_of_saved_statements = result.count { |m| m[:code] == 'D' }
|
||||||
|
expect(number_of_saved_statements).to eq(0)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context "When named prepared statements are used" do
|
||||||
|
it "caches them" do
|
||||||
|
socket = PostgresSocket.new('localhost', processes.pgcat.port)
|
||||||
|
socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
|
||||||
|
|
||||||
|
socket.send_message(SimpleQueryMessage.new("DISCARD ALL"))
|
||||||
|
socket.read_from_server
|
||||||
|
|
||||||
|
3.times do
|
||||||
|
socket.send_message(ParseMessage.new("my_query", "SELECT * FROM employees WHERE employee_id in ($1,$2,$3)", [0,0,0]))
|
||||||
|
socket.send_message(BindMessage.new("", "my_query", [0,0,0], [0,0,0].map(&:to_s), [0,0,0,0,0,0]))
|
||||||
|
socket.send_message(SyncMessage.new)
|
||||||
|
socket.read_from_server
|
||||||
|
end
|
||||||
|
|
||||||
|
3.times do
|
||||||
|
socket.send_message(ParseMessage.new("my_other_query", "SELECT * FROM employees WHERE salary in ($1,$2,$3)", [0,0,0]))
|
||||||
|
socket.send_message(BindMessage.new("", "my_other_query", [0,0,0], [0,0,0].map(&:to_s), [0,0,0,0,0,0]))
|
||||||
|
socket.send_message(SyncMessage.new)
|
||||||
|
socket.read_from_server
|
||||||
|
end
|
||||||
|
|
||||||
|
socket.send_message(SimpleQueryMessage.new("SELECT name, statement, prepare_time, parameter_types FROM pg_prepared_statements"))
|
||||||
|
result = socket.read_from_server
|
||||||
|
number_of_saved_statements = result.count { |m| m[:code] == 'D' }
|
||||||
|
expect(number_of_saved_statements).to eq(2)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context "When DISCARD ALL/DEALLOCATE ALL are called" do
|
||||||
|
it "resets server and client caches" do
|
||||||
|
socket = PostgresSocket.new('localhost', processes.pgcat.port)
|
||||||
|
socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
|
||||||
|
|
||||||
|
20.times do |i|
|
||||||
|
socket.send_message(ParseMessage.new("my_query_#{i}", "SELECT * FROM employees WHERE employee_id in ($1,$2,$3)", [0,0,0]))
|
||||||
|
end
|
||||||
|
|
||||||
|
20.times do |i|
|
||||||
|
socket.send_message(BindMessage.new("", "my_query_#{i}", [0,0,0], [0,0,0].map(&:to_s), [0,0]))
|
||||||
|
end
|
||||||
|
|
||||||
|
socket.send_message(SyncMessage.new)
|
||||||
|
socket.read_from_server
|
||||||
|
|
||||||
|
socket.send_message(SimpleQueryMessage.new("DISCARD ALL"))
|
||||||
|
socket.read_from_server
|
||||||
|
responses = []
|
||||||
|
4.times do |i|
|
||||||
|
socket.send_message(ParseMessage.new("my_query_#{i}", "SELECT * FROM employees WHERE employee_id in ($1,$2,$3)", [0,0,0]))
|
||||||
|
socket.send_message(BindMessage.new("", "my_query_#{i}", [0,0,0], [0,0,0].map(&:to_s), [0,0]))
|
||||||
|
socket.send_message(SyncMessage.new)
|
||||||
|
|
||||||
|
responses += socket.read_from_server
|
||||||
|
end
|
||||||
|
|
||||||
|
errors = responses.select { |message| message[:code] == 'E' }
|
||||||
|
error_message = errors.map { |message| message[:bytes].map(&:chr).join("") }.join("\n")
|
||||||
|
raise StandardError, "Encountered the following errors: #{error_message}" if errors.length > 0
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context "Maximum number of bound paramters" do
|
||||||
|
it "does not crash" do
|
||||||
|
test_socket = PostgresSocket.new('localhost', processes.pgcat.port)
|
||||||
|
test_socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
|
||||||
|
|
||||||
|
types = Array.new(65_535) { |i| 0 }
|
||||||
|
|
||||||
|
params = Array.new(65_535) { |i| "$#{i+1}" }.join(",")
|
||||||
|
test_socket.send_message(ParseMessage.new("my_query", "SELECT * FROM employees WHERE employee_id in (#{params})", types))
|
||||||
|
|
||||||
|
test_socket.send_message(BindMessage.new("my_query", "my_query", types, types.map(&:to_s), types))
|
||||||
|
|
||||||
|
test_socket.send_message(SyncMessage.new)
|
||||||
|
|
||||||
|
# If the proxy crashes, this will raise an error
|
||||||
|
expect { test_socket.read_from_server }.to_not raise_error
|
||||||
|
|
||||||
|
test_socket.close
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
Reference in New Issue
Block a user