Skip to content

Commit

Permalink
Remove all allocations in metrics system
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Aug 13, 2024
1 parent 026e282 commit f2792f6
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 73 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,4 @@ rpath = false
large_futures = "warn"

[workspace.lints.rust]
unused_imports = "deny"
#unused_imports = "deny"
9 changes: 1 addition & 8 deletions datafusion/physical-plan/src/metrics/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,7 @@ impl<'a> MetricBuilder<'a> {

/// Consume self and create a metric of the specified value
/// registered with the MetricsSet
pub fn build(self, value: MetricValue) {
let Self {
labels,
partition,
metrics,
} = self;
let metric = Arc::new(Metric::new_with_labels(value, partition, labels));
metrics.register(metric);
pub fn build(self, _value: MetricValue) {
}

/// Consume self and create a new counter for recording output rows
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,8 @@ impl ExecutionPlanMetricsSet {
}

/// Add the specified metric to the underlying metric set
pub fn register(&self, metric: Arc<Metric>) {
self.inner.lock().push(metric)
pub fn register(&self, _metric: Arc<Metric>) {
//self.inner.lock().push(metric)
}

/// Return a clone of the inner [`MetricsSet`]
Expand Down
78 changes: 16 additions & 62 deletions datafusion/physical-plan/src/metrics/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,12 @@ use std::{

use chrono::{DateTime, Utc};
use datafusion_common::instant::Instant;
use parking_lot::Mutex;

/// A counter to record things such as number of input or output rows
///
/// Note `clone`ing counters update the same underlying metrics
#[derive(Debug, Clone)]
pub struct Count {
/// value of the metric counter
value: std::sync::Arc<AtomicUsize>,
}

impl PartialEq for Count {
Expand All @@ -62,20 +59,16 @@ impl Count {
/// create a new counter
pub fn new() -> Self {
Self {
value: Arc::new(AtomicUsize::new(0)),
}
}

/// Add `n` to the metric's value
pub fn add(&self, n: usize) {
// relaxed ordering for operations on `value` poses no issues
// we're purely using atomic ops with no associated memory ops
self.value.fetch_add(n, Ordering::Relaxed);
pub fn add(&self, _n: usize) {
}

/// Get the current value
pub fn value(&self) -> usize {
self.value.load(Ordering::Relaxed)
0
}
}

Expand All @@ -85,8 +78,6 @@ impl Count {
/// Note `clone`ing gauge update the same underlying metrics
#[derive(Debug, Clone)]
pub struct Gauge {
/// value of the metric gauge
value: std::sync::Arc<AtomicUsize>,
}

impl PartialEq for Gauge {
Expand All @@ -111,47 +102,35 @@ impl Gauge {
/// create a new gauge
pub fn new() -> Self {
Self {
value: Arc::new(AtomicUsize::new(0)),
}
}

/// Add `n` to the metric's value
pub fn add(&self, n: usize) {
// relaxed ordering for operations on `value` poses no issues
// we're purely using atomic ops with no associated memory ops
self.value.fetch_add(n, Ordering::Relaxed);
pub fn add(&self, _n: usize) {
}

/// Sub `n` from the metric's value
pub fn sub(&self, n: usize) {
// relaxed ordering for operations on `value` poses no issues
// we're purely using atomic ops with no associated memory ops
self.value.fetch_sub(n, Ordering::Relaxed);
pub fn sub(&self, _n: usize) {
}

/// Set metric's value to maximum of `n` and current value
pub fn set_max(&self, n: usize) {
self.value.fetch_max(n, Ordering::Relaxed);
pub fn set_max(&self, _n: usize) {
}

/// Set the metric's value to `n` and return the previous value
pub fn set(&self, n: usize) -> usize {
// relaxed ordering for operations on `value` poses no issues
// we're purely using atomic ops with no associated memory ops
self.value.swap(n, Ordering::Relaxed)
pub fn set(&self, _n: usize) -> usize {
0
}

/// Get the current value
pub fn value(&self) -> usize {
self.value.load(Ordering::Relaxed)
0
}
}

/// Measure a potentially non contiguous duration of time
#[derive(Debug, Clone)]
pub struct Time {
/// elapsed time, in nanoseconds
nanos: Arc<AtomicUsize>,
}

impl Default for Time {
Expand All @@ -178,13 +157,11 @@ impl Time {
/// times for operations.
pub fn new() -> Self {
Self {
nanos: Arc::new(AtomicUsize::new(0)),
}
}

/// Add elapsed nanoseconds since `start`to self
pub fn add_elapsed(&self, start: Instant) {
self.add_duration(start.elapsed());
pub fn add_elapsed(&self, _start: Instant) {
}

/// Add duration of time to self
Expand All @@ -197,14 +174,11 @@ impl Time {
/// This is based on the assumption that the timing logic in most cases is likely
/// to take at least a nanosecond, and so this is reasonable mechanism to avoid
/// ambiguity, especially on systems with low-resolution monotonic clocks
pub fn add_duration(&self, duration: Duration) {
let more_nanos = duration.as_nanos() as usize;
self.nanos.fetch_add(more_nanos.max(1), Ordering::Relaxed);
pub fn add_duration(&self, _duration: Duration) {
}

/// Add the number of nanoseconds of other `Time` to self
pub fn add(&self, other: &Time) {
self.add_duration(Duration::from_nanos(other.value() as u64))
pub fn add(&self, _other: &Time) {
}

/// return a scoped guard that adds the amount of time elapsed
Expand All @@ -219,16 +193,14 @@ impl Time {

/// Get the number of nanoseconds record by this Time metric
pub fn value(&self) -> usize {
self.nanos.load(Ordering::Relaxed)
0
}
}

/// Stores a single timestamp, stored as the number of nanoseconds
/// elapsed from Jan 1, 1970 UTC
#[derive(Debug, Clone)]
pub struct Timestamp {
/// Time thing started
timestamp: Arc<Mutex<Option<DateTime<Utc>>>>,
}

impl Default for Timestamp {
Expand All @@ -241,7 +213,6 @@ impl Timestamp {
/// Create a new timestamp and sets its value to 0
pub fn new() -> Self {
Self {
timestamp: Arc::new(Mutex::new(None)),
}
}

Expand All @@ -251,40 +222,23 @@ impl Timestamp {
}

/// Sets the timestamps value to a specified time
pub fn set(&self, now: DateTime<Utc>) {
*self.timestamp.lock() = Some(now);
pub fn set(&self, _now: DateTime<Utc>) {
}

/// return the timestamps value at the last time `record()` was
/// called.
///
/// Returns `None` if `record()` has not been called
pub fn value(&self) -> Option<DateTime<Utc>> {
*self.timestamp.lock()
None
}

/// sets the value of this timestamp to the minimum of this and other
pub fn update_to_min(&self, other: &Timestamp) {
let min = match (self.value(), other.value()) {
(None, None) => None,
(Some(v), None) => Some(v),
(None, Some(v)) => Some(v),
(Some(v1), Some(v2)) => Some(if v1 < v2 { v1 } else { v2 }),
};

*self.timestamp.lock() = min;
pub fn update_to_min(&self, _other: &Timestamp) {
}

/// sets the value of this timestamp to the maximum of this and other
pub fn update_to_max(&self, other: &Timestamp) {
let max = match (self.value(), other.value()) {
(None, None) => None,
(Some(v), None) => Some(v),
(None, Some(v)) => Some(v),
(Some(v1), Some(v2)) => Some(if v1 < v2 { v2 } else { v1 }),
};

*self.timestamp.lock() = max;
pub fn update_to_max(&self, _other: &Timestamp) {
}
}

Expand Down

0 comments on commit f2792f6

Please sign in to comment.