mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-26 18:36:28 +00:00
Compare commits
1 Commits
v0.1.0-bet
...
sven_md5_a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5f745859c0 |
@@ -13,9 +13,6 @@ function start_pgcat() {
|
|||||||
|
|
||||||
# Setup the database with shards and user
|
# Setup the database with shards and user
|
||||||
psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql
|
psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql
|
||||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard0 -i
|
|
||||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
|
|
||||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
|
|
||||||
|
|
||||||
# Install Toxiproxy to simulate a downed/slow database
|
# Install Toxiproxy to simulate a downed/slow database
|
||||||
wget -O toxiproxy-2.1.4.deb https://github.com/Shopify/toxiproxy/releases/download/v2.1.4/toxiproxy_2.1.4_amd64.deb
|
wget -O toxiproxy-2.1.4.deb https://github.com/Shopify/toxiproxy/releases/download/v2.1.4/toxiproxy_2.1.4_amd64.deb
|
||||||
@@ -31,9 +28,9 @@ toxiproxy-cli create -l 127.0.0.1:5433 -u 127.0.0.1:5432 postgres_replica
|
|||||||
start_pgcat "info"
|
start_pgcat "info"
|
||||||
|
|
||||||
# pgbench test
|
# pgbench test
|
||||||
pgbench -i -h 127.0.0.1 -p 6432
|
pgbench -i -h 127.0.0.1 -p 6432 && \
|
||||||
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple -f tests/pgbench/simple.sql
|
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple && \
|
||||||
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended
|
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended
|
||||||
|
|
||||||
# COPY TO STDOUT test
|
# COPY TO STDOUT test
|
||||||
psql -h 127.0.0.1 -p 6432 -c 'COPY (SELECT * FROM pgbench_accounts LIMIT 15) TO STDOUT;' > /dev/null
|
psql -h 127.0.0.1 -p 6432 -c 'COPY (SELECT * FROM pgbench_accounts LIMIT 15) TO STDOUT;' > /dev/null
|
||||||
|
|||||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -1,2 +1,5 @@
|
|||||||
/target
|
/target
|
||||||
*.deb
|
*.deb
|
||||||
|
.idea/*
|
||||||
|
tests/ruby/.bundle/*
|
||||||
|
tests/ruby/vendor/*
|
||||||
34
Cargo.lock
generated
34
Cargo.lock
generated
@@ -220,6 +220,12 @@ dependencies = [
|
|||||||
"cfg-if",
|
"cfg-if",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "itoa"
|
||||||
|
version = "1.0.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "libc"
|
name = "libc"
|
||||||
version = "0.2.117"
|
version = "0.2.117"
|
||||||
@@ -369,8 +375,10 @@ dependencies = [
|
|||||||
"regex",
|
"regex",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
|
"serde_json",
|
||||||
"sha-1",
|
"sha-1",
|
||||||
"sqlparser",
|
"sqlparser",
|
||||||
|
"statsd",
|
||||||
"tokio",
|
"tokio",
|
||||||
"toml",
|
"toml",
|
||||||
]
|
]
|
||||||
@@ -477,6 +485,12 @@ version = "0.6.25"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
|
checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "ryu"
|
||||||
|
version = "1.0.9"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "scopeguard"
|
name = "scopeguard"
|
||||||
version = "1.1.0"
|
version = "1.1.0"
|
||||||
@@ -500,6 +514,17 @@ dependencies = [
|
|||||||
"syn",
|
"syn",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serde_json"
|
||||||
|
version = "1.0.79"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8e8d9fa5c3b304765ce1fd9c4c8a3de2c8db365a5b91be52f186efc675681d95"
|
||||||
|
dependencies = [
|
||||||
|
"itoa",
|
||||||
|
"ryu",
|
||||||
|
"serde",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "sha-1"
|
name = "sha-1"
|
||||||
version = "0.10.0"
|
version = "0.10.0"
|
||||||
@@ -541,6 +566,15 @@ dependencies = [
|
|||||||
"log",
|
"log",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "statsd"
|
||||||
|
version = "0.15.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "df1efceb4bf2c0b5ebec94354285a43bbbed1375605bdf2ebe4132299434a330"
|
||||||
|
dependencies = [
|
||||||
|
"rand",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "syn"
|
name = "syn"
|
||||||
version = "1.0.86"
|
version = "1.0.86"
|
||||||
|
|||||||
@@ -17,9 +17,11 @@ sha-1 = "0.10"
|
|||||||
toml = "0.5"
|
toml = "0.5"
|
||||||
serde = "1"
|
serde = "1"
|
||||||
serde_derive = "1"
|
serde_derive = "1"
|
||||||
|
serde_json = "1"
|
||||||
regex = "1"
|
regex = "1"
|
||||||
num_cpus = "1"
|
num_cpus = "1"
|
||||||
once_cell = "1"
|
once_cell = "1"
|
||||||
|
statsd = "0.15"
|
||||||
sqlparser = "0.14"
|
sqlparser = "0.14"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
arc-swap = "1"
|
arc-swap = "1"
|
||||||
|
|||||||
12
pgcat.toml
12
pgcat.toml
@@ -48,8 +48,8 @@ password = "sharding_user"
|
|||||||
|
|
||||||
# [ host, port, role ]
|
# [ host, port, role ]
|
||||||
servers = [
|
servers = [
|
||||||
[ "127.0.0.1", 5432, "primary" ],
|
["127.0.0.1", 5432, "primary"],
|
||||||
[ "localhost", 5432, "replica" ],
|
["localhost", 5432, "replica"],
|
||||||
# [ "127.0.1.1", 5432, "replica" ],
|
# [ "127.0.1.1", 5432, "replica" ],
|
||||||
]
|
]
|
||||||
# Database name (e.g. "postgres")
|
# Database name (e.g. "postgres")
|
||||||
@@ -58,8 +58,8 @@ database = "shard0"
|
|||||||
[shards.1]
|
[shards.1]
|
||||||
# [ host, port, role ]
|
# [ host, port, role ]
|
||||||
servers = [
|
servers = [
|
||||||
[ "127.0.0.1", 5432, "primary" ],
|
["127.0.0.1", 5432, "primary"],
|
||||||
[ "localhost", 5432, "replica" ],
|
["localhost", 5432, "replica"],
|
||||||
# [ "127.0.1.1", 5432, "replica" ],
|
# [ "127.0.1.1", 5432, "replica" ],
|
||||||
]
|
]
|
||||||
database = "shard1"
|
database = "shard1"
|
||||||
@@ -67,8 +67,8 @@ database = "shard1"
|
|||||||
[shards.2]
|
[shards.2]
|
||||||
# [ host, port, role ]
|
# [ host, port, role ]
|
||||||
servers = [
|
servers = [
|
||||||
[ "127.0.0.1", 5432, "primary" ],
|
["127.0.0.1", 5432, "primary"],
|
||||||
[ "localhost", 5432, "replica" ],
|
["localhost", 5432, "replica"],
|
||||||
# [ "127.0.1.1", 5432, "replica" ],
|
# [ "127.0.1.1", 5432, "replica" ],
|
||||||
]
|
]
|
||||||
database = "shard2"
|
database = "shard2"
|
||||||
|
|||||||
75
src/admin.rs
75
src/admin.rs
@@ -31,7 +31,7 @@ pub async fn handle_admin(
|
|||||||
|
|
||||||
if query.starts_with("SHOW STATS") {
|
if query.starts_with("SHOW STATS") {
|
||||||
trace!("SHOW STATS");
|
trace!("SHOW STATS");
|
||||||
show_stats(stream, &pool).await
|
show_stats(stream).await
|
||||||
} else if query.starts_with("RELOAD") {
|
} else if query.starts_with("RELOAD") {
|
||||||
trace!("RELOAD");
|
trace!("RELOAD");
|
||||||
reload(stream).await
|
reload(stream).await
|
||||||
@@ -77,19 +77,11 @@ async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul
|
|||||||
])); // but admin tools that work with pgbouncer want this
|
])); // but admin tools that work with pgbouncer want this
|
||||||
res.put(data_row(&vec![
|
res.put(data_row(&vec![
|
||||||
"free_clients".to_string(),
|
"free_clients".to_string(),
|
||||||
stats
|
stats["cl_idle"].to_string(),
|
||||||
.keys()
|
|
||||||
.map(|address_id| stats[&address_id]["cl_idle"])
|
|
||||||
.sum::<i64>()
|
|
||||||
.to_string(),
|
|
||||||
]));
|
]));
|
||||||
res.put(data_row(&vec![
|
res.put(data_row(&vec![
|
||||||
"used_clients".to_string(),
|
"used_clients".to_string(),
|
||||||
stats
|
stats["cl_active"].to_string(),
|
||||||
.keys()
|
|
||||||
.map(|address_id| stats[&address_id]["cl_active"])
|
|
||||||
.sum::<i64>()
|
|
||||||
.to_string(),
|
|
||||||
]));
|
]));
|
||||||
res.put(data_row(&vec![
|
res.put(data_row(&vec![
|
||||||
"login_clients".to_string(),
|
"login_clients".to_string(),
|
||||||
@@ -97,19 +89,11 @@ async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul
|
|||||||
]));
|
]));
|
||||||
res.put(data_row(&vec![
|
res.put(data_row(&vec![
|
||||||
"free_servers".to_string(),
|
"free_servers".to_string(),
|
||||||
stats
|
stats["sv_idle"].to_string(),
|
||||||
.keys()
|
|
||||||
.map(|address_id| stats[&address_id]["sv_idle"])
|
|
||||||
.sum::<i64>()
|
|
||||||
.to_string(),
|
|
||||||
]));
|
]));
|
||||||
res.put(data_row(&vec![
|
res.put(data_row(&vec![
|
||||||
"used_servers".to_string(),
|
"used_servers".to_string(),
|
||||||
stats
|
stats["sv_active"].to_string(),
|
||||||
.keys()
|
|
||||||
.map(|address_id| stats[&address_id]["sv_active"])
|
|
||||||
.sum::<i64>()
|
|
||||||
.to_string(),
|
|
||||||
]));
|
]));
|
||||||
res.put(data_row(&vec!["dns_names".to_string(), "0".to_string()]));
|
res.put(data_row(&vec!["dns_names".to_string(), "0".to_string()]));
|
||||||
res.put(data_row(&vec!["dns_zones".to_string(), "0".to_string()]));
|
res.put(data_row(&vec!["dns_zones".to_string(), "0".to_string()]));
|
||||||
@@ -141,7 +125,7 @@ async fn show_version(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// SHOW POOLS
|
/// SHOW POOLS
|
||||||
async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
|
async fn show_pools(stream: &mut OwnedWriteHalf, _pool: &ConnectionPool) -> Result<(), Error> {
|
||||||
let stats = get_stats();
|
let stats = get_stats();
|
||||||
let config = {
|
let config = {
|
||||||
let guard = get_config();
|
let guard = get_config();
|
||||||
@@ -167,26 +151,16 @@ async fn show_pools(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul
|
|||||||
let mut res = BytesMut::new();
|
let mut res = BytesMut::new();
|
||||||
res.put(row_description(&columns));
|
res.put(row_description(&columns));
|
||||||
|
|
||||||
for shard in 0..pool.shards() {
|
let mut row = vec![String::from("all"), config.user.name.clone()];
|
||||||
for server in 0..pool.servers(shard) {
|
|
||||||
let address = pool.address(shard, server);
|
|
||||||
let stats = match stats.get(&address.id) {
|
|
||||||
Some(stats) => stats.clone(),
|
|
||||||
None => HashMap::new(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut row = vec![address.name(), config.user.name.clone()];
|
for column in &columns[2..columns.len() - 1] {
|
||||||
|
let value = stats.get(column.0).unwrap_or(&0).to_string();
|
||||||
for column in &columns[2..columns.len() - 1] {
|
row.push(value);
|
||||||
let value = stats.get(column.0).unwrap_or(&0).to_string();
|
|
||||||
row.push(value);
|
|
||||||
}
|
|
||||||
|
|
||||||
row.push(config.general.pool_mode.to_string());
|
|
||||||
res.put(data_row(&row));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
row.push(config.general.pool_mode.to_string());
|
||||||
|
|
||||||
|
res.put(data_row(&row));
|
||||||
res.put(command_complete("SHOW"));
|
res.put(command_complete("SHOW"));
|
||||||
|
|
||||||
res.put_u8(b'Z');
|
res.put_u8(b'Z');
|
||||||
@@ -335,7 +309,7 @@ async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// SHOW STATS
|
/// SHOW STATS
|
||||||
async fn show_stats(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> {
|
async fn show_stats(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
|
||||||
let columns = vec![
|
let columns = vec![
|
||||||
("database", DataType::Text),
|
("database", DataType::Text),
|
||||||
("total_xact_count", DataType::Numeric),
|
("total_xact_count", DataType::Numeric),
|
||||||
@@ -358,24 +332,15 @@ async fn show_stats(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Resul
|
|||||||
let mut res = BytesMut::new();
|
let mut res = BytesMut::new();
|
||||||
res.put(row_description(&columns));
|
res.put(row_description(&columns));
|
||||||
|
|
||||||
for shard in 0..pool.shards() {
|
let mut row = vec![
|
||||||
for server in 0..pool.servers(shard) {
|
String::from("all"), // TODO: per-database stats,
|
||||||
let address = pool.address(shard, server);
|
];
|
||||||
let stats = match stats.get(&address.id) {
|
|
||||||
Some(stats) => stats.clone(),
|
|
||||||
None => HashMap::new(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut row = vec![address.name()];
|
for column in &columns[1..] {
|
||||||
|
row.push(stats.get(column.0).unwrap_or(&0).to_string());
|
||||||
for column in &columns[1..] {
|
|
||||||
row.push(stats.get(column.0).unwrap_or(&0).to_string());
|
|
||||||
}
|
|
||||||
|
|
||||||
res.put(data_row(&row));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
res.put(data_row(&row));
|
||||||
res.put(command_complete("SHOW"));
|
res.put(command_complete("SHOW"));
|
||||||
|
|
||||||
res.put_u8(b'Z');
|
res.put_u8(b'Z');
|
||||||
|
|||||||
@@ -58,12 +58,6 @@ pub struct Client {
|
|||||||
|
|
||||||
// Clients want to talk to admin
|
// Clients want to talk to admin
|
||||||
admin: bool,
|
admin: bool,
|
||||||
|
|
||||||
// Last address the client talked to
|
|
||||||
last_address_id: Option<usize>,
|
|
||||||
|
|
||||||
// Last server process id we talked to
|
|
||||||
last_server_id: Option<i32>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Client {
|
impl Client {
|
||||||
@@ -114,15 +108,18 @@ impl Client {
|
|||||||
// Regular startup message.
|
// Regular startup message.
|
||||||
PROTOCOL_VERSION_NUMBER => {
|
PROTOCOL_VERSION_NUMBER => {
|
||||||
trace!("Got StartupMessage");
|
trace!("Got StartupMessage");
|
||||||
|
|
||||||
// TODO: perform actual auth.
|
|
||||||
let parameters = parse_startup(bytes.clone())?;
|
let parameters = parse_startup(bytes.clone())?;
|
||||||
|
let mut user_name: String = String::new();
|
||||||
|
match parameters.get(&"user") {
|
||||||
|
Some(&user) => user_name = user,
|
||||||
|
None => return Err(Error::ClientBadStartup),
|
||||||
|
}
|
||||||
|
start_auth(&mut stream, &user_name).await?;
|
||||||
|
|
||||||
// Generate random backend ID and secret key
|
// Generate random backend ID and secret key
|
||||||
let process_id: i32 = rand::random();
|
let process_id: i32 = rand::random();
|
||||||
let secret_key: i32 = rand::random();
|
let secret_key: i32 = rand::random();
|
||||||
|
|
||||||
auth_ok(&mut stream).await?;
|
|
||||||
write_all(&mut stream, server_info).await?;
|
write_all(&mut stream, server_info).await?;
|
||||||
backend_key_data(&mut stream, process_id, secret_key).await?;
|
backend_key_data(&mut stream, process_id, secret_key).await?;
|
||||||
ready_for_query(&mut stream).await?;
|
ready_for_query(&mut stream).await?;
|
||||||
@@ -153,8 +150,6 @@ impl Client {
|
|||||||
parameters: parameters,
|
parameters: parameters,
|
||||||
stats: stats,
|
stats: stats,
|
||||||
admin: admin,
|
admin: admin,
|
||||||
last_address_id: None,
|
|
||||||
last_server_id: None,
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -177,8 +172,6 @@ impl Client {
|
|||||||
parameters: HashMap::new(),
|
parameters: HashMap::new(),
|
||||||
stats: stats,
|
stats: stats,
|
||||||
admin: false,
|
admin: false,
|
||||||
last_address_id: None,
|
|
||||||
last_server_id: None,
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -229,6 +222,9 @@ impl Client {
|
|||||||
loop {
|
loop {
|
||||||
trace!("Client idle, waiting for message");
|
trace!("Client idle, waiting for message");
|
||||||
|
|
||||||
|
// Client idle, waiting for messages.
|
||||||
|
self.stats.client_idle(self.process_id);
|
||||||
|
|
||||||
// Read a complete message from the client, which normally would be
|
// Read a complete message from the client, which normally would be
|
||||||
// either a `Q` (query) or `P` (prepare, extended protocol).
|
// either a `Q` (query) or `P` (prepare, extended protocol).
|
||||||
// We can parse it here before grabbing a server from the pool,
|
// We can parse it here before grabbing a server from the pool,
|
||||||
@@ -299,13 +295,13 @@ impl Client {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Waiting for server connection.
|
||||||
|
self.stats.client_waiting(self.process_id);
|
||||||
|
|
||||||
debug!("Waiting for connection from pool");
|
debug!("Waiting for connection from pool");
|
||||||
|
|
||||||
// Grab a server from the pool: the client issued a regular query.
|
// Grab a server from the pool: the client issued a regular query.
|
||||||
let connection = match pool
|
let connection = match pool.get(query_router.shard(), query_router.role()).await {
|
||||||
.get(query_router.shard(), query_router.role(), self.process_id)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(conn) => {
|
Ok(conn) => {
|
||||||
debug!("Got connection from pool");
|
debug!("Got connection from pool");
|
||||||
conn
|
conn
|
||||||
@@ -319,23 +315,15 @@ impl Client {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let mut reference = connection.0;
|
let mut reference = connection.0;
|
||||||
let address = connection.1;
|
let _address = connection.1;
|
||||||
let server = &mut *reference;
|
let server = &mut *reference;
|
||||||
|
|
||||||
// Claim this server as mine for query cancellation.
|
// Claim this server as mine for query cancellation.
|
||||||
server.claim(self.process_id, self.secret_key);
|
server.claim(self.process_id, self.secret_key);
|
||||||
|
|
||||||
// "disconnect" from the previous server stats-wise
|
|
||||||
if let Some(last_address_id) = self.last_address_id {
|
|
||||||
self.stats
|
|
||||||
.client_disconnecting(self.process_id, last_address_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Client active & server active
|
// Client active & server active
|
||||||
self.stats.client_active(self.process_id, address.id);
|
self.stats.client_active(self.process_id);
|
||||||
self.stats.server_active(server.process_id(), address.id);
|
self.stats.server_active(server.process_id());
|
||||||
self.last_address_id = Some(address.id);
|
|
||||||
self.last_server_id = Some(server.process_id());
|
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"Client {:?} talking to server {:?}",
|
"Client {:?} talking to server {:?}",
|
||||||
@@ -407,17 +395,17 @@ impl Client {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Report query executed statistics.
|
// Report query executed statistics.
|
||||||
self.stats.query(self.process_id, address.id);
|
self.stats.query();
|
||||||
|
|
||||||
// The transaction is over, we can release the connection back to the pool.
|
// The transaction is over, we can release the connection back to the pool.
|
||||||
if !server.in_transaction() {
|
if !server.in_transaction() {
|
||||||
// Report transaction executed statistics.
|
// Report transaction executed statistics.
|
||||||
self.stats.transaction(self.process_id, address.id);
|
self.stats.transaction();
|
||||||
|
|
||||||
// Release server back to the pool if we are in transaction mode.
|
// Release server back to the pool if we are in transaction mode.
|
||||||
// If we are in session mode, we keep the server until the client disconnects.
|
// If we are in session mode, we keep the server until the client disconnects.
|
||||||
if self.transaction_mode {
|
if self.transaction_mode {
|
||||||
self.stats.server_idle(server.process_id(), address.id);
|
self.stats.server_idle(server.process_id());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -493,15 +481,15 @@ impl Client {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Report query executed statistics.
|
// Report query executed statistics.
|
||||||
self.stats.query(self.process_id, address.id);
|
self.stats.query();
|
||||||
|
|
||||||
// Release server back to the pool if we are in transaction mode.
|
// Release server back to the pool if we are in transaction mode.
|
||||||
// If we are in session mode, we keep the server until the client disconnects.
|
// If we are in session mode, we keep the server until the client disconnects.
|
||||||
if !server.in_transaction() {
|
if !server.in_transaction() {
|
||||||
self.stats.transaction(self.process_id, address.id);
|
self.stats.transaction();
|
||||||
|
|
||||||
if self.transaction_mode {
|
if self.transaction_mode {
|
||||||
self.stats.server_idle(server.process_id(), address.id);
|
self.stats.server_idle(server.process_id());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -532,10 +520,10 @@ impl Client {
|
|||||||
// Release server back to the pool if we are in transaction mode.
|
// Release server back to the pool if we are in transaction mode.
|
||||||
// If we are in session mode, we keep the server until the client disconnects.
|
// If we are in session mode, we keep the server until the client disconnects.
|
||||||
if !server.in_transaction() {
|
if !server.in_transaction() {
|
||||||
self.stats.transaction(self.process_id, address.id);
|
self.stats.transaction();
|
||||||
|
|
||||||
if self.transaction_mode {
|
if self.transaction_mode {
|
||||||
self.stats.server_idle(server.process_id(), address.id);
|
self.stats.server_idle(server.process_id());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -552,7 +540,6 @@ impl Client {
|
|||||||
// The server is no longer bound to us, we can't cancel it's queries anymore.
|
// The server is no longer bound to us, we can't cancel it's queries anymore.
|
||||||
debug!("Releasing server back into the pool");
|
debug!("Releasing server back into the pool");
|
||||||
self.release();
|
self.release();
|
||||||
self.stats.client_idle(self.process_id, address.id);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -565,14 +552,6 @@ impl Client {
|
|||||||
|
|
||||||
impl Drop for Client {
|
impl Drop for Client {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
// Disconnect the client
|
self.stats.client_disconnecting(self.process_id);
|
||||||
if let Some(address_id) = self.last_address_id {
|
|
||||||
self.stats.client_disconnecting(self.process_id, address_id);
|
|
||||||
|
|
||||||
// The server is now idle
|
|
||||||
if let Some(process_id) = self.last_server_id {
|
|
||||||
self.stats.server_idle(process_id, address_id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -48,7 +48,6 @@ impl PartialEq<Role> for Option<Role> {
|
|||||||
|
|
||||||
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Debug)]
|
#[derive(Clone, PartialEq, Hash, std::cmp::Eq, Debug)]
|
||||||
pub struct Address {
|
pub struct Address {
|
||||||
pub id: usize,
|
|
||||||
pub host: String,
|
pub host: String,
|
||||||
pub port: String,
|
pub port: String,
|
||||||
pub shard: usize,
|
pub shard: usize,
|
||||||
@@ -59,7 +58,6 @@ pub struct Address {
|
|||||||
impl Default for Address {
|
impl Default for Address {
|
||||||
fn default() -> Address {
|
fn default() -> Address {
|
||||||
Address {
|
Address {
|
||||||
id: 0,
|
|
||||||
host: String::from("127.0.0.1"),
|
host: String::from("127.0.0.1"),
|
||||||
port: String::from("5432"),
|
port: String::from("5432"),
|
||||||
shard: 0,
|
shard: 0,
|
||||||
|
|||||||
@@ -8,5 +8,7 @@ pub enum Error {
|
|||||||
// ServerTimeout,
|
// ServerTimeout,
|
||||||
// DirtyServer,
|
// DirtyServer,
|
||||||
BadConfig,
|
BadConfig,
|
||||||
|
BadUserList,
|
||||||
AllServersDown,
|
AllServersDown,
|
||||||
|
AuthenticationError
|
||||||
}
|
}
|
||||||
|
|||||||
25
src/main.rs
25
src/main.rs
@@ -31,6 +31,7 @@ extern crate once_cell;
|
|||||||
extern crate serde;
|
extern crate serde;
|
||||||
extern crate serde_derive;
|
extern crate serde_derive;
|
||||||
extern crate sqlparser;
|
extern crate sqlparser;
|
||||||
|
extern crate statsd;
|
||||||
extern crate tokio;
|
extern crate tokio;
|
||||||
extern crate toml;
|
extern crate toml;
|
||||||
|
|
||||||
@@ -49,6 +50,7 @@ use std::sync::Arc;
|
|||||||
mod admin;
|
mod admin;
|
||||||
mod client;
|
mod client;
|
||||||
mod config;
|
mod config;
|
||||||
|
mod userlist;
|
||||||
mod constants;
|
mod constants;
|
||||||
mod errors;
|
mod errors;
|
||||||
mod messages;
|
mod messages;
|
||||||
@@ -93,6 +95,15 @@ async fn main() {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Prepare user list
|
||||||
|
match userlist::parse("userlist.json").await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(err) => {
|
||||||
|
error!("Userlist parse error: {:?}", err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let config = get_config();
|
let config = get_config();
|
||||||
|
|
||||||
let addr = format!("{}:{}", config.general.host, config.general.port);
|
let addr = format!("{}:{}", config.general.host, config.general.port);
|
||||||
@@ -112,19 +123,15 @@ async fn main() {
|
|||||||
|
|
||||||
// Collect statistics and send them to StatsD
|
// Collect statistics and send them to StatsD
|
||||||
let (tx, rx) = mpsc::channel(100);
|
let (tx, rx) = mpsc::channel(100);
|
||||||
|
let collector_tx = tx.clone();
|
||||||
|
tokio::task::spawn(async move {
|
||||||
|
let mut stats_collector = Collector::new(rx, collector_tx);
|
||||||
|
stats_collector.collect().await;
|
||||||
|
});
|
||||||
|
|
||||||
// Connection pool for all shards and replicas
|
|
||||||
let mut pool =
|
let mut pool =
|
||||||
ConnectionPool::from_config(client_server_map.clone(), Reporter::new(tx.clone())).await;
|
ConnectionPool::from_config(client_server_map.clone(), Reporter::new(tx.clone())).await;
|
||||||
|
|
||||||
let collector_tx = tx.clone();
|
|
||||||
let addresses = pool.databases();
|
|
||||||
tokio::task::spawn(async move {
|
|
||||||
let mut stats_collector = Collector::new(rx, collector_tx);
|
|
||||||
stats_collector.collect(addresses).await;
|
|
||||||
});
|
|
||||||
|
|
||||||
// Connect to all servers and validate their versions.
|
|
||||||
let server_info = match pool.validate().await {
|
let server_info = match pool.validate().await {
|
||||||
Ok(info) => info,
|
Ok(info) => info,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
|||||||
100
src/messages.rs
100
src/messages.rs
@@ -1,5 +1,7 @@
|
|||||||
/// Helper functions to send one-off protocol messages
|
/// Helper functions to send one-off protocol messages
|
||||||
/// and handle TcpStream (TCP socket).
|
/// and handle TcpStream (TCP socket).
|
||||||
|
|
||||||
|
|
||||||
use bytes::{Buf, BufMut, BytesMut};
|
use bytes::{Buf, BufMut, BytesMut};
|
||||||
use md5::{Digest, Md5};
|
use md5::{Digest, Md5};
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
|
||||||
@@ -7,10 +9,16 @@ use tokio::net::{
|
|||||||
tcp::{OwnedReadHalf, OwnedWriteHalf},
|
tcp::{OwnedReadHalf, OwnedWriteHalf},
|
||||||
TcpStream,
|
TcpStream,
|
||||||
};
|
};
|
||||||
|
use log::{error};
|
||||||
|
|
||||||
use crate::errors::Error;
|
use crate::errors::Error;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use rand::Rng;
|
||||||
|
|
||||||
|
use crate::userlist::get_user_list;
|
||||||
|
|
||||||
|
|
||||||
/// Postgres data type mappings
|
/// Postgres data type mappings
|
||||||
/// used in RowDescription ('T') message.
|
/// used in RowDescription ('T') message.
|
||||||
pub enum DataType {
|
pub enum DataType {
|
||||||
@@ -29,6 +37,98 @@ impl From<&DataType> for i32 {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
1. Generate salt (4 bytes of random data)
|
||||||
|
md5(concat(md5(concat(password, username)), random-salt)))
|
||||||
|
2. Send md5 auth request
|
||||||
|
3. recieve PasswordMessage with salt.
|
||||||
|
4. refactor md5_password function to be reusable
|
||||||
|
5. check username hash combo against file
|
||||||
|
6. AuthenticationOk or ErrorResponse
|
||||||
|
**/
|
||||||
|
pub async fn start_auth(stream: &mut TcpStream, user_name: &String) -> Result<(), Error> {
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
|
||||||
|
//Generate random 4 byte salt
|
||||||
|
let salt = rng.gen::<u32>();
|
||||||
|
|
||||||
|
// Send AuthenticationMD5Password request
|
||||||
|
send_md5_request(stream, salt).await?;
|
||||||
|
|
||||||
|
let code = match stream.read_u8().await {
|
||||||
|
Ok(code) => code as char,
|
||||||
|
Err(_) => return Err(Error::AuthenticationError),
|
||||||
|
};
|
||||||
|
|
||||||
|
match code {
|
||||||
|
// Password response
|
||||||
|
'p' => {
|
||||||
|
fetch_password_and_authenticate(stream, &user_name, &salt).await?;
|
||||||
|
Ok(auth_ok(stream).await?)
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
error!("Unknown code: {}", code);
|
||||||
|
return Err(Error::AuthenticationError);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn send_md5_request(stream: &mut TcpStream, salt: u32) -> Result<(), Error> {
|
||||||
|
let mut authentication_md5password = BytesMut::with_capacity(12);
|
||||||
|
authentication_md5password.put_u8(b'R');
|
||||||
|
authentication_md5password.put_i32(12);
|
||||||
|
authentication_md5password.put_i32(5);
|
||||||
|
authentication_md5password.put_u32(salt);
|
||||||
|
|
||||||
|
// Send AuthenticationMD5Password request
|
||||||
|
Ok(write_all(stream, authentication_md5password).await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn fetch_password_and_authenticate(stream: &mut TcpStream, user_name: &String, salt: &u32) -> Result<(), Error> {
|
||||||
|
/**
|
||||||
|
1. How do I store the lists of users and paswords? clear text or hash?? wtf
|
||||||
|
2. Add auth to tests
|
||||||
|
**/
|
||||||
|
|
||||||
|
let len = match stream.read_i32().await {
|
||||||
|
Ok(len) => len,
|
||||||
|
Err(_) => return Err(Error::AuthenticationError),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Read whatever is left.
|
||||||
|
let mut password_hash = vec![0u8; len as usize - 4];
|
||||||
|
|
||||||
|
match stream.read_exact(&mut password_hash).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(_) => return Err(Error::AuthenticationError),
|
||||||
|
};
|
||||||
|
|
||||||
|
let user_list = get_user_list();
|
||||||
|
let mut password: String = String::new();
|
||||||
|
match user_list.get(&user_name) {
|
||||||
|
Some(&p) => password = p,
|
||||||
|
None => return Err(Error::AuthenticationError),
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut md5 = Md5::new();
|
||||||
|
|
||||||
|
// concat('md5', md5(concat(md5(concat(password, username)), random-salt)))
|
||||||
|
// First pass
|
||||||
|
md5.update(&password.as_bytes());
|
||||||
|
md5.update(&user_name.as_bytes());
|
||||||
|
let output = md5.finalize_reset();
|
||||||
|
// Second pass
|
||||||
|
md5.update(format!("{:x}", output));
|
||||||
|
md5.update(salt.to_be_bytes().to_vec());
|
||||||
|
|
||||||
|
|
||||||
|
let password_string: String = String::from_utf8(password_hash).expect("Could not get password hash");
|
||||||
|
match format!("md5{:x}", md5.finalize()) == password_string {
|
||||||
|
true => Ok(()),
|
||||||
|
_ => Err(Error::AuthenticationError)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Tell the client that authentication handshake completed successfully.
|
/// Tell the client that authentication handshake completed successfully.
|
||||||
pub async fn auth_ok(stream: &mut TcpStream) -> Result<(), Error> {
|
pub async fn auth_ok(stream: &mut TcpStream) -> Result<(), Error> {
|
||||||
let mut auth_ok = BytesMut::with_capacity(9);
|
let mut auth_ok = BytesMut::with_capacity(9);
|
||||||
|
|||||||
40
src/pool.rs
40
src/pool.rs
@@ -38,7 +38,6 @@ impl ConnectionPool {
|
|||||||
let mut shards = Vec::new();
|
let mut shards = Vec::new();
|
||||||
let mut addresses = Vec::new();
|
let mut addresses = Vec::new();
|
||||||
let mut banlist = Vec::new();
|
let mut banlist = Vec::new();
|
||||||
let mut address_id = 0;
|
|
||||||
let mut shard_ids = config
|
let mut shard_ids = config
|
||||||
.shards
|
.shards
|
||||||
.clone()
|
.clone()
|
||||||
@@ -64,7 +63,6 @@ impl ConnectionPool {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let address = Address {
|
let address = Address {
|
||||||
id: address_id,
|
|
||||||
host: server.0.clone(),
|
host: server.0.clone(),
|
||||||
port: server.1.to_string(),
|
port: server.1.to_string(),
|
||||||
role: role,
|
role: role,
|
||||||
@@ -72,8 +70,6 @@ impl ConnectionPool {
|
|||||||
shard: shard_idx.parse::<usize>().unwrap(),
|
shard: shard_idx.parse::<usize>().unwrap(),
|
||||||
};
|
};
|
||||||
|
|
||||||
address_id += 1;
|
|
||||||
|
|
||||||
if role == Role::Replica {
|
if role == Role::Replica {
|
||||||
replica_number += 1;
|
replica_number += 1;
|
||||||
}
|
}
|
||||||
@@ -125,13 +121,9 @@ impl ConnectionPool {
|
|||||||
pub async fn validate(&mut self) -> Result<BytesMut, Error> {
|
pub async fn validate(&mut self) -> Result<BytesMut, Error> {
|
||||||
let mut server_infos = Vec::new();
|
let mut server_infos = Vec::new();
|
||||||
|
|
||||||
let stats = self.stats.clone();
|
|
||||||
for shard in 0..self.shards() {
|
for shard in 0..self.shards() {
|
||||||
for _ in 0..self.servers(shard) {
|
for _ in 0..self.servers(shard) {
|
||||||
// To keep stats consistent.
|
let connection = match self.get(shard, None).await {
|
||||||
let fake_process_id = 0;
|
|
||||||
|
|
||||||
let connection = match self.get(shard, None, fake_process_id).await {
|
|
||||||
Ok(conn) => conn,
|
Ok(conn) => conn,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("Shard {} down or misconfigured: {:?}", shard, err);
|
error!("Shard {} down or misconfigured: {:?}", shard, err);
|
||||||
@@ -145,8 +137,6 @@ impl ConnectionPool {
|
|||||||
|
|
||||||
let server_info = server.server_info();
|
let server_info = server.server_info();
|
||||||
|
|
||||||
stats.client_disconnecting(fake_process_id, address.id);
|
|
||||||
|
|
||||||
if server_infos.len() > 0 {
|
if server_infos.len() > 0 {
|
||||||
// Compare against the last server checked.
|
// Compare against the last server checked.
|
||||||
if server_info != server_infos[server_infos.len() - 1] {
|
if server_info != server_infos[server_infos.len() - 1] {
|
||||||
@@ -175,7 +165,6 @@ impl ConnectionPool {
|
|||||||
&mut self,
|
&mut self,
|
||||||
shard: usize,
|
shard: usize,
|
||||||
role: Option<Role>,
|
role: Option<Role>,
|
||||||
process_id: i32,
|
|
||||||
) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> {
|
) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let addresses = &self.addresses[shard];
|
let addresses = &self.addresses[shard];
|
||||||
@@ -211,8 +200,6 @@ impl ConnectionPool {
|
|||||||
let index = self.round_robin % addresses.len();
|
let index = self.round_robin % addresses.len();
|
||||||
let address = &addresses[index];
|
let address = &addresses[index];
|
||||||
|
|
||||||
self.stats.client_waiting(process_id, address.id);
|
|
||||||
|
|
||||||
// Make sure you're getting a primary or a replica
|
// Make sure you're getting a primary or a replica
|
||||||
// as per request. If no specific role is requested, the first
|
// as per request. If no specific role is requested, the first
|
||||||
// available will be chosen.
|
// available will be chosen.
|
||||||
@@ -232,9 +219,6 @@ impl ConnectionPool {
|
|||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("Banning replica {}, error: {:?}", index, err);
|
error!("Banning replica {}, error: {:?}", index, err);
|
||||||
self.ban(address, shard);
|
self.ban(address, shard);
|
||||||
self.stats.client_disconnecting(process_id, address.id);
|
|
||||||
self.stats
|
|
||||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -243,7 +227,7 @@ impl ConnectionPool {
|
|||||||
let server = &mut *conn;
|
let server = &mut *conn;
|
||||||
let healthcheck_timeout = get_config().general.healthcheck_timeout;
|
let healthcheck_timeout = get_config().general.healthcheck_timeout;
|
||||||
|
|
||||||
self.stats.server_tested(server.process_id(), address.id);
|
self.stats.server_tested(server.process_id());
|
||||||
|
|
||||||
match tokio::time::timeout(
|
match tokio::time::timeout(
|
||||||
tokio::time::Duration::from_millis(healthcheck_timeout),
|
tokio::time::Duration::from_millis(healthcheck_timeout),
|
||||||
@@ -254,9 +238,8 @@ impl ConnectionPool {
|
|||||||
// Check if health check succeeded
|
// Check if health check succeeded
|
||||||
Ok(res) => match res {
|
Ok(res) => match res {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
self.stats
|
self.stats.checkout_time(now.elapsed().as_micros());
|
||||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
self.stats.server_idle(conn.process_id());
|
||||||
self.stats.server_idle(conn.process_id(), address.id);
|
|
||||||
return Ok((conn, address.clone()));
|
return Ok((conn, address.clone()));
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
@@ -265,9 +248,6 @@ impl ConnectionPool {
|
|||||||
server.mark_bad();
|
server.mark_bad();
|
||||||
|
|
||||||
self.ban(address, shard);
|
self.ban(address, shard);
|
||||||
self.stats.client_disconnecting(process_id, address.id);
|
|
||||||
self.stats
|
|
||||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@@ -278,9 +258,6 @@ impl ConnectionPool {
|
|||||||
server.mark_bad();
|
server.mark_bad();
|
||||||
|
|
||||||
self.ban(address, shard);
|
self.ban(address, shard);
|
||||||
self.stats.client_disconnecting(process_id, address.id);
|
|
||||||
self.stats
|
|
||||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -418,14 +395,13 @@ impl ManageConnection for ServerPool {
|
|||||||
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
|
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
|
||||||
info!(
|
info!(
|
||||||
"Creating a new connection to {:?} using user {:?}",
|
"Creating a new connection to {:?} using user {:?}",
|
||||||
self.address.name(),
|
self.address, self.user.name
|
||||||
self.user.name
|
|
||||||
);
|
);
|
||||||
|
|
||||||
// Put a temporary process_id into the stats
|
// Put a temporary process_id into the stats
|
||||||
// for server login.
|
// for server login.
|
||||||
let process_id = rand::random::<i32>();
|
let process_id = rand::random::<i32>();
|
||||||
self.stats.server_login(process_id, self.address.id);
|
self.stats.server_login(process_id);
|
||||||
|
|
||||||
match Server::startup(
|
match Server::startup(
|
||||||
&self.address,
|
&self.address,
|
||||||
@@ -438,12 +414,12 @@ impl ManageConnection for ServerPool {
|
|||||||
{
|
{
|
||||||
Ok(conn) => {
|
Ok(conn) => {
|
||||||
// Remove the temporary process_id from the stats.
|
// Remove the temporary process_id from the stats.
|
||||||
self.stats.server_disconnecting(process_id, self.address.id);
|
self.stats.server_disconnecting(process_id);
|
||||||
Ok(conn)
|
Ok(conn)
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
// Remove the temporary process_id from the stats.
|
// Remove the temporary process_id from the stats.
|
||||||
self.stats.server_disconnecting(process_id, self.address.id);
|
self.stats.server_disconnecting(process_id);
|
||||||
Err(err)
|
Err(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ use sqlparser::parser::Parser;
|
|||||||
|
|
||||||
const CUSTOM_SQL_REGEXES: [&str; 5] = [
|
const CUSTOM_SQL_REGEXES: [&str; 5] = [
|
||||||
r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$",
|
r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$",
|
||||||
r"(?i)^ *SET SHARD TO '?([0-9]+|ANY)'? *;? *$",
|
r"(?i)^ *SET SHARD TO '?([0-9]+)'? *;? *$",
|
||||||
r"(?i)^ *SHOW SHARD *;? *$",
|
r"(?i)^ *SHOW SHARD *;? *$",
|
||||||
r"(?i)^ *SET SERVER ROLE TO '(PRIMARY|REPLICA|ANY|AUTO|DEFAULT)' *;? *$",
|
r"(?i)^ *SET SERVER ROLE TO '(PRIMARY|REPLICA|ANY|AUTO|DEFAULT)' *;? *$",
|
||||||
r"(?i)^ *SHOW SERVER ROLE *;? *$",
|
r"(?i)^ *SHOW SERVER ROLE *;? *$",
|
||||||
@@ -192,10 +192,7 @@ impl QueryRouter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Command::SetShard => {
|
Command::SetShard => {
|
||||||
self.active_shard = match value.to_ascii_uppercase().as_ref() {
|
self.active_shard = Some(value.parse::<usize>().unwrap());
|
||||||
"ANY" => Some(rand::random::<usize>() % self.shards),
|
|
||||||
_ => Some(value.parse::<usize>().unwrap()),
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Command::SetServerRole => {
|
Command::SetServerRole => {
|
||||||
|
|||||||
@@ -268,8 +268,7 @@ impl Server {
|
|||||||
|
|
||||||
/// Send messages to the server from the client.
|
/// Send messages to the server from the client.
|
||||||
pub async fn send(&mut self, messages: BytesMut) -> Result<(), Error> {
|
pub async fn send(&mut self, messages: BytesMut) -> Result<(), Error> {
|
||||||
self.stats
|
self.stats.data_sent(messages.len());
|
||||||
.data_sent(messages.len(), self.process_id, self.address.id);
|
|
||||||
|
|
||||||
match write_all_half(&mut self.write, messages).await {
|
match write_all_half(&mut self.write, messages).await {
|
||||||
Ok(_) => Ok(()),
|
Ok(_) => Ok(()),
|
||||||
@@ -375,8 +374,7 @@ impl Server {
|
|||||||
let bytes = self.buffer.clone();
|
let bytes = self.buffer.clone();
|
||||||
|
|
||||||
// Keep track of how much data we got from the server for stats.
|
// Keep track of how much data we got from the server for stats.
|
||||||
self.stats
|
self.stats.data_received(bytes.len());
|
||||||
.data_received(bytes.len(), self.process_id, self.address.id);
|
|
||||||
|
|
||||||
// Clear the buffer for next query.
|
// Clear the buffer for next query.
|
||||||
self.buffer.clear();
|
self.buffer.clear();
|
||||||
@@ -472,8 +470,7 @@ impl Drop for Server {
|
|||||||
/// the socket is in non-blocking mode, so it may not be ready
|
/// the socket is in non-blocking mode, so it may not be ready
|
||||||
/// for a write.
|
/// for a write.
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.stats
|
self.stats.server_disconnecting(self.process_id());
|
||||||
.server_disconnecting(self.process_id(), self.address.id);
|
|
||||||
|
|
||||||
let mut bytes = BytesMut::with_capacity(4);
|
let mut bytes = BytesMut::with_capacity(4);
|
||||||
bytes.put_u8(b'X');
|
bytes.put_u8(b'X');
|
||||||
|
|||||||
275
src/stats.rs
275
src/stats.rs
@@ -1,20 +1,17 @@
|
|||||||
/// Statistics and reporting.
|
use log::{debug, info};
|
||||||
use log::info;
|
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use std::collections::HashMap;
|
use statsd::Client;
|
||||||
use tokio::sync::mpsc::{Receiver, Sender};
|
use tokio::sync::mpsc::{Receiver, Sender};
|
||||||
|
|
||||||
// Latest stats updated every second; used in SHOW STATS and other admin commands.
|
use std::collections::HashMap;
|
||||||
static LATEST_STATS: Lazy<Mutex<HashMap<usize, HashMap<String, i64>>>> =
|
|
||||||
Lazy::new(|| Mutex::new(HashMap::new()));
|
|
||||||
|
|
||||||
// Statistics period used for average calculations.
|
use crate::config::get_config;
|
||||||
// 15 seconds.
|
|
||||||
static STAT_PERIOD: u64 = 15000;
|
// Stats used in SHOW STATS
|
||||||
|
static LATEST_STATS: Lazy<Mutex<HashMap<String, i64>>> = Lazy::new(|| Mutex::new(HashMap::new()));
|
||||||
|
static STAT_PERIOD: u64 = 15000; //15 seconds
|
||||||
|
|
||||||
/// The names for the events reported
|
|
||||||
/// to the statistics collector.
|
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
enum EventName {
|
enum EventName {
|
||||||
CheckoutTime,
|
CheckoutTime,
|
||||||
@@ -31,253 +28,186 @@ enum EventName {
|
|||||||
ServerTested,
|
ServerTested,
|
||||||
ServerLogin,
|
ServerLogin,
|
||||||
ServerDisconnecting,
|
ServerDisconnecting,
|
||||||
UpdateStats,
|
FlushStatsToStatsD,
|
||||||
UpdateAverages,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Event data sent to the collector
|
|
||||||
/// from clients and servers.
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Event {
|
pub struct Event {
|
||||||
/// The name of the event being reported.
|
|
||||||
name: EventName,
|
name: EventName,
|
||||||
|
|
||||||
/// The value being reported. Meaning differs based on event name.
|
|
||||||
value: i64,
|
value: i64,
|
||||||
|
process_id: Option<i32>,
|
||||||
/// The client or server connection reporting the event.
|
|
||||||
process_id: i32,
|
|
||||||
|
|
||||||
/// The server the client is connected to.
|
|
||||||
address_id: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The statistics reporter. An instance is given
|
|
||||||
/// to each possible source of statistics,
|
|
||||||
/// e.g. clients, servers, connection pool.
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct Reporter {
|
pub struct Reporter {
|
||||||
tx: Sender<Event>,
|
tx: Sender<Event>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Reporter {
|
impl Reporter {
|
||||||
/// Create a new Reporter instance.
|
|
||||||
pub fn new(tx: Sender<Event>) -> Reporter {
|
pub fn new(tx: Sender<Event>) -> Reporter {
|
||||||
Reporter { tx: tx }
|
Reporter { tx: tx }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report a query executed by a client against
|
pub fn query(&self) {
|
||||||
/// a server identified by the `address_id`.
|
|
||||||
pub fn query(&self, process_id: i32, address_id: usize) {
|
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::Query,
|
name: EventName::Query,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: process_id,
|
process_id: None,
|
||||||
address_id: address_id,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report a transaction executed by a client against
|
pub fn transaction(&self) {
|
||||||
/// a server identified by the `address_id`.
|
|
||||||
pub fn transaction(&self, process_id: i32, address_id: usize) {
|
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::Transaction,
|
name: EventName::Transaction,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: process_id,
|
process_id: None,
|
||||||
address_id: address_id,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report data sent to a server identified by `address_id`.
|
pub fn data_sent(&self, amount: usize) {
|
||||||
/// The `amount` is measured in bytes.
|
|
||||||
pub fn data_sent(&self, amount: usize, process_id: i32, address_id: usize) {
|
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::DataSent,
|
name: EventName::DataSent,
|
||||||
value: amount as i64,
|
value: amount as i64,
|
||||||
process_id: process_id,
|
process_id: None,
|
||||||
address_id: address_id,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report data received from a server identified by `address_id`.
|
pub fn data_received(&self, amount: usize) {
|
||||||
/// The `amount` is measured in bytes.
|
|
||||||
pub fn data_received(&self, amount: usize, process_id: i32, address_id: usize) {
|
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::DataReceived,
|
name: EventName::DataReceived,
|
||||||
value: amount as i64,
|
value: amount as i64,
|
||||||
process_id: process_id,
|
process_id: None,
|
||||||
address_id: address_id,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Time spent waiting to get a healthy connection from the pool
|
pub fn checkout_time(&self, ms: u128) {
|
||||||
/// for a server identified by `address_id`.
|
|
||||||
/// Measured in milliseconds.
|
|
||||||
pub fn checkout_time(&self, ms: u128, process_id: i32, address_id: usize) {
|
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::CheckoutTime,
|
name: EventName::CheckoutTime,
|
||||||
value: ms as i64,
|
value: ms as i64,
|
||||||
process_id: process_id,
|
process_id: None,
|
||||||
address_id: address_id,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client identified by `process_id` waiting for a connection
|
pub fn client_waiting(&self, process_id: i32) {
|
||||||
/// to a server identified by `address_id`.
|
|
||||||
pub fn client_waiting(&self, process_id: i32, address_id: usize) {
|
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::ClientWaiting,
|
name: EventName::ClientWaiting,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: process_id,
|
process_id: Some(process_id),
|
||||||
address_id: address_id,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client identified by `process_id` is done waiting for a connection
|
pub fn client_active(&self, process_id: i32) {
|
||||||
/// to a server identified by `address_id` and is about to query the server.
|
|
||||||
pub fn client_active(&self, process_id: i32, address_id: usize) {
|
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::ClientActive,
|
name: EventName::ClientActive,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: process_id,
|
process_id: Some(process_id),
|
||||||
address_id: address_id,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client identified by `process_id` is done querying the server
|
pub fn client_idle(&self, process_id: i32) {
|
||||||
/// identified by `address_id` and is no longer active.
|
|
||||||
pub fn client_idle(&self, process_id: i32, address_id: usize) {
|
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::ClientIdle,
|
name: EventName::ClientIdle,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: process_id,
|
process_id: Some(process_id),
|
||||||
address_id: address_id,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client identified by `process_id` is disconecting from the pooler.
|
pub fn client_disconnecting(&self, process_id: i32) {
|
||||||
/// The last server it was connected to is identified by `address_id`.
|
|
||||||
pub fn client_disconnecting(&self, process_id: i32, address_id: usize) {
|
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::ClientDisconnecting,
|
name: EventName::ClientDisconnecting,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: process_id,
|
process_id: Some(process_id),
|
||||||
address_id: address_id,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` for
|
pub fn server_active(&self, process_id: i32) {
|
||||||
/// a configured server identified by `address_id` is actively used
|
|
||||||
/// by a client.
|
|
||||||
pub fn server_active(&self, process_id: i32, address_id: usize) {
|
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::ServerActive,
|
name: EventName::ServerActive,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: process_id,
|
process_id: Some(process_id),
|
||||||
address_id: address_id,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` for
|
pub fn server_idle(&self, process_id: i32) {
|
||||||
/// a configured server identified by `address_id` is no longer
|
|
||||||
/// actively used by a client and is now idle.
|
|
||||||
pub fn server_idle(&self, process_id: i32, address_id: usize) {
|
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::ServerIdle,
|
name: EventName::ServerIdle,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: process_id,
|
process_id: Some(process_id),
|
||||||
address_id: address_id,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` for
|
pub fn server_login(&self, process_id: i32) {
|
||||||
/// a configured server identified by `address_id` is attempting
|
|
||||||
/// to login.
|
|
||||||
pub fn server_login(&self, process_id: i32, address_id: usize) {
|
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::ServerLogin,
|
name: EventName::ServerLogin,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: process_id,
|
process_id: Some(process_id),
|
||||||
address_id: address_id,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` for
|
pub fn server_tested(&self, process_id: i32) {
|
||||||
/// a configured server identified by `address_id` is being
|
|
||||||
/// tested before being given to a client.
|
|
||||||
pub fn server_tested(&self, process_id: i32, address_id: usize) {
|
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::ServerTested,
|
name: EventName::ServerTested,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: process_id,
|
process_id: Some(process_id),
|
||||||
address_id: address_id,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` is disconecting from the pooler.
|
pub fn server_disconnecting(&self, process_id: i32) {
|
||||||
/// The configured server it was connected to is identified by `address_id`.
|
|
||||||
pub fn server_disconnecting(&self, process_id: i32, address_id: usize) {
|
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::ServerDisconnecting,
|
name: EventName::ServerDisconnecting,
|
||||||
value: 1,
|
value: 1,
|
||||||
process_id: process_id,
|
process_id: Some(process_id),
|
||||||
address_id: address_id,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = self.tx.try_send(event);
|
let _ = self.tx.try_send(event);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The statistics collector which is receiving statistics
|
|
||||||
/// from clients, servers, and the connection pool. There is
|
|
||||||
/// only one collector (kind of like a singleton).
|
|
||||||
/// The collector can trigger events on its own, e.g.
|
|
||||||
/// it updates aggregates every second and averages every
|
|
||||||
/// 15 seconds.
|
|
||||||
pub struct Collector {
|
pub struct Collector {
|
||||||
rx: Receiver<Event>,
|
rx: Receiver<Event>,
|
||||||
tx: Sender<Event>,
|
tx: Sender<Event>,
|
||||||
|
client: Client,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Collector {
|
impl Collector {
|
||||||
/// Create a new collector instance. There should only be one instance
|
|
||||||
/// at a time. This is ensured by mpsc which allows only one receiver.
|
|
||||||
pub fn new(rx: Receiver<Event>, tx: Sender<Event>) -> Collector {
|
pub fn new(rx: Receiver<Event>, tx: Sender<Event>) -> Collector {
|
||||||
Collector { rx, tx }
|
Collector {
|
||||||
|
rx,
|
||||||
|
tx,
|
||||||
|
client: Client::new(&get_config().general.statsd_address, "pgcat").unwrap(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The statistics collection handler. It will collect statistics
|
pub async fn collect(&mut self) {
|
||||||
/// for `address_id`s starting at 0 up to `addresses`.
|
|
||||||
pub async fn collect(&mut self, addresses: usize) {
|
|
||||||
info!("Events reporter started");
|
info!("Events reporter started");
|
||||||
|
|
||||||
let stats_template = HashMap::from([
|
let mut stats = HashMap::from([
|
||||||
("total_query_count", 0),
|
("total_query_count", 0),
|
||||||
("total_xact_count", 0),
|
("total_xact_count", 0),
|
||||||
("total_sent", 0),
|
("total_sent", 0),
|
||||||
@@ -302,47 +232,25 @@ impl Collector {
|
|||||||
("sv_tested", 0),
|
("sv_tested", 0),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
let mut stats = HashMap::new();
|
|
||||||
|
|
||||||
// Stats saved after each iteration of the flush event. Used in calculation
|
// Stats saved after each iteration of the flush event. Used in calculation
|
||||||
// of averages in the last flush period.
|
// of averages in the last flush period.
|
||||||
let mut old_stats: HashMap<usize, HashMap<String, i64>> = HashMap::new();
|
let mut old_stats: HashMap<String, i64> = HashMap::new();
|
||||||
|
|
||||||
// Track which state the client and server are at any given time.
|
// Track which state the client and server are at any given time.
|
||||||
let mut client_server_states: HashMap<usize, HashMap<i32, EventName>> = HashMap::new();
|
let mut client_server_states: HashMap<i32, EventName> = HashMap::new();
|
||||||
|
|
||||||
// Flush stats to StatsD and calculate averages every 15 seconds.
|
// Flush stats to StatsD and calculate averages every 15 seconds.
|
||||||
let tx = self.tx.clone();
|
|
||||||
tokio::task::spawn(async move {
|
|
||||||
let mut interval =
|
|
||||||
tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD / 15));
|
|
||||||
loop {
|
|
||||||
interval.tick().await;
|
|
||||||
for address_id in 0..addresses {
|
|
||||||
let _ = tx.try_send(Event {
|
|
||||||
name: EventName::UpdateStats,
|
|
||||||
value: 0,
|
|
||||||
process_id: -1,
|
|
||||||
address_id: address_id,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let tx = self.tx.clone();
|
let tx = self.tx.clone();
|
||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
let mut interval =
|
let mut interval =
|
||||||
tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD));
|
tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD));
|
||||||
loop {
|
loop {
|
||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
for address_id in 0..addresses {
|
let _ = tx.try_send(Event {
|
||||||
let _ = tx.try_send(Event {
|
name: EventName::FlushStatsToStatsD,
|
||||||
name: EventName::UpdateAverages,
|
value: 0,
|
||||||
value: 0,
|
process_id: None,
|
||||||
process_id: -1,
|
});
|
||||||
address_id: address_id,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -356,14 +264,6 @@ impl Collector {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let stats = stats
|
|
||||||
.entry(stat.address_id)
|
|
||||||
.or_insert(stats_template.clone());
|
|
||||||
let client_server_states = client_server_states
|
|
||||||
.entry(stat.address_id)
|
|
||||||
.or_insert(HashMap::new());
|
|
||||||
let old_stats = old_stats.entry(stat.address_id).or_insert(HashMap::new());
|
|
||||||
|
|
||||||
// Some are counters, some are gauges...
|
// Some are counters, some are gauges...
|
||||||
match stat.name {
|
match stat.name {
|
||||||
EventName::Query => {
|
EventName::Query => {
|
||||||
@@ -413,16 +313,16 @@ impl Collector {
|
|||||||
| EventName::ServerIdle
|
| EventName::ServerIdle
|
||||||
| EventName::ServerTested
|
| EventName::ServerTested
|
||||||
| EventName::ServerLogin => {
|
| EventName::ServerLogin => {
|
||||||
client_server_states.insert(stat.process_id, stat.name);
|
client_server_states.insert(stat.process_id.unwrap(), stat.name);
|
||||||
}
|
}
|
||||||
|
|
||||||
EventName::ClientDisconnecting | EventName::ServerDisconnecting => {
|
EventName::ClientDisconnecting | EventName::ServerDisconnecting => {
|
||||||
client_server_states.remove(&stat.process_id);
|
client_server_states.remove(&stat.process_id.unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
EventName::UpdateStats => {
|
EventName::FlushStatsToStatsD => {
|
||||||
// Calculate connection states
|
// Calculate connection states
|
||||||
for (_, state) in client_server_states.iter() {
|
for (_, state) in &client_server_states {
|
||||||
match state {
|
match state {
|
||||||
EventName::ClientActive => {
|
EventName::ClientActive => {
|
||||||
let counter = stats.entry("cl_active").or_insert(0);
|
let counter = stats.entry("cl_active").or_insert(0);
|
||||||
@@ -434,6 +334,11 @@ impl Collector {
|
|||||||
*counter += 1;
|
*counter += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
EventName::ClientIdle => {
|
||||||
|
let counter = stats.entry("cl_idle").or_insert(0);
|
||||||
|
*counter += 1;
|
||||||
|
}
|
||||||
|
|
||||||
EventName::ServerIdle => {
|
EventName::ServerIdle => {
|
||||||
let counter = stats.entry("sv_idle").or_insert(0);
|
let counter = stats.entry("sv_idle").or_insert(0);
|
||||||
*counter += 1;
|
*counter += 1;
|
||||||
@@ -454,20 +359,39 @@ impl Collector {
|
|||||||
*counter += 1;
|
*counter += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
EventName::ClientIdle => {
|
|
||||||
let counter = stats.entry("cl_idle").or_insert(0);
|
|
||||||
*counter += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Calculate averages
|
||||||
|
for stat in &[
|
||||||
|
"avg_query_count",
|
||||||
|
"avgxact_count",
|
||||||
|
"avg_sent",
|
||||||
|
"avg_received",
|
||||||
|
"avg_wait_time",
|
||||||
|
] {
|
||||||
|
let total_name = stat.replace("avg_", "total_");
|
||||||
|
let old_value = old_stats.entry(total_name.clone()).or_insert(0);
|
||||||
|
let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned();
|
||||||
|
let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second
|
||||||
|
|
||||||
|
stats.insert(stat, avg);
|
||||||
|
*old_value = new_value;
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("{:?}", stats);
|
||||||
|
|
||||||
// Update latest stats used in SHOW STATS
|
// Update latest stats used in SHOW STATS
|
||||||
let mut guard = LATEST_STATS.lock();
|
let mut guard = LATEST_STATS.lock();
|
||||||
|
for (key, value) in &stats {
|
||||||
|
guard.insert(key.to_string(), value.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut pipeline = self.client.pipeline();
|
||||||
|
|
||||||
for (key, value) in stats.iter() {
|
for (key, value) in stats.iter() {
|
||||||
let entry = guard.entry(stat.address_id).or_insert(HashMap::new());
|
pipeline.gauge(key, *value as f64);
|
||||||
entry.insert(key.to_string(), value.clone());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// These are re-calculated every iteration of the loop, so we don't want to add values
|
// These are re-calculated every iteration of the loop, so we don't want to add values
|
||||||
@@ -485,33 +409,14 @@ impl Collector {
|
|||||||
] {
|
] {
|
||||||
stats.insert(stat, 0);
|
stats.insert(stat, 0);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
EventName::UpdateAverages => {
|
pipeline.send(&self.client);
|
||||||
// Calculate averages
|
|
||||||
for stat in &[
|
|
||||||
"avg_query_count",
|
|
||||||
"avgxact_count",
|
|
||||||
"avg_sent",
|
|
||||||
"avg_received",
|
|
||||||
"avg_wait_time",
|
|
||||||
] {
|
|
||||||
let total_name = stat.replace("avg_", "total_");
|
|
||||||
let old_value = old_stats.entry(total_name.clone()).or_insert(0);
|
|
||||||
let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned();
|
|
||||||
let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second
|
|
||||||
|
|
||||||
stats.insert(stat, avg);
|
|
||||||
*old_value = new_value;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get a snapshot of statistics. Updated once a second
|
pub fn get_stats() -> HashMap<String, i64> {
|
||||||
/// by the `Collector`.
|
|
||||||
pub fn get_stats() -> HashMap<usize, HashMap<String, i64>> {
|
|
||||||
LATEST_STATS.lock().clone()
|
LATEST_STATS.lock().clone()
|
||||||
}
|
}
|
||||||
|
|||||||
4
src/userlist.json
Normal file
4
src/userlist.json
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
{
|
||||||
|
"sven": "clear_text_password",
|
||||||
|
"sharding_user": "sharding_user"
|
||||||
|
}
|
||||||
57
src/userlist.rs
Normal file
57
src/userlist.rs
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
use arc_swap::{ArcSwap, Guard};
|
||||||
|
use log::{error};
|
||||||
|
use once_cell::sync::Lazy;
|
||||||
|
use tokio::fs::File;
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
|
use std::collections::{HashMap};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use crate::errors::Error;
|
||||||
|
|
||||||
|
pub type UserList = HashMap<String, String>;
|
||||||
|
static USER_LIST: Lazy<ArcSwap<UserList>> = Lazy::new(|| ArcSwap::from_pointee(HashMap::new()));
|
||||||
|
|
||||||
|
pub fn get_user_list() -> Guard<Arc<UserList>> {
|
||||||
|
USER_LIST.load()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse the user list.
|
||||||
|
pub async fn parse(path: &str) -> Result<(), Error> {
|
||||||
|
let mut contents = String::new();
|
||||||
|
let mut file = match File::open(path).await {
|
||||||
|
Ok(file) => file,
|
||||||
|
Err(err) => {
|
||||||
|
error!("Could not open '{}': {}", path, err.to_string());
|
||||||
|
return Err(Error::BadConfig);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match file.read_to_string(&mut contents).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(err) => {
|
||||||
|
error!("Could not read config file: {}", err.to_string());
|
||||||
|
return Err(Error::BadConfig);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let map: HashMap<String, String> = serde_json::from_str(&contents).expect("JSON was not well-formatted");
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
USER_LIST.store(Arc::new(map.clone()));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_config() {
|
||||||
|
parse("userlist.json").await.unwrap();
|
||||||
|
assert_eq!(get_user_list()["sven"], "clear_text_password");
|
||||||
|
assert_eq!(get_user_list()["sharding_user"], "sharding_user");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,28 +0,0 @@
|
|||||||
|
|
||||||
-- \setrandom aid 1 :naccounts
|
|
||||||
\set aid random(1, 100000)
|
|
||||||
-- \setrandom bid 1 :nbranches
|
|
||||||
\set bid random(1, 100000)
|
|
||||||
-- \setrandom tid 1 :ntellers
|
|
||||||
\set tid random(1, 100000)
|
|
||||||
-- \setrandom delta -5000 5000
|
|
||||||
\set delta random(-5000,5000)
|
|
||||||
|
|
||||||
\set shard random(0, 2)
|
|
||||||
|
|
||||||
SET SHARD TO :shard;
|
|
||||||
|
|
||||||
BEGIN;
|
|
||||||
|
|
||||||
UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;
|
|
||||||
|
|
||||||
SELECT abalance FROM pgbench_accounts WHERE aid = :aid;
|
|
||||||
|
|
||||||
UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;
|
|
||||||
|
|
||||||
UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;
|
|
||||||
|
|
||||||
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);
|
|
||||||
|
|
||||||
END;
|
|
||||||
|
|
||||||
Reference in New Issue
Block a user