1
mirror of https://github.com/home-assistant/core synced 2024-10-04 07:58:43 +02:00
ha-core/homeassistant/helpers/trace.py

177 lines
5.0 KiB
Python
Raw Normal View History

"""Helpers for script and condition tracing."""
2021-03-17 18:34:19 +01:00
from __future__ import annotations
from collections import deque
from contextlib import contextmanager
from contextvars import ContextVar
2021-03-17 18:34:19 +01:00
from typing import Any, Deque, Generator, cast
from homeassistant.helpers.typing import TemplateVarsType
import homeassistant.util.dt as dt_util
class TraceElement:
"""Container for trace data."""
def __init__(self, variables: TemplateVarsType, path: str):
"""Container for trace data."""
2021-03-17 18:34:19 +01:00
self._error: Exception | None = None
self.path: str = path
2021-03-17 18:34:19 +01:00
self._result: dict | None = None
self._timestamp = dt_util.utcnow()
if variables is None:
variables = {}
last_variables = variables_cv.get() or {}
variables_cv.set(dict(variables))
changed_variables = {
key: value
for key, value in variables.items()
if key not in last_variables or last_variables[key] != value
}
self._variables = changed_variables
def __repr__(self) -> str:
"""Container for trace data."""
return str(self.as_dict())
def set_error(self, ex: Exception) -> None:
"""Set error."""
self._error = ex
def set_result(self, **kwargs: Any) -> None:
"""Set result."""
self._result = {**kwargs}
2021-03-17 18:34:19 +01:00
def as_dict(self) -> dict[str, Any]:
"""Return dictionary version of this TraceElement."""
2021-03-17 18:34:19 +01:00
result: dict[str, Any] = {"path": self.path, "timestamp": self._timestamp}
if self._variables:
result["changed_variables"] = self._variables
if self._error is not None:
result["error"] = str(self._error)
if self._result is not None:
result["result"] = self._result
return result
# Context variables for tracing
# Current trace
2021-03-17 18:34:19 +01:00
trace_cv: ContextVar[dict[str, Deque[TraceElement]] | None] = ContextVar(
"trace_cv", default=None
)
# Stack of TraceElements
2021-03-17 18:34:19 +01:00
trace_stack_cv: ContextVar[list[TraceElement] | None] = ContextVar(
"trace_stack_cv", default=None
)
# Current location in config tree
2021-03-17 18:34:19 +01:00
trace_path_stack_cv: ContextVar[list[str] | None] = ContextVar(
"trace_path_stack_cv", default=None
)
# Copy of last variables
2021-03-17 18:34:19 +01:00
variables_cv: ContextVar[Any | None] = ContextVar("variables_cv", default=None)
# Automation ID + Run ID
2021-03-17 18:34:19 +01:00
trace_id_cv: ContextVar[tuple[str, str] | None] = ContextVar(
"trace_id_cv", default=None
)
2021-03-17 18:34:19 +01:00
def trace_id_set(trace_id: tuple[str, str]) -> None:
"""Set id of the current trace."""
trace_id_cv.set(trace_id)
2021-03-17 18:34:19 +01:00
def trace_id_get() -> tuple[str, str] | None:
"""Get id if the current trace."""
return trace_id_cv.get()
def trace_stack_push(trace_stack_var: ContextVar, node: Any) -> None:
"""Push an element to the top of a trace stack."""
trace_stack = trace_stack_var.get()
if trace_stack is None:
trace_stack = []
trace_stack_var.set(trace_stack)
trace_stack.append(node)
def trace_stack_pop(trace_stack_var: ContextVar) -> None:
"""Remove the top element from a trace stack."""
trace_stack = trace_stack_var.get()
trace_stack.pop()
2021-03-17 18:34:19 +01:00
def trace_stack_top(trace_stack_var: ContextVar) -> Any | None:
"""Return the element at the top of a trace stack."""
trace_stack = trace_stack_var.get()
return trace_stack[-1] if trace_stack else None
2021-03-17 18:34:19 +01:00
def trace_path_push(suffix: str | list[str]) -> int:
"""Go deeper in the config tree."""
if isinstance(suffix, str):
suffix = [suffix]
for node in suffix:
trace_stack_push(trace_path_stack_cv, node)
return len(suffix)
def trace_path_pop(count: int) -> None:
"""Go n levels up in the config tree."""
for _ in range(count):
trace_stack_pop(trace_path_stack_cv)
def trace_path_get() -> str:
"""Return a string representing the current location in the config tree."""
path = trace_path_stack_cv.get()
if not path:
return ""
return "/".join(path)
def trace_append_element(
trace_element: TraceElement,
2021-03-17 18:34:19 +01:00
maxlen: int | None = None,
) -> None:
"""Append a TraceElement to trace[path]."""
path = trace_element.path
trace = trace_cv.get()
if trace is None:
trace = {}
trace_cv.set(trace)
if path not in trace:
trace[path] = deque(maxlen=maxlen)
trace[path].append(trace_element)
2021-03-17 18:34:19 +01:00
def trace_get(clear: bool = True) -> dict[str, Deque[TraceElement]] | None:
"""Return the current trace."""
if clear:
trace_clear()
return trace_cv.get()
def trace_clear() -> None:
"""Clear the trace."""
trace_cv.set({})
trace_stack_cv.set(None)
trace_path_stack_cv.set(None)
variables_cv.set(None)
def trace_set_result(**kwargs: Any) -> None:
"""Set the result of TraceElement at the top of the stack."""
node = cast(TraceElement, trace_stack_top(trace_stack_cv))
node.set_result(**kwargs)
@contextmanager
2021-03-17 18:34:19 +01:00
def trace_path(suffix: str | list[str]) -> Generator:
"""Go deeper in the config tree."""
count = trace_path_push(suffix)
try:
yield
finally:
trace_path_pop(count)