mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 01:16:30 +00:00
Compare commits
5 Commits
mostafa_we
...
levkk-log-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
61b9756ded | ||
|
|
2cd9e15849 | ||
|
|
fd57fae280 | ||
|
|
a460a645f5 | ||
|
|
f7d33fba7a |
124
src/stats.rs
124
src/stats.rs
@@ -4,6 +4,7 @@ use log::{error, info, trace};
|
||||
use once_cell::sync::Lazy;
|
||||
use parking_lot::Mutex;
|
||||
use std::collections::HashMap;
|
||||
use std::time::SystemTime;
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
|
||||
@@ -42,6 +43,26 @@ enum EventName {
|
||||
UpdateAverages,
|
||||
}
|
||||
|
||||
/// Send an event via the channel and log
|
||||
/// an error if it fails.
|
||||
fn send(tx: &Sender<Event>, event: Event) {
|
||||
let name = event.name;
|
||||
let result = tx.try_send(event);
|
||||
|
||||
match result {
|
||||
Ok(_) => trace!(
|
||||
"{:?} event reported successfully, capacity: {}",
|
||||
name,
|
||||
tx.capacity()
|
||||
),
|
||||
|
||||
Err(err) => match err {
|
||||
TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name),
|
||||
TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/// Event data sent to the collector
|
||||
/// from clients and servers.
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -80,25 +101,6 @@ impl Reporter {
|
||||
Reporter { tx: tx }
|
||||
}
|
||||
|
||||
/// Send statistics to the task keeping track of stats.
|
||||
fn send(&self, event: Event) {
|
||||
let name = event.name;
|
||||
let result = self.tx.try_send(event);
|
||||
|
||||
match result {
|
||||
Ok(_) => trace!(
|
||||
"{:?} event reported successfully, capacity: {}",
|
||||
name,
|
||||
self.tx.capacity()
|
||||
),
|
||||
|
||||
Err(err) => match err {
|
||||
TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name),
|
||||
TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/// Report a query executed by a client against
|
||||
/// a server identified by the `address_id`.
|
||||
pub fn query(&self, process_id: i32, address_id: usize) {
|
||||
@@ -109,7 +111,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event);
|
||||
send(&self.tx, event);
|
||||
}
|
||||
|
||||
/// Report a transaction executed by a client against
|
||||
@@ -122,7 +124,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Report data sent to a server identified by `address_id`.
|
||||
@@ -135,7 +137,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Report data received from a server identified by `address_id`.
|
||||
@@ -148,7 +150,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Time spent waiting to get a healthy connection from the pool
|
||||
@@ -162,7 +164,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Reports a client identified by `process_id` waiting for a connection
|
||||
@@ -175,7 +177,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Reports a client identified by `process_id` is done waiting for a connection
|
||||
@@ -188,7 +190,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Reports a client identified by `process_id` is done querying the server
|
||||
@@ -201,7 +203,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Reports a client identified by `process_id` is disconecting from the pooler.
|
||||
@@ -214,7 +216,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Reports a server connection identified by `process_id` for
|
||||
@@ -228,7 +230,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Reports a server connection identified by `process_id` for
|
||||
@@ -242,7 +244,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Reports a server connection identified by `process_id` for
|
||||
@@ -256,7 +258,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Reports a server connection identified by `process_id` for
|
||||
@@ -270,7 +272,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
|
||||
/// Reports a server connection identified by `process_id` is disconecting from the pooler.
|
||||
@@ -283,7 +285,7 @@ impl Reporter {
|
||||
address_id: address_id,
|
||||
};
|
||||
|
||||
self.send(event)
|
||||
send(&self.tx, event)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -345,6 +347,9 @@ impl Collector {
|
||||
// Track which state the client and server are at any given time.
|
||||
let mut client_server_states: HashMap<usize, HashMap<i32, EventName>> = HashMap::new();
|
||||
|
||||
// Average update times
|
||||
let mut last_updated_avg: HashMap<usize, SystemTime> = HashMap::new();
|
||||
|
||||
// Flush stats to StatsD and calculate averages every 15 seconds.
|
||||
let tx = self.tx.clone();
|
||||
tokio::task::spawn(async move {
|
||||
@@ -354,12 +359,15 @@ impl Collector {
|
||||
interval.tick().await;
|
||||
let address_count = get_number_of_addresses();
|
||||
for address_id in 0..address_count {
|
||||
let _ = tx.try_send(Event {
|
||||
name: EventName::UpdateStats,
|
||||
value: 0,
|
||||
process_id: -1,
|
||||
address_id: address_id,
|
||||
});
|
||||
send(
|
||||
&tx,
|
||||
Event {
|
||||
name: EventName::UpdateStats,
|
||||
value: 0,
|
||||
process_id: -1,
|
||||
address_id: address_id,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -372,12 +380,15 @@ impl Collector {
|
||||
interval.tick().await;
|
||||
let address_count = get_number_of_addresses();
|
||||
for address_id in 0..address_count {
|
||||
let _ = tx.try_send(Event {
|
||||
name: EventName::UpdateAverages,
|
||||
value: 0,
|
||||
process_id: -1,
|
||||
address_id: address_id,
|
||||
});
|
||||
send(
|
||||
&tx,
|
||||
Event {
|
||||
name: EventName::UpdateAverages,
|
||||
value: 0,
|
||||
process_id: -1,
|
||||
address_id: address_id,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -399,6 +410,9 @@ impl Collector {
|
||||
.entry(stat.address_id)
|
||||
.or_insert(HashMap::new());
|
||||
let old_stats = old_stats.entry(stat.address_id).or_insert(HashMap::new());
|
||||
let last_updated_avg = last_updated_avg
|
||||
.entry(stat.address_id)
|
||||
.or_insert(SystemTime::now());
|
||||
|
||||
// Some are counters, some are gauges...
|
||||
match stat.name {
|
||||
@@ -524,6 +538,24 @@ impl Collector {
|
||||
}
|
||||
|
||||
EventName::UpdateAverages => {
|
||||
let elapsed = match last_updated_avg.elapsed() {
|
||||
Ok(elapsed) => elapsed.as_secs(),
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Could not get elapsed time, averages may be incorrect: {:?}",
|
||||
err
|
||||
);
|
||||
STAT_PERIOD / 1_000
|
||||
}
|
||||
} as i64;
|
||||
|
||||
*last_updated_avg = SystemTime::now();
|
||||
|
||||
// Tokio triggers the interval on first tick and then sleeps.
|
||||
if elapsed == 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Calculate averages
|
||||
for stat in &[
|
||||
"avg_query_count",
|
||||
@@ -541,7 +573,7 @@ impl Collector {
|
||||
|
||||
let old_value = old_stats.entry(total_name.clone()).or_insert(0);
|
||||
let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned();
|
||||
let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second
|
||||
let avg = (new_value - *old_value) / elapsed; // Avg / second
|
||||
|
||||
stats.insert(stat, avg);
|
||||
*old_value = new_value;
|
||||
|
||||
Reference in New Issue
Block a user