Move processing of recorder service call arguments into services.py (#71260)

This commit is contained in:
J. Nick Koston 2022-05-03 15:56:22 -05:00 committed by GitHub
parent e9abfad361
commit e30940ef2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 104 additions and 108 deletions

View File

@ -179,4 +179,4 @@ async def _process_recorder_platform(
) -> None:
"""Process a recorder platform."""
instance: Recorder = hass.data[DATA_INSTANCE]
instance.queue.put(AddRecorderPlatformTask(domain, platform))
instance.queue_task(AddRecorderPlatformTask(domain, platform))

View File

@ -28,7 +28,6 @@ from homeassistant.const import (
MATCH_ALL,
)
from homeassistant.core import CALLBACK_TYPE, CoreState, Event, HomeAssistant, callback
from homeassistant.helpers.entityfilter import generate_filter
from homeassistant.helpers.event import (
async_track_time_change,
async_track_time_interval,
@ -38,9 +37,6 @@ import homeassistant.util.dt as dt_util
from . import migration, statistics
from .const import (
ATTR_APPLY_FILTER,
ATTR_KEEP_DAYS,
ATTR_REPACK,
DB_WORKER_PREFIX,
KEEPALIVE_TIME,
MAX_QUEUE_BACKLOG,
@ -70,7 +66,6 @@ from .tasks import (
ExternalStatisticsTask,
KeepAliveTask,
PerodicCleanupTask,
PurgeEntitiesTask,
PurgeTask,
RecorderTask,
StatisticsTask,
@ -112,6 +107,7 @@ SHUTDOWN_TASK = object()
COMMIT_TASK = CommitTask()
KEEP_ALIVE_TASK = KeepAliveTask()
WAIT_TASK = WaitTask()
DB_LOCK_TIMEOUT = 30
DB_LOCK_QUEUE_CHECK_TIMEOUT = 1
@ -152,7 +148,7 @@ class Recorder(threading.Thread):
self.keep_days = keep_days
self._hass_started: asyncio.Future[object] = asyncio.Future()
self.commit_interval = commit_interval
self.queue: queue.SimpleQueue[RecorderTask] = queue.SimpleQueue()
self._queue: queue.SimpleQueue[RecorderTask] = queue.SimpleQueue()
self.db_url = uri
self.db_max_retries = db_max_retries
self.db_retry_wait = db_retry_wait
@ -175,21 +171,42 @@ class Recorder(threading.Thread):
self.event_session: Session | None = None
self.get_session: Callable[[], Session] | None = None
self._completed_first_database_setup: bool | None = None
self._event_listener: CALLBACK_TYPE | None = None
self.async_migration_event = asyncio.Event()
self.migration_in_progress = False
self._queue_watcher: CALLBACK_TYPE | None = None
self._db_supports_row_number = True
self._database_lock_task: DatabaseLockTask | None = None
self._db_executor: DBInterruptibleThreadPoolExecutor | None = None
self._exclude_attributes_by_domain = exclude_attributes_by_domain
self._event_listener: CALLBACK_TYPE | None = None
self._queue_watcher: CALLBACK_TYPE | None = None
self._keep_alive_listener: CALLBACK_TYPE | None = None
self._commit_listener: CALLBACK_TYPE | None = None
self._periodic_listener: CALLBACK_TYPE | None = None
self._nightly_listener: CALLBACK_TYPE | None = None
self.enabled = True
@property
def backlog(self) -> int:
"""Return the number of items in the recorder backlog."""
return self._queue.qsize()
@property
def _using_file_sqlite(self) -> bool:
"""Short version to check if we are using sqlite3 as a file."""
return self.db_url != SQLITE_URL_PREFIX and self.db_url.startswith(
SQLITE_URL_PREFIX
)
@property
def recording(self) -> bool:
"""Return if the recorder is recording."""
return self._event_listener is not None
def queue_task(self, task: RecorderTask) -> None:
"""Add a task to the recorder queue."""
self._queue.put(task)
def set_enable(self, enable: bool) -> None:
"""Enable or disable recording events and states."""
self.enabled = enable
@ -222,7 +239,7 @@ class Recorder(threading.Thread):
def _async_keep_alive(self, now: datetime) -> None:
"""Queue a keep alive."""
if self._event_listener:
self.queue.put(KEEP_ALIVE_TASK)
self.queue_task(KEEP_ALIVE_TASK)
@callback
def _async_commit(self, now: datetime) -> None:
@ -232,7 +249,7 @@ class Recorder(threading.Thread):
and not self._database_lock_task
and self._event_session_has_pending_writes()
):
self.queue.put(COMMIT_TASK)
self.queue_task(COMMIT_TASK)
@callback
def async_add_executor_job(
@ -253,7 +270,7 @@ class Recorder(threading.Thread):
The queue grows during migraton or if something really goes wrong.
"""
size = self.queue.qsize()
size = self.backlog
_LOGGER.debug("Recorder queue size is: %s", size)
if size <= MAX_QUEUE_BACKLOG:
return
@ -314,73 +331,52 @@ class Recorder(threading.Thread):
# Unknown what it is.
return True
def do_adhoc_purge(self, **kwargs: Any) -> None:
"""Trigger an adhoc purge retaining keep_days worth of data."""
keep_days = kwargs.get(ATTR_KEEP_DAYS, self.keep_days)
repack = cast(bool, kwargs[ATTR_REPACK])
apply_filter = cast(bool, kwargs[ATTR_APPLY_FILTER])
purge_before = dt_util.utcnow() - timedelta(days=keep_days)
self.queue.put(PurgeTask(purge_before, repack, apply_filter))
def do_adhoc_purge_entities(
self, entity_ids: set[str], domains: list[str], entity_globs: list[str]
) -> None:
"""Trigger an adhoc purge of requested entities."""
entity_filter = generate_filter(domains, list(entity_ids), [], [], entity_globs)
self.queue.put(PurgeEntitiesTask(entity_filter))
def do_adhoc_statistics(self, **kwargs: Any) -> None:
"""Trigger an adhoc statistics run."""
if not (start := kwargs.get("start")):
start = statistics.get_start_time()
self.queue.put(StatisticsTask(start))
self.queue_task(StatisticsTask(start))
def _empty_queue(self, event: Event) -> None:
"""Empty the queue if its still present at final write."""
# If the queue is full of events to be processed because
# the database is so broken that every event results in a retry
# we will never be able to get though the events to shutdown in time.
#
# We drain all the events in the queue and then insert
# an empty one to ensure the next thing the recorder sees
# is a request to shutdown.
while True:
try:
self._queue.get_nowait()
except queue.Empty:
break
self.queue_task(StopTask())
async def _async_shutdown(self, event: Event) -> None:
"""Shut down the Recorder."""
if not self._hass_started.done():
self._hass_started.set_result(SHUTDOWN_TASK)
self.queue_task(StopTask())
self._async_stop_listeners()
await self.hass.async_add_executor_job(self.join)
@callback
def _async_hass_started(self, event: Event) -> None:
"""Notify that hass has started."""
self._hass_started.set_result(None)
@callback
def async_register(self) -> None:
"""Post connection initialize."""
def _empty_queue(event: Event) -> None:
"""Empty the queue if its still present at final write."""
# If the queue is full of events to be processed because
# the database is so broken that every event results in a retry
# we will never be able to get though the events to shutdown in time.
#
# We drain all the events in the queue and then insert
# an empty one to ensure the next thing the recorder sees
# is a request to shutdown.
while True:
try:
self.queue.get_nowait()
except queue.Empty:
break
self.queue.put(StopTask())
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_FINAL_WRITE, _empty_queue)
async def _async_shutdown(event: Event) -> None:
"""Shut down the Recorder."""
if not self._hass_started.done():
self._hass_started.set_result(SHUTDOWN_TASK)
self.queue.put(StopTask())
self._async_stop_listeners()
await self.hass.async_add_executor_job(self.join)
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_shutdown)
bus = self.hass.bus
bus.async_listen_once(EVENT_HOMEASSISTANT_FINAL_WRITE, self._empty_queue)
bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self._async_shutdown)
if self.hass.state == CoreState.running:
self._hass_started.set_result(None)
return
@callback
def _async_hass_started(event: Event) -> None:
"""Notify that hass has started."""
self._hass_started.set_result(None)
self.hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_STARTED, _async_hass_started
)
bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, self._async_hass_started)
@callback
def async_connection_failed(self) -> None:
@ -414,9 +410,9 @@ class Recorder(threading.Thread):
# until after the database is vacuumed
repack = self.auto_repack and is_second_sunday(now)
purge_before = dt_util.utcnow() - timedelta(days=self.keep_days)
self.queue.put(PurgeTask(purge_before, repack=repack, apply_filter=False))
self.queue_task(PurgeTask(purge_before, repack=repack, apply_filter=False))
else:
self.queue.put(PerodicCleanupTask())
self.queue_task(PerodicCleanupTask())
@callback
def async_periodic_statistics(self, now: datetime) -> None:
@ -425,33 +421,33 @@ class Recorder(threading.Thread):
Short term statistics run every 5 minutes
"""
start = statistics.get_start_time()
self.queue.put(StatisticsTask(start))
self.queue_task(StatisticsTask(start))
@callback
def async_adjust_statistics(
self, statistic_id: str, start_time: datetime, sum_adjustment: float
) -> None:
"""Adjust statistics."""
self.queue.put(AdjustStatisticsTask(statistic_id, start_time, sum_adjustment))
self.queue_task(AdjustStatisticsTask(statistic_id, start_time, sum_adjustment))
@callback
def async_clear_statistics(self, statistic_ids: list[str]) -> None:
"""Clear statistics for a list of statistic_ids."""
self.queue.put(ClearStatisticsTask(statistic_ids))
self.queue_task(ClearStatisticsTask(statistic_ids))
@callback
def async_update_statistics_metadata(
self, statistic_id: str, unit_of_measurement: str | None
) -> None:
"""Update statistics metadata for a statistic_id."""
self.queue.put(UpdateStatisticsMetadataTask(statistic_id, unit_of_measurement))
self.queue_task(UpdateStatisticsMetadataTask(statistic_id, unit_of_measurement))
@callback
def async_external_statistics(
self, metadata: StatisticMetaData, stats: Iterable[StatisticData]
) -> None:
"""Schedule external statistics."""
self.queue.put(ExternalStatisticsTask(metadata, stats))
self.queue_task(ExternalStatisticsTask(metadata, stats))
@callback
def using_sqlite(self) -> bool:
@ -553,7 +549,7 @@ class Recorder(threading.Thread):
# has changed. This reduces the disk io.
self.stop_requested = False
while not self.stop_requested:
task = self.queue.get()
task = self._queue.get()
_LOGGER.debug("Processing task: %s", task)
try:
self._process_one_task_or_recover(task)
@ -643,7 +639,7 @@ class Recorder(threading.Thread):
# Notify that lock is being held, wait until database can be used again.
self.hass.add_job(_async_set_database_locked, task)
while not task.database_unlock.wait(timeout=DB_LOCK_QUEUE_CHECK_TIMEOUT):
if self.queue.qsize() > MAX_QUEUE_BACKLOG * 0.9:
if self.backlog > MAX_QUEUE_BACKLOG * 0.9:
_LOGGER.warning(
"Database queue backlog reached more than 90% of maximum queue "
"length while waiting for backup to finish; recorder will now "
@ -654,7 +650,7 @@ class Recorder(threading.Thread):
break
_LOGGER.info(
"Database queue backlog reached %d entries during backup",
self.queue.qsize(),
self.backlog,
)
def _process_one_event(self, event: Event) -> None:
@ -908,7 +904,7 @@ class Recorder(threading.Thread):
@callback
def event_listener(self, event: Event) -> None:
"""Listen for new events and put them in the process queue."""
self.queue.put(EventTask(event))
self.queue_task(EventTask(event))
def block_till_done(self) -> None:
"""Block till all events processed.
@ -923,7 +919,7 @@ class Recorder(threading.Thread):
is in the database.
"""
self._queue_watch.clear()
self.queue.put(WaitTask())
self.queue_task(WAIT_TASK)
self._queue_watch.wait()
async def lock_database(self) -> bool:
@ -940,7 +936,7 @@ class Recorder(threading.Thread):
database_locked = asyncio.Event()
task = DatabaseLockTask(database_locked, threading.Event(), False)
self.queue.put(task)
self.queue_task(task)
try:
await asyncio.wait_for(database_locked.wait(), timeout=DB_LOCK_TIMEOUT)
except asyncio.TimeoutError as err:
@ -1013,13 +1009,6 @@ class Recorder(threading.Thread):
self.get_session = scoped_session(sessionmaker(bind=self.engine, future=True))
_LOGGER.debug("Connected to recorder database")
@property
def _using_file_sqlite(self) -> bool:
"""Short version to check if we are using sqlite3 as a file."""
return self.db_url != SQLITE_URL_PREFIX and self.db_url.startswith(
SQLITE_URL_PREFIX
)
def _close_connection(self) -> None:
"""Close the connection."""
assert self.engine is not None
@ -1053,7 +1042,7 @@ class Recorder(threading.Thread):
while start < last_period:
end = start + timedelta(minutes=5)
_LOGGER.debug("Compiling missing statistics for %s-%s", start, end)
self.queue.put(StatisticsTask(start))
self.queue_task(StatisticsTask(start))
start = end
def _end_session(self) -> None:
@ -1075,8 +1064,3 @@ class Recorder(threading.Thread):
self._stop_executor()
self._end_session()
self._close_connection()
@property
def recording(self) -> bool:
"""Return if the recorder is recording."""
return self._event_listener is not None

View File

@ -1,21 +1,26 @@
"""Support for recorder services."""
from __future__ import annotations
from datetime import timedelta
from typing import cast
import voluptuous as vol
from homeassistant.core import HomeAssistant, ServiceCall, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entityfilter import generate_filter
from homeassistant.helpers.service import async_extract_entity_ids
import homeassistant.util.dt as dt_util
from .const import ATTR_APPLY_FILTER, ATTR_KEEP_DAYS, ATTR_REPACK, DOMAIN
from .core import Recorder
from .tasks import PurgeEntitiesTask, PurgeTask
SERVICE_PURGE = "purge"
SERVICE_PURGE_ENTITIES = "purge_entities"
SERVICE_ENABLE = "enable"
SERVICE_DISABLE = "disable"
SERVICE_PURGE_SCHEMA = vol.Schema(
{
vol.Optional(ATTR_KEEP_DAYS): cv.positive_int,
@ -44,7 +49,12 @@ SERVICE_DISABLE_SCHEMA = vol.Schema({})
def _async_register_purge_service(hass: HomeAssistant, instance: Recorder) -> None:
async def async_handle_purge_service(service: ServiceCall) -> None:
"""Handle calls to the purge service."""
instance.do_adhoc_purge(**service.data)
kwargs = service.data
keep_days = kwargs.get(ATTR_KEEP_DAYS, instance.keep_days)
repack = cast(bool, kwargs[ATTR_REPACK])
apply_filter = cast(bool, kwargs[ATTR_APPLY_FILTER])
purge_before = dt_util.utcnow() - timedelta(days=keep_days)
instance.queue_task(PurgeTask(purge_before, repack, apply_filter))
hass.services.async_register(
DOMAIN, SERVICE_PURGE, async_handle_purge_service, schema=SERVICE_PURGE_SCHEMA
@ -60,8 +70,8 @@ def _async_register_purge_entities_service(
entity_ids = await async_extract_entity_ids(hass, service)
domains = service.data.get(ATTR_DOMAINS, [])
entity_globs = service.data.get(ATTR_ENTITY_GLOBS, [])
instance.do_adhoc_purge_entities(entity_ids, domains, entity_globs)
entity_filter = generate_filter(domains, list(entity_ids), [], [], entity_globs)
instance.queue_task(PurgeEntitiesTask(entity_filter))
hass.services.async_register(
DOMAIN,

View File

@ -78,7 +78,9 @@ class PurgeTask(RecorderTask):
periodic_db_cleanups(instance)
return
# Schedule a new purge task if this one didn't finish
instance.queue.put(PurgeTask(self.purge_before, self.repack, self.apply_filter))
instance.queue_task(
PurgeTask(self.purge_before, self.repack, self.apply_filter)
)
@dataclass
@ -92,7 +94,7 @@ class PurgeEntitiesTask(RecorderTask):
if purge.purge_entity_data(instance, self.entity_filter):
return
# Schedule a new purge task if this one didn't finish
instance.queue.put(PurgeEntitiesTask(self.entity_filter))
instance.queue_task(PurgeEntitiesTask(self.entity_filter))
@dataclass
@ -115,7 +117,7 @@ class StatisticsTask(RecorderTask):
if statistics.compile_statistics(instance, self.start):
return
# Schedule a new statistics task if this one didn't finish
instance.queue.put(StatisticsTask(self.start))
instance.queue_task(StatisticsTask(self.start))
@dataclass
@ -130,7 +132,7 @@ class ExternalStatisticsTask(RecorderTask):
if statistics.add_external_statistics(instance, self.metadata, self.statistics):
return
# Schedule a new statistics task if this one didn't finish
instance.queue.put(ExternalStatisticsTask(self.metadata, self.statistics))
instance.queue_task(ExternalStatisticsTask(self.metadata, self.statistics))
@dataclass
@ -151,7 +153,7 @@ class AdjustStatisticsTask(RecorderTask):
):
return
# Schedule a new adjust statistics task if this one didn't finish
instance.queue.put(
instance.queue_task(
AdjustStatisticsTask(
self.statistic_id, self.start_time, self.sum_adjustment
)

View File

@ -148,7 +148,7 @@ def ws_info(
"""Return status of the recorder."""
instance: Recorder = hass.data[DATA_INSTANCE]
backlog = instance.queue.qsize() if instance and instance.queue else None
backlog = instance.backlog if instance else None
migration_in_progress = async_migration_in_progress(hass)
recording = instance.recording if instance else False
thread_alive = instance.is_alive() if instance else False

View File

@ -1395,7 +1395,7 @@ async def test_database_lock_timeout(hass, recorder_mock):
self.event.wait()
block_task = BlockQueue()
instance.queue.put(block_task)
instance.queue_task(block_task)
with patch.object(recorder.core, "DB_LOCK_TIMEOUT", 0.1):
try:
with pytest.raises(TimeoutError):

View File

@ -557,7 +557,7 @@ async def test_purge_cutoff_date(
assert events.filter(Events.event_type == "PURGE").count() == rows - 1
assert events.filter(Events.event_type == "KEEP").count() == 1
instance.queue.put(PurgeTask(cutoff, repack=False, apply_filter=False))
instance.queue_task(PurgeTask(cutoff, repack=False, apply_filter=False))
await hass.async_block_till_done()
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)
@ -588,7 +588,7 @@ async def test_purge_cutoff_date(
assert events.filter(Events.event_type == "KEEP").count() == 1
# Make sure we can purge everything
instance.queue.put(PurgeTask(dt_util.utcnow(), repack=False, apply_filter=False))
instance.queue_task(PurgeTask(dt_util.utcnow(), repack=False, apply_filter=False))
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)
@ -599,7 +599,7 @@ async def test_purge_cutoff_date(
assert state_attributes.count() == 0
# Make sure we can purge everything when the db is already empty
instance.queue.put(PurgeTask(dt_util.utcnow(), repack=False, apply_filter=False))
instance.queue_task(PurgeTask(dt_util.utcnow(), repack=False, apply_filter=False))
await async_recorder_block_till_done(hass)
await async_wait_purge_done(hass)