mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-26 10:26:30 +00:00
Fix time based average stats (#442)
* keep track of current stats and zero them after updating averages * Try tests * typo * remove commented test stuff * Avoid dividing by zero * Fix test * refactor, get rid of iterator. do it manually * trigger build * Fix
This commit is contained in:
@@ -113,6 +113,7 @@ impl Collector {
|
|||||||
for stats in server_stats.values() {
|
for stats in server_stats.values() {
|
||||||
if !stats.check_address_stat_average_is_updated_status() {
|
if !stats.check_address_stat_average_is_updated_status() {
|
||||||
stats.address_stats().update_averages();
|
stats.address_stats().update_averages();
|
||||||
|
stats.address_stats().reset_current_counts();
|
||||||
stats.set_address_stat_average_is_updated_status(true);
|
stats.set_address_stat_average_is_updated_status(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,35 +1,26 @@
|
|||||||
use std::sync::atomic::*;
|
use std::sync::atomic::*;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Default)]
|
||||||
|
struct AddressStatFields {
|
||||||
|
xact_count: Arc<AtomicU64>,
|
||||||
|
query_count: Arc<AtomicU64>,
|
||||||
|
bytes_received: Arc<AtomicU64>,
|
||||||
|
bytes_sent: Arc<AtomicU64>,
|
||||||
|
xact_time: Arc<AtomicU64>,
|
||||||
|
query_time: Arc<AtomicU64>,
|
||||||
|
wait_time: Arc<AtomicU64>,
|
||||||
|
errors: Arc<AtomicU64>,
|
||||||
|
}
|
||||||
|
|
||||||
/// Internal address stats
|
/// Internal address stats
|
||||||
#[derive(Debug, Clone, Default)]
|
#[derive(Debug, Clone, Default)]
|
||||||
pub struct AddressStats {
|
pub struct AddressStats {
|
||||||
pub total_xact_count: Arc<AtomicU64>,
|
total: AddressStatFields,
|
||||||
pub total_query_count: Arc<AtomicU64>,
|
|
||||||
pub total_received: Arc<AtomicU64>,
|
|
||||||
pub total_sent: Arc<AtomicU64>,
|
|
||||||
pub total_xact_time: Arc<AtomicU64>,
|
|
||||||
pub total_query_time: Arc<AtomicU64>,
|
|
||||||
pub total_wait_time: Arc<AtomicU64>,
|
|
||||||
pub total_errors: Arc<AtomicU64>,
|
|
||||||
|
|
||||||
pub old_total_xact_count: Arc<AtomicU64>,
|
current: AddressStatFields,
|
||||||
pub old_total_query_count: Arc<AtomicU64>,
|
|
||||||
pub old_total_received: Arc<AtomicU64>,
|
|
||||||
pub old_total_sent: Arc<AtomicU64>,
|
|
||||||
pub old_total_xact_time: Arc<AtomicU64>,
|
|
||||||
pub old_total_query_time: Arc<AtomicU64>,
|
|
||||||
pub old_total_wait_time: Arc<AtomicU64>,
|
|
||||||
pub old_total_errors: Arc<AtomicU64>,
|
|
||||||
|
|
||||||
pub avg_query_count: Arc<AtomicU64>,
|
averages: AddressStatFields,
|
||||||
pub avg_query_time: Arc<AtomicU64>,
|
|
||||||
pub avg_recv: Arc<AtomicU64>,
|
|
||||||
pub avg_sent: Arc<AtomicU64>,
|
|
||||||
pub avg_errors: Arc<AtomicU64>,
|
|
||||||
pub avg_xact_time: Arc<AtomicU64>,
|
|
||||||
pub avg_xact_count: Arc<AtomicU64>,
|
|
||||||
pub avg_wait_time: Arc<AtomicU64>,
|
|
||||||
|
|
||||||
// Determines if the averages have been updated since the last time they were reported
|
// Determines if the averages have been updated since the last time they were reported
|
||||||
pub averages_updated: Arc<AtomicBool>,
|
pub averages_updated: Arc<AtomicBool>,
|
||||||
@@ -43,67 +34,67 @@ impl IntoIterator for AddressStats {
|
|||||||
vec![
|
vec![
|
||||||
(
|
(
|
||||||
"total_xact_count".to_string(),
|
"total_xact_count".to_string(),
|
||||||
self.total_xact_count.load(Ordering::Relaxed),
|
self.total.xact_count.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"total_query_count".to_string(),
|
"total_query_count".to_string(),
|
||||||
self.total_query_count.load(Ordering::Relaxed),
|
self.total.query_count.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"total_received".to_string(),
|
"total_received".to_string(),
|
||||||
self.total_received.load(Ordering::Relaxed),
|
self.total.bytes_received.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"total_sent".to_string(),
|
"total_sent".to_string(),
|
||||||
self.total_sent.load(Ordering::Relaxed),
|
self.total.bytes_sent.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"total_xact_time".to_string(),
|
"total_xact_time".to_string(),
|
||||||
self.total_xact_time.load(Ordering::Relaxed),
|
self.total.xact_time.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"total_query_time".to_string(),
|
"total_query_time".to_string(),
|
||||||
self.total_query_time.load(Ordering::Relaxed),
|
self.total.query_time.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"total_wait_time".to_string(),
|
"total_wait_time".to_string(),
|
||||||
self.total_wait_time.load(Ordering::Relaxed),
|
self.total.wait_time.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"total_errors".to_string(),
|
"total_errors".to_string(),
|
||||||
self.total_errors.load(Ordering::Relaxed),
|
self.total.errors.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"avg_xact_count".to_string(),
|
"avg_xact_count".to_string(),
|
||||||
self.avg_xact_count.load(Ordering::Relaxed),
|
self.averages.xact_count.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"avg_query_count".to_string(),
|
"avg_query_count".to_string(),
|
||||||
self.avg_query_count.load(Ordering::Relaxed),
|
self.averages.query_count.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"avg_recv".to_string(),
|
"avg_recv".to_string(),
|
||||||
self.avg_recv.load(Ordering::Relaxed),
|
self.averages.bytes_received.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"avg_sent".to_string(),
|
"avg_sent".to_string(),
|
||||||
self.avg_sent.load(Ordering::Relaxed),
|
self.averages.bytes_sent.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"avg_errors".to_string(),
|
"avg_errors".to_string(),
|
||||||
self.avg_errors.load(Ordering::Relaxed),
|
self.averages.errors.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"avg_xact_time".to_string(),
|
"avg_xact_time".to_string(),
|
||||||
self.avg_xact_time.load(Ordering::Relaxed),
|
self.averages.xact_time.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"avg_query_time".to_string(),
|
"avg_query_time".to_string(),
|
||||||
self.avg_query_time.load(Ordering::Relaxed),
|
self.averages.query_time.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"avg_wait_time".to_string(),
|
"avg_wait_time".to_string(),
|
||||||
self.avg_wait_time.load(Ordering::Relaxed),
|
self.averages.wait_time.load(Ordering::Relaxed),
|
||||||
),
|
),
|
||||||
]
|
]
|
||||||
.into_iter()
|
.into_iter()
|
||||||
@@ -111,21 +102,120 @@ impl IntoIterator for AddressStats {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl AddressStats {
|
impl AddressStats {
|
||||||
|
pub fn xact_count_add(&self) {
|
||||||
|
self.total.xact_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
self.current.xact_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn query_count_add(&self) {
|
||||||
|
self.total.query_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
self.current.query_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn bytes_received_add(&self, bytes: u64) {
|
||||||
|
self.total
|
||||||
|
.bytes_received
|
||||||
|
.fetch_add(bytes, Ordering::Relaxed);
|
||||||
|
self.current
|
||||||
|
.bytes_received
|
||||||
|
.fetch_add(bytes, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn bytes_sent_add(&self, bytes: u64) {
|
||||||
|
self.total.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
|
||||||
|
self.current.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn xact_time_add(&self, time: u64) {
|
||||||
|
self.total.xact_time.fetch_add(time, Ordering::Relaxed);
|
||||||
|
self.current.xact_time.fetch_add(time, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn query_time_add(&self, time: u64) {
|
||||||
|
self.total.query_time.fetch_add(time, Ordering::Relaxed);
|
||||||
|
self.current.query_time.fetch_add(time, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn wait_time_add(&self, time: u64) {
|
||||||
|
self.total.wait_time.fetch_add(time, Ordering::Relaxed);
|
||||||
|
self.current.wait_time.fetch_add(time, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn error(&self) {
|
pub fn error(&self) {
|
||||||
self.total_errors.fetch_add(1, Ordering::Relaxed);
|
self.total.errors.fetch_add(1, Ordering::Relaxed);
|
||||||
|
self.current.errors.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn update_averages(&self) {
|
pub fn update_averages(&self) {
|
||||||
let (totals, averages, old_totals) = self.fields_iterators();
|
let stat_period_per_second = crate::stats::STAT_PERIOD / 1_000;
|
||||||
for (total, average, old_total) in itertools::izip!(totals, averages, old_totals) {
|
|
||||||
let total_value = total.load(Ordering::Relaxed);
|
// xact_count
|
||||||
let old_total_value = old_total.load(Ordering::Relaxed);
|
let current_xact_count = self.current.xact_count.load(Ordering::Relaxed);
|
||||||
average.store(
|
let current_xact_time = self.current.xact_time.load(Ordering::Relaxed);
|
||||||
(total_value - old_total_value) / (crate::stats::STAT_PERIOD / 1_000),
|
self.averages.xact_count.store(
|
||||||
Ordering::Relaxed,
|
current_xact_count / stat_period_per_second,
|
||||||
); // Avg / second
|
Ordering::Relaxed,
|
||||||
old_total.store(total_value, Ordering::Relaxed);
|
);
|
||||||
|
if current_xact_count == 0 {
|
||||||
|
self.averages.xact_time.store(0, Ordering::Relaxed);
|
||||||
|
} else {
|
||||||
|
self.averages
|
||||||
|
.xact_time
|
||||||
|
.store(current_xact_time / current_xact_count, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// query_count
|
||||||
|
let current_query_count = self.current.query_count.load(Ordering::Relaxed);
|
||||||
|
let current_query_time = self.current.query_time.load(Ordering::Relaxed);
|
||||||
|
self.averages.query_count.store(
|
||||||
|
current_query_count / stat_period_per_second,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
);
|
||||||
|
if current_query_count == 0 {
|
||||||
|
self.averages.query_time.store(0, Ordering::Relaxed);
|
||||||
|
} else {
|
||||||
|
self.averages
|
||||||
|
.query_time
|
||||||
|
.store(current_query_time / current_query_count, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
// bytes_received
|
||||||
|
let current_bytes_received = self.current.bytes_received.load(Ordering::Relaxed);
|
||||||
|
self.averages.bytes_received.store(
|
||||||
|
current_bytes_received / stat_period_per_second,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
);
|
||||||
|
|
||||||
|
// bytes_sent
|
||||||
|
let current_bytes_sent = self.current.bytes_sent.load(Ordering::Relaxed);
|
||||||
|
self.averages.bytes_sent.store(
|
||||||
|
current_bytes_sent / stat_period_per_second,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
);
|
||||||
|
|
||||||
|
// wait_time
|
||||||
|
let current_wait_time = self.current.wait_time.load(Ordering::Relaxed);
|
||||||
|
self.averages.wait_time.store(
|
||||||
|
current_wait_time / stat_period_per_second,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
);
|
||||||
|
|
||||||
|
// errors
|
||||||
|
let current_errors = self.current.errors.load(Ordering::Relaxed);
|
||||||
|
self.averages
|
||||||
|
.errors
|
||||||
|
.store(current_errors / stat_period_per_second, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn reset_current_counts(&self) {
|
||||||
|
self.current.xact_count.store(0, Ordering::Relaxed);
|
||||||
|
self.current.xact_time.store(0, Ordering::Relaxed);
|
||||||
|
self.current.query_count.store(0, Ordering::Relaxed);
|
||||||
|
self.current.query_time.store(0, Ordering::Relaxed);
|
||||||
|
self.current.bytes_received.store(0, Ordering::Relaxed);
|
||||||
|
self.current.bytes_sent.store(0, Ordering::Relaxed);
|
||||||
|
self.current.wait_time.store(0, Ordering::Relaxed);
|
||||||
|
self.current.errors.store(0, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn populate_row(&self, row: &mut Vec<String>) {
|
pub fn populate_row(&self, row: &mut Vec<String>) {
|
||||||
@@ -133,43 +223,4 @@ impl AddressStats {
|
|||||||
row.push(value.to_string());
|
row.push(value.to_string());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn fields_iterators(
|
|
||||||
&self,
|
|
||||||
) -> (
|
|
||||||
Vec<Arc<AtomicU64>>,
|
|
||||||
Vec<Arc<AtomicU64>>,
|
|
||||||
Vec<Arc<AtomicU64>>,
|
|
||||||
) {
|
|
||||||
let mut totals: Vec<Arc<AtomicU64>> = Vec::new();
|
|
||||||
let mut averages: Vec<Arc<AtomicU64>> = Vec::new();
|
|
||||||
let mut old_totals: Vec<Arc<AtomicU64>> = Vec::new();
|
|
||||||
|
|
||||||
totals.push(self.total_xact_count.clone());
|
|
||||||
old_totals.push(self.old_total_xact_count.clone());
|
|
||||||
averages.push(self.avg_xact_count.clone());
|
|
||||||
totals.push(self.total_query_count.clone());
|
|
||||||
old_totals.push(self.old_total_query_count.clone());
|
|
||||||
averages.push(self.avg_query_count.clone());
|
|
||||||
totals.push(self.total_received.clone());
|
|
||||||
old_totals.push(self.old_total_received.clone());
|
|
||||||
averages.push(self.avg_recv.clone());
|
|
||||||
totals.push(self.total_sent.clone());
|
|
||||||
old_totals.push(self.old_total_sent.clone());
|
|
||||||
averages.push(self.avg_sent.clone());
|
|
||||||
totals.push(self.total_xact_time.clone());
|
|
||||||
old_totals.push(self.old_total_xact_time.clone());
|
|
||||||
averages.push(self.avg_xact_time.clone());
|
|
||||||
totals.push(self.total_query_time.clone());
|
|
||||||
old_totals.push(self.old_total_query_time.clone());
|
|
||||||
averages.push(self.avg_query_time.clone());
|
|
||||||
totals.push(self.total_wait_time.clone());
|
|
||||||
old_totals.push(self.old_total_wait_time.clone());
|
|
||||||
averages.push(self.avg_wait_time.clone());
|
|
||||||
totals.push(self.total_errors.clone());
|
|
||||||
old_totals.push(self.old_total_errors.clone());
|
|
||||||
averages.push(self.avg_errors.clone());
|
|
||||||
|
|
||||||
(totals, averages, old_totals)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -177,12 +177,9 @@ impl ServerStats {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn checkout_time(&self, microseconds: u64, application_name: String) {
|
pub fn checkout_time(&self, microseconds: u64, application_name: String) {
|
||||||
// Update server stats and address aggergation stats
|
// Update server stats and address aggregation stats
|
||||||
self.set_application(application_name);
|
self.set_application(application_name);
|
||||||
self.address
|
self.address.stats.wait_time_add(microseconds);
|
||||||
.stats
|
|
||||||
.total_wait_time
|
|
||||||
.fetch_add(microseconds, Ordering::Relaxed);
|
|
||||||
self.pool_stats
|
self.pool_stats
|
||||||
.maxwait
|
.maxwait
|
||||||
.fetch_max(microseconds, Ordering::Relaxed);
|
.fetch_max(microseconds, Ordering::Relaxed);
|
||||||
@@ -191,13 +188,8 @@ impl ServerStats {
|
|||||||
/// Report a query executed by a client against a server
|
/// Report a query executed by a client against a server
|
||||||
pub fn query(&self, milliseconds: u64, application_name: &str) {
|
pub fn query(&self, milliseconds: u64, application_name: &str) {
|
||||||
self.set_application(application_name.to_string());
|
self.set_application(application_name.to_string());
|
||||||
let address_stats = self.address_stats();
|
self.address.stats.query_count_add();
|
||||||
address_stats
|
self.address.stats.query_time_add(milliseconds);
|
||||||
.total_query_count
|
|
||||||
.fetch_add(1, Ordering::Relaxed);
|
|
||||||
address_stats
|
|
||||||
.total_query_time
|
|
||||||
.fetch_add(milliseconds, Ordering::Relaxed);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report a transaction executed by a client a server
|
/// Report a transaction executed by a client a server
|
||||||
@@ -208,29 +200,20 @@ impl ServerStats {
|
|||||||
self.set_application(application_name.to_string());
|
self.set_application(application_name.to_string());
|
||||||
|
|
||||||
self.transaction_count.fetch_add(1, Ordering::Relaxed);
|
self.transaction_count.fetch_add(1, Ordering::Relaxed);
|
||||||
self.address
|
self.address.stats.xact_count_add();
|
||||||
.stats
|
|
||||||
.total_xact_count
|
|
||||||
.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report data sent to a server
|
/// Report data sent to a server
|
||||||
pub fn data_sent(&self, amount_bytes: usize) {
|
pub fn data_sent(&self, amount_bytes: usize) {
|
||||||
self.bytes_sent
|
self.bytes_sent
|
||||||
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
|
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
|
||||||
self.address
|
self.address.stats.bytes_sent_add(amount_bytes as u64);
|
||||||
.stats
|
|
||||||
.total_sent
|
|
||||||
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report data received from a server
|
/// Report data received from a server
|
||||||
pub fn data_received(&self, amount_bytes: usize) {
|
pub fn data_received(&self, amount_bytes: usize) {
|
||||||
self.bytes_received
|
self.bytes_received
|
||||||
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
|
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
|
||||||
self.address
|
self.address.stats.bytes_received_add(amount_bytes as u64);
|
||||||
.stats
|
|
||||||
.total_received
|
|
||||||
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ describe "Admin" do
|
|||||||
results = admin_conn.async_exec("SHOW STATS")[0]
|
results = admin_conn.async_exec("SHOW STATS")[0]
|
||||||
admin_conn.close
|
admin_conn.close
|
||||||
expect(results["total_query_time"].to_i).to be_within(200).of(750)
|
expect(results["total_query_time"].to_i).to be_within(200).of(750)
|
||||||
expect(results["avg_query_time"].to_i).to be_within(20).of(50)
|
expect(results["avg_query_time"].to_i).to be_within(50).of(250)
|
||||||
|
|
||||||
expect(results["total_wait_time"].to_i).to_not eq(0)
|
expect(results["total_wait_time"].to_i).to_not eq(0)
|
||||||
expect(results["avg_wait_time"].to_i).to_not eq(0)
|
expect(results["avg_wait_time"].to_i).to_not eq(0)
|
||||||
|
|||||||
Reference in New Issue
Block a user