diff --git a/src/stats.rs b/src/stats.rs index 773140b..a380dda 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -42,6 +42,26 @@ enum EventName { UpdateAverages, } +/// Send an event via the channel and log +/// an error if it fails. +fn send(tx: &Sender, event: Event) { + let name = event.name; + let result = tx.try_send(event); + + match result { + Ok(_) => trace!( + "{:?} event reported successfully, capacity: {}", + name, + tx.capacity() + ), + + Err(err) => match err { + TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name), + TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name), + }, + }; +} + /// Event data sent to the collector /// from clients and servers. #[derive(Debug, Clone)] @@ -80,25 +100,6 @@ impl Reporter { Reporter { tx: tx } } - /// Send statistics to the task keeping track of stats. - fn send(&self, event: Event) { - let name = event.name; - let result = self.tx.try_send(event); - - match result { - Ok(_) => trace!( - "{:?} event reported successfully, capacity: {}", - name, - self.tx.capacity() - ), - - Err(err) => match err { - TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name), - TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name), - }, - }; - } - /// Report a query executed by a client against /// a server identified by the `address_id`. pub fn query(&self, process_id: i32, address_id: usize) { @@ -109,7 +110,7 @@ impl Reporter { address_id: address_id, }; - self.send(event); + send(&self.tx, event); } /// Report a transaction executed by a client against @@ -122,7 +123,7 @@ impl Reporter { address_id: address_id, }; - self.send(event) + send(&self.tx, event) } /// Report data sent to a server identified by `address_id`. @@ -135,7 +136,7 @@ impl Reporter { address_id: address_id, }; - self.send(event) + send(&self.tx, event) } /// Report data received from a server identified by `address_id`. @@ -148,7 +149,7 @@ impl Reporter { address_id: address_id, }; - self.send(event) + send(&self.tx, event) } /// Time spent waiting to get a healthy connection from the pool @@ -162,7 +163,7 @@ impl Reporter { address_id: address_id, }; - self.send(event) + send(&self.tx, event) } /// Reports a client identified by `process_id` waiting for a connection @@ -175,7 +176,7 @@ impl Reporter { address_id: address_id, }; - self.send(event) + send(&self.tx, event) } /// Reports a client identified by `process_id` is done waiting for a connection @@ -188,7 +189,7 @@ impl Reporter { address_id: address_id, }; - self.send(event) + send(&self.tx, event) } /// Reports a client identified by `process_id` is done querying the server @@ -201,7 +202,7 @@ impl Reporter { address_id: address_id, }; - self.send(event) + send(&self.tx, event) } /// Reports a client identified by `process_id` is disconecting from the pooler. @@ -214,7 +215,7 @@ impl Reporter { address_id: address_id, }; - self.send(event) + send(&self.tx, event) } /// Reports a server connection identified by `process_id` for @@ -228,7 +229,7 @@ impl Reporter { address_id: address_id, }; - self.send(event) + send(&self.tx, event) } /// Reports a server connection identified by `process_id` for @@ -242,7 +243,7 @@ impl Reporter { address_id: address_id, }; - self.send(event) + send(&self.tx, event) } /// Reports a server connection identified by `process_id` for @@ -256,7 +257,7 @@ impl Reporter { address_id: address_id, }; - self.send(event) + send(&self.tx, event) } /// Reports a server connection identified by `process_id` for @@ -270,7 +271,7 @@ impl Reporter { address_id: address_id, }; - self.send(event) + send(&self.tx, event) } /// Reports a server connection identified by `process_id` is disconecting from the pooler. @@ -283,7 +284,7 @@ impl Reporter { address_id: address_id, }; - self.send(event) + send(&self.tx, event) } } @@ -305,24 +306,6 @@ impl Collector { Collector { rx, tx } } - fn send(tx: &Sender, event: Event) { - let name = event.name; - let result = tx.try_send(event); - - match result { - Ok(_) => trace!( - "{:?} event reported successfully, capacity: {}", - name, - tx.capacity() - ), - - Err(err) => match err { - TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name), - TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name), - }, - }; - } - /// The statistics collection handler. It will collect statistics /// for `address_id`s starting at 0 up to `addresses`. pub async fn collect(&mut self) { @@ -372,7 +355,7 @@ impl Collector { interval.tick().await; let address_count = get_number_of_addresses(); for address_id in 0..address_count { - Self::send( + send( &tx, Event { name: EventName::UpdateStats, @@ -393,7 +376,7 @@ impl Collector { interval.tick().await; let address_count = get_number_of_addresses(); for address_id in 0..address_count { - Self::send( + send( &tx, Event { name: EventName::UpdateAverages,