mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-28 11:16:29 +00:00
Compare commits
5 Commits
levkk-drop
...
levkk-log-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
61b9756ded | ||
|
|
2cd9e15849 | ||
|
|
fd57fae280 | ||
|
|
a460a645f5 | ||
|
|
f7d33fba7a |
122
src/client.rs
122
src/client.rs
@@ -667,6 +667,7 @@ where
|
|||||||
.client_disconnecting(self.process_id, last_address_id);
|
.client_disconnecting(self.process_id, last_address_id);
|
||||||
}
|
}
|
||||||
self.stats.client_active(self.process_id, address.id);
|
self.stats.client_active(self.process_id, address.id);
|
||||||
|
self.stats.server_active(server.process_id(), address.id);
|
||||||
|
|
||||||
self.last_address_id = Some(address.id);
|
self.last_address_id = Some(address.id);
|
||||||
self.last_server_id = Some(server.process_id());
|
self.last_server_id = Some(server.process_id());
|
||||||
@@ -730,16 +731,44 @@ where
|
|||||||
'Q' => {
|
'Q' => {
|
||||||
debug!("Sending query to server");
|
debug!("Sending query to server");
|
||||||
|
|
||||||
self.send_and_receive_loop(
|
self.send_server_message(
|
||||||
code,
|
|
||||||
original,
|
|
||||||
server,
|
server,
|
||||||
|
original,
|
||||||
&address,
|
&address,
|
||||||
query_router.shard(),
|
query_router.shard(),
|
||||||
&pool,
|
&pool,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
// Read all data the server has to offer, which can be multiple messages
|
||||||
|
// buffered in 8196 bytes chunks.
|
||||||
|
loop {
|
||||||
|
let response = self
|
||||||
|
.receive_server_message(
|
||||||
|
server,
|
||||||
|
&address,
|
||||||
|
query_router.shard(),
|
||||||
|
&pool,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Send server reply to the client.
|
||||||
|
match write_all_half(&mut self.write, response).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(err) => {
|
||||||
|
server.mark_bad();
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if !server.is_data_available() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Report query executed statistics.
|
||||||
|
self.stats.query(self.process_id, address.id);
|
||||||
|
|
||||||
if !server.in_transaction() {
|
if !server.in_transaction() {
|
||||||
// Report transaction executed statistics.
|
// Report transaction executed statistics.
|
||||||
self.stats.transaction(self.process_id, address.id);
|
self.stats.transaction(self.process_id, address.id);
|
||||||
@@ -747,6 +776,7 @@ where
|
|||||||
// Release server back to the pool if we are in transaction mode.
|
// Release server back to the pool if we are in transaction mode.
|
||||||
// If we are in session mode, we keep the server until the client disconnects.
|
// If we are in session mode, we keep the server until the client disconnects.
|
||||||
if self.transaction_mode {
|
if self.transaction_mode {
|
||||||
|
self.stats.server_idle(server.process_id(), address.id);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -759,13 +789,9 @@ where
|
|||||||
// Pgbouncer closes the connection which leads to
|
// Pgbouncer closes the connection which leads to
|
||||||
// connection thrashing when clients misbehave.
|
// connection thrashing when clients misbehave.
|
||||||
if server.in_transaction() {
|
if server.in_transaction() {
|
||||||
// server.query("ROLLBACK").await?;
|
server.query("ROLLBACK").await?;
|
||||||
// server.query("DISCARD ALL").await?;
|
server.query("DISCARD ALL").await?;
|
||||||
// server.set_name("pgcat").await?;
|
server.set_name("pgcat").await?;
|
||||||
|
|
||||||
// TODO: Figure out a clever way to ensure
|
|
||||||
// the server has no more messages for us.
|
|
||||||
server.mark_bad();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
self.release();
|
self.release();
|
||||||
@@ -804,10 +830,9 @@ where
|
|||||||
|
|
||||||
self.buffer.put(&original[..]);
|
self.buffer.put(&original[..]);
|
||||||
|
|
||||||
self.send_and_receive_loop(
|
self.send_server_message(
|
||||||
code,
|
|
||||||
self.buffer.clone(),
|
|
||||||
server,
|
server,
|
||||||
|
self.buffer.clone(),
|
||||||
&address,
|
&address,
|
||||||
query_router.shard(),
|
query_router.shard(),
|
||||||
&pool,
|
&pool,
|
||||||
@@ -816,12 +841,41 @@ where
|
|||||||
|
|
||||||
self.buffer.clear();
|
self.buffer.clear();
|
||||||
|
|
||||||
|
// Read all data the server has to offer, which can be multiple messages
|
||||||
|
// buffered in 8196 bytes chunks.
|
||||||
|
loop {
|
||||||
|
let response = self
|
||||||
|
.receive_server_message(
|
||||||
|
server,
|
||||||
|
&address,
|
||||||
|
query_router.shard(),
|
||||||
|
&pool,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
match write_all_half(&mut self.write, response).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(err) => {
|
||||||
|
server.mark_bad();
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if !server.is_data_available() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Report query executed statistics.
|
||||||
|
self.stats.query(self.process_id, address.id);
|
||||||
|
|
||||||
if !server.in_transaction() {
|
if !server.in_transaction() {
|
||||||
self.stats.transaction(self.process_id, address.id);
|
self.stats.transaction(self.process_id, address.id);
|
||||||
|
|
||||||
// Release server back to the pool if we are in transaction mode.
|
// Release server back to the pool if we are in transaction mode.
|
||||||
// If we are in session mode, we keep the server until the client disconnects.
|
// If we are in session mode, we keep the server until the client disconnects.
|
||||||
if self.transaction_mode {
|
if self.transaction_mode {
|
||||||
|
self.stats.server_idle(server.process_id(), address.id);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -871,6 +925,7 @@ where
|
|||||||
// Release server back to the pool if we are in transaction mode.
|
// Release server back to the pool if we are in transaction mode.
|
||||||
// If we are in session mode, we keep the server until the client disconnects.
|
// If we are in session mode, we keep the server until the client disconnects.
|
||||||
if self.transaction_mode {
|
if self.transaction_mode {
|
||||||
|
self.stats.server_idle(server.process_id(), address.id);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -886,7 +941,6 @@ where
|
|||||||
|
|
||||||
// The server is no longer bound to us, we can't cancel it's queries anymore.
|
// The server is no longer bound to us, we can't cancel it's queries anymore.
|
||||||
debug!("Releasing server back into the pool");
|
debug!("Releasing server back into the pool");
|
||||||
self.stats.server_idle(server.process_id(), address.id);
|
|
||||||
self.release();
|
self.release();
|
||||||
self.stats.client_idle(self.process_id, address.id);
|
self.stats.client_idle(self.process_id, address.id);
|
||||||
}
|
}
|
||||||
@@ -898,46 +952,6 @@ where
|
|||||||
guard.remove(&(self.process_id, self.secret_key));
|
guard.remove(&(self.process_id, self.secret_key));
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_and_receive_loop(
|
|
||||||
&mut self,
|
|
||||||
code: char,
|
|
||||||
message: BytesMut,
|
|
||||||
server: &mut Server,
|
|
||||||
address: &Address,
|
|
||||||
shard: usize,
|
|
||||||
pool: &ConnectionPool,
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
debug!("Sending {} to server", code);
|
|
||||||
|
|
||||||
self.send_server_message(server, message, &address, shard, &pool)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
// Read all data the server has to offer, which can be multiple messages
|
|
||||||
// buffered in 8196 bytes chunks.
|
|
||||||
loop {
|
|
||||||
let response = self
|
|
||||||
.receive_server_message(server, &address, shard, &pool)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
match write_all_half(&mut self.write, response).await {
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(err) => {
|
|
||||||
server.mark_bad();
|
|
||||||
return Err(err);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if !server.is_data_available() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Report query executed statistics.
|
|
||||||
self.stats.query(self.process_id, address.id);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn send_server_message(
|
async fn send_server_message(
|
||||||
&self,
|
&self,
|
||||||
server: &mut Server,
|
server: &mut Server,
|
||||||
|
|||||||
@@ -64,8 +64,6 @@ pub struct Address {
|
|||||||
pub database: String,
|
pub database: String,
|
||||||
pub role: Role,
|
pub role: Role,
|
||||||
pub replica_number: usize,
|
pub replica_number: usize,
|
||||||
pub username: String,
|
|
||||||
pub poolname: String,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Address {
|
impl Default for Address {
|
||||||
@@ -78,8 +76,6 @@ impl Default for Address {
|
|||||||
replica_number: 0,
|
replica_number: 0,
|
||||||
database: String::from("database"),
|
database: String::from("database"),
|
||||||
role: Role::Replica,
|
role: Role::Replica,
|
||||||
username: String::from("username"),
|
|
||||||
poolname: String::from("poolname"),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -88,11 +84,11 @@ impl Address {
|
|||||||
/// Address name (aka database) used in `SHOW STATS`, `SHOW DATABASES`, and `SHOW POOLS`.
|
/// Address name (aka database) used in `SHOW STATS`, `SHOW DATABASES`, and `SHOW POOLS`.
|
||||||
pub fn name(&self) -> String {
|
pub fn name(&self) -> String {
|
||||||
match self.role {
|
match self.role {
|
||||||
Role::Primary => format!("{}_shard_{}_primary", self.poolname, self.shard),
|
Role::Primary => format!("{}_shard_{}_primary", self.database, self.shard),
|
||||||
|
|
||||||
Role::Replica => format!(
|
Role::Replica => format!(
|
||||||
"{}_shard_{}_replica_{}",
|
"{}_shard_{}_replica_{}",
|
||||||
self.poolname, self.shard, self.replica_number
|
self.database, self.shard, self.replica_number
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -114,14 +114,12 @@ impl ConnectionPool {
|
|||||||
|
|
||||||
let address = Address {
|
let address = Address {
|
||||||
id: address_id,
|
id: address_id,
|
||||||
database: shard.database.clone(),
|
database: pool_name.clone(),
|
||||||
host: server.0.clone(),
|
host: server.0.clone(),
|
||||||
port: server.1.to_string(),
|
port: server.1.to_string(),
|
||||||
role: role,
|
role: role,
|
||||||
replica_number,
|
replica_number,
|
||||||
shard: shard_idx.parse::<usize>().unwrap(),
|
shard: shard_idx.parse::<usize>().unwrap(),
|
||||||
username: user_info.username.clone(),
|
|
||||||
poolname: pool_name.clone(),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
address_id += 1;
|
address_id += 1;
|
||||||
@@ -335,7 +333,7 @@ impl ConnectionPool {
|
|||||||
if !require_healthcheck {
|
if !require_healthcheck {
|
||||||
self.stats
|
self.stats
|
||||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||||
self.stats.server_active(conn.process_id(), address.id);
|
self.stats.server_idle(conn.process_id(), address.id);
|
||||||
return Ok((conn, address.clone()));
|
return Ok((conn, address.clone()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -354,7 +352,7 @@ impl ConnectionPool {
|
|||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
self.stats
|
self.stats
|
||||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||||
self.stats.server_active(conn.process_id(), address.id);
|
self.stats.server_idle(conn.process_id(), address.id);
|
||||||
return Ok((conn, address.clone()));
|
return Ok((conn, address.clone()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
124
src/stats.rs
124
src/stats.rs
@@ -4,6 +4,7 @@ use log::{error, info, trace};
|
|||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::time::SystemTime;
|
||||||
use tokio::sync::mpsc::error::TrySendError;
|
use tokio::sync::mpsc::error::TrySendError;
|
||||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||||
|
|
||||||
@@ -42,6 +43,26 @@ enum EventName {
|
|||||||
UpdateAverages,
|
UpdateAverages,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Send an event via the channel and log
|
||||||
|
/// an error if it fails.
|
||||||
|
fn send(tx: &Sender<Event>, event: Event) {
|
||||||
|
let name = event.name;
|
||||||
|
let result = tx.try_send(event);
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(_) => trace!(
|
||||||
|
"{:?} event reported successfully, capacity: {}",
|
||||||
|
name,
|
||||||
|
tx.capacity()
|
||||||
|
),
|
||||||
|
|
||||||
|
Err(err) => match err {
|
||||||
|
TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name),
|
||||||
|
TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
/// Event data sent to the collector
|
/// Event data sent to the collector
|
||||||
/// from clients and servers.
|
/// from clients and servers.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@@ -80,25 +101,6 @@ impl Reporter {
|
|||||||
Reporter { tx: tx }
|
Reporter { tx: tx }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send statistics to the task keeping track of stats.
|
|
||||||
fn send(&self, event: Event) {
|
|
||||||
let name = event.name;
|
|
||||||
let result = self.tx.try_send(event);
|
|
||||||
|
|
||||||
match result {
|
|
||||||
Ok(_) => trace!(
|
|
||||||
"{:?} event reported successfully, capacity: {}",
|
|
||||||
name,
|
|
||||||
self.tx.capacity()
|
|
||||||
),
|
|
||||||
|
|
||||||
Err(err) => match err {
|
|
||||||
TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name),
|
|
||||||
TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name),
|
|
||||||
},
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Report a query executed by a client against
|
/// Report a query executed by a client against
|
||||||
/// a server identified by the `address_id`.
|
/// a server identified by the `address_id`.
|
||||||
pub fn query(&self, process_id: i32, address_id: usize) {
|
pub fn query(&self, process_id: i32, address_id: usize) {
|
||||||
@@ -109,7 +111,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event);
|
send(&self.tx, event);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report a transaction executed by a client against
|
/// Report a transaction executed by a client against
|
||||||
@@ -122,7 +124,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report data sent to a server identified by `address_id`.
|
/// Report data sent to a server identified by `address_id`.
|
||||||
@@ -135,7 +137,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report data received from a server identified by `address_id`.
|
/// Report data received from a server identified by `address_id`.
|
||||||
@@ -148,7 +150,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Time spent waiting to get a healthy connection from the pool
|
/// Time spent waiting to get a healthy connection from the pool
|
||||||
@@ -162,7 +164,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client identified by `process_id` waiting for a connection
|
/// Reports a client identified by `process_id` waiting for a connection
|
||||||
@@ -175,7 +177,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client identified by `process_id` is done waiting for a connection
|
/// Reports a client identified by `process_id` is done waiting for a connection
|
||||||
@@ -188,7 +190,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client identified by `process_id` is done querying the server
|
/// Reports a client identified by `process_id` is done querying the server
|
||||||
@@ -201,7 +203,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client identified by `process_id` is disconecting from the pooler.
|
/// Reports a client identified by `process_id` is disconecting from the pooler.
|
||||||
@@ -214,7 +216,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` for
|
/// Reports a server connection identified by `process_id` for
|
||||||
@@ -228,7 +230,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` for
|
/// Reports a server connection identified by `process_id` for
|
||||||
@@ -242,7 +244,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` for
|
/// Reports a server connection identified by `process_id` for
|
||||||
@@ -256,7 +258,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` for
|
/// Reports a server connection identified by `process_id` for
|
||||||
@@ -270,7 +272,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` is disconecting from the pooler.
|
/// Reports a server connection identified by `process_id` is disconecting from the pooler.
|
||||||
@@ -283,7 +285,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -345,6 +347,9 @@ impl Collector {
|
|||||||
// Track which state the client and server are at any given time.
|
// Track which state the client and server are at any given time.
|
||||||
let mut client_server_states: HashMap<usize, HashMap<i32, EventName>> = HashMap::new();
|
let mut client_server_states: HashMap<usize, HashMap<i32, EventName>> = HashMap::new();
|
||||||
|
|
||||||
|
// Average update times
|
||||||
|
let mut last_updated_avg: HashMap<usize, SystemTime> = HashMap::new();
|
||||||
|
|
||||||
// Flush stats to StatsD and calculate averages every 15 seconds.
|
// Flush stats to StatsD and calculate averages every 15 seconds.
|
||||||
let tx = self.tx.clone();
|
let tx = self.tx.clone();
|
||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
@@ -354,12 +359,15 @@ impl Collector {
|
|||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
let address_count = get_number_of_addresses();
|
let address_count = get_number_of_addresses();
|
||||||
for address_id in 0..address_count {
|
for address_id in 0..address_count {
|
||||||
let _ = tx.try_send(Event {
|
send(
|
||||||
name: EventName::UpdateStats,
|
&tx,
|
||||||
value: 0,
|
Event {
|
||||||
process_id: -1,
|
name: EventName::UpdateStats,
|
||||||
address_id: address_id,
|
value: 0,
|
||||||
});
|
process_id: -1,
|
||||||
|
address_id: address_id,
|
||||||
|
},
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -372,12 +380,15 @@ impl Collector {
|
|||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
let address_count = get_number_of_addresses();
|
let address_count = get_number_of_addresses();
|
||||||
for address_id in 0..address_count {
|
for address_id in 0..address_count {
|
||||||
let _ = tx.try_send(Event {
|
send(
|
||||||
name: EventName::UpdateAverages,
|
&tx,
|
||||||
value: 0,
|
Event {
|
||||||
process_id: -1,
|
name: EventName::UpdateAverages,
|
||||||
address_id: address_id,
|
value: 0,
|
||||||
});
|
process_id: -1,
|
||||||
|
address_id: address_id,
|
||||||
|
},
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -399,6 +410,9 @@ impl Collector {
|
|||||||
.entry(stat.address_id)
|
.entry(stat.address_id)
|
||||||
.or_insert(HashMap::new());
|
.or_insert(HashMap::new());
|
||||||
let old_stats = old_stats.entry(stat.address_id).or_insert(HashMap::new());
|
let old_stats = old_stats.entry(stat.address_id).or_insert(HashMap::new());
|
||||||
|
let last_updated_avg = last_updated_avg
|
||||||
|
.entry(stat.address_id)
|
||||||
|
.or_insert(SystemTime::now());
|
||||||
|
|
||||||
// Some are counters, some are gauges...
|
// Some are counters, some are gauges...
|
||||||
match stat.name {
|
match stat.name {
|
||||||
@@ -524,6 +538,24 @@ impl Collector {
|
|||||||
}
|
}
|
||||||
|
|
||||||
EventName::UpdateAverages => {
|
EventName::UpdateAverages => {
|
||||||
|
let elapsed = match last_updated_avg.elapsed() {
|
||||||
|
Ok(elapsed) => elapsed.as_secs(),
|
||||||
|
Err(err) => {
|
||||||
|
error!(
|
||||||
|
"Could not get elapsed time, averages may be incorrect: {:?}",
|
||||||
|
err
|
||||||
|
);
|
||||||
|
STAT_PERIOD / 1_000
|
||||||
|
}
|
||||||
|
} as i64;
|
||||||
|
|
||||||
|
*last_updated_avg = SystemTime::now();
|
||||||
|
|
||||||
|
// Tokio triggers the interval on first tick and then sleeps.
|
||||||
|
if elapsed == 0 {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// Calculate averages
|
// Calculate averages
|
||||||
for stat in &[
|
for stat in &[
|
||||||
"avg_query_count",
|
"avg_query_count",
|
||||||
@@ -541,7 +573,7 @@ impl Collector {
|
|||||||
|
|
||||||
let old_value = old_stats.entry(total_name.clone()).or_insert(0);
|
let old_value = old_stats.entry(total_name.clone()).or_insert(0);
|
||||||
let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned();
|
let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned();
|
||||||
let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second
|
let avg = (new_value - *old_value) / elapsed; // Avg / second
|
||||||
|
|
||||||
stats.insert(stat, avg);
|
stats.insert(stat, avg);
|
||||||
*old_value = new_value;
|
*old_value = new_value;
|
||||||
|
|||||||
Reference in New Issue
Block a user