mirror of https://github.com/home-assistant/core
Initial draft of statistics (#49852)
This commit is contained in:
parent
703456abea
commit
89dd3292ba
|
@ -41,6 +41,7 @@ homeassistant.components.persistent_notification.*
|
|||
homeassistant.components.proximity.*
|
||||
homeassistant.components.recorder.purge
|
||||
homeassistant.components.recorder.repack
|
||||
homeassistant.components.recorder.statistics
|
||||
homeassistant.components.remote.*
|
||||
homeassistant.components.scene.*
|
||||
homeassistant.components.sensor.*
|
||||
|
|
|
@ -55,7 +55,7 @@ CONFIG_SCHEMA = vol.Schema(
|
|||
|
||||
@deprecated_function("homeassistant.components.recorder.history.get_significant_states")
|
||||
def get_significant_states(hass, *args, **kwargs):
|
||||
"""Wrap _get_significant_states with a sql session."""
|
||||
"""Wrap _get_significant_states with an sql session."""
|
||||
return history.get_significant_states(hass, *args, **kwargs)
|
||||
|
||||
|
||||
|
|
|
@ -35,12 +35,18 @@ from homeassistant.helpers.entityfilter import (
|
|||
INCLUDE_EXCLUDE_FILTER_SCHEMA_INNER,
|
||||
convert_include_exclude_filter,
|
||||
)
|
||||
from homeassistant.helpers.event import async_track_time_interval, track_time_change
|
||||
from homeassistant.helpers.event import (
|
||||
async_track_time_change,
|
||||
async_track_time_interval,
|
||||
)
|
||||
from homeassistant.helpers.integration_platform import (
|
||||
async_process_integration_platforms,
|
||||
)
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
from homeassistant.loader import bind_hass
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
||||
from . import history, migration, purge
|
||||
from . import history, migration, purge, statistics
|
||||
from .const import CONF_DB_INTEGRITY_CHECK, DATA_INSTANCE, DOMAIN, SQLITE_URL_PREFIX
|
||||
from .models import Base, Events, RecorderRuns, States
|
||||
from .pool import RecorderPool
|
||||
|
@ -56,6 +62,7 @@ from .util import (
|
|||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
SERVICE_PURGE = "purge"
|
||||
SERVICE_STATISTICS = "statistics"
|
||||
SERVICE_ENABLE = "enable"
|
||||
SERVICE_DISABLE = "disable"
|
||||
|
||||
|
@ -194,6 +201,7 @@ def run_information_with_session(session, point_in_time: datetime | None = None)
|
|||
|
||||
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
"""Set up the recorder."""
|
||||
hass.data[DOMAIN] = {}
|
||||
conf = config[DOMAIN]
|
||||
entity_filter = convert_include_exclude_filter(conf)
|
||||
auto_purge = conf[CONF_AUTO_PURGE]
|
||||
|
@ -221,10 +229,17 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
|||
instance.start()
|
||||
_async_register_services(hass, instance)
|
||||
history.async_setup(hass)
|
||||
statistics.async_setup(hass)
|
||||
await async_process_integration_platforms(hass, DOMAIN, _process_recorder_platform)
|
||||
|
||||
return await instance.async_db_ready
|
||||
|
||||
|
||||
async def _process_recorder_platform(hass, domain, platform):
|
||||
"""Process a recorder platform."""
|
||||
hass.data[DOMAIN][domain] = platform
|
||||
|
||||
|
||||
@callback
|
||||
def _async_register_services(hass, instance):
|
||||
"""Register recorder services."""
|
||||
|
@ -263,6 +278,12 @@ class PurgeTask(NamedTuple):
|
|||
apply_filter: bool
|
||||
|
||||
|
||||
class StatisticsTask(NamedTuple):
|
||||
"""An object to insert into the recorder queue to run a statistics task."""
|
||||
|
||||
start: datetime.datetime
|
||||
|
||||
|
||||
class WaitTask:
|
||||
"""An object to insert into the recorder queue to tell it set the _queue_watch event."""
|
||||
|
||||
|
@ -389,6 +410,13 @@ class Recorder(threading.Thread):
|
|||
|
||||
self.queue.put(PurgeTask(keep_days, repack, apply_filter))
|
||||
|
||||
def do_adhoc_statistics(self, **kwargs):
|
||||
"""Trigger an adhoc statistics run."""
|
||||
start = kwargs.get("start")
|
||||
if not start:
|
||||
start = statistics.get_start_time()
|
||||
self.queue.put(StatisticsTask(start))
|
||||
|
||||
@callback
|
||||
def async_register(self, shutdown_task, hass_started):
|
||||
"""Post connection initialize."""
|
||||
|
@ -451,7 +479,8 @@ class Recorder(threading.Thread):
|
|||
|
||||
@callback
|
||||
def _async_recorder_ready(self):
|
||||
"""Mark recorder ready."""
|
||||
"""Finish start and mark recorder ready."""
|
||||
self._async_setup_periodic_tasks()
|
||||
self.async_recorder_ready.set()
|
||||
|
||||
@callback
|
||||
|
@ -459,6 +488,24 @@ class Recorder(threading.Thread):
|
|||
"""Trigger the purge."""
|
||||
self.queue.put(PurgeTask(self.keep_days, repack=False, apply_filter=False))
|
||||
|
||||
@callback
|
||||
def async_hourly_statistics(self, now):
|
||||
"""Trigger the hourly statistics run."""
|
||||
start = statistics.get_start_time()
|
||||
self.queue.put(StatisticsTask(start))
|
||||
|
||||
def _async_setup_periodic_tasks(self):
|
||||
"""Prepare periodic tasks."""
|
||||
if self.auto_purge:
|
||||
# Purge every night at 4:12am
|
||||
async_track_time_change(
|
||||
self.hass, self.async_purge, hour=4, minute=12, second=0
|
||||
)
|
||||
# Compile hourly statistics every hour at *:12
|
||||
async_track_time_change(
|
||||
self.hass, self.async_hourly_statistics, minute=12, second=0
|
||||
)
|
||||
|
||||
def run(self):
|
||||
"""Start processing events to save."""
|
||||
shutdown_task = object()
|
||||
|
@ -507,11 +554,6 @@ class Recorder(threading.Thread):
|
|||
self._shutdown()
|
||||
return
|
||||
|
||||
# Start periodic purge
|
||||
if self.auto_purge:
|
||||
# Purge every night at 4:12am
|
||||
track_time_change(self.hass, self.async_purge, hour=4, minute=12, second=0)
|
||||
|
||||
_LOGGER.debug("Recorder processing the queue")
|
||||
self.hass.add_job(self._async_recorder_ready)
|
||||
self._run_event_loop()
|
||||
|
@ -608,11 +650,21 @@ class Recorder(threading.Thread):
|
|||
# Schedule a new purge task if this one didn't finish
|
||||
self.queue.put(PurgeTask(keep_days, repack, apply_filter))
|
||||
|
||||
def _run_statistics(self, start):
|
||||
"""Run statistics task."""
|
||||
if statistics.compile_statistics(self, start):
|
||||
return
|
||||
# Schedule a new statistics task if this one didn't finish
|
||||
self.queue.put(StatisticsTask(start))
|
||||
|
||||
def _process_one_event(self, event):
|
||||
"""Process one event."""
|
||||
if isinstance(event, PurgeTask):
|
||||
self._run_purge(event.keep_days, event.repack, event.apply_filter)
|
||||
return
|
||||
if isinstance(event, StatisticsTask):
|
||||
self._run_statistics(event.start)
|
||||
return
|
||||
if isinstance(event, WaitTask):
|
||||
self._queue_watch.set()
|
||||
return
|
||||
|
|
|
@ -52,7 +52,7 @@ QUERY_STATES = [
|
|||
States.last_updated,
|
||||
]
|
||||
|
||||
HISTORY_BAKERY = "history_bakery"
|
||||
HISTORY_BAKERY = "recorder_history_bakery"
|
||||
|
||||
|
||||
def async_setup(hass):
|
||||
|
|
|
@ -6,6 +6,7 @@ from sqlalchemy import (
|
|||
Boolean,
|
||||
Column,
|
||||
DateTime,
|
||||
Float,
|
||||
ForeignKey,
|
||||
Index,
|
||||
Integer,
|
||||
|
@ -38,7 +39,15 @@ TABLE_STATES = "states"
|
|||
TABLE_RECORDER_RUNS = "recorder_runs"
|
||||
TABLE_SCHEMA_CHANGES = "schema_changes"
|
||||
|
||||
ALL_TABLES = [TABLE_STATES, TABLE_EVENTS, TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES]
|
||||
TABLE_STATISTICS = "statistics"
|
||||
|
||||
ALL_TABLES = [
|
||||
TABLE_STATES,
|
||||
TABLE_EVENTS,
|
||||
TABLE_RECORDER_RUNS,
|
||||
TABLE_SCHEMA_CHANGES,
|
||||
TABLE_STATISTICS,
|
||||
]
|
||||
|
||||
DATETIME_TYPE = DateTime(timezone=True).with_variant(
|
||||
mysql.DATETIME(timezone=True, fsp=6), "mysql"
|
||||
|
@ -198,6 +207,39 @@ class States(Base): # type: ignore
|
|||
return None
|
||||
|
||||
|
||||
class Statistics(Base): # type: ignore
|
||||
"""Statistics."""
|
||||
|
||||
__table_args__ = {
|
||||
"mysql_default_charset": "utf8mb4",
|
||||
"mysql_collate": "utf8mb4_unicode_ci",
|
||||
}
|
||||
__tablename__ = TABLE_STATISTICS
|
||||
id = Column(Integer, primary_key=True)
|
||||
created = Column(DATETIME_TYPE, default=dt_util.utcnow)
|
||||
source = Column(String(32))
|
||||
statistic_id = Column(String(255))
|
||||
start = Column(DATETIME_TYPE, index=True)
|
||||
mean = Column(Float())
|
||||
min = Column(Float())
|
||||
max = Column(Float())
|
||||
|
||||
__table_args__ = (
|
||||
# Used for fetching statistics for a certain entity at a specific time
|
||||
Index("ix_statistics_statistic_id_start", "statistic_id", "start"),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def from_stats(source, statistic_id, start, stats):
|
||||
"""Create object from a statistics."""
|
||||
return Statistics(
|
||||
source=source,
|
||||
statistic_id=statistic_id,
|
||||
start=start,
|
||||
**stats,
|
||||
)
|
||||
|
||||
|
||||
class RecorderRuns(Base): # type: ignore
|
||||
"""Representation of recorder run."""
|
||||
|
||||
|
|
|
@ -3,10 +3,8 @@ from __future__ import annotations
|
|||
|
||||
from datetime import datetime, timedelta
|
||||
import logging
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from sqlalchemy.exc import OperationalError
|
||||
from sqlalchemy.orm.session import Session
|
||||
from sqlalchemy.sql.expression import distinct
|
||||
|
||||
|
@ -15,20 +13,15 @@ import homeassistant.util.dt as dt_util
|
|||
from .const import MAX_ROWS_TO_PURGE
|
||||
from .models import Events, RecorderRuns, States
|
||||
from .repack import repack_database
|
||||
from .util import session_scope
|
||||
from .util import retryable_database_job, session_scope
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from . import Recorder
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
# Retry when one of the following MySQL errors occurred:
|
||||
RETRYABLE_MYSQL_ERRORS = (1205, 1206, 1213)
|
||||
# 1205: Lock wait timeout exceeded; try restarting transaction
|
||||
# 1206: The total number of locks exceeds the lock table size
|
||||
# 1213: Deadlock found when trying to get lock; try restarting transaction
|
||||
|
||||
|
||||
@retryable_database_job("purge")
|
||||
def purge_old_data(
|
||||
instance: Recorder, purge_days: int, repack: bool, apply_filter: bool = False
|
||||
) -> bool:
|
||||
|
@ -41,36 +34,25 @@ def purge_old_data(
|
|||
"Purging states and events before target %s",
|
||||
purge_before.isoformat(sep=" ", timespec="seconds"),
|
||||
)
|
||||
try:
|
||||
with session_scope(session=instance.get_session()) as session: # type: ignore
|
||||
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record
|
||||
event_ids = _select_event_ids_to_purge(session, purge_before)
|
||||
state_ids = _select_state_ids_to_purge(session, purge_before, event_ids)
|
||||
if state_ids:
|
||||
_purge_state_ids(session, state_ids)
|
||||
if event_ids:
|
||||
_purge_event_ids(session, event_ids)
|
||||
# If states or events purging isn't processing the purge_before yet,
|
||||
# return false, as we are not done yet.
|
||||
_LOGGER.debug("Purging hasn't fully completed yet")
|
||||
return False
|
||||
if apply_filter and _purge_filtered_data(instance, session) is False:
|
||||
_LOGGER.debug("Cleanup filtered data hasn't fully completed yet")
|
||||
return False
|
||||
_purge_old_recorder_runs(instance, session, purge_before)
|
||||
if repack:
|
||||
repack_database(instance)
|
||||
except OperationalError as err:
|
||||
if (
|
||||
instance.engine.dialect.name == "mysql"
|
||||
and err.orig.args[0] in RETRYABLE_MYSQL_ERRORS
|
||||
):
|
||||
_LOGGER.info("%s; purge not completed, retrying", err.orig.args[1])
|
||||
time.sleep(instance.db_retry_wait)
|
||||
|
||||
with session_scope(session=instance.get_session()) as session: # type: ignore
|
||||
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record
|
||||
event_ids = _select_event_ids_to_purge(session, purge_before)
|
||||
state_ids = _select_state_ids_to_purge(session, purge_before, event_ids)
|
||||
if state_ids:
|
||||
_purge_state_ids(session, state_ids)
|
||||
if event_ids:
|
||||
_purge_event_ids(session, event_ids)
|
||||
# If states or events purging isn't processing the purge_before yet,
|
||||
# return false, as we are not done yet.
|
||||
_LOGGER.debug("Purging hasn't fully completed yet")
|
||||
return False
|
||||
|
||||
_LOGGER.warning("Error purging history: %s", err)
|
||||
|
||||
if apply_filter and _purge_filtered_data(instance, session) is False:
|
||||
_LOGGER.debug("Cleanup filtered data hasn't fully completed yet")
|
||||
return False
|
||||
_purge_old_recorder_runs(instance, session, purge_before)
|
||||
if repack:
|
||||
repack_database(instance)
|
||||
return True
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,138 @@
|
|||
"""Statistics helper."""
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timedelta
|
||||
from itertools import groupby
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from sqlalchemy import bindparam
|
||||
from sqlalchemy.ext import baked
|
||||
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
||||
from .const import DOMAIN
|
||||
from .models import Statistics, process_timestamp_to_utc_isoformat
|
||||
from .util import execute, retryable_database_job, session_scope
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from . import Recorder
|
||||
|
||||
QUERY_STATISTICS = [
|
||||
Statistics.statistic_id,
|
||||
Statistics.start,
|
||||
Statistics.mean,
|
||||
Statistics.min,
|
||||
Statistics.max,
|
||||
]
|
||||
|
||||
STATISTICS_BAKERY = "recorder_statistics_bakery"
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def async_setup(hass):
|
||||
"""Set up the history hooks."""
|
||||
hass.data[STATISTICS_BAKERY] = baked.bakery()
|
||||
|
||||
|
||||
def get_start_time() -> datetime.datetime:
|
||||
"""Return start time."""
|
||||
last_hour = dt_util.utcnow() - timedelta(hours=1)
|
||||
start = last_hour.replace(minute=0, second=0, microsecond=0)
|
||||
return start
|
||||
|
||||
|
||||
@retryable_database_job("statistics")
|
||||
def compile_statistics(instance: Recorder, start: datetime.datetime) -> bool:
|
||||
"""Compile statistics."""
|
||||
start = dt_util.as_utc(start)
|
||||
end = start + timedelta(hours=1)
|
||||
_LOGGER.debug(
|
||||
"Compiling statistics for %s-%s",
|
||||
start,
|
||||
end,
|
||||
)
|
||||
platform_stats = []
|
||||
for domain, platform in instance.hass.data[DOMAIN].items():
|
||||
if not hasattr(platform, "compile_statistics"):
|
||||
continue
|
||||
platform_stats.append(platform.compile_statistics(instance.hass, start, end))
|
||||
_LOGGER.debug(
|
||||
"Statistics for %s during %s-%s: %s", domain, start, end, platform_stats[-1]
|
||||
)
|
||||
|
||||
with session_scope(session=instance.get_session()) as session: # type: ignore
|
||||
for stats in platform_stats:
|
||||
for entity_id, stat in stats.items():
|
||||
session.add(Statistics.from_stats(DOMAIN, entity_id, start, stat))
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def statistics_during_period(hass, start_time, end_time=None, statistic_id=None):
|
||||
"""Return states changes during UTC period start_time - end_time."""
|
||||
with session_scope(hass=hass) as session:
|
||||
baked_query = hass.data[STATISTICS_BAKERY](
|
||||
lambda session: session.query(*QUERY_STATISTICS)
|
||||
)
|
||||
|
||||
baked_query += lambda q: q.filter(Statistics.start >= bindparam("start_time"))
|
||||
|
||||
if end_time is not None:
|
||||
baked_query += lambda q: q.filter(Statistics.start < bindparam("end_time"))
|
||||
|
||||
if statistic_id is not None:
|
||||
baked_query += lambda q: q.filter_by(statistic_id=bindparam("statistic_id"))
|
||||
statistic_id = statistic_id.lower()
|
||||
|
||||
baked_query += lambda q: q.order_by(Statistics.statistic_id, Statistics.start)
|
||||
|
||||
stats = execute(
|
||||
baked_query(session).params(
|
||||
start_time=start_time, end_time=end_time, statistic_id=statistic_id
|
||||
)
|
||||
)
|
||||
|
||||
statistic_ids = [statistic_id] if statistic_id is not None else None
|
||||
|
||||
return _sorted_statistics_to_dict(
|
||||
hass, session, stats, start_time, statistic_ids
|
||||
)
|
||||
|
||||
|
||||
def _sorted_statistics_to_dict(
|
||||
hass,
|
||||
session,
|
||||
stats,
|
||||
start_time,
|
||||
statistic_ids,
|
||||
):
|
||||
"""Convert SQL results into JSON friendly data structure."""
|
||||
result = defaultdict(list)
|
||||
# Set all statistic IDs to empty lists in result set to maintain the order
|
||||
if statistic_ids is not None:
|
||||
for stat_id in statistic_ids:
|
||||
result[stat_id] = []
|
||||
|
||||
# Called in a tight loop so cache the function
|
||||
# here
|
||||
_process_timestamp_to_utc_isoformat = process_timestamp_to_utc_isoformat
|
||||
|
||||
# Append all changes to it
|
||||
for ent_id, group in groupby(stats, lambda state: state.statistic_id):
|
||||
ent_results = result[ent_id]
|
||||
ent_results.extend(
|
||||
{
|
||||
"statistic_id": db_state.statistic_id,
|
||||
"start": _process_timestamp_to_utc_isoformat(db_state.start),
|
||||
"mean": db_state.mean,
|
||||
"min": db_state.min,
|
||||
"max": db_state.max,
|
||||
}
|
||||
for db_state in group
|
||||
)
|
||||
|
||||
# Filter out the empty lists if some states had 0 results.
|
||||
return {key: val for key, val in result.items() if val}
|
|
@ -4,9 +4,11 @@ from __future__ import annotations
|
|||
from collections.abc import Generator
|
||||
from contextlib import contextmanager
|
||||
from datetime import timedelta
|
||||
import functools
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from sqlalchemy.exc import OperationalError, SQLAlchemyError
|
||||
from sqlalchemy.orm.session import Session
|
||||
|
@ -19,10 +21,14 @@ from .models import (
|
|||
ALL_TABLES,
|
||||
TABLE_RECORDER_RUNS,
|
||||
TABLE_SCHEMA_CHANGES,
|
||||
TABLE_STATISTICS,
|
||||
RecorderRuns,
|
||||
process_timestamp,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from . import Recorder
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
RETRIES = 3
|
||||
|
@ -34,6 +40,12 @@ SQLITE3_POSTFIXES = ["", "-wal", "-shm"]
|
|||
# should do a check on the sqlite3 database.
|
||||
MAX_RESTART_TIME = timedelta(minutes=10)
|
||||
|
||||
# Retry when one of the following MySQL errors occurred:
|
||||
RETRYABLE_MYSQL_ERRORS = (1205, 1206, 1213)
|
||||
# 1205: Lock wait timeout exceeded; try restarting transaction
|
||||
# 1206: The total number of locks exceeds the lock table size
|
||||
# 1213: Deadlock found when trying to get lock; try restarting transaction
|
||||
|
||||
|
||||
@contextmanager
|
||||
def session_scope(
|
||||
|
@ -167,6 +179,8 @@ def basic_sanity_check(cursor):
|
|||
"""Check tables to make sure select does not fail."""
|
||||
|
||||
for table in ALL_TABLES:
|
||||
if table == TABLE_STATISTICS:
|
||||
continue
|
||||
if table in (TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES):
|
||||
cursor.execute(f"SELECT * FROM {table};") # nosec # not injection
|
||||
else:
|
||||
|
@ -270,3 +284,36 @@ def end_incomplete_runs(session, start_time):
|
|||
"Ended unfinished session (id=%s from %s)", run.run_id, run.start
|
||||
)
|
||||
session.add(run)
|
||||
|
||||
|
||||
def retryable_database_job(description: str):
|
||||
"""Try to execute a database job.
|
||||
|
||||
The job should return True if it finished, and False if it needs to be rescheduled.
|
||||
"""
|
||||
|
||||
def decorator(job: callable):
|
||||
@functools.wraps(job)
|
||||
def wrapper(instance: Recorder, *args, **kwargs):
|
||||
try:
|
||||
return job(instance, *args, **kwargs)
|
||||
except OperationalError as err:
|
||||
if (
|
||||
instance.engine.dialect.name == "mysql"
|
||||
and err.orig.args[0] in RETRYABLE_MYSQL_ERRORS
|
||||
):
|
||||
_LOGGER.info(
|
||||
"%s; %s not completed, retrying", err.orig.args[1], description
|
||||
)
|
||||
time.sleep(instance.db_retry_wait)
|
||||
# Failed with retryable error
|
||||
return False
|
||||
|
||||
_LOGGER.warning("Error executing %s: %s", description, err)
|
||||
|
||||
# Failed with permanent error
|
||||
return True
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
|
|
@ -3,5 +3,6 @@
|
|||
"name": "Sensor",
|
||||
"documentation": "https://www.home-assistant.io/integrations/sensor",
|
||||
"codeowners": [],
|
||||
"quality_scale": "internal"
|
||||
"quality_scale": "internal",
|
||||
"after_dependencies": ["recorder"]
|
||||
}
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
"""Statistics helper for sensor."""
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import statistics
|
||||
|
||||
from homeassistant.components.recorder import history
|
||||
from homeassistant.components.sensor import ATTR_STATE_CLASS, STATE_CLASS_MEASUREMENT
|
||||
from homeassistant.const import ATTR_DEVICE_CLASS
|
||||
from homeassistant.core import HomeAssistant
|
||||
|
||||
from . import DOMAIN
|
||||
|
||||
DEVICE_CLASS_STATISTICS = {"temperature": {"mean", "min", "max"}}
|
||||
|
||||
|
||||
def _get_entities(hass: HomeAssistant) -> list[tuple[str, str]]:
|
||||
"""Get (entity_id, device_class) of all sensors for which to compile statistics."""
|
||||
all_sensors = hass.states.all(DOMAIN)
|
||||
entity_ids = []
|
||||
|
||||
for state in all_sensors:
|
||||
device_class = state.attributes.get(ATTR_DEVICE_CLASS)
|
||||
state_class = state.attributes.get(ATTR_STATE_CLASS)
|
||||
if not state_class or state_class != STATE_CLASS_MEASUREMENT:
|
||||
continue
|
||||
if not device_class or device_class not in DEVICE_CLASS_STATISTICS:
|
||||
continue
|
||||
entity_ids.append((state.entity_id, device_class))
|
||||
return entity_ids
|
||||
|
||||
|
||||
# Faster than try/except
|
||||
# From https://stackoverflow.com/a/23639915
|
||||
def _is_number(s: str) -> bool: # pylint: disable=invalid-name
|
||||
"""Return True if string is a number."""
|
||||
return s.replace(".", "", 1).isdigit()
|
||||
|
||||
|
||||
def compile_statistics(
|
||||
hass: HomeAssistant, start: datetime.datetime, end: datetime.datetime
|
||||
) -> dict:
|
||||
"""Compile statistics for all entities during start-end.
|
||||
|
||||
Note: This will query the database and must not be run in the event loop
|
||||
"""
|
||||
result: dict = {}
|
||||
|
||||
entities = _get_entities(hass)
|
||||
|
||||
# Get history between start and end
|
||||
history_list = history.get_significant_states( # type: ignore
|
||||
hass, start, end, [i[0] for i in entities]
|
||||
)
|
||||
|
||||
for entity_id, device_class in entities:
|
||||
wanted_statistics = DEVICE_CLASS_STATISTICS[device_class]
|
||||
|
||||
if entity_id not in history_list:
|
||||
continue
|
||||
|
||||
entity_history = history_list[entity_id]
|
||||
fstates = [float(el.state) for el in entity_history if _is_number(el.state)]
|
||||
|
||||
if not fstates:
|
||||
continue
|
||||
|
||||
result[entity_id] = {}
|
||||
|
||||
# Make calculations
|
||||
if "max" in wanted_statistics:
|
||||
result[entity_id]["max"] = max(fstates)
|
||||
if "min" in wanted_statistics:
|
||||
result[entity_id]["min"] = min(fstates)
|
||||
|
||||
# Note: The average calculation will be incorrect for unevenly spaced readings,
|
||||
# this needs to be improved by weighting with time between measurements
|
||||
if "mean" in wanted_statistics:
|
||||
result[entity_id]["mean"] = statistics.fmean(fstates)
|
||||
|
||||
return result
|
11
mypy.ini
11
mypy.ini
|
@ -462,6 +462,17 @@ no_implicit_optional = true
|
|||
warn_return_any = true
|
||||
warn_unreachable = true
|
||||
|
||||
[mypy-homeassistant.components.recorder.statistics]
|
||||
check_untyped_defs = true
|
||||
disallow_incomplete_defs = true
|
||||
disallow_subclassing_any = true
|
||||
disallow_untyped_calls = true
|
||||
disallow_untyped_decorators = true
|
||||
disallow_untyped_defs = true
|
||||
no_implicit_optional = true
|
||||
warn_return_any = true
|
||||
warn_unreachable = true
|
||||
|
||||
[mypy-homeassistant.components.remote.*]
|
||||
check_untyped_defs = true
|
||||
disallow_incomplete_defs = true
|
||||
|
|
|
@ -590,7 +590,7 @@ def run_tasks_at_time(hass, test_time):
|
|||
|
||||
|
||||
def test_auto_purge(hass_recorder):
|
||||
"""Test periodic purge alarm scheduling."""
|
||||
"""Test periodic purge scheduling."""
|
||||
hass = hass_recorder()
|
||||
|
||||
original_tz = dt_util.DEFAULT_TIME_ZONE
|
||||
|
@ -598,9 +598,10 @@ def test_auto_purge(hass_recorder):
|
|||
tz = dt_util.get_time_zone("Europe/Copenhagen")
|
||||
dt_util.set_default_time_zone(tz)
|
||||
|
||||
# Purging is schedule to happen at 4:12am every day. Exercise this behavior
|
||||
# by firing alarms and advancing the clock around this time. Pick an arbitrary
|
||||
# year in the future to avoid boundary conditions relative to the current date.
|
||||
# Purging is scheduled to happen at 4:12am every day. Exercise this behavior by
|
||||
# firing time changed events and advancing the clock around this time. Pick an
|
||||
# arbitrary year in the future to avoid boundary conditions relative to the current
|
||||
# date.
|
||||
#
|
||||
# The clock is started at 4:15am then advanced forward below
|
||||
now = dt_util.utcnow()
|
||||
|
@ -637,6 +638,56 @@ def test_auto_purge(hass_recorder):
|
|||
dt_util.set_default_time_zone(original_tz)
|
||||
|
||||
|
||||
def test_auto_statistics(hass_recorder):
|
||||
"""Test periodic statistics scheduling."""
|
||||
hass = hass_recorder()
|
||||
|
||||
original_tz = dt_util.DEFAULT_TIME_ZONE
|
||||
|
||||
tz = dt_util.get_time_zone("Europe/Copenhagen")
|
||||
dt_util.set_default_time_zone(tz)
|
||||
|
||||
# Statistics is scheduled to happen at *:12am every hour. Exercise this behavior by
|
||||
# firing time changed events and advancing the clock around this time. Pick an
|
||||
# arbitrary year in the future to avoid boundary conditions relative to the current
|
||||
# date.
|
||||
#
|
||||
# The clock is started at 4:15am then advanced forward below
|
||||
now = dt_util.utcnow()
|
||||
test_time = datetime(now.year + 2, 1, 1, 4, 15, 0, tzinfo=tz)
|
||||
run_tasks_at_time(hass, test_time)
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.recorder.statistics.compile_statistics",
|
||||
return_value=True,
|
||||
) as compile_statistics:
|
||||
# Advance one hour, and the statistics task should run
|
||||
test_time = test_time + timedelta(hours=1)
|
||||
run_tasks_at_time(hass, test_time)
|
||||
assert len(compile_statistics.mock_calls) == 1
|
||||
|
||||
compile_statistics.reset_mock()
|
||||
|
||||
# Advance one hour, and the statistics task should run again
|
||||
test_time = test_time + timedelta(hours=1)
|
||||
run_tasks_at_time(hass, test_time)
|
||||
assert len(compile_statistics.mock_calls) == 1
|
||||
|
||||
compile_statistics.reset_mock()
|
||||
|
||||
# Advance less than one full hour. The task should not run.
|
||||
test_time = test_time + timedelta(minutes=50)
|
||||
run_tasks_at_time(hass, test_time)
|
||||
assert len(compile_statistics.mock_calls) == 0
|
||||
|
||||
# Advance to the next hour, and the statistics task should run again
|
||||
test_time = test_time + timedelta(hours=1)
|
||||
run_tasks_at_time(hass, test_time)
|
||||
assert len(compile_statistics.mock_calls) == 1
|
||||
|
||||
dt_util.set_default_time_zone(original_tz)
|
||||
|
||||
|
||||
def test_saving_sets_old_state(hass_recorder):
|
||||
"""Test saving sets old state."""
|
||||
hass = hass_recorder()
|
||||
|
|
|
@ -104,7 +104,7 @@ async def test_purge_old_states_encounters_temporary_mysql_error(
|
|||
mysql_exception.orig = MagicMock(args=(1205, "retryable"))
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.recorder.purge.time.sleep"
|
||||
"homeassistant.components.recorder.util.time.sleep"
|
||||
) as sleep_mock, patch(
|
||||
"homeassistant.components.recorder.purge._purge_old_recorder_runs",
|
||||
side_effect=[mysql_exception, None],
|
||||
|
@ -147,7 +147,7 @@ async def test_purge_old_states_encounters_operational_error(
|
|||
await async_wait_recording_done_without_instance(hass)
|
||||
|
||||
assert "retrying" not in caplog.text
|
||||
assert "Error purging history" in caplog.text
|
||||
assert "Error executing purge" in caplog.text
|
||||
|
||||
|
||||
async def test_purge_old_events(
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
"""The tests for sensor recorder platform."""
|
||||
# pylint: disable=protected-access,invalid-name
|
||||
from datetime import timedelta
|
||||
from unittest.mock import patch, sentinel
|
||||
|
||||
from homeassistant.components.recorder import history
|
||||
from homeassistant.components.recorder.const import DATA_INSTANCE
|
||||
from homeassistant.components.recorder.models import process_timestamp_to_utc_isoformat
|
||||
from homeassistant.components.recorder.statistics import statistics_during_period
|
||||
from homeassistant.setup import setup_component
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
||||
from tests.components.recorder.common import wait_recording_done
|
||||
|
||||
|
||||
def test_compile_hourly_statistics(hass_recorder):
|
||||
"""Test compiling hourly statistics."""
|
||||
hass = hass_recorder()
|
||||
recorder = hass.data[DATA_INSTANCE]
|
||||
setup_component(hass, "sensor", {})
|
||||
zero, four, states = record_states(hass)
|
||||
hist = history.get_significant_states(hass, zero, four)
|
||||
assert dict(states) == dict(hist)
|
||||
|
||||
recorder.do_adhoc_statistics(period="hourly", start=zero)
|
||||
wait_recording_done(hass)
|
||||
stats = statistics_during_period(hass, zero)
|
||||
assert stats == {
|
||||
"sensor.test1": [
|
||||
{
|
||||
"statistic_id": "sensor.test1",
|
||||
"start": process_timestamp_to_utc_isoformat(zero),
|
||||
"mean": 15.0,
|
||||
"min": 10.0,
|
||||
"max": 20.0,
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
def record_states(hass):
|
||||
"""Record some test states.
|
||||
|
||||
We inject a bunch of state updates temperature sensors.
|
||||
"""
|
||||
mp = "media_player.test"
|
||||
sns1 = "sensor.test1"
|
||||
sns2 = "sensor.test2"
|
||||
sns3 = "sensor.test3"
|
||||
sns1_attr = {"device_class": "temperature", "state_class": "measurement"}
|
||||
sns2_attr = {"device_class": "temperature"}
|
||||
sns3_attr = {}
|
||||
|
||||
def set_state(entity_id, state, **kwargs):
|
||||
"""Set the state."""
|
||||
hass.states.set(entity_id, state, **kwargs)
|
||||
wait_recording_done(hass)
|
||||
return hass.states.get(entity_id)
|
||||
|
||||
zero = dt_util.utcnow()
|
||||
one = zero + timedelta(minutes=1)
|
||||
two = one + timedelta(minutes=15)
|
||||
three = two + timedelta(minutes=30)
|
||||
four = three + timedelta(minutes=15)
|
||||
|
||||
states = {mp: [], sns1: [], sns2: [], sns3: []}
|
||||
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one):
|
||||
states[mp].append(
|
||||
set_state(mp, "idle", attributes={"media_title": str(sentinel.mt1)})
|
||||
)
|
||||
states[mp].append(
|
||||
set_state(mp, "YouTube", attributes={"media_title": str(sentinel.mt2)})
|
||||
)
|
||||
states[sns1].append(set_state(sns1, "10", attributes=sns1_attr))
|
||||
states[sns2].append(set_state(sns2, "10", attributes=sns2_attr))
|
||||
states[sns3].append(set_state(sns3, "10", attributes=sns3_attr))
|
||||
|
||||
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=two):
|
||||
states[sns1].append(set_state(sns1, "15", attributes=sns1_attr))
|
||||
states[sns2].append(set_state(sns2, "15", attributes=sns2_attr))
|
||||
states[sns3].append(set_state(sns3, "15", attributes=sns3_attr))
|
||||
|
||||
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=three):
|
||||
states[sns1].append(set_state(sns1, "20", attributes=sns1_attr))
|
||||
states[sns2].append(set_state(sns2, "20", attributes=sns2_attr))
|
||||
states[sns3].append(set_state(sns3, "20", attributes=sns3_attr))
|
||||
|
||||
return zero, four, states
|
|
@ -0,0 +1,224 @@
|
|||
"""The tests for sensor recorder platform."""
|
||||
# pylint: disable=protected-access,invalid-name
|
||||
from datetime import timedelta
|
||||
from unittest.mock import patch, sentinel
|
||||
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.recorder import history
|
||||
from homeassistant.components.recorder.const import DATA_INSTANCE
|
||||
from homeassistant.components.recorder.models import process_timestamp_to_utc_isoformat
|
||||
from homeassistant.components.recorder.statistics import statistics_during_period
|
||||
from homeassistant.const import STATE_UNAVAILABLE
|
||||
from homeassistant.setup import setup_component
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
||||
from tests.common import get_test_home_assistant, init_recorder_component
|
||||
from tests.components.recorder.common import wait_recording_done
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def hass_recorder():
|
||||
"""Home Assistant fixture with in-memory recorder."""
|
||||
hass = get_test_home_assistant()
|
||||
|
||||
def setup_recorder(config=None):
|
||||
"""Set up with params."""
|
||||
init_recorder_component(hass, config)
|
||||
hass.start()
|
||||
hass.block_till_done()
|
||||
hass.data[DATA_INSTANCE].block_till_done()
|
||||
return hass
|
||||
|
||||
yield setup_recorder
|
||||
hass.stop()
|
||||
|
||||
|
||||
def test_compile_hourly_statistics(hass_recorder):
|
||||
"""Test compiling hourly statistics."""
|
||||
hass = hass_recorder()
|
||||
recorder = hass.data[DATA_INSTANCE]
|
||||
setup_component(hass, "sensor", {})
|
||||
zero, four, states = record_states(hass)
|
||||
hist = history.get_significant_states(hass, zero, four)
|
||||
assert dict(states) == dict(hist)
|
||||
|
||||
recorder.do_adhoc_statistics(period="hourly", start=zero)
|
||||
wait_recording_done(hass)
|
||||
stats = statistics_during_period(hass, zero)
|
||||
assert stats == {
|
||||
"sensor.test1": [
|
||||
{
|
||||
"statistic_id": "sensor.test1",
|
||||
"start": process_timestamp_to_utc_isoformat(zero),
|
||||
"mean": 15.0,
|
||||
"min": 10.0,
|
||||
"max": 20.0,
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
def test_compile_hourly_statistics_unchanged(hass_recorder):
|
||||
"""Test compiling hourly statistics, with no changes during the hour."""
|
||||
hass = hass_recorder()
|
||||
recorder = hass.data[DATA_INSTANCE]
|
||||
setup_component(hass, "sensor", {})
|
||||
zero, four, states = record_states(hass)
|
||||
hist = history.get_significant_states(hass, zero, four)
|
||||
assert dict(states) == dict(hist)
|
||||
|
||||
recorder.do_adhoc_statistics(period="hourly", start=four)
|
||||
wait_recording_done(hass)
|
||||
stats = statistics_during_period(hass, four)
|
||||
assert stats == {
|
||||
"sensor.test1": [
|
||||
{
|
||||
"statistic_id": "sensor.test1",
|
||||
"start": process_timestamp_to_utc_isoformat(four),
|
||||
"mean": 20.0,
|
||||
"min": 20.0,
|
||||
"max": 20.0,
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
def test_compile_hourly_statistics_partially_unavailable(hass_recorder):
|
||||
"""Test compiling hourly statistics, with the sensor being partially unavailable."""
|
||||
hass = hass_recorder()
|
||||
recorder = hass.data[DATA_INSTANCE]
|
||||
setup_component(hass, "sensor", {})
|
||||
zero, four, states = record_states_partially_unavailable(hass)
|
||||
hist = history.get_significant_states(hass, zero, four)
|
||||
assert dict(states) == dict(hist)
|
||||
|
||||
recorder.do_adhoc_statistics(period="hourly", start=zero)
|
||||
wait_recording_done(hass)
|
||||
stats = statistics_during_period(hass, zero)
|
||||
assert stats == {
|
||||
"sensor.test1": [
|
||||
{
|
||||
"statistic_id": "sensor.test1",
|
||||
"start": process_timestamp_to_utc_isoformat(zero),
|
||||
"mean": 17.5,
|
||||
"min": 10.0,
|
||||
"max": 25.0,
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
def test_compile_hourly_statistics_unavailable(hass_recorder):
|
||||
"""Test compiling hourly statistics, with the sensor being unavailable."""
|
||||
hass = hass_recorder()
|
||||
recorder = hass.data[DATA_INSTANCE]
|
||||
setup_component(hass, "sensor", {})
|
||||
zero, four, states = record_states_partially_unavailable(hass)
|
||||
hist = history.get_significant_states(hass, zero, four)
|
||||
assert dict(states) == dict(hist)
|
||||
|
||||
recorder.do_adhoc_statistics(period="hourly", start=four)
|
||||
wait_recording_done(hass)
|
||||
stats = statistics_during_period(hass, four)
|
||||
assert stats == {}
|
||||
|
||||
|
||||
def record_states(hass):
|
||||
"""Record some test states.
|
||||
|
||||
We inject a bunch of state updates temperature sensors.
|
||||
"""
|
||||
mp = "media_player.test"
|
||||
sns1 = "sensor.test1"
|
||||
sns2 = "sensor.test2"
|
||||
sns3 = "sensor.test3"
|
||||
sns1_attr = {"device_class": "temperature", "state_class": "measurement"}
|
||||
sns2_attr = {"device_class": "temperature"}
|
||||
sns3_attr = {}
|
||||
|
||||
def set_state(entity_id, state, **kwargs):
|
||||
"""Set the state."""
|
||||
hass.states.set(entity_id, state, **kwargs)
|
||||
wait_recording_done(hass)
|
||||
return hass.states.get(entity_id)
|
||||
|
||||
zero = dt_util.utcnow()
|
||||
one = zero + timedelta(minutes=1)
|
||||
two = one + timedelta(minutes=15)
|
||||
three = two + timedelta(minutes=30)
|
||||
four = three + timedelta(minutes=15)
|
||||
|
||||
states = {mp: [], sns1: [], sns2: [], sns3: []}
|
||||
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one):
|
||||
states[mp].append(
|
||||
set_state(mp, "idle", attributes={"media_title": str(sentinel.mt1)})
|
||||
)
|
||||
states[mp].append(
|
||||
set_state(mp, "YouTube", attributes={"media_title": str(sentinel.mt2)})
|
||||
)
|
||||
states[sns1].append(set_state(sns1, "10", attributes=sns1_attr))
|
||||
states[sns2].append(set_state(sns2, "10", attributes=sns2_attr))
|
||||
states[sns3].append(set_state(sns3, "10", attributes=sns3_attr))
|
||||
|
||||
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=two):
|
||||
states[sns1].append(set_state(sns1, "15", attributes=sns1_attr))
|
||||
states[sns2].append(set_state(sns2, "15", attributes=sns2_attr))
|
||||
states[sns3].append(set_state(sns3, "15", attributes=sns3_attr))
|
||||
|
||||
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=three):
|
||||
states[sns1].append(set_state(sns1, "20", attributes=sns1_attr))
|
||||
states[sns2].append(set_state(sns2, "20", attributes=sns2_attr))
|
||||
states[sns3].append(set_state(sns3, "20", attributes=sns3_attr))
|
||||
|
||||
return zero, four, states
|
||||
|
||||
|
||||
def record_states_partially_unavailable(hass):
|
||||
"""Record some test states.
|
||||
|
||||
We inject a bunch of state updates temperature sensors.
|
||||
"""
|
||||
mp = "media_player.test"
|
||||
sns1 = "sensor.test1"
|
||||
sns2 = "sensor.test2"
|
||||
sns3 = "sensor.test3"
|
||||
sns1_attr = {"device_class": "temperature", "state_class": "measurement"}
|
||||
sns2_attr = {"device_class": "temperature"}
|
||||
sns3_attr = {}
|
||||
|
||||
def set_state(entity_id, state, **kwargs):
|
||||
"""Set the state."""
|
||||
hass.states.set(entity_id, state, **kwargs)
|
||||
wait_recording_done(hass)
|
||||
return hass.states.get(entity_id)
|
||||
|
||||
zero = dt_util.utcnow()
|
||||
one = zero + timedelta(minutes=1)
|
||||
two = one + timedelta(minutes=15)
|
||||
three = two + timedelta(minutes=30)
|
||||
four = three + timedelta(minutes=15)
|
||||
|
||||
states = {mp: [], sns1: [], sns2: [], sns3: []}
|
||||
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one):
|
||||
states[mp].append(
|
||||
set_state(mp, "idle", attributes={"media_title": str(sentinel.mt1)})
|
||||
)
|
||||
states[mp].append(
|
||||
set_state(mp, "YouTube", attributes={"media_title": str(sentinel.mt2)})
|
||||
)
|
||||
states[sns1].append(set_state(sns1, "10", attributes=sns1_attr))
|
||||
states[sns2].append(set_state(sns2, "10", attributes=sns2_attr))
|
||||
states[sns3].append(set_state(sns3, "10", attributes=sns3_attr))
|
||||
|
||||
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=two):
|
||||
states[sns1].append(set_state(sns1, "25", attributes=sns1_attr))
|
||||
states[sns2].append(set_state(sns2, "25", attributes=sns2_attr))
|
||||
states[sns3].append(set_state(sns3, "25", attributes=sns3_attr))
|
||||
|
||||
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=three):
|
||||
states[sns1].append(set_state(sns1, STATE_UNAVAILABLE, attributes=sns1_attr))
|
||||
states[sns2].append(set_state(sns2, STATE_UNAVAILABLE, attributes=sns2_attr))
|
||||
states[sns3].append(set_state(sns3, STATE_UNAVAILABLE, attributes=sns3_attr))
|
||||
|
||||
return zero, four, states
|
Loading…
Reference in New Issue