Source code for faust.utils.tracing

"""OpenTracing utilities."""
import asyncio
import sys
import typing

from contextvars import ContextVar
from functools import wraps
from typing import Any, Callable, Optional, Tuple

import opentracing
from mode import shortlabel

__all__ = [
    'current_span',
    'noop_span',
    'set_current_span',
    'finish_span',
    'operation_name_from_fun',
    'traced_from_parent_span',
    'call_with_trace',
]

if typing.TYPE_CHECKING:
    _current_span: ContextVar[opentracing.Span]
_current_span = ContextVar('current_span')


[docs]def current_span() -> Optional[opentracing.Span]: """Get the current span for this context (if any).""" return _current_span.get(None)
[docs]def set_current_span(span: opentracing.Span) -> None: """Set the current span for the current context.""" _current_span.set(span)
[docs]def noop_span() -> opentracing.Span: """Return a span that does nothing when traced.""" return opentracing.Tracer()._noop_span
[docs]def finish_span(span: Optional[opentracing.Span], *, error: BaseException = None) -> None: """Finish span, and optionally set error tag.""" if span is not None: if error: span.__exit__(type(error), error, error.__traceback__) else: span.finish()
[docs]def operation_name_from_fun(fun: Any) -> str: """Generate opentracing name from function.""" obj = getattr(fun, '__self__', None) if obj is not None: objlabel = shortlabel(obj) funlabel = shortlabel(fun) if funlabel.startswith(objlabel): # remove obj name from function label funlabel = funlabel[len(objlabel):] return f'{objlabel}-{funlabel}' else: return f'{shortlabel(fun)}'
[docs]def traced_from_parent_span(parent_span: opentracing.Span = None, callback: Callable = None, **extra_context: Any) -> Callable: """Decorate function to be traced from parent span.""" def _wrapper(fun: Callable, **more_context: Any) -> Callable: operation_name = operation_name_from_fun(fun) @wraps(fun) def _inner(*args: Any, **kwargs: Any) -> Any: parent = parent_span if parent is None: parent = current_span() if parent is not None: child = parent.tracer.start_span( operation_name=operation_name, child_of=parent, tags={**extra_context, **more_context}, ) if callback is not None: callback(child) on_exit = (_restore_span, (parent, child)) set_current_span(child) return call_with_trace( child, fun, on_exit, *args, **kwargs) return fun(*args, **kwargs) return _inner return _wrapper
def _restore_span(span: opentracing.Span, expected_current_span: opentracing.Span) -> None: current = current_span() assert current is expected_current_span set_current_span(span)
[docs]def call_with_trace(span: opentracing.Span, fun: Callable, callback: Optional[Tuple[Callable, Tuple[Any, ...]]], *args: Any, **kwargs: Any) -> Any: """Call function and trace it from parent span.""" cb: Optional[Callable] = None cb_args: Tuple = () if callback: cb, cb_args = callback span.__enter__() try: ret = fun(*args, **kwargs) except BaseException: span.__exit__(*sys.exc_info()) raise else: if asyncio.iscoroutine(ret): # if async def method, we attach our span to # when it completes. async def corowrapped() -> Any: await_ret = None try: await_ret = await ret except BaseException: span.__exit__(*sys.exc_info()) if cb: cb(*cb_args) raise else: span.__exit__(None, None, None) if cb: cb(*cb_args) return await_ret return corowrapped() else: # for non async def method, we just exit the span. span.__exit__(None, None, None) if cb: cb(*cb_args) return ret