Move legacy database queries and models to prepare for schema v38 (#89532)

This commit is contained in:
J. Nick Koston 2023-03-11 11:26:30 -10:00 committed by GitHub
parent 16b420d660
commit 50c31a5355
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 724 additions and 568 deletions

View File

@ -0,0 +1,22 @@
"""Provide pre-made queries on top of the recorder component."""
from __future__ import annotations
from .const import NEED_ATTRIBUTE_DOMAINS, SIGNIFICANT_DOMAINS
from .legacy import (
get_full_significant_states_with_session,
get_last_state_changes,
get_significant_states,
get_significant_states_with_session,
state_changes_during_period,
)
# These are the APIs of this package
__all__ = [
"NEED_ATTRIBUTE_DOMAINS",
"SIGNIFICANT_DOMAINS",
"get_full_significant_states_with_session",
"get_last_state_changes",
"get_significant_states",
"get_significant_states_with_session",
"state_changes_during_period",
]

View File

@ -0,0 +1,10 @@
"""Common functions for history."""
from __future__ import annotations
from homeassistant.core import HomeAssistant
from ... import recorder
def _schema_version(hass: HomeAssistant) -> int:
return recorder.get_instance(hass).schema_version

View File

@ -0,0 +1,23 @@
"""Constants for history."""
STATE_KEY = "state"
LAST_CHANGED_KEY = "last_changed"
SIGNIFICANT_DOMAINS = {
"climate",
"device_tracker",
"humidifier",
"thermostat",
"water_heater",
}
SIGNIFICANT_DOMAINS_ENTITY_ID_LIKE = [f"{domain}.%" for domain in SIGNIFICANT_DOMAINS]
IGNORE_DOMAINS = {"zone", "scene"}
IGNORE_DOMAINS_ENTITY_ID_LIKE = [f"{domain}.%" for domain in IGNORE_DOMAINS]
NEED_ATTRIBUTE_DOMAINS = {
"climate",
"humidifier",
"input_datetime",
"thermostat",
"water_heater",
}

View File

@ -22,43 +22,30 @@ from homeassistant.const import COMPRESSED_STATE_LAST_UPDATED, COMPRESSED_STATE_
from homeassistant.core import HomeAssistant, State, split_entity_id
import homeassistant.util.dt as dt_util
from .. import recorder
from .db_schema import RecorderRuns, StateAttributes, States
from .filters import Filters
from .models import (
from ... import recorder
from ..db_schema import RecorderRuns, StateAttributes, States
from ..filters import Filters
from ..models import (
LazyState,
LazyStatePreSchema31,
process_datetime_to_timestamp,
process_timestamp,
process_timestamp_to_utc_isoformat,
row_to_compressed_state,
row_to_compressed_state_pre_schema_31,
)
from .util import execute_stmt_lambda_element, session_scope
from ..models.legacy import LazyStatePreSchema31, row_to_compressed_state_pre_schema_31
from ..util import execute_stmt_lambda_element, session_scope
from .common import _schema_version
from .const import (
IGNORE_DOMAINS_ENTITY_ID_LIKE,
LAST_CHANGED_KEY,
NEED_ATTRIBUTE_DOMAINS,
SIGNIFICANT_DOMAINS,
SIGNIFICANT_DOMAINS_ENTITY_ID_LIKE,
STATE_KEY,
)
_LOGGER = logging.getLogger(__name__)
STATE_KEY = "state"
LAST_CHANGED_KEY = "last_changed"
SIGNIFICANT_DOMAINS = {
"climate",
"device_tracker",
"humidifier",
"thermostat",
"water_heater",
}
SIGNIFICANT_DOMAINS_ENTITY_ID_LIKE = [f"{domain}.%" for domain in SIGNIFICANT_DOMAINS]
IGNORE_DOMAINS = {"zone", "scene"}
IGNORE_DOMAINS_ENTITY_ID_LIKE = [f"{domain}.%" for domain in IGNORE_DOMAINS]
NEED_ATTRIBUTE_DOMAINS = {
"climate",
"humidifier",
"input_datetime",
"thermostat",
"water_heater",
}
_BASE_STATES = (
States.entity_id,
@ -151,11 +138,7 @@ _FIELD_MAP_PRE_SCHEMA_31 = {
}
def _schema_version(hass: HomeAssistant) -> int:
return recorder.get_instance(hass).schema_version
def lambda_stmt_and_join_attributes(
def _lambda_stmt_and_join_attributes(
schema_version: int, no_attributes: bool, include_last_changed: bool = True
) -> tuple[StatementLambdaElement, bool]:
"""Return the lambda_stmt and if StateAttributes should be joined.
@ -268,7 +251,7 @@ def _significant_states_stmt(
no_attributes: bool,
) -> StatementLambdaElement:
"""Query the database for significant state changes."""
stmt, join_attributes = lambda_stmt_and_join_attributes(
stmt, join_attributes = _lambda_stmt_and_join_attributes(
schema_version, no_attributes, include_last_changed=not significant_changes_only
)
if (
@ -442,7 +425,7 @@ def _state_changed_during_period_stmt(
descending: bool,
limit: int | None,
) -> StatementLambdaElement:
stmt, join_attributes = lambda_stmt_and_join_attributes(
stmt, join_attributes = _lambda_stmt_and_join_attributes(
schema_version, no_attributes, include_last_changed=False
)
if schema_version >= 31:
@ -534,7 +517,7 @@ def state_changes_during_period(
def _get_last_state_changes_stmt(
schema_version: int, number_of_states: int, entity_id: str
) -> StatementLambdaElement:
stmt, join_attributes = lambda_stmt_and_join_attributes(
stmt, join_attributes = _lambda_stmt_and_join_attributes(
schema_version, False, include_last_changed=False
)
if schema_version >= 31:
@ -601,7 +584,7 @@ def _get_states_for_entities_stmt(
no_attributes: bool,
) -> StatementLambdaElement:
"""Baked query to get states for specific entities."""
stmt, join_attributes = lambda_stmt_and_join_attributes(
stmt, join_attributes = _lambda_stmt_and_join_attributes(
schema_version, no_attributes, include_last_changed=True
)
# We got an include-list of entities, accelerate the query by filtering already
@ -673,7 +656,7 @@ def _get_states_for_all_stmt(
no_attributes: bool,
) -> StatementLambdaElement:
"""Baked query to get states for all entities."""
stmt, join_attributes = lambda_stmt_and_join_attributes(
stmt, join_attributes = _lambda_stmt_and_join_attributes(
schema_version, no_attributes, include_last_changed=True
)
# We did not get an include-list of entities, query all states in the inner
@ -787,7 +770,7 @@ def _get_single_entity_states_stmt(
) -> StatementLambdaElement:
# Use an entirely different (and extremely fast) query if we only
# have a single entity id
stmt, join_attributes = lambda_stmt_and_join_attributes(
stmt, join_attributes = _lambda_stmt_and_join_attributes(
schema_version, no_attributes, include_last_changed=True
)
if schema_version >= 31:

View File

@ -1,521 +0,0 @@
"""Models for Recorder."""
from __future__ import annotations
from contextlib import suppress
from dataclasses import dataclass
from datetime import datetime, timedelta
from functools import lru_cache
import logging
from typing import Any, Literal, TypedDict, overload
from uuid import UUID
from awesomeversion import AwesomeVersion
from sqlalchemy.engine.row import Row
from homeassistant.const import (
COMPRESSED_STATE_ATTRIBUTES,
COMPRESSED_STATE_LAST_CHANGED,
COMPRESSED_STATE_LAST_UPDATED,
COMPRESSED_STATE_STATE,
)
from homeassistant.core import Context, State
import homeassistant.util.dt as dt_util
from homeassistant.util.json import json_loads_object
from homeassistant.util.ulid import bytes_to_ulid, ulid_to_bytes
from .const import SupportedDialect
# pylint: disable=invalid-name
_LOGGER = logging.getLogger(__name__)
DB_TIMEZONE = "+00:00"
EMPTY_JSON_OBJECT = "{}"
class UnsupportedDialect(Exception):
"""The dialect or its version is not supported."""
class StatisticResult(TypedDict):
"""Statistic result data class.
Allows multiple datapoints for the same statistic_id.
"""
meta: StatisticMetaData
stat: StatisticData
class StatisticDataTimestampBase(TypedDict):
"""Mandatory fields for statistic data class with a timestamp."""
start_ts: float
class StatisticDataBase(TypedDict):
"""Mandatory fields for statistic data class."""
start: datetime
class StatisticMixIn(TypedDict, total=False):
"""Mandatory fields for statistic data class."""
state: float
sum: float
min: float
max: float
mean: float
class StatisticData(StatisticDataBase, StatisticMixIn, total=False):
"""Statistic data class."""
last_reset: datetime | None
class StatisticDataTimestamp(StatisticDataTimestampBase, StatisticMixIn, total=False):
"""Statistic data class with a timestamp."""
last_reset_ts: float | None
class StatisticMetaData(TypedDict):
"""Statistic meta data class."""
has_mean: bool
has_sum: bool
name: str | None
source: str
statistic_id: str
unit_of_measurement: str | None
@overload
def process_timestamp(ts: None) -> None:
...
@overload
def process_timestamp(ts: datetime) -> datetime:
...
def process_timestamp(ts: datetime | None) -> datetime | None:
"""Process a timestamp into datetime object."""
if ts is None:
return None
if ts.tzinfo is None:
return ts.replace(tzinfo=dt_util.UTC)
return dt_util.as_utc(ts)
@overload
def process_timestamp_to_utc_isoformat(ts: None) -> None:
...
@overload
def process_timestamp_to_utc_isoformat(ts: datetime) -> str:
...
def process_timestamp_to_utc_isoformat(ts: datetime | None) -> str | None:
"""Process a timestamp into UTC isotime."""
if ts is None:
return None
if ts.tzinfo == dt_util.UTC:
return ts.isoformat()
if ts.tzinfo is None:
return f"{ts.isoformat()}{DB_TIMEZONE}"
return ts.astimezone(dt_util.UTC).isoformat()
def process_datetime_to_timestamp(ts: datetime) -> float:
"""Process a datebase datetime to epoch.
Mirrors the behavior of process_timestamp_to_utc_isoformat
except it returns the epoch time.
"""
if ts.tzinfo is None or ts.tzinfo == dt_util.UTC:
return dt_util.utc_to_timestamp(ts)
return ts.timestamp()
def datetime_to_timestamp_or_none(dt: datetime | None) -> float | None:
"""Convert a datetime to a timestamp."""
if dt is None:
return None
return dt_util.utc_to_timestamp(dt)
def timestamp_to_datetime_or_none(ts: float | None) -> datetime | None:
"""Convert a timestamp to a datetime."""
if not ts:
return None
return dt_util.utc_from_timestamp(ts)
def ulid_to_bytes_or_none(ulid: str | None) -> bytes | None:
"""Convert an ulid to bytes."""
if ulid is None:
return None
return ulid_to_bytes(ulid)
def bytes_to_ulid_or_none(_bytes: bytes | None) -> str | None:
"""Convert bytes to a ulid."""
if _bytes is None:
return None
return bytes_to_ulid(_bytes)
@lru_cache(maxsize=16)
def uuid_hex_to_bytes_or_none(uuid_hex: str | None) -> bytes | None:
"""Convert a uuid hex to bytes."""
if uuid_hex is None:
return None
with suppress(ValueError):
return UUID(hex=uuid_hex).bytes
return None
@lru_cache(maxsize=16)
def bytes_to_uuid_hex_or_none(_bytes: bytes | None) -> str | None:
"""Convert bytes to a uuid hex."""
if _bytes is None:
return None
with suppress(ValueError):
return UUID(bytes=_bytes).hex
return None
class LazyStatePreSchema31(State):
"""A lazy version of core State before schema 31."""
__slots__ = [
"_row",
"_attributes",
"_last_changed",
"_last_updated",
"_context",
"attr_cache",
]
def __init__( # pylint: disable=super-init-not-called
self,
row: Row,
attr_cache: dict[str, dict[str, Any]],
start_time: datetime | None,
) -> None:
"""Init the lazy state."""
self._row = row
self.entity_id: str = self._row.entity_id
self.state = self._row.state or ""
self._attributes: dict[str, Any] | None = None
self._last_changed: datetime | None = start_time
self._last_updated: datetime | None = start_time
self._context: Context | None = None
self.attr_cache = attr_cache
@property # type: ignore[override]
def attributes(self) -> dict[str, Any]:
"""State attributes."""
if self._attributes is None:
self._attributes = decode_attributes_from_row(self._row, self.attr_cache)
return self._attributes
@attributes.setter
def attributes(self, value: dict[str, Any]) -> None:
"""Set attributes."""
self._attributes = value
@property
def context(self) -> Context:
"""State context."""
if self._context is None:
self._context = Context(id=None)
return self._context
@context.setter
def context(self, value: Context) -> None:
"""Set context."""
self._context = value
@property
def last_changed(self) -> datetime:
"""Last changed datetime."""
if self._last_changed is None:
if (last_changed := self._row.last_changed) is not None:
self._last_changed = process_timestamp(last_changed)
else:
self._last_changed = self.last_updated
return self._last_changed
@last_changed.setter
def last_changed(self, value: datetime) -> None:
"""Set last changed datetime."""
self._last_changed = value
@property
def last_updated(self) -> datetime:
"""Last updated datetime."""
if self._last_updated is None:
self._last_updated = process_timestamp(self._row.last_updated)
return self._last_updated
@last_updated.setter
def last_updated(self, value: datetime) -> None:
"""Set last updated datetime."""
self._last_updated = value
def as_dict(self) -> dict[str, Any]: # type: ignore[override]
"""Return a dict representation of the LazyState.
Async friendly.
To be used for JSON serialization.
"""
if self._last_changed is None and self._last_updated is None:
last_updated_isoformat = process_timestamp_to_utc_isoformat(
self._row.last_updated
)
if (
self._row.last_changed is None
or self._row.last_changed == self._row.last_updated
):
last_changed_isoformat = last_updated_isoformat
else:
last_changed_isoformat = process_timestamp_to_utc_isoformat(
self._row.last_changed
)
else:
last_updated_isoformat = self.last_updated.isoformat()
if self.last_changed == self.last_updated:
last_changed_isoformat = last_updated_isoformat
else:
last_changed_isoformat = self.last_changed.isoformat()
return {
"entity_id": self.entity_id,
"state": self.state,
"attributes": self._attributes or self.attributes,
"last_changed": last_changed_isoformat,
"last_updated": last_updated_isoformat,
}
class LazyState(State):
"""A lazy version of core State after schema 31."""
__slots__ = [
"_row",
"_attributes",
"_last_changed_ts",
"_last_updated_ts",
"_context",
"attr_cache",
]
def __init__( # pylint: disable=super-init-not-called
self,
row: Row,
attr_cache: dict[str, dict[str, Any]],
start_time: datetime | None,
) -> None:
"""Init the lazy state."""
self._row = row
self.entity_id: str = self._row.entity_id
self.state = self._row.state or ""
self._attributes: dict[str, Any] | None = None
self._last_updated_ts: float | None = self._row.last_updated_ts or (
dt_util.utc_to_timestamp(start_time) if start_time else None
)
self._last_changed_ts: float | None = (
self._row.last_changed_ts or self._last_updated_ts
)
self._context: Context | None = None
self.attr_cache = attr_cache
@property # type: ignore[override]
def attributes(self) -> dict[str, Any]:
"""State attributes."""
if self._attributes is None:
self._attributes = decode_attributes_from_row(self._row, self.attr_cache)
return self._attributes
@attributes.setter
def attributes(self, value: dict[str, Any]) -> None:
"""Set attributes."""
self._attributes = value
@property
def context(self) -> Context:
"""State context."""
if self._context is None:
self._context = Context(id=None)
return self._context
@context.setter
def context(self, value: Context) -> None:
"""Set context."""
self._context = value
@property
def last_changed(self) -> datetime:
"""Last changed datetime."""
assert self._last_changed_ts is not None
return dt_util.utc_from_timestamp(self._last_changed_ts)
@last_changed.setter
def last_changed(self, value: datetime) -> None:
"""Set last changed datetime."""
self._last_changed_ts = process_timestamp(value).timestamp()
@property
def last_updated(self) -> datetime:
"""Last updated datetime."""
assert self._last_updated_ts is not None
return dt_util.utc_from_timestamp(self._last_updated_ts)
@last_updated.setter
def last_updated(self, value: datetime) -> None:
"""Set last updated datetime."""
self._last_updated_ts = process_timestamp(value).timestamp()
def as_dict(self) -> dict[str, Any]: # type: ignore[override]
"""Return a dict representation of the LazyState.
Async friendly.
To be used for JSON serialization.
"""
last_updated_isoformat = self.last_updated.isoformat()
if self._last_changed_ts == self._last_updated_ts:
last_changed_isoformat = last_updated_isoformat
else:
last_changed_isoformat = self.last_changed.isoformat()
return {
"entity_id": self.entity_id,
"state": self.state,
"attributes": self._attributes or self.attributes,
"last_changed": last_changed_isoformat,
"last_updated": last_updated_isoformat,
}
def decode_attributes_from_row(
row: Row, attr_cache: dict[str, dict[str, Any]]
) -> dict[str, Any]:
"""Decode attributes from a database row."""
source: str = row.shared_attrs or row.attributes
if (attributes := attr_cache.get(source)) is not None:
return attributes
if not source or source == EMPTY_JSON_OBJECT:
return {}
try:
attr_cache[source] = attributes = json_loads_object(source)
except ValueError:
_LOGGER.exception("Error converting row to state attributes: %s", source)
attr_cache[source] = attributes = {}
return attributes
def row_to_compressed_state(
row: Row,
attr_cache: dict[str, dict[str, Any]],
start_time: datetime | None,
) -> dict[str, Any]:
"""Convert a database row to a compressed state schema 31 and later."""
comp_state = {
COMPRESSED_STATE_STATE: row.state,
COMPRESSED_STATE_ATTRIBUTES: decode_attributes_from_row(row, attr_cache),
}
if start_time:
comp_state[COMPRESSED_STATE_LAST_UPDATED] = dt_util.utc_to_timestamp(start_time)
else:
row_last_updated_ts: float = row.last_updated_ts
comp_state[COMPRESSED_STATE_LAST_UPDATED] = row_last_updated_ts
if (
row_changed_changed_ts := row.last_changed_ts
) and row_last_updated_ts != row_changed_changed_ts:
comp_state[COMPRESSED_STATE_LAST_CHANGED] = row_changed_changed_ts
return comp_state
def row_to_compressed_state_pre_schema_31(
row: Row,
attr_cache: dict[str, dict[str, Any]],
start_time: datetime | None,
) -> dict[str, Any]:
"""Convert a database row to a compressed state before schema 31."""
comp_state = {
COMPRESSED_STATE_STATE: row.state,
COMPRESSED_STATE_ATTRIBUTES: decode_attributes_from_row(row, attr_cache),
}
if start_time:
comp_state[COMPRESSED_STATE_LAST_UPDATED] = start_time.timestamp()
else:
row_last_updated: datetime = row.last_updated
comp_state[COMPRESSED_STATE_LAST_UPDATED] = process_datetime_to_timestamp(
row_last_updated
)
if (
row_changed_changed := row.last_changed
) and row_last_updated != row_changed_changed:
comp_state[COMPRESSED_STATE_LAST_CHANGED] = process_datetime_to_timestamp(
row_changed_changed
)
return comp_state
class CalendarStatisticPeriod(TypedDict, total=False):
"""Statistic period definition."""
period: Literal["hour", "day", "week", "month", "year"]
offset: int
class FixedStatisticPeriod(TypedDict, total=False):
"""Statistic period definition."""
end_time: datetime
start_time: datetime
class RollingWindowStatisticPeriod(TypedDict, total=False):
"""Statistic period definition."""
duration: timedelta
offset: timedelta
class StatisticPeriod(TypedDict, total=False):
"""Statistic period definition."""
calendar: CalendarStatisticPeriod
fixed_period: FixedStatisticPeriod
rolling_window: RollingWindowStatisticPeriod
@dataclass
class DatabaseEngine:
"""Properties of the database engine."""
dialect: SupportedDialect
optimizer: DatabaseOptimizer
version: AwesomeVersion | None
@dataclass
class DatabaseOptimizer:
"""Properties of the database optimizer for the configured database engine."""
# Some MariaDB versions have a bug that causes a slow query when using
# a range in a select statement with an IN clause.
#
# https://jira.mariadb.org/browse/MDEV-25020
#
slow_range_in_select: bool

View File

@ -0,0 +1,53 @@
"""Models for Recorder."""
from __future__ import annotations
from .context import (
bytes_to_ulid_or_none,
bytes_to_uuid_hex_or_none,
ulid_to_bytes_or_none,
uuid_hex_to_bytes_or_none,
)
from .database import DatabaseEngine, DatabaseOptimizer, UnsupportedDialect
from .state import LazyState, row_to_compressed_state
from .statistics import (
CalendarStatisticPeriod,
FixedStatisticPeriod,
RollingWindowStatisticPeriod,
StatisticData,
StatisticDataTimestamp,
StatisticMetaData,
StatisticPeriod,
StatisticResult,
)
from .time import (
datetime_to_timestamp_or_none,
process_datetime_to_timestamp,
process_timestamp,
process_timestamp_to_utc_isoformat,
timestamp_to_datetime_or_none,
)
__all__ = [
"CalendarStatisticPeriod",
"DatabaseEngine",
"DatabaseOptimizer",
"FixedStatisticPeriod",
"LazyState",
"RollingWindowStatisticPeriod",
"StatisticData",
"StatisticDataTimestamp",
"StatisticMetaData",
"StatisticPeriod",
"StatisticResult",
"UnsupportedDialect",
"bytes_to_ulid_or_none",
"bytes_to_uuid_hex_or_none",
"datetime_to_timestamp_or_none",
"process_datetime_to_timestamp",
"process_timestamp",
"process_timestamp_to_utc_isoformat",
"row_to_compressed_state",
"timestamp_to_datetime_or_none",
"ulid_to_bytes_or_none",
"uuid_hex_to_bytes_or_none",
]

View File

@ -0,0 +1,42 @@
"""Models for Recorder."""
from __future__ import annotations
from contextlib import suppress
from functools import lru_cache
from uuid import UUID
from homeassistant.util.ulid import bytes_to_ulid, ulid_to_bytes
def ulid_to_bytes_or_none(ulid: str | None) -> bytes | None:
"""Convert an ulid to bytes."""
if ulid is None:
return None
return ulid_to_bytes(ulid)
def bytes_to_ulid_or_none(_bytes: bytes | None) -> str | None:
"""Convert bytes to a ulid."""
if _bytes is None:
return None
return bytes_to_ulid(_bytes)
@lru_cache(maxsize=16)
def uuid_hex_to_bytes_or_none(uuid_hex: str | None) -> bytes | None:
"""Convert a uuid hex to bytes."""
if uuid_hex is None:
return None
with suppress(ValueError):
return UUID(hex=uuid_hex).bytes
return None
@lru_cache(maxsize=16)
def bytes_to_uuid_hex_or_none(_bytes: bytes | None) -> str | None:
"""Convert bytes to a uuid hex."""
if _bytes is None:
return None
with suppress(ValueError):
return UUID(bytes=_bytes).hex
return None

View File

@ -0,0 +1,33 @@
"""Models for the database in the Recorder."""
from __future__ import annotations
from dataclasses import dataclass
from awesomeversion import AwesomeVersion
from ..const import SupportedDialect
class UnsupportedDialect(Exception):
"""The dialect or its version is not supported."""
@dataclass
class DatabaseEngine:
"""Properties of the database engine."""
dialect: SupportedDialect
optimizer: DatabaseOptimizer
version: AwesomeVersion | None
@dataclass
class DatabaseOptimizer:
"""Properties of the database optimizer for the configured database engine."""
# Some MariaDB versions have a bug that causes a slow query when using
# a range in a select statement with an IN clause.
#
# https://jira.mariadb.org/browse/MDEV-25020
#
slow_range_in_select: bool

View File

@ -0,0 +1,164 @@
"""Models for Recorder."""
from __future__ import annotations
from datetime import datetime
from typing import Any
from sqlalchemy.engine.row import Row
from homeassistant.const import (
COMPRESSED_STATE_ATTRIBUTES,
COMPRESSED_STATE_LAST_CHANGED,
COMPRESSED_STATE_LAST_UPDATED,
COMPRESSED_STATE_STATE,
)
from homeassistant.core import Context, State
from .state_attributes import decode_attributes_from_row
from .time import (
process_datetime_to_timestamp,
process_timestamp,
process_timestamp_to_utc_isoformat,
)
# pylint: disable=invalid-name
class LazyStatePreSchema31(State):
"""A lazy version of core State before schema 31."""
__slots__ = [
"_row",
"_attributes",
"_last_changed",
"_last_updated",
"_context",
"attr_cache",
]
def __init__( # pylint: disable=super-init-not-called
self,
row: Row,
attr_cache: dict[str, dict[str, Any]],
start_time: datetime | None,
) -> None:
"""Init the lazy state."""
self._row = row
self.entity_id: str = self._row.entity_id
self.state = self._row.state or ""
self._attributes: dict[str, Any] | None = None
self._last_changed: datetime | None = start_time
self._last_updated: datetime | None = start_time
self._context: Context | None = None
self.attr_cache = attr_cache
@property # type: ignore[override]
def attributes(self) -> dict[str, Any]:
"""State attributes."""
if self._attributes is None:
self._attributes = decode_attributes_from_row(self._row, self.attr_cache)
return self._attributes
@attributes.setter
def attributes(self, value: dict[str, Any]) -> None:
"""Set attributes."""
self._attributes = value
@property
def context(self) -> Context:
"""State context."""
if self._context is None:
self._context = Context(id=None)
return self._context
@context.setter
def context(self, value: Context) -> None:
"""Set context."""
self._context = value
@property
def last_changed(self) -> datetime:
"""Last changed datetime."""
if self._last_changed is None:
if (last_changed := self._row.last_changed) is not None:
self._last_changed = process_timestamp(last_changed)
else:
self._last_changed = self.last_updated
return self._last_changed
@last_changed.setter
def last_changed(self, value: datetime) -> None:
"""Set last changed datetime."""
self._last_changed = value
@property
def last_updated(self) -> datetime:
"""Last updated datetime."""
if self._last_updated is None:
self._last_updated = process_timestamp(self._row.last_updated)
return self._last_updated
@last_updated.setter
def last_updated(self, value: datetime) -> None:
"""Set last updated datetime."""
self._last_updated = value
def as_dict(self) -> dict[str, Any]: # type: ignore[override]
"""Return a dict representation of the LazyState.
Async friendly.
To be used for JSON serialization.
"""
if self._last_changed is None and self._last_updated is None:
last_updated_isoformat = process_timestamp_to_utc_isoformat(
self._row.last_updated
)
if (
self._row.last_changed is None
or self._row.last_changed == self._row.last_updated
):
last_changed_isoformat = last_updated_isoformat
else:
last_changed_isoformat = process_timestamp_to_utc_isoformat(
self._row.last_changed
)
else:
last_updated_isoformat = self.last_updated.isoformat()
if self.last_changed == self.last_updated:
last_changed_isoformat = last_updated_isoformat
else:
last_changed_isoformat = self.last_changed.isoformat()
return {
"entity_id": self.entity_id,
"state": self.state,
"attributes": self._attributes or self.attributes,
"last_changed": last_changed_isoformat,
"last_updated": last_updated_isoformat,
}
def row_to_compressed_state_pre_schema_31(
row: Row,
attr_cache: dict[str, dict[str, Any]],
start_time: datetime | None,
) -> dict[str, Any]:
"""Convert a database row to a compressed state before schema 31."""
comp_state = {
COMPRESSED_STATE_STATE: row.state,
COMPRESSED_STATE_ATTRIBUTES: decode_attributes_from_row(row, attr_cache),
}
if start_time:
comp_state[COMPRESSED_STATE_LAST_UPDATED] = start_time.timestamp()
else:
row_last_updated: datetime = row.last_updated
comp_state[COMPRESSED_STATE_LAST_UPDATED] = process_datetime_to_timestamp(
row_last_updated
)
if (
row_changed_changed := row.last_changed
) and row_last_updated != row_changed_changed:
comp_state[COMPRESSED_STATE_LAST_CHANGED] = process_datetime_to_timestamp(
row_changed_changed
)
return comp_state

View File

@ -0,0 +1,145 @@
"""Models states in for Recorder."""
from __future__ import annotations
from datetime import datetime
import logging
from typing import Any
from sqlalchemy.engine.row import Row
from homeassistant.const import (
COMPRESSED_STATE_ATTRIBUTES,
COMPRESSED_STATE_LAST_CHANGED,
COMPRESSED_STATE_LAST_UPDATED,
COMPRESSED_STATE_STATE,
)
from homeassistant.core import Context, State
import homeassistant.util.dt as dt_util
from .state_attributes import decode_attributes_from_row
from .time import process_timestamp
# pylint: disable=invalid-name
_LOGGER = logging.getLogger(__name__)
class LazyState(State):
"""A lazy version of core State after schema 31."""
__slots__ = [
"_row",
"_attributes",
"_last_changed_ts",
"_last_updated_ts",
"_context",
"attr_cache",
]
def __init__( # pylint: disable=super-init-not-called
self,
row: Row,
attr_cache: dict[str, dict[str, Any]],
start_time: datetime | None,
) -> None:
"""Init the lazy state."""
self._row = row
self.entity_id: str = self._row.entity_id
self.state = self._row.state or ""
self._attributes: dict[str, Any] | None = None
self._last_updated_ts: float | None = self._row.last_updated_ts or (
dt_util.utc_to_timestamp(start_time) if start_time else None
)
self._last_changed_ts: float | None = (
self._row.last_changed_ts or self._last_updated_ts
)
self._context: Context | None = None
self.attr_cache = attr_cache
@property # type: ignore[override]
def attributes(self) -> dict[str, Any]:
"""State attributes."""
if self._attributes is None:
self._attributes = decode_attributes_from_row(self._row, self.attr_cache)
return self._attributes
@attributes.setter
def attributes(self, value: dict[str, Any]) -> None:
"""Set attributes."""
self._attributes = value
@property
def context(self) -> Context:
"""State context."""
if self._context is None:
self._context = Context(id=None)
return self._context
@context.setter
def context(self, value: Context) -> None:
"""Set context."""
self._context = value
@property
def last_changed(self) -> datetime:
"""Last changed datetime."""
assert self._last_changed_ts is not None
return dt_util.utc_from_timestamp(self._last_changed_ts)
@last_changed.setter
def last_changed(self, value: datetime) -> None:
"""Set last changed datetime."""
self._last_changed_ts = process_timestamp(value).timestamp()
@property
def last_updated(self) -> datetime:
"""Last updated datetime."""
assert self._last_updated_ts is not None
return dt_util.utc_from_timestamp(self._last_updated_ts)
@last_updated.setter
def last_updated(self, value: datetime) -> None:
"""Set last updated datetime."""
self._last_updated_ts = process_timestamp(value).timestamp()
def as_dict(self) -> dict[str, Any]: # type: ignore[override]
"""Return a dict representation of the LazyState.
Async friendly.
To be used for JSON serialization.
"""
last_updated_isoformat = self.last_updated.isoformat()
if self._last_changed_ts == self._last_updated_ts:
last_changed_isoformat = last_updated_isoformat
else:
last_changed_isoformat = self.last_changed.isoformat()
return {
"entity_id": self.entity_id,
"state": self.state,
"attributes": self._attributes or self.attributes,
"last_changed": last_changed_isoformat,
"last_updated": last_updated_isoformat,
}
def row_to_compressed_state(
row: Row,
attr_cache: dict[str, dict[str, Any]],
start_time: datetime | None,
) -> dict[str, Any]:
"""Convert a database row to a compressed state schema 31 and later."""
comp_state = {
COMPRESSED_STATE_STATE: row.state,
COMPRESSED_STATE_ATTRIBUTES: decode_attributes_from_row(row, attr_cache),
}
if start_time:
comp_state[COMPRESSED_STATE_LAST_UPDATED] = dt_util.utc_to_timestamp(start_time)
else:
row_last_updated_ts: float = row.last_updated_ts
comp_state[COMPRESSED_STATE_LAST_UPDATED] = row_last_updated_ts
if (
row_changed_changed_ts := row.last_changed_ts
) and row_last_updated_ts != row_changed_changed_ts:
comp_state[COMPRESSED_STATE_LAST_CHANGED] = row_changed_changed_ts
return comp_state

View File

@ -0,0 +1,30 @@
"""State attributes models."""
from __future__ import annotations
import logging
from typing import Any
from sqlalchemy.engine.row import Row
from homeassistant.util.json import json_loads_object
EMPTY_JSON_OBJECT = "{}"
_LOGGER = logging.getLogger(__name__)
def decode_attributes_from_row(
row: Row, attr_cache: dict[str, dict[str, Any]]
) -> dict[str, Any]:
"""Decode attributes from a database row."""
source: str = row.shared_attrs or row.attributes
if (attributes := attr_cache.get(source)) is not None:
return attributes
if not source or source == EMPTY_JSON_OBJECT:
return {}
try:
attr_cache[source] = attributes = json_loads_object(source)
except ValueError:
_LOGGER.exception("Error converting row to state attributes: %s", source)
attr_cache[source] = attributes = {}
return attributes

View File

@ -0,0 +1,89 @@
"""Models for statistics in the Recorder."""
from __future__ import annotations
from datetime import datetime, timedelta
from typing import Literal, TypedDict
class StatisticResult(TypedDict):
"""Statistic result data class.
Allows multiple datapoints for the same statistic_id.
"""
meta: StatisticMetaData
stat: StatisticData
class StatisticDataTimestampBase(TypedDict):
"""Mandatory fields for statistic data class with a timestamp."""
start_ts: float
class StatisticDataBase(TypedDict):
"""Mandatory fields for statistic data class."""
start: datetime
class StatisticMixIn(TypedDict, total=False):
"""Mandatory fields for statistic data class."""
state: float
sum: float
min: float
max: float
mean: float
class StatisticData(StatisticDataBase, StatisticMixIn, total=False):
"""Statistic data class."""
last_reset: datetime | None
class StatisticDataTimestamp(StatisticDataTimestampBase, StatisticMixIn, total=False):
"""Statistic data class with a timestamp."""
last_reset_ts: float | None
class StatisticMetaData(TypedDict):
"""Statistic meta data class."""
has_mean: bool
has_sum: bool
name: str | None
source: str
statistic_id: str
unit_of_measurement: str | None
class CalendarStatisticPeriod(TypedDict, total=False):
"""Statistic period definition."""
period: Literal["hour", "day", "week", "month", "year"]
offset: int
class FixedStatisticPeriod(TypedDict, total=False):
"""Statistic period definition."""
end_time: datetime
start_time: datetime
class RollingWindowStatisticPeriod(TypedDict, total=False):
"""Statistic period definition."""
duration: timedelta
offset: timedelta
class StatisticPeriod(TypedDict, total=False):
"""Statistic period definition."""
calendar: CalendarStatisticPeriod
fixed_period: FixedStatisticPeriod
rolling_window: RollingWindowStatisticPeriod

View File

@ -0,0 +1,82 @@
"""Models for Recorder."""
from __future__ import annotations
from datetime import datetime
import logging
from typing import overload
import homeassistant.util.dt as dt_util
# pylint: disable=invalid-name
_LOGGER = logging.getLogger(__name__)
DB_TIMEZONE = "+00:00"
EMPTY_JSON_OBJECT = "{}"
@overload
def process_timestamp(ts: None) -> None:
...
@overload
def process_timestamp(ts: datetime) -> datetime:
...
def process_timestamp(ts: datetime | None) -> datetime | None:
"""Process a timestamp into datetime object."""
if ts is None:
return None
if ts.tzinfo is None:
return ts.replace(tzinfo=dt_util.UTC)
return dt_util.as_utc(ts)
@overload
def process_timestamp_to_utc_isoformat(ts: None) -> None:
...
@overload
def process_timestamp_to_utc_isoformat(ts: datetime) -> str:
...
def process_timestamp_to_utc_isoformat(ts: datetime | None) -> str | None:
"""Process a timestamp into UTC isotime."""
if ts is None:
return None
if ts.tzinfo == dt_util.UTC:
return ts.isoformat()
if ts.tzinfo is None:
return f"{ts.isoformat()}{DB_TIMEZONE}"
return ts.astimezone(dt_util.UTC).isoformat()
def process_datetime_to_timestamp(ts: datetime) -> float:
"""Process a datebase datetime to epoch.
Mirrors the behavior of process_timestamp_to_utc_isoformat
except it returns the epoch time.
"""
if ts.tzinfo is None or ts.tzinfo == dt_util.UTC:
return dt_util.utc_to_timestamp(ts)
return ts.timestamp()
def datetime_to_timestamp_or_none(dt: datetime | None) -> float | None:
"""Convert a datetime to a timestamp."""
if dt is None:
return None
return dt_util.utc_to_timestamp(dt)
def timestamp_to_datetime_or_none(ts: float | None) -> datetime | None:
"""Convert a timestamp to a datetime."""
if not ts:
return None
return dt_util.utc_from_timestamp(ts)

View File

@ -20,11 +20,9 @@ from homeassistant.components.recorder.db_schema import (
StateAttributes,
States,
)
from homeassistant.components.recorder.models import (
LazyState,
LazyStatePreSchema31,
process_timestamp,
)
from homeassistant.components.recorder.history import legacy
from homeassistant.components.recorder.models import LazyState, process_timestamp
from homeassistant.components.recorder.models.legacy import LazyStatePreSchema31
from homeassistant.components.recorder.util import session_scope
import homeassistant.core as ha
from homeassistant.core import HomeAssistant, State
@ -63,7 +61,7 @@ async def _async_get_states(
attr_cache = {}
return [
klass(row, attr_cache, None)
for row in history._get_rows_with_session(
for row in legacy._get_rows_with_session(
hass,
session,
utc_point_in_time,

View File

@ -15,9 +15,12 @@ from sqlalchemy.sql.elements import TextClause
from sqlalchemy.sql.lambdas import StatementLambdaElement
from homeassistant.components import recorder
from homeassistant.components.recorder import history, util
from homeassistant.components.recorder import util
from homeassistant.components.recorder.const import DOMAIN, SQLITE_URL_PREFIX
from homeassistant.components.recorder.db_schema import RecorderRuns
from homeassistant.components.recorder.history.legacy import (
_get_single_entity_states_stmt,
)
from homeassistant.components.recorder.models import (
UnsupportedDialect,
process_timestamp,
@ -905,7 +908,7 @@ def test_execute_stmt_lambda_element(
with session_scope(hass=hass) as session:
# No time window, we always get a list
stmt = history._get_single_entity_states_stmt(
stmt = _get_single_entity_states_stmt(
instance.schema_version, dt_util.utcnow(), "sensor.on", False
)
rows = util.execute_stmt_lambda_element(session, stmt)