Add a commit interval setting to recorder (#32596)

* Add a commit interval setting to recorder

* Make the default every 1s instead of immediate

* See attached py-spy flamegraphs for why 1s

* This avoids disk thrashing during event storms

* Make Home Assistant significantly more responsive on busy systems

* remove debug

* Add commit forces for tests that expect commits to be immediate

* Add commit forces for tests that expect commits to be immediate

* make sure _trigger_db_commit is in the right place (all effective "wait_recording_done" calls)

* De-duplicate wait_recording_done code
This commit is contained in:
J. Nick Koston 2020-03-09 19:43:26 -05:00 committed by GitHub
parent 8a46d93be4
commit 3a680bf7b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 124 additions and 63 deletions

View File

@ -68,6 +68,7 @@ CONF_DB_RETRY_WAIT = "db_retry_wait"
CONF_PURGE_KEEP_DAYS = "purge_keep_days"
CONF_PURGE_INTERVAL = "purge_interval"
CONF_EVENT_TYPES = "event_types"
CONF_COMMIT_INTERVAL = "commit_interval"
FILTER_SCHEMA = vol.Schema(
{
@ -98,6 +99,9 @@ CONFIG_SCHEMA = vol.Schema(
vol.Coerce(int), vol.Range(min=0)
),
vol.Optional(CONF_DB_URL): cv.string,
vol.Optional(CONF_COMMIT_INTERVAL, default=1): vol.All(
vol.Coerce(int), vol.Range(min=0)
),
vol.Optional(
CONF_DB_MAX_RETRIES, default=DEFAULT_DB_MAX_RETRIES
): cv.positive_int,
@ -141,6 +145,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
conf = config[DOMAIN]
keep_days = conf.get(CONF_PURGE_KEEP_DAYS)
purge_interval = conf.get(CONF_PURGE_INTERVAL)
commit_interval = conf[CONF_COMMIT_INTERVAL]
db_max_retries = conf[CONF_DB_MAX_RETRIES]
db_retry_wait = conf[CONF_DB_RETRY_WAIT]
@ -154,6 +159,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
hass=hass,
keep_days=keep_days,
purge_interval=purge_interval,
commit_interval=commit_interval,
uri=db_url,
db_max_retries=db_max_retries,
db_retry_wait=db_retry_wait,
@ -185,6 +191,7 @@ class Recorder(threading.Thread):
hass: HomeAssistant,
keep_days: int,
purge_interval: int,
commit_interval: int,
uri: str,
db_max_retries: int,
db_retry_wait: int,
@ -197,6 +204,7 @@ class Recorder(threading.Thread):
self.hass = hass
self.keep_days = keep_days
self.purge_interval = purge_interval
self.commit_interval = commit_interval
self.queue: Any = queue.Queue()
self.recording_start = dt_util.utcnow()
self.db_url = uri
@ -214,6 +222,8 @@ class Recorder(threading.Thread):
)
self.exclude_t = exclude.get(CONF_EVENT_TYPES, [])
self._timechanges_seen = 0
self.event_session = None
self.get_session = None
@callback
@ -326,6 +336,10 @@ class Recorder(threading.Thread):
self.hass.helpers.event.track_point_in_time(async_purge, run)
self.event_session = self.get_session()
# Use a session for the event read loop
# with a commit every time the event time
# has changed. This reduces the disk io.
while True:
event = self.queue.get()
@ -340,6 +354,11 @@ class Recorder(threading.Thread):
continue
if event.event_type == EVENT_TIME_CHANGED:
self.queue.task_done()
if self.commit_interval:
self._timechanges_seen += 1
if self.commit_interval >= self._timechanges_seen:
self._timechanges_seen = 0
self._commit_event_session_or_retry()
continue
if event.event_type in self.exclude_t:
self.queue.task_done()
@ -351,55 +370,72 @@ class Recorder(threading.Thread):
self.queue.task_done()
continue
tries = 1
updated = False
while not updated and tries <= self.db_max_retries:
if tries != 1:
time.sleep(self.db_retry_wait)
try:
dbevent = Events.from_event(event)
self.event_session.add(dbevent)
self.event_session.flush()
except (TypeError, ValueError):
_LOGGER.warning("Event is not JSON serializable: %s", event)
if dbevent and event.event_type == EVENT_STATE_CHANGED:
try:
with session_scope(session=self.get_session()) as session:
try:
dbevent = Events.from_event(event)
session.add(dbevent)
session.flush()
except (TypeError, ValueError):
_LOGGER.warning("Event is not JSON serializable: %s", event)
if event.event_type == EVENT_STATE_CHANGED:
try:
dbstate = States.from_event(event)
dbstate.event_id = dbevent.event_id
session.add(dbstate)
except (TypeError, ValueError):
_LOGGER.warning(
"State is not JSON serializable: %s",
event.data.get("new_state"),
)
updated = True
except exc.OperationalError as err:
_LOGGER.error(
"Error in database connectivity: %s. "
"(retrying in %s seconds)",
err,
self.db_retry_wait,
dbstate = States.from_event(event)
dbstate.event_id = dbevent.event_id
self.event_session.add(dbstate)
except (TypeError, ValueError):
_LOGGER.warning(
"State is not JSON serializable: %s",
event.data.get("new_state"),
)
tries += 1
except exc.SQLAlchemyError:
updated = True
_LOGGER.exception("Error saving event: %s", event)
if not updated:
_LOGGER.error(
"Error in database update. Could not save "
"after %d tries. Giving up",
tries,
)
# If they do not have a commit interval
# than we commit right away
if not self.commit_interval:
self._commit_event_session_or_retry()
self.queue.task_done()
def _commit_event_session_or_retry(self):
tries = 1
while tries <= self.db_max_retries:
if tries != 1:
time.sleep(self.db_retry_wait)
try:
self._commit_event_session()
return
except exc.OperationalError as err:
_LOGGER.error(
"Error in database connectivity: %s. " "(retrying in %s seconds)",
err,
self.db_retry_wait,
)
tries += 1
except exc.SQLAlchemyError:
_LOGGER.exception("Error saving events")
return
_LOGGER.error(
"Error in database update. Could not save " "after %d tries. Giving up",
tries,
)
try:
self.event_session.close()
except exc.SQLAlchemyError:
_LOGGER.exception("Failed to close event session.")
self.event_session = self.get_session()
def _commit_event_session(self):
try:
self.event_session.commit()
except Exception as err:
_LOGGER.error("Error executing query: %s", err)
self.event_session.rollback()
raise
@callback
def event_listener(self, event):
"""Listen for new events and put them in the process queue."""
@ -465,7 +501,10 @@ class Recorder(threading.Thread):
def _close_run(self):
"""Save end time for current run."""
with session_scope(session=self.get_session()) as session:
if self.event_session is not None:
self.run_info.end = dt_util.utcnow()
session.add(self.run_info)
self.event_session.add(self.run_info)
self._commit_event_session_or_retry()
self.event_session.close()
self.run_info = None

View File

@ -14,6 +14,7 @@ from tests.common import (
init_recorder_component,
mock_state_change_event,
)
from tests.components.recorder.common import wait_recording_done
class TestComponentHistory(unittest.TestCase):
@ -31,12 +32,7 @@ class TestComponentHistory(unittest.TestCase):
"""Initialize the recorder."""
init_recorder_component(self.hass)
self.hass.start()
self.wait_recording_done()
def wait_recording_done(self):
"""Block till recording is done."""
self.hass.block_till_done()
self.hass.data[recorder.DATA_INSTANCE].block_till_done()
wait_recording_done(self.hass)
def test_setup(self):
"""Test setup method of history."""
@ -78,7 +74,7 @@ class TestComponentHistory(unittest.TestCase):
states.append(state)
self.wait_recording_done()
wait_recording_done(self.hass)
future = now + timedelta(seconds=1)
with patch(
@ -93,7 +89,7 @@ class TestComponentHistory(unittest.TestCase):
mock_state_change_event(self.hass, state)
self.wait_recording_done()
wait_recording_done(self.hass)
# Get states returns everything before POINT
for state1, state2 in zip(
@ -115,7 +111,7 @@ class TestComponentHistory(unittest.TestCase):
def set_state(state):
"""Set the state."""
self.hass.states.set(entity_id, state)
self.wait_recording_done()
wait_recording_done(self.hass)
return self.hass.states.get(entity_id)
start = dt_util.utcnow()
@ -156,7 +152,7 @@ class TestComponentHistory(unittest.TestCase):
def set_state(state):
"""Set the state."""
self.hass.states.set(entity_id, state)
self.wait_recording_done()
wait_recording_done(self.hass)
return self.hass.states.get(entity_id)
start = dt_util.utcnow() - timedelta(minutes=2)
@ -559,7 +555,7 @@ class TestComponentHistory(unittest.TestCase):
def set_state(entity_id, state, **kwargs):
"""Set the state."""
self.hass.states.set(entity_id, state, **kwargs)
self.wait_recording_done()
wait_recording_done(self.hass)
return self.hass.states.get(entity_id)
zero = dt_util.utcnow()
@ -615,6 +611,7 @@ class TestComponentHistory(unittest.TestCase):
)
# state will be skipped since entity is hidden
set_state(therm, 22, attributes={"current_temperature": 21, "hidden": True})
return zero, four, states

View File

@ -1,6 +1,7 @@
"""The tests for the logbook component."""
# pylint: disable=protected-access,invalid-name
from datetime import datetime, timedelta
from functools import partial
import logging
import unittest
@ -35,6 +36,7 @@ from homeassistant.setup import async_setup_component, setup_component
import homeassistant.util.dt as dt_util
from tests.common import get_test_home_assistant, init_recorder_component
from tests.components.recorder.common import trigger_db_commit
_LOGGER = logging.getLogger(__name__)
@ -1288,6 +1290,7 @@ async def test_logbook_view_period_entity(hass, hass_client):
entity_id_second = "switch.second"
hass.states.async_set(entity_id_second, STATE_OFF)
hass.states.async_set(entity_id_second, STATE_ON)
await hass.async_add_job(partial(trigger_db_commit, hass))
await hass.async_block_till_done()
await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done)

View File

@ -0,0 +1,22 @@
"""Common test utils for working with recorder."""
from homeassistant.components import recorder
from homeassistant.util import dt as dt_util
from tests.common import fire_time_changed
DB_COMMIT_INTERVAL = 50
def wait_recording_done(hass):
"""Block till recording is done."""
trigger_db_commit(hass)
hass.block_till_done()
hass.data[recorder.DATA_INSTANCE].block_till_done()
def trigger_db_commit(hass):
"""Force the recorder to commit."""
for _ in range(DB_COMMIT_INTERVAL):
# We only commit on time change
fire_time_changed(hass, dt_util.utcnow())

View File

@ -13,6 +13,8 @@ from homeassistant.const import MATCH_ALL
from homeassistant.core import callback
from homeassistant.setup import async_setup_component
from .common import wait_recording_done
from tests.common import get_test_home_assistant, init_recorder_component
@ -37,8 +39,7 @@ class TestRecorder(unittest.TestCase):
self.hass.states.set(entity_id, state, attributes)
self.hass.block_till_done()
self.hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(self.hass)
with session_scope(hass=self.hass) as session:
db_states = list(session.query(States))
@ -65,7 +66,7 @@ class TestRecorder(unittest.TestCase):
self.hass.bus.fire(event_type, event_data)
self.hass.block_till_done()
wait_recording_done(self.hass)
assert len(events) == 1
event = events[0]
@ -109,8 +110,7 @@ def _add_entities(hass, entity_ids):
attributes = {"test_attr": 5, "test_attr_10": "nice"}
for idx, entity_id in enumerate(entity_ids):
hass.states.set(entity_id, "state{}".format(idx), attributes)
hass.block_till_done()
hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(hass)
with session_scope(hass=hass) as session:
return [st.to_native() for st in session.query(States)]
@ -121,8 +121,7 @@ def _add_events(hass, events):
session.query(Events).delete(synchronize_session=False)
for event_type in events:
hass.bus.fire(event_type)
hass.block_till_done()
hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(hass)
with session_scope(hass=hass) as session:
return [ev.to_native() for ev in session.query(Events)]
@ -201,6 +200,7 @@ def test_recorder_setup_failure():
hass,
keep_days=7,
purge_interval=2,
commit_interval=1,
uri="sqlite://",
db_max_retries=10,
db_retry_wait=3,