From e5df179ac97e6e9cf298e2a12def603d90a45304 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Wed, 15 Mar 2023 17:58:45 -0500 Subject: [PATCH] Reduce memory and CPU footprint of mirroring (#369) The experimental mirroring feature used a lot of memory and CPU when put under production traffic. This change attempts to reduce memory and CPU usage. Memory footprint is reduced by making the channel smaller. CPU usage is reduced by avoiding allocations if the channel is full or is closed. We might lose more messages this way if the mirror falls behind but that is more acceptable than crashing the entire process when it goes out-of-memory (OOM) --- src/mirrors.rs | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/src/mirrors.rs b/src/mirrors.rs index 6a59172..ab2b2dc 100644 --- a/src/mirrors.rs +++ b/src/mirrors.rs @@ -119,7 +119,7 @@ impl MirroringManager { let mut exit_senders: Vec> = vec![]; addresses.iter().for_each(|mirror| { - let (bytes_tx, bytes_rx) = channel::(500); + let (bytes_tx, bytes_rx) = channel::(10); let (exit_tx, exit_rx) = channel::<()>(1); let mut addr = mirror.clone(); addr.role = Role::Mirror; @@ -142,15 +142,25 @@ impl MirroringManager { } pub fn send(self: &mut Self, bytes: &BytesMut) { - let cpy = bytes.clone().freeze(); - self.byte_senders - .iter_mut() - .for_each(|sender| match sender.try_send(cpy.clone()) { + // We want to avoid performing an allocation if we won't be able to send the message + // There is a possibility of a race here where we check the capacity and then the channel is + // closed or the capacity is reduced to 0, but mirroring is best effort anyway + if self + .byte_senders + .iter() + .all(|sender| sender.capacity() == 0 || sender.is_closed()) + { + return; + } + let immutable_bytes = bytes.clone().freeze(); + self.byte_senders.iter_mut().for_each(|sender| { + match sender.try_send(immutable_bytes.clone()) { Ok(_) => {} Err(err) => { warn!("Failed to send bytes to a mirror channel {}", err); } - }); + } + }); } pub fn disconnect(self: &mut Self) {