diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index af34d4dd9f63..a662a457add0 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -68,6 +68,7 @@ CONF_DB_RETRY_WAIT = "db_retry_wait" CONF_PURGE_KEEP_DAYS = "purge_keep_days" CONF_PURGE_INTERVAL = "purge_interval" CONF_EVENT_TYPES = "event_types" +CONF_COMMIT_INTERVAL = "commit_interval" FILTER_SCHEMA = vol.Schema( { @@ -98,6 +99,9 @@ CONFIG_SCHEMA = vol.Schema( vol.Coerce(int), vol.Range(min=0) ), vol.Optional(CONF_DB_URL): cv.string, + vol.Optional(CONF_COMMIT_INTERVAL, default=1): vol.All( + vol.Coerce(int), vol.Range(min=0) + ), vol.Optional( CONF_DB_MAX_RETRIES, default=DEFAULT_DB_MAX_RETRIES ): cv.positive_int, @@ -141,6 +145,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: conf = config[DOMAIN] keep_days = conf.get(CONF_PURGE_KEEP_DAYS) purge_interval = conf.get(CONF_PURGE_INTERVAL) + commit_interval = conf[CONF_COMMIT_INTERVAL] db_max_retries = conf[CONF_DB_MAX_RETRIES] db_retry_wait = conf[CONF_DB_RETRY_WAIT] @@ -154,6 +159,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: hass=hass, keep_days=keep_days, purge_interval=purge_interval, + commit_interval=commit_interval, uri=db_url, db_max_retries=db_max_retries, db_retry_wait=db_retry_wait, @@ -185,6 +191,7 @@ class Recorder(threading.Thread): hass: HomeAssistant, keep_days: int, purge_interval: int, + commit_interval: int, uri: str, db_max_retries: int, db_retry_wait: int, @@ -197,6 +204,7 @@ class Recorder(threading.Thread): self.hass = hass self.keep_days = keep_days self.purge_interval = purge_interval + self.commit_interval = commit_interval self.queue: Any = queue.Queue() self.recording_start = dt_util.utcnow() self.db_url = uri @@ -214,6 +222,8 @@ class Recorder(threading.Thread): ) self.exclude_t = exclude.get(CONF_EVENT_TYPES, []) + self._timechanges_seen = 0 + self.event_session = None self.get_session = None @callback @@ -326,6 +336,10 @@ class Recorder(threading.Thread): self.hass.helpers.event.track_point_in_time(async_purge, run) + self.event_session = self.get_session() + # Use a session for the event read loop + # with a commit every time the event time + # has changed. This reduces the disk io. while True: event = self.queue.get() @@ -340,6 +354,11 @@ class Recorder(threading.Thread): continue if event.event_type == EVENT_TIME_CHANGED: self.queue.task_done() + if self.commit_interval: + self._timechanges_seen += 1 + if self.commit_interval >= self._timechanges_seen: + self._timechanges_seen = 0 + self._commit_event_session_or_retry() continue if event.event_type in self.exclude_t: self.queue.task_done() @@ -351,55 +370,72 @@ class Recorder(threading.Thread): self.queue.task_done() continue - tries = 1 - updated = False - while not updated and tries <= self.db_max_retries: - if tries != 1: - time.sleep(self.db_retry_wait) + try: + dbevent = Events.from_event(event) + self.event_session.add(dbevent) + self.event_session.flush() + except (TypeError, ValueError): + _LOGGER.warning("Event is not JSON serializable: %s", event) + + if dbevent and event.event_type == EVENT_STATE_CHANGED: try: - with session_scope(session=self.get_session()) as session: - try: - dbevent = Events.from_event(event) - session.add(dbevent) - session.flush() - except (TypeError, ValueError): - _LOGGER.warning("Event is not JSON serializable: %s", event) - - if event.event_type == EVENT_STATE_CHANGED: - try: - dbstate = States.from_event(event) - dbstate.event_id = dbevent.event_id - session.add(dbstate) - except (TypeError, ValueError): - _LOGGER.warning( - "State is not JSON serializable: %s", - event.data.get("new_state"), - ) - - updated = True - - except exc.OperationalError as err: - _LOGGER.error( - "Error in database connectivity: %s. " - "(retrying in %s seconds)", - err, - self.db_retry_wait, + dbstate = States.from_event(event) + dbstate.event_id = dbevent.event_id + self.event_session.add(dbstate) + except (TypeError, ValueError): + _LOGGER.warning( + "State is not JSON serializable: %s", + event.data.get("new_state"), ) - tries += 1 - except exc.SQLAlchemyError: - updated = True - _LOGGER.exception("Error saving event: %s", event) - - if not updated: - _LOGGER.error( - "Error in database update. Could not save " - "after %d tries. Giving up", - tries, - ) + # If they do not have a commit interval + # than we commit right away + if not self.commit_interval: + self._commit_event_session_or_retry() self.queue.task_done() + def _commit_event_session_or_retry(self): + tries = 1 + while tries <= self.db_max_retries: + if tries != 1: + time.sleep(self.db_retry_wait) + + try: + self._commit_event_session() + return + + except exc.OperationalError as err: + _LOGGER.error( + "Error in database connectivity: %s. " "(retrying in %s seconds)", + err, + self.db_retry_wait, + ) + tries += 1 + + except exc.SQLAlchemyError: + _LOGGER.exception("Error saving events") + return + + _LOGGER.error( + "Error in database update. Could not save " "after %d tries. Giving up", + tries, + ) + try: + self.event_session.close() + except exc.SQLAlchemyError: + _LOGGER.exception("Failed to close event session.") + + self.event_session = self.get_session() + + def _commit_event_session(self): + try: + self.event_session.commit() + except Exception as err: + _LOGGER.error("Error executing query: %s", err) + self.event_session.rollback() + raise + @callback def event_listener(self, event): """Listen for new events and put them in the process queue.""" @@ -465,7 +501,10 @@ class Recorder(threading.Thread): def _close_run(self): """Save end time for current run.""" - with session_scope(session=self.get_session()) as session: + if self.event_session is not None: self.run_info.end = dt_util.utcnow() - session.add(self.run_info) + self.event_session.add(self.run_info) + self._commit_event_session_or_retry() + self.event_session.close() + self.run_info = None diff --git a/tests/components/history/test_init.py b/tests/components/history/test_init.py index 051024999e4e..65c0a717beec 100644 --- a/tests/components/history/test_init.py +++ b/tests/components/history/test_init.py @@ -14,6 +14,7 @@ from tests.common import ( init_recorder_component, mock_state_change_event, ) +from tests.components.recorder.common import wait_recording_done class TestComponentHistory(unittest.TestCase): @@ -31,12 +32,7 @@ class TestComponentHistory(unittest.TestCase): """Initialize the recorder.""" init_recorder_component(self.hass) self.hass.start() - self.wait_recording_done() - - def wait_recording_done(self): - """Block till recording is done.""" - self.hass.block_till_done() - self.hass.data[recorder.DATA_INSTANCE].block_till_done() + wait_recording_done(self.hass) def test_setup(self): """Test setup method of history.""" @@ -78,7 +74,7 @@ class TestComponentHistory(unittest.TestCase): states.append(state) - self.wait_recording_done() + wait_recording_done(self.hass) future = now + timedelta(seconds=1) with patch( @@ -93,7 +89,7 @@ class TestComponentHistory(unittest.TestCase): mock_state_change_event(self.hass, state) - self.wait_recording_done() + wait_recording_done(self.hass) # Get states returns everything before POINT for state1, state2 in zip( @@ -115,7 +111,7 @@ class TestComponentHistory(unittest.TestCase): def set_state(state): """Set the state.""" self.hass.states.set(entity_id, state) - self.wait_recording_done() + wait_recording_done(self.hass) return self.hass.states.get(entity_id) start = dt_util.utcnow() @@ -156,7 +152,7 @@ class TestComponentHistory(unittest.TestCase): def set_state(state): """Set the state.""" self.hass.states.set(entity_id, state) - self.wait_recording_done() + wait_recording_done(self.hass) return self.hass.states.get(entity_id) start = dt_util.utcnow() - timedelta(minutes=2) @@ -559,7 +555,7 @@ class TestComponentHistory(unittest.TestCase): def set_state(entity_id, state, **kwargs): """Set the state.""" self.hass.states.set(entity_id, state, **kwargs) - self.wait_recording_done() + wait_recording_done(self.hass) return self.hass.states.get(entity_id) zero = dt_util.utcnow() @@ -615,6 +611,7 @@ class TestComponentHistory(unittest.TestCase): ) # state will be skipped since entity is hidden set_state(therm, 22, attributes={"current_temperature": 21, "hidden": True}) + return zero, four, states diff --git a/tests/components/logbook/test_init.py b/tests/components/logbook/test_init.py index cc9a459d2c1b..98653dc5a6ca 100644 --- a/tests/components/logbook/test_init.py +++ b/tests/components/logbook/test_init.py @@ -1,6 +1,7 @@ """The tests for the logbook component.""" # pylint: disable=protected-access,invalid-name from datetime import datetime, timedelta +from functools import partial import logging import unittest @@ -35,6 +36,7 @@ from homeassistant.setup import async_setup_component, setup_component import homeassistant.util.dt as dt_util from tests.common import get_test_home_assistant, init_recorder_component +from tests.components.recorder.common import trigger_db_commit _LOGGER = logging.getLogger(__name__) @@ -1288,6 +1290,7 @@ async def test_logbook_view_period_entity(hass, hass_client): entity_id_second = "switch.second" hass.states.async_set(entity_id_second, STATE_OFF) hass.states.async_set(entity_id_second, STATE_ON) + await hass.async_add_job(partial(trigger_db_commit, hass)) await hass.async_block_till_done() await hass.async_add_job(hass.data[recorder.DATA_INSTANCE].block_till_done) diff --git a/tests/components/recorder/common.py b/tests/components/recorder/common.py new file mode 100644 index 000000000000..07bfcb0bf4f8 --- /dev/null +++ b/tests/components/recorder/common.py @@ -0,0 +1,22 @@ +"""Common test utils for working with recorder.""" + +from homeassistant.components import recorder +from homeassistant.util import dt as dt_util + +from tests.common import fire_time_changed + +DB_COMMIT_INTERVAL = 50 + + +def wait_recording_done(hass): + """Block till recording is done.""" + trigger_db_commit(hass) + hass.block_till_done() + hass.data[recorder.DATA_INSTANCE].block_till_done() + + +def trigger_db_commit(hass): + """Force the recorder to commit.""" + for _ in range(DB_COMMIT_INTERVAL): + # We only commit on time change + fire_time_changed(hass, dt_util.utcnow()) diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index a21ef578ca9c..8a56ba3d9773 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -13,6 +13,8 @@ from homeassistant.const import MATCH_ALL from homeassistant.core import callback from homeassistant.setup import async_setup_component +from .common import wait_recording_done + from tests.common import get_test_home_assistant, init_recorder_component @@ -37,8 +39,7 @@ class TestRecorder(unittest.TestCase): self.hass.states.set(entity_id, state, attributes) - self.hass.block_till_done() - self.hass.data[DATA_INSTANCE].block_till_done() + wait_recording_done(self.hass) with session_scope(hass=self.hass) as session: db_states = list(session.query(States)) @@ -65,7 +66,7 @@ class TestRecorder(unittest.TestCase): self.hass.bus.fire(event_type, event_data) - self.hass.block_till_done() + wait_recording_done(self.hass) assert len(events) == 1 event = events[0] @@ -109,8 +110,7 @@ def _add_entities(hass, entity_ids): attributes = {"test_attr": 5, "test_attr_10": "nice"} for idx, entity_id in enumerate(entity_ids): hass.states.set(entity_id, "state{}".format(idx), attributes) - hass.block_till_done() - hass.data[DATA_INSTANCE].block_till_done() + wait_recording_done(hass) with session_scope(hass=hass) as session: return [st.to_native() for st in session.query(States)] @@ -121,8 +121,7 @@ def _add_events(hass, events): session.query(Events).delete(synchronize_session=False) for event_type in events: hass.bus.fire(event_type) - hass.block_till_done() - hass.data[DATA_INSTANCE].block_till_done() + wait_recording_done(hass) with session_scope(hass=hass) as session: return [ev.to_native() for ev in session.query(Events)] @@ -201,6 +200,7 @@ def test_recorder_setup_failure(): hass, keep_days=7, purge_interval=2, + commit_interval=1, uri="sqlite://", db_max_retries=10, db_retry_wait=3,