diff --git a/homeassistant/components/tomorrowio/__init__.py b/homeassistant/components/tomorrowio/__init__.py index 6d1b84ec5d70..41fa8158624e 100644 --- a/homeassistant/components/tomorrowio/__init__.py +++ b/homeassistant/components/tomorrowio/__init__.py @@ -221,7 +221,10 @@ class TomorrowioDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): await self.async_refresh() self.update_interval = async_set_update_interval(self.hass, self._api) - self._schedule_refresh() + self._next_refresh = None + self._async_unsub_refresh() + if self._listeners: + self._schedule_refresh() async def async_unload_entry(self, entry: ConfigEntry) -> bool | None: """Unload a config entry from coordinator. diff --git a/homeassistant/helpers/event.py b/homeassistant/helpers/event.py index daad994bbd42..11bfe04473af 100644 --- a/homeassistant/helpers/event.py +++ b/homeassistant/helpers/event.py @@ -1434,6 +1434,37 @@ def async_track_point_in_utc_time( track_point_in_utc_time = threaded_listener_factory(async_track_point_in_utc_time) +@callback +@bind_hass +def async_call_at( + hass: HomeAssistant, + action: HassJob[[datetime], Coroutine[Any, Any, None] | None] + | Callable[[datetime], Coroutine[Any, Any, None] | None], + loop_time: float, +) -> CALLBACK_TYPE: + """Add a listener that is called at .""" + + @callback + def run_action(job: HassJob[[datetime], Coroutine[Any, Any, None] | None]) -> None: + """Call the action.""" + hass.async_run_hass_job(job, time_tracker_utcnow()) + + job = ( + action + if isinstance(action, HassJob) + else HassJob(action, f"call_at {loop_time}") + ) + cancel_callback = hass.loop.call_at(loop_time, run_action, job) + + @callback + def unsub_call_later_listener() -> None: + """Cancel the call_later.""" + assert cancel_callback is not None + cancel_callback.cancel() + + return unsub_call_later_listener + + @callback @bind_hass def async_call_later( diff --git a/homeassistant/helpers/update_coordinator.py b/homeassistant/helpers/update_coordinator.py index a050c0da9e40..34651fcaf9d1 100644 --- a/homeassistant/helpers/update_coordinator.py +++ b/homeassistant/helpers/update_coordinator.py @@ -81,6 +81,7 @@ class DataUpdateCoordinator(BaseDataUpdateCoordinatorProtocol, Generic[_DataT]): self._shutdown_requested = False self.config_entry = config_entries.current_entry.get() self.always_update = always_update + self._next_refresh: float | None = None # It's None before the first successful update. # Components should call async_config_entry_first_refresh @@ -89,10 +90,11 @@ class DataUpdateCoordinator(BaseDataUpdateCoordinatorProtocol, Generic[_DataT]): # when it was already checked during setup. self.data: _DataT = None # type: ignore[assignment] - # Pick a random microsecond to stagger the refreshes + # Pick a random microsecond in range 0.05..0.50 to stagger the refreshes # and avoid a thundering herd. - self._microsecond = randint( - event.RANDOM_MICROSECOND_MIN, event.RANDOM_MICROSECOND_MAX + self._microsecond = ( + randint(event.RANDOM_MICROSECOND_MIN, event.RANDOM_MICROSECOND_MAX) + / 10**6 ) self._listeners: dict[CALLBACK_TYPE, tuple[CALLBACK_TYPE, object | None]] = {} @@ -182,6 +184,7 @@ class DataUpdateCoordinator(BaseDataUpdateCoordinatorProtocol, Generic[_DataT]): """Unschedule any pending refresh since there is no longer any listeners.""" self._async_unsub_refresh() self._debounced_refresh.async_cancel() + self._next_refresh = None def async_contexts(self) -> Generator[Any, None, None]: """Return all registered contexts.""" @@ -214,20 +217,16 @@ class DataUpdateCoordinator(BaseDataUpdateCoordinatorProtocol, Generic[_DataT]): # than the debouncer cooldown, this would cause the debounce to never be called self._async_unsub_refresh() - # We _floor_ utcnow to create a schedule on a rounded second, - # minimizing the time between the point and the real activation. - # That way we obtain a constant update frequency, - # as long as the update process takes less than 500ms - # - # We do not align everything to happen at microsecond 0 - # since it increases the risk of a thundering herd - # when multiple coordinators are scheduled to update at the same time. - # - # https://github.com/home-assistant/core/issues/82231 - self._unsub_refresh = event.async_track_point_in_utc_time( + # We use event.async_call_at because DataUpdateCoordinator does + # not need an exact update interval. + now = self.hass.loop.time() + if self._next_refresh is None or self._next_refresh <= now: + self._next_refresh = int(now) + self._microsecond + self._next_refresh += self.update_interval.total_seconds() + self._unsub_refresh = event.async_call_at( self.hass, self._job, - utcnow().replace(microsecond=self._microsecond) + self.update_interval, + self._next_refresh, ) async def _handle_refresh_interval(self, _now: datetime) -> None: @@ -266,6 +265,7 @@ class DataUpdateCoordinator(BaseDataUpdateCoordinatorProtocol, Generic[_DataT]): async def async_refresh(self) -> None: """Refresh data and log errors.""" + self._next_refresh = None await self._async_refresh(log_failures=True) async def _async_refresh( # noqa: C901 @@ -405,6 +405,7 @@ class DataUpdateCoordinator(BaseDataUpdateCoordinatorProtocol, Generic[_DataT]): """Manually update data, notify listeners and reset refresh interval.""" self._async_unsub_refresh() self._debounced_refresh.async_cancel() + self._next_refresh = None self.data = data self.last_update_success = True diff --git a/tests/common.py b/tests/common.py index 6ccb804ee737..48bb38383c7c 100644 --- a/tests/common.py +++ b/tests/common.py @@ -412,12 +412,9 @@ def async_fire_time_changed( else: utc_datetime = dt_util.as_utc(datetime_) - if utc_datetime.microsecond < event.RANDOM_MICROSECOND_MAX: - # Allow up to 500000 microseconds to be added to the time - # to handle update_coordinator's and - # async_track_time_interval's - # staggering to avoid thundering herd. - utc_datetime = utc_datetime.replace(microsecond=event.RANDOM_MICROSECOND_MAX) + # Increase the mocked time by 0.5 s to account for up to 0.5 s delay + # added to events scheduled by update_coordinator and async_track_time_interval + utc_datetime += timedelta(microseconds=event.RANDOM_MICROSECOND_MAX) _async_fire_time_changed(hass, utc_datetime, fire_all) diff --git a/tests/helpers/test_event.py b/tests/helpers/test_event.py index 572a0d22e927..dc06b9d94c84 100644 --- a/tests/helpers/test_event.py +++ b/tests/helpers/test_event.py @@ -4174,27 +4174,27 @@ async def test_periodic_task_entering_dst_2( ) freezer.move_to(f"{today} 01:59:59.999999+01:00") - async_fire_time_changed(hass) + async_fire_time_changed_exact(hass) await hass.async_block_till_done() assert len(specific_runs) == 0 freezer.move_to(f"{today} 03:00:00.999999+02:00") - async_fire_time_changed(hass) + async_fire_time_changed_exact(hass) await hass.async_block_till_done() assert len(specific_runs) == 1 freezer.move_to(f"{today} 03:00:01.999999+02:00") - async_fire_time_changed(hass) + async_fire_time_changed_exact(hass) await hass.async_block_till_done() assert len(specific_runs) == 2 freezer.move_to(f"{tomorrow} 01:59:59.999999+02:00") - async_fire_time_changed(hass) + async_fire_time_changed_exact(hass) await hass.async_block_till_done() assert len(specific_runs) == 3 freezer.move_to(f"{tomorrow} 02:00:00.999999+02:00") - async_fire_time_changed(hass) + async_fire_time_changed_exact(hass) await hass.async_block_till_done() assert len(specific_runs) == 4 diff --git a/tests/helpers/test_update_coordinator.py b/tests/helpers/test_update_coordinator.py index 4258a508c347..182ed6c3cb46 100644 --- a/tests/helpers/test_update_coordinator.py +++ b/tests/helpers/test_update_coordinator.py @@ -6,6 +6,7 @@ from unittest.mock import AsyncMock, Mock, patch import urllib.error import aiohttp +from freezegun.api import FrozenDateTimeFactory import pytest import requests @@ -329,11 +330,14 @@ async def test_refresh_no_update_method( async def test_update_interval( - hass: HomeAssistant, crd: update_coordinator.DataUpdateCoordinator[int] + hass: HomeAssistant, + freezer: FrozenDateTimeFactory, + crd: update_coordinator.DataUpdateCoordinator[int], ) -> None: """Test update interval works.""" # Test we don't update without subscriber - async_fire_time_changed(hass, utcnow() + crd.update_interval) + freezer.tick(crd.update_interval) + async_fire_time_changed(hass) await hass.async_block_till_done() assert crd.data is None @@ -342,18 +346,21 @@ async def test_update_interval( unsub = crd.async_add_listener(update_callback) # Test twice we update with subscriber - async_fire_time_changed(hass, utcnow() + crd.update_interval) + freezer.tick(crd.update_interval) + async_fire_time_changed(hass) await hass.async_block_till_done() assert crd.data == 1 - async_fire_time_changed(hass, utcnow() + crd.update_interval) + freezer.tick(crd.update_interval) + async_fire_time_changed(hass) await hass.async_block_till_done() assert crd.data == 2 # Test removing listener unsub() - async_fire_time_changed(hass, utcnow() + crd.update_interval) + freezer.tick(crd.update_interval) + async_fire_time_changed(hass) await hass.async_block_till_done() # Test we stop updating after we lose last subscriber