Skip to content

Commit

Permalink
Merge pull request #1189 from dperl-dls/bluesky_1682_add_opentelemetr…
Browse files Browse the repository at this point in the history
…y_tracing

(Bluesky #1682) add OpenTelemetry tracing to status
  • Loading branch information
mrakitin authored Jul 16, 2024
2 parents 6f87aa4 + 07e4093 commit e9f51dd
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 7 deletions.
4 changes: 2 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@

# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {
"python": ("https://docs.python.org/3", None),
"python": ("https://docs.python.org/3/", None),
"bluesky": ("https://blueskyproject.io/bluesky/main/", None),
"numpy": ("https://numpy.org/devdocs/", None),
"databroker": ("https://blueskyproject.io/databroker/", None),
"event-model": ("https://blueskyproject.io/event-model/main", None),
"event-model": ("https://blueskyproject.io/event-model/main/", None),
}

# If False and a module has the __all__ attribute set, autosummary documents
Expand Down
4 changes: 4 additions & 0 deletions docs/user/explanations/otel-tracing
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Tracing with Opentelemetry
==========================

Ophyd is instrumented with [OpenTelemetry](https://opentelemetry.io/) tracing span hooks on the lifetime of `Status` objects and their `.wait()` method. Please see the [Bluesky documentation](https://blueskyproject.io/bluesky/main/otel_tracing.html) for examples of how to make use of this.
72 changes: 68 additions & 4 deletions ophyd/status.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import threading
import time
from collections import deque
Expand All @@ -6,6 +7,7 @@
from warnings import warn

import numpy as np
from opentelemetry import trace

from .log import logger
from .utils import (
Expand All @@ -16,6 +18,9 @@
adapt_old_callback_signature,
)

tracer = trace.get_tracer(__name__)
_TRACE_PREFIX = "Ophyd Status"


class UseNewProperty(RuntimeError):
...
Expand Down Expand Up @@ -80,6 +85,14 @@ class StatusBase:

def __init__(self, *, timeout=None, settle_time=0, done=None, success=None):
super().__init__()
self._tracing_span = tracer.start_span(_TRACE_PREFIX)
self._trace_attributes = {
"status_type": self.__class__.__name__,
"settle_time": settle_time,
}
self._trace_attributes.update(
{"timeout": timeout} if timeout else {"no_timeout_given": True}
)
self._tname = None
self._lock = threading.RLock()
self._event = threading.Event() # state associated with done-ness
Expand Down Expand Up @@ -134,6 +147,8 @@ def __init__(self, *, timeout=None, settle_time=0, done=None, success=None):
)
self.set_exception(exc)

self._trace_attributes["object_repr"] = repr(self)

@property
def timeout(self):
"""
Expand Down Expand Up @@ -290,6 +305,7 @@ def set_exception(self, exc):
----------
exc: Exception
"""
self._trace_attributes["exception"] = exc
# Since we rely on this being raise-able later, check proactively to
# avoid potentially very confusing failures.
if not (
Expand Down Expand Up @@ -329,6 +345,7 @@ def set_exception(self, exc):
self._exception = exc
self._settled_event.set()

self._close_trace()
if self._callback_thread is None:
self._run_callbacks()

Expand Down Expand Up @@ -364,6 +381,7 @@ def settle_done():

if self._callback_thread is None:
self._run_callbacks()
self._close_trace()

def _finished(self, success=True, **kwargs):
"""
Expand Down Expand Up @@ -420,6 +438,7 @@ def exception(self, timeout=None):
raise WaitTimeoutError(f"Status {self!r} has not completed yet.")
return self._exception

@tracer.start_as_current_span(f"{_TRACE_PREFIX} wait")
def wait(self, timeout=None):
"""
Block until the action completes.
Expand All @@ -446,6 +465,7 @@ def wait(self, timeout=None):
indicates that the action itself raised ``TimeoutError``, distinct
from ``WaitTimeoutError`` above.
"""
_set_trace_attributes(trace.get_current_span(), self._trace_attributes)
if not self._event.wait(timeout=timeout):
raise WaitTimeoutError(f"Status {self!r} has not completed yet.")
if self._exception is not None:
Expand Down Expand Up @@ -532,6 +552,13 @@ def finished_cb(self, cb):
"method instead."
)

def _update_trace_attributes(self):
_set_trace_attributes(self._tracing_span, self._trace_attributes)

def _close_trace(self):
self._update_trace_attributes()
self._tracing_span.end()

def __and__(self, other):
"""
Returns a new 'composite' status object, AndStatus,
Expand All @@ -546,9 +573,11 @@ class AndStatus(StatusBase):
"a Status that has composes two other Status objects using logical and"

def __init__(self, left, right, **kwargs):
super().__init__(**kwargs)
self.left = left
self.right = right
super().__init__(**kwargs)
self._trace_attributes["left"] = self.left._trace_attributes
self._trace_attributes["right"] = self.right._trace_attributes

def inner(status):
with self._lock:
Expand Down Expand Up @@ -628,6 +657,8 @@ def __init__(self, obj=None, timeout=None, settle_time=0, done=None, success=Non
super().__init__(
timeout=timeout, settle_time=settle_time, done=done, success=success
)
self._trace_attributes["obj"] = obj
self._trace_attributes["no_obj_given"] = not bool(obj)

def __str__(self):
return (
Expand Down Expand Up @@ -664,17 +695,24 @@ def __init__(self, device, **kwargs):
self.device = device
self._watchers = []
super().__init__(**kwargs)
self._trace_attributes.update(
{"device_name": device.name, "device_type": device.__class__.__name__}
if device
else {"no_device_given": True}
)
self._trace_attributes["kwargs"] = json.dumps(kwargs, default=repr)

def _handle_failure(self):
super()._handle_failure()
self.log.debug("Trying to stop %s", repr(self.device))
self.device.stop()

def __str__(self):
device_name = self.device.name if self.device else "None"
return (
"{0}(device={1.device.name}, done={1.done}, "
"{0}(device={2}, done={1.done}, "
"success={1.success})"
"".format(self.__class__.__name__, self)
"".format(self.__class__.__name__, self, device_name)
)

def watch(self, func):
Expand Down Expand Up @@ -846,7 +884,7 @@ def check_value(self, *args, **kwargs):
try:
success = self.callback(*args, **kwargs)

# If successfull start a timer for completion
# If successful start a timer for completion
if success:
if not self._stable_timer.is_alive():
self._stable_timer.start()
Expand Down Expand Up @@ -948,6 +986,21 @@ def __init__(self, positioner, target, *, start_ts=None, **kwargs):
if not self.done:
self.pos.subscribe(self._notify_watchers, event_type=self.pos.SUB_READBACK)

self._trace_attributes.update(
{
"target": target,
"start_time": start_ts,
"start_pos ": self.pos.position,
"unit": self._unit,
"positioner_name": self._name,
}
)
self._trace_attributes.update(
{"positioner": repr(self.pos)}
if self.pos
else {"no_positioner_given": True}
)

def watch(self, func):
"""
Subscribe to notifications about partial progress.
Expand Down Expand Up @@ -1023,6 +1076,12 @@ def _settled(self):
self._watchers.clear()
self.finish_ts = time.time()
self.finish_pos = self.pos.position
self._trace_attributes.update(
{
"finish_time": self.finish_ts,
"finish_pos": self.finish_pos,
}
)

@property
def elapsed(self):
Expand All @@ -1043,6 +1102,11 @@ def __str__(self):
__repr__ = __str__


def _set_trace_attributes(span, trace_attributes):
for k, v in trace_attributes.items():
span.set_attribute(k, v)


def wait(status, timeout=None, *, poll_rate="DEPRECATED"):
"""(Blocking) wait for the status object to complete
Expand Down
4 changes: 3 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ setup_requires =
# Specify any package dependencies below.
install_requires =
networkx>=2.0
numpy
numpy<2.0
opentelemetry-api
packaging
pint

Expand Down Expand Up @@ -64,6 +65,7 @@ dev =
pytest-rerunfailures
pytest-timeout
pipdeptree
setuptools>=64
setuptools_scm[toml]>=6.2
sphinx-autobuild
sphinx-design
Expand Down

0 comments on commit e9f51dd

Please sign in to comment.