mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-27 18:56:30 +00:00
quick refactor
This commit is contained in:
89
src/stats.rs
89
src/stats.rs
@@ -42,6 +42,26 @@ enum EventName {
|
|||||||
UpdateAverages,
|
UpdateAverages,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Send an event via the channel and log
|
||||||
|
/// an error if it fails.
|
||||||
|
fn send(tx: &Sender<Event>, event: Event) {
|
||||||
|
let name = event.name;
|
||||||
|
let result = tx.try_send(event);
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(_) => trace!(
|
||||||
|
"{:?} event reported successfully, capacity: {}",
|
||||||
|
name,
|
||||||
|
tx.capacity()
|
||||||
|
),
|
||||||
|
|
||||||
|
Err(err) => match err {
|
||||||
|
TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name),
|
||||||
|
TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
/// Event data sent to the collector
|
/// Event data sent to the collector
|
||||||
/// from clients and servers.
|
/// from clients and servers.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@@ -80,25 +100,6 @@ impl Reporter {
|
|||||||
Reporter { tx: tx }
|
Reporter { tx: tx }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send statistics to the task keeping track of stats.
|
|
||||||
fn send(&self, event: Event) {
|
|
||||||
let name = event.name;
|
|
||||||
let result = self.tx.try_send(event);
|
|
||||||
|
|
||||||
match result {
|
|
||||||
Ok(_) => trace!(
|
|
||||||
"{:?} event reported successfully, capacity: {}",
|
|
||||||
name,
|
|
||||||
self.tx.capacity()
|
|
||||||
),
|
|
||||||
|
|
||||||
Err(err) => match err {
|
|
||||||
TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name),
|
|
||||||
TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name),
|
|
||||||
},
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Report a query executed by a client against
|
/// Report a query executed by a client against
|
||||||
/// a server identified by the `address_id`.
|
/// a server identified by the `address_id`.
|
||||||
pub fn query(&self, process_id: i32, address_id: usize) {
|
pub fn query(&self, process_id: i32, address_id: usize) {
|
||||||
@@ -109,7 +110,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event);
|
send(&self.tx, event);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report a transaction executed by a client against
|
/// Report a transaction executed by a client against
|
||||||
@@ -122,7 +123,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report data sent to a server identified by `address_id`.
|
/// Report data sent to a server identified by `address_id`.
|
||||||
@@ -135,7 +136,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report data received from a server identified by `address_id`.
|
/// Report data received from a server identified by `address_id`.
|
||||||
@@ -148,7 +149,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Time spent waiting to get a healthy connection from the pool
|
/// Time spent waiting to get a healthy connection from the pool
|
||||||
@@ -162,7 +163,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client identified by `process_id` waiting for a connection
|
/// Reports a client identified by `process_id` waiting for a connection
|
||||||
@@ -175,7 +176,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client identified by `process_id` is done waiting for a connection
|
/// Reports a client identified by `process_id` is done waiting for a connection
|
||||||
@@ -188,7 +189,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client identified by `process_id` is done querying the server
|
/// Reports a client identified by `process_id` is done querying the server
|
||||||
@@ -201,7 +202,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a client identified by `process_id` is disconecting from the pooler.
|
/// Reports a client identified by `process_id` is disconecting from the pooler.
|
||||||
@@ -214,7 +215,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` for
|
/// Reports a server connection identified by `process_id` for
|
||||||
@@ -228,7 +229,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` for
|
/// Reports a server connection identified by `process_id` for
|
||||||
@@ -242,7 +243,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` for
|
/// Reports a server connection identified by `process_id` for
|
||||||
@@ -256,7 +257,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` for
|
/// Reports a server connection identified by `process_id` for
|
||||||
@@ -270,7 +271,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reports a server connection identified by `process_id` is disconecting from the pooler.
|
/// Reports a server connection identified by `process_id` is disconecting from the pooler.
|
||||||
@@ -283,7 +284,7 @@ impl Reporter {
|
|||||||
address_id: address_id,
|
address_id: address_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send(event)
|
send(&self.tx, event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -305,24 +306,6 @@ impl Collector {
|
|||||||
Collector { rx, tx }
|
Collector { rx, tx }
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send(tx: &Sender<Event>, event: Event) {
|
|
||||||
let name = event.name;
|
|
||||||
let result = tx.try_send(event);
|
|
||||||
|
|
||||||
match result {
|
|
||||||
Ok(_) => trace!(
|
|
||||||
"{:?} event reported successfully, capacity: {}",
|
|
||||||
name,
|
|
||||||
tx.capacity()
|
|
||||||
),
|
|
||||||
|
|
||||||
Err(err) => match err {
|
|
||||||
TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name),
|
|
||||||
TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name),
|
|
||||||
},
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The statistics collection handler. It will collect statistics
|
/// The statistics collection handler. It will collect statistics
|
||||||
/// for `address_id`s starting at 0 up to `addresses`.
|
/// for `address_id`s starting at 0 up to `addresses`.
|
||||||
pub async fn collect(&mut self) {
|
pub async fn collect(&mut self) {
|
||||||
@@ -372,7 +355,7 @@ impl Collector {
|
|||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
let address_count = get_number_of_addresses();
|
let address_count = get_number_of_addresses();
|
||||||
for address_id in 0..address_count {
|
for address_id in 0..address_count {
|
||||||
Self::send(
|
send(
|
||||||
&tx,
|
&tx,
|
||||||
Event {
|
Event {
|
||||||
name: EventName::UpdateStats,
|
name: EventName::UpdateStats,
|
||||||
@@ -393,7 +376,7 @@ impl Collector {
|
|||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
let address_count = get_number_of_addresses();
|
let address_count = get_number_of_addresses();
|
||||||
for address_id in 0..address_count {
|
for address_id in 0..address_count {
|
||||||
Self::send(
|
send(
|
||||||
&tx,
|
&tx,
|
||||||
Event {
|
Event {
|
||||||
name: EventName::UpdateAverages,
|
name: EventName::UpdateAverages,
|
||||||
|
|||||||
Reference in New Issue
Block a user