Teach CoordinatorWeatherEntity about multiple coordinators (#98830)

This commit is contained in:
Erik Montnemery 2023-08-24 11:28:20 +02:00 committed by GitHub
parent f395147f7c
commit c47983621c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 332 additions and 158 deletions

View File

@ -11,8 +11,8 @@ from homeassistant.components.weather import (
ATTR_FORECAST_TIME,
ATTR_FORECAST_WIND_BEARING,
DOMAIN as WEATHER_DOMAIN,
CoordinatorWeatherEntity,
Forecast,
SingleCoordinatorWeatherEntity,
WeatherEntityFeature,
)
from homeassistant.config_entries import ConfigEntry
@ -22,7 +22,7 @@ from homeassistant.const import (
UnitOfSpeed,
UnitOfTemperature,
)
from homeassistant.core import HomeAssistant
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.entity_platform import AddEntitiesCallback
@ -110,7 +110,7 @@ async def async_setup_entry(
async_add_entities(entities, False)
class AemetWeather(CoordinatorWeatherEntity[WeatherUpdateCoordinator]):
class AemetWeather(SingleCoordinatorWeatherEntity[WeatherUpdateCoordinator]):
"""Implementation of an AEMET OpenData sensor."""
_attr_attribution = ATTRIBUTION
@ -160,11 +160,13 @@ class AemetWeather(CoordinatorWeatherEntity[WeatherUpdateCoordinator]):
"""Return the forecast array."""
return self._forecast(self._forecast_mode)
async def async_forecast_daily(self) -> list[Forecast]:
@callback
def _async_forecast_daily(self) -> list[Forecast]:
"""Return the daily forecast in native units."""
return self._forecast(FORECAST_MODE_DAILY)
async def async_forecast_hourly(self) -> list[Forecast]:
@callback
def _async_forecast_hourly(self) -> list[Forecast]:
"""Return the hourly forecast in native units."""
return self._forecast(FORECAST_MODE_HOURLY)

View File

@ -22,8 +22,8 @@ from homeassistant.components.weather import (
ATTR_FORECAST_PRECIPITATION_PROBABILITY,
ATTR_FORECAST_TIME,
DOMAIN as WEATHER_DOMAIN,
CoordinatorWeatherEntity,
Forecast,
SingleCoordinatorWeatherEntity,
WeatherEntityFeature,
)
from homeassistant.config_entries import ConfigEntry
@ -33,7 +33,7 @@ from homeassistant.const import (
UnitOfSpeed,
UnitOfTemperature,
)
from homeassistant.core import HomeAssistant
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.util import dt as dt_util
@ -86,7 +86,7 @@ def _calculate_unique_id(config_entry_unique_id: str | None, hourly: bool) -> st
return f"{config_entry_unique_id}{'-hourly' if hourly else '-daily'}"
class ECWeather(CoordinatorWeatherEntity):
class ECWeather(SingleCoordinatorWeatherEntity):
"""Representation of a weather condition."""
_attr_has_entity_name = True
@ -182,11 +182,13 @@ class ECWeather(CoordinatorWeatherEntity):
"""Return the forecast array."""
return get_forecast(self.ec_data, self._hourly)
async def async_forecast_daily(self) -> list[Forecast] | None:
@callback
def _async_forecast_daily(self) -> list[Forecast] | None:
"""Return the daily forecast in native units."""
return get_forecast(self.ec_data, False)
async def async_forecast_hourly(self) -> list[Forecast] | None:
@callback
def _async_forecast_hourly(self) -> list[Forecast] | None:
"""Return the hourly forecast in native units."""
return get_forecast(self.ec_data, True)

View File

@ -16,8 +16,8 @@ from homeassistant.components.weather import (
ATTR_WEATHER_WIND_GUST_SPEED,
ATTR_WEATHER_WIND_SPEED,
DOMAIN as WEATHER_DOMAIN,
CoordinatorWeatherEntity,
Forecast,
SingleCoordinatorWeatherEntity,
WeatherEntityFeature,
)
from homeassistant.config_entries import ConfigEntry
@ -30,7 +30,7 @@ from homeassistant.const import (
UnitOfSpeed,
UnitOfTemperature,
)
from homeassistant.core import HomeAssistant
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
@ -91,7 +91,7 @@ def format_condition(condition: str) -> str:
return condition
class MetWeather(CoordinatorWeatherEntity[MetDataUpdateCoordinator]):
class MetWeather(SingleCoordinatorWeatherEntity[MetDataUpdateCoordinator]):
"""Implementation of a Met.no weather condition."""
_attr_attribution = (
@ -239,11 +239,13 @@ class MetWeather(CoordinatorWeatherEntity[MetDataUpdateCoordinator]):
"""Return the forecast array."""
return self._forecast(self._hourly)
async def async_forecast_daily(self) -> list[Forecast] | None:
@callback
def _async_forecast_daily(self) -> list[Forecast] | None:
"""Return the daily forecast in native units."""
return self._forecast(False)
async def async_forecast_hourly(self) -> list[Forecast] | None:
@callback
def _async_forecast_hourly(self) -> list[Forecast] | None:
"""Return the hourly forecast in native units."""
return self._forecast(True)

View File

@ -7,8 +7,8 @@ from homeassistant.components.weather import (
ATTR_FORECAST_CONDITION,
ATTR_FORECAST_TIME,
DOMAIN as WEATHER_DOMAIN,
CoordinatorWeatherEntity,
Forecast,
SingleCoordinatorWeatherEntity,
WeatherEntityFeature,
)
from homeassistant.config_entries import ConfigEntry
@ -21,7 +21,7 @@ from homeassistant.const import (
UnitOfSpeed,
UnitOfTemperature,
)
from homeassistant.core import HomeAssistant
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
@ -75,7 +75,7 @@ def _calculate_unique_id(config: MappingProxyType[str, Any], hourly: bool) -> st
class MetEireannWeather(
CoordinatorWeatherEntity[DataUpdateCoordinator[MetEireannWeatherData]]
SingleCoordinatorWeatherEntity[DataUpdateCoordinator[MetEireannWeatherData]]
):
"""Implementation of a Met Éireann weather condition."""
@ -182,11 +182,13 @@ class MetEireannWeather(
"""Return the forecast array."""
return self._forecast(self._hourly)
async def async_forecast_daily(self) -> list[Forecast]:
@callback
def _async_forecast_daily(self) -> list[Forecast]:
"""Return the daily forecast in native units."""
return self._forecast(False)
async def async_forecast_hourly(self) -> list[Forecast]:
@callback
def _async_forecast_hourly(self) -> list[Forecast]:
"""Return the hourly forecast in native units."""
return self._forecast(True)

View File

@ -16,7 +16,7 @@ from homeassistant.helpers import debounce
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.event import async_track_point_in_utc_time
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from homeassistant.helpers.update_coordinator import TimestampDataUpdateCoordinator
from homeassistant.util.dt import utcnow
from .const import CONF_STATION, DOMAIN, UPDATE_TIME_PERIOD
@ -45,7 +45,7 @@ class NWSData:
coordinator_forecast_hourly: NwsDataUpdateCoordinator
class NwsDataUpdateCoordinator(DataUpdateCoordinator[None]):
class NwsDataUpdateCoordinator(TimestampDataUpdateCoordinator[None]):
"""NWS data update coordinator.
Implements faster data update intervals for failed updates and exposes a last successful update time.
@ -72,7 +72,6 @@ class NwsDataUpdateCoordinator(DataUpdateCoordinator[None]):
request_refresh_debouncer=request_refresh_debouncer,
)
self.failed_update_interval = failed_update_interval
self.last_update_success_time: datetime.datetime | None = None
@callback
def _schedule_refresh(self) -> None:
@ -98,23 +97,6 @@ class NwsDataUpdateCoordinator(DataUpdateCoordinator[None]):
utcnow().replace(microsecond=0) + update_interval,
)
async def _async_refresh(
self,
log_failures: bool = True,
raise_on_auth_failed: bool = False,
scheduled: bool = False,
raise_on_entry_error: bool = False,
) -> None:
"""Refresh data."""
await super()._async_refresh(
log_failures,
raise_on_auth_failed,
scheduled,
raise_on_entry_error,
)
if self.last_update_success:
self.last_update_success_time = utcnow()
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up a National Weather Service entry."""

View File

@ -1,9 +1,8 @@
"""Support for NWS weather service."""
from __future__ import annotations
from collections.abc import Callable
from types import MappingProxyType
from typing import TYPE_CHECKING, Any, Literal, cast
from typing import TYPE_CHECKING, Any, cast
from homeassistant.components.weather import (
ATTR_CONDITION_CLEAR_NIGHT,
@ -18,8 +17,8 @@ from homeassistant.components.weather import (
ATTR_FORECAST_TIME,
ATTR_FORECAST_WIND_BEARING,
DOMAIN as WEATHER_DOMAIN,
CoordinatorWeatherEntity,
Forecast,
WeatherEntity,
WeatherEntityFeature,
)
from homeassistant.config_entries import ConfigEntry
@ -38,13 +37,7 @@ from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.util.dt import utcnow
from homeassistant.util.unit_conversion import SpeedConverter, TemperatureConverter
from . import (
DEFAULT_SCAN_INTERVAL,
NWSData,
NwsDataUpdateCoordinator,
base_unique_id,
device_info,
)
from . import NWSData, base_unique_id, device_info
from .const import (
ATTR_FORECAST_DETAILED_DESCRIPTION,
ATTRIBUTION,
@ -120,7 +113,7 @@ def _calculate_unique_id(entry_data: MappingProxyType[str, Any], mode: str) -> s
return f"{base_unique_id(latitude, longitude)}_{mode}"
class NWSWeather(WeatherEntity):
class NWSWeather(CoordinatorWeatherEntity):
"""Representation of a weather condition."""
_attr_attribution = ATTRIBUTION
@ -136,19 +129,21 @@ class NWSWeather(WeatherEntity):
mode: str,
) -> None:
"""Initialise the platform with a data instance and station name."""
super().__init__(
observation_coordinator=nws_data.coordinator_observation,
hourly_coordinator=nws_data.coordinator_forecast_hourly,
twice_daily_coordinator=nws_data.coordinator_forecast,
hourly_forecast_valid=FORECAST_VALID_TIME,
twice_daily_forecast_valid=FORECAST_VALID_TIME,
)
self.nws = nws_data.api
self.latitude = entry_data[CONF_LATITUDE]
self.longitude = entry_data[CONF_LONGITUDE]
self.coordinator_forecast_hourly = nws_data.coordinator_forecast_hourly
self.coordinator_forecast_twice_daily = nws_data.coordinator_forecast
self.coordinator_observation = nws_data.coordinator_observation
if mode == DAYNIGHT:
self.coordinator_forecast_legacy = nws_data.coordinator_forecast
else:
self.coordinator_forecast_legacy = nws_data.coordinator_forecast_hourly
self.station = self.nws.station
self._unsub_hourly_forecast: Callable[[], None] | None = None
self._unsub_twice_daily_forecast: Callable[[], None] | None = None
self.mode = mode
@ -161,76 +156,42 @@ class NWSWeather(WeatherEntity):
async def async_added_to_hass(self) -> None:
"""Set up a listener and load data."""
await super().async_added_to_hass()
self.async_on_remove(
self.coordinator_observation.async_add_listener(self._update_callback)
)
self.async_on_remove(
self.coordinator_forecast_legacy.async_add_listener(self._update_callback)
)
self.async_on_remove(self._remove_hourly_forecast_listener)
self.async_on_remove(self._remove_twice_daily_forecast_listener)
self._update_callback()
def _remove_hourly_forecast_listener(self) -> None:
"""Remove hourly forecast listener."""
if self._unsub_hourly_forecast:
self._unsub_hourly_forecast()
self._unsub_hourly_forecast = None
def _remove_twice_daily_forecast_listener(self) -> None:
"""Remove hourly forecast listener."""
if self._unsub_twice_daily_forecast:
self._unsub_twice_daily_forecast()
self._unsub_twice_daily_forecast = None
@callback
def _async_subscription_started(
self,
forecast_type: Literal["daily", "hourly", "twice_daily"],
) -> None:
"""Start subscription to forecast_type."""
if forecast_type == "hourly" and self.mode == DAYNIGHT:
self._unsub_hourly_forecast = (
self.coordinator_forecast_hourly.async_add_listener(
self._update_callback
)
self.coordinator_forecast_legacy.async_add_listener(
self._handle_legacy_forecast_coordinator_update
)
return
if forecast_type == "twice_daily" and self.mode == HOURLY:
self._unsub_twice_daily_forecast = (
self.coordinator_forecast_twice_daily.async_add_listener(
self._update_callback
)
)
return
)
# Load initial data from coordinators
self._handle_coordinator_update()
self._handle_hourly_forecast_coordinator_update()
self._handle_twice_daily_forecast_coordinator_update()
self._handle_legacy_forecast_coordinator_update()
@callback
def _async_subscription_ended(
self,
forecast_type: Literal["daily", "hourly", "twice_daily"],
) -> None:
"""End subscription to forecast_type."""
if forecast_type == "hourly" and self.mode == DAYNIGHT:
self._remove_hourly_forecast_listener()
if forecast_type == "twice_daily" and self.mode == HOURLY:
self._remove_twice_daily_forecast_listener()
@callback
def _update_callback(self) -> None:
def _handle_coordinator_update(self) -> None:
"""Load data from integration."""
self.observation = self.nws.observation
self.async_write_ha_state()
@callback
def _handle_hourly_forecast_coordinator_update(self) -> None:
"""Handle updated data from the hourly forecast coordinator."""
self._forecast_hourly = self.nws.forecast_hourly
@callback
def _handle_twice_daily_forecast_coordinator_update(self) -> None:
"""Handle updated data from the twice daily forecast coordinator."""
self._forecast_twice_daily = self.nws.forecast
@callback
def _handle_legacy_forecast_coordinator_update(self) -> None:
"""Handle updated data from the legacy forecast coordinator."""
if self.mode == DAYNIGHT:
self._forecast_legacy = self.nws.forecast
else:
self._forecast_legacy = self.nws.forecast_hourly
self.async_write_ha_state()
assert self.platform.config_entry
self.platform.config_entry.async_create_task(
self.hass, self.async_update_listeners(("hourly", "twice_daily"))
)
@property
def name(self) -> str:
@ -373,50 +334,29 @@ class NWSWeather(WeatherEntity):
"""Return forecast."""
return self._forecast(self._forecast_legacy, self.mode)
async def _async_forecast(
self,
coordinator: NwsDataUpdateCoordinator,
nws_forecast: list[dict[str, Any]] | None,
mode: str,
) -> list[Forecast] | None:
"""Refresh stale forecast and return it in native units."""
if (
not (last_success_time := coordinator.last_update_success_time)
or utcnow() - last_success_time >= DEFAULT_SCAN_INTERVAL
):
await coordinator.async_refresh()
if (
not (last_success_time := coordinator.last_update_success_time)
or utcnow() - last_success_time >= FORECAST_VALID_TIME
):
return None
return self._forecast(nws_forecast, mode)
async def async_forecast_hourly(self) -> list[Forecast] | None:
@callback
def _async_forecast_hourly(self) -> list[Forecast] | None:
"""Return the hourly forecast in native units."""
coordinator = self.coordinator_forecast_hourly
return await self._async_forecast(coordinator, self._forecast_hourly, HOURLY)
return self._forecast(self._forecast_hourly, HOURLY)
async def async_forecast_twice_daily(self) -> list[Forecast] | None:
@callback
def _async_forecast_twice_daily(self) -> list[Forecast] | None:
"""Return the twice daily forecast in native units."""
coordinator = self.coordinator_forecast_twice_daily
return await self._async_forecast(
coordinator, self._forecast_twice_daily, DAYNIGHT
)
return self._forecast(self._forecast_twice_daily, DAYNIGHT)
@property
def available(self) -> bool:
"""Return if state is available."""
last_success = (
self.coordinator_observation.last_update_success
self.coordinator.last_update_success
and self.coordinator_forecast_legacy.last_update_success
)
if (
self.coordinator_observation.last_update_success_time
self.coordinator.last_update_success_time
and self.coordinator_forecast_legacy.last_update_success_time
):
last_success_time = (
utcnow() - self.coordinator_observation.last_update_success_time
utcnow() - self.coordinator.last_update_success_time
< OBSERVATION_VALID_TIME
and utcnow() - self.coordinator_forecast_legacy.last_update_success_time
< FORECAST_VALID_TIME
@ -430,7 +370,7 @@ class NWSWeather(WeatherEntity):
Only used by the generic entity update service.
"""
await self.coordinator_observation.async_request_refresh()
await self.coordinator.async_request_refresh()
await self.coordinator_forecast_legacy.async_request_refresh()
@property

View File

@ -4,13 +4,13 @@ from __future__ import annotations
from open_meteo import Forecast as OpenMeteoForecast
from homeassistant.components.weather import (
CoordinatorWeatherEntity,
Forecast,
SingleCoordinatorWeatherEntity,
WeatherEntityFeature,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import UnitOfPrecipitationDepth, UnitOfSpeed, UnitOfTemperature
from homeassistant.core import HomeAssistant
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
@ -29,7 +29,7 @@ async def async_setup_entry(
class OpenMeteoWeatherEntity(
CoordinatorWeatherEntity[DataUpdateCoordinator[OpenMeteoForecast]]
SingleCoordinatorWeatherEntity[DataUpdateCoordinator[OpenMeteoForecast]]
):
"""Defines an Open-Meteo weather entity."""
@ -124,6 +124,7 @@ class OpenMeteoWeatherEntity(
return forecasts
async def async_forecast_daily(self) -> list[Forecast] | None:
@callback
def _async_forecast_daily(self) -> list[Forecast] | None:
"""Return the daily forecast in native units."""
return self.forecast

View File

@ -17,8 +17,8 @@ from homeassistant.components.weather import (
ATTR_FORECAST_TIME,
ATTR_FORECAST_WIND_BEARING,
DOMAIN as WEATHER_DOMAIN,
CoordinatorWeatherEntity,
Forecast,
SingleCoordinatorWeatherEntity,
WeatherEntityFeature,
)
from homeassistant.config_entries import ConfigEntry
@ -31,7 +31,7 @@ from homeassistant.const import (
UnitOfSpeed,
UnitOfTemperature,
)
from homeassistant.core import HomeAssistant
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.sun import is_up
@ -93,7 +93,7 @@ def _calculate_unique_id(config_entry_unique_id: str | None, forecast_type: str)
return f"{config_entry_unique_id}_{forecast_type}"
class TomorrowioWeatherEntity(TomorrowioEntity, CoordinatorWeatherEntity):
class TomorrowioWeatherEntity(TomorrowioEntity, SingleCoordinatorWeatherEntity):
"""Entity that talks to Tomorrow.io v4 API to retrieve weather data."""
_attr_native_precipitation_unit = UnitOfPrecipitationDepth.MILLIMETERS
@ -303,10 +303,12 @@ class TomorrowioWeatherEntity(TomorrowioEntity, CoordinatorWeatherEntity):
"""Return the forecast array."""
return self._forecast(self.forecast_type)
async def async_forecast_daily(self) -> list[Forecast] | None:
@callback
def _async_forecast_daily(self) -> list[Forecast] | None:
"""Return the daily forecast in native units."""
return self._forecast(DAILY)
async def async_forecast_hourly(self) -> list[Forecast] | None:
@callback
def _async_forecast_hourly(self) -> list[Forecast] | None:
"""Return the hourly forecast in native units."""
return self._forecast(HOURLY)

View File

@ -6,9 +6,20 @@ from collections.abc import Callable, Iterable
from contextlib import suppress
from dataclasses import dataclass
from datetime import timedelta
from functools import partial
import inspect
import logging
from typing import Any, Final, Literal, Required, TypedDict, TypeVar, final
from typing import (
Any,
Final,
Generic,
Literal,
Required,
TypedDict,
TypeVar,
cast,
final,
)
import voluptuous as vol
@ -40,7 +51,9 @@ from homeassistant.helpers.typing import ConfigType
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
TimestampDataUpdateCoordinator,
)
from homeassistant.util.dt import utcnow
from homeassistant.util.json import JsonValueType
from homeassistant.util.unit_system import US_CUSTOMARY_SYSTEM
@ -121,8 +134,22 @@ ROUNDING_PRECISION = 2
SERVICE_GET_FORECAST: Final = "get_forecast"
_DataUpdateCoordinatorT = TypeVar(
"_DataUpdateCoordinatorT", bound="DataUpdateCoordinator[Any]"
_ObservationUpdateCoordinatorT = TypeVar(
"_ObservationUpdateCoordinatorT", bound="DataUpdateCoordinator[Any]"
)
# Note:
# Mypy bug https://github.com/python/mypy/issues/9424 prevents us from making the
# forecast cooordinators optional, bound=TimestampDataUpdateCoordinator[Any] | None
_DailyForecastUpdateCoordinatorT = TypeVar(
"_DailyForecastUpdateCoordinatorT", bound="TimestampDataUpdateCoordinator[Any]"
)
_HourlyForecastUpdateCoordinatorT = TypeVar(
"_HourlyForecastUpdateCoordinatorT", bound="TimestampDataUpdateCoordinator[Any]"
)
_TwiceDailyForecastUpdateCoordinatorT = TypeVar(
"_TwiceDailyForecastUpdateCoordinatorT", bound="TimestampDataUpdateCoordinator[Any]"
)
# mypy: disallow-any-generics
@ -1187,9 +1214,200 @@ async def async_get_forecast_service(
class CoordinatorWeatherEntity(
CoordinatorEntity[_DataUpdateCoordinatorT], WeatherEntity
CoordinatorEntity[_ObservationUpdateCoordinatorT],
WeatherEntity,
Generic[
_ObservationUpdateCoordinatorT,
_DailyForecastUpdateCoordinatorT,
_HourlyForecastUpdateCoordinatorT,
_TwiceDailyForecastUpdateCoordinatorT,
],
):
"""A class for weather entities using a single DataUpdateCoordinator."""
"""A class for weather entities using DataUpdateCoordinators."""
def __init__(
self,
observation_coordinator: _ObservationUpdateCoordinatorT,
*,
context: Any = None,
daily_coordinator: _DailyForecastUpdateCoordinatorT | None = None,
hourly_coordinator: _DailyForecastUpdateCoordinatorT | None = None,
twice_daily_coordinator: _DailyForecastUpdateCoordinatorT | None = None,
daily_forecast_valid: timedelta | None = None,
hourly_forecast_valid: timedelta | None = None,
twice_daily_forecast_valid: timedelta | None = None,
) -> None:
"""Initialize."""
super().__init__(observation_coordinator, context)
self.forecast_coordinators = {
"daily": daily_coordinator,
"hourly": hourly_coordinator,
"twice_daily": twice_daily_coordinator,
}
self.forecast_valid = {
"daily": daily_forecast_valid,
"hourly": hourly_forecast_valid,
"twice_daily": twice_daily_forecast_valid,
}
self.unsub_forecast: dict[str, Callable[[], None] | None] = {
"daily": None,
"hourly": None,
"twice_daily": None,
}
async def async_added_to_hass(self) -> None:
"""When entity is added to hass."""
await super().async_added_to_hass()
self.async_on_remove(partial(self._remove_forecast_listener, "daily"))
self.async_on_remove(partial(self._remove_forecast_listener, "hourly"))
self.async_on_remove(partial(self._remove_forecast_listener, "twice_daily"))
def _remove_forecast_listener(
self, forecast_type: Literal["daily", "hourly", "twice_daily"]
) -> None:
"""Remove weather forecast listener."""
if unsub_fn := self.unsub_forecast[forecast_type]:
unsub_fn()
self.unsub_forecast[forecast_type] = None
@callback
def _async_subscription_started(
self,
forecast_type: Literal["daily", "hourly", "twice_daily"],
) -> None:
"""Start subscription to forecast_type."""
if not (coordinator := self.forecast_coordinators[forecast_type]):
return
self.unsub_forecast[forecast_type] = coordinator.async_add_listener(
partial(self._handle_forecast_update, forecast_type)
)
@callback
def _handle_daily_forecast_coordinator_update(self) -> None:
"""Handle updated data from the daily forecast coordinator."""
@callback
def _handle_hourly_forecast_coordinator_update(self) -> None:
"""Handle updated data from the hourly forecast coordinator."""
@callback
def _handle_twice_daily_forecast_coordinator_update(self) -> None:
"""Handle updated data from the twice daily forecast coordinator."""
@final
@callback
def _handle_forecast_update(
self, forecast_type: Literal["daily", "hourly", "twice_daily"]
) -> None:
"""Update forecast data."""
coordinator = self.forecast_coordinators[forecast_type]
assert coordinator
assert coordinator.config_entry is not None
getattr(self, f"_handle_{forecast_type}_forecast_coordinator_update")()
coordinator.config_entry.async_create_task(
self.hass, self.async_update_listeners((forecast_type,))
)
@callback
def _async_subscription_ended(
self,
forecast_type: Literal["daily", "hourly", "twice_daily"],
) -> None:
"""End subscription to forecast_type."""
self._remove_forecast_listener(forecast_type)
@final
async def _async_refresh_forecast(
self,
coordinator: TimestampDataUpdateCoordinator[Any],
forecast_valid_time: timedelta | None,
) -> bool:
"""Refresh stale forecast if needed."""
if coordinator.update_interval is None:
return True
if forecast_valid_time is None:
forecast_valid_time = coordinator.update_interval
if (
not (last_success_time := coordinator.last_update_success_time)
or utcnow() - last_success_time >= coordinator.update_interval
):
await coordinator.async_refresh()
if (
not (last_success_time := coordinator.last_update_success_time)
or utcnow() - last_success_time >= forecast_valid_time
):
return False
return True
@callback
def _async_forecast_daily(self) -> list[Forecast] | None:
"""Return the daily forecast in native units."""
raise NotImplementedError
@callback
def _async_forecast_hourly(self) -> list[Forecast] | None:
"""Return the hourly forecast in native units."""
raise NotImplementedError
@callback
def _async_forecast_twice_daily(self) -> list[Forecast] | None:
"""Return the twice daily forecast in native units."""
raise NotImplementedError
@final
async def _async_forecast(
self, forecast_type: Literal["daily", "hourly", "twice_daily"]
) -> list[Forecast] | None:
"""Return the forecast in native units."""
coordinator = self.forecast_coordinators[forecast_type]
if coordinator and not await self._async_refresh_forecast(
coordinator, self.forecast_valid[forecast_type]
):
return None
return cast(
list[Forecast] | None, getattr(self, f"_async_forecast_{forecast_type}")()
)
@final
async def async_forecast_daily(self) -> list[Forecast] | None:
"""Return the daily forecast in native units."""
return await self._async_forecast("daily")
@final
async def async_forecast_hourly(self) -> list[Forecast] | None:
"""Return the hourly forecast in native units."""
return await self._async_forecast("hourly")
@final
async def async_forecast_twice_daily(self) -> list[Forecast] | None:
"""Return the twice daily forecast in native units."""
return await self._async_forecast("twice_daily")
class SingleCoordinatorWeatherEntity(
CoordinatorWeatherEntity[
_ObservationUpdateCoordinatorT,
TimestampDataUpdateCoordinator[None],
TimestampDataUpdateCoordinator[None],
TimestampDataUpdateCoordinator[None],
],
):
"""A class for weather entities using a single DataUpdateCoordinators.
This class is added as a convenience because:
- Deriving from CoordinatorWeatherEntity requires specifying all type parameters
until we upgrade to Python 3.12 which supports defaults
- Mypy bug https://github.com/python/mypy/issues/9424 prevents us from making the
forecast cooordinator type vars optional
"""
def __init__(
self,
coordinator: _ObservationUpdateCoordinatorT,
context: Any = None,
) -> None:
"""Initialize."""
super().__init__(coordinator, context=context)
@callback
def _handle_coordinator_update(self) -> None:

View File

@ -419,6 +419,29 @@ class DataUpdateCoordinator(BaseDataUpdateCoordinatorProtocol, Generic[_DataT]):
self.async_update_listeners()
class TimestampDataUpdateCoordinator(DataUpdateCoordinator[_DataT]):
"""DataUpdateCoordinator which keeps track of the last successful update."""
last_update_success_time: datetime | None = None
async def _async_refresh(
self,
log_failures: bool = True,
raise_on_auth_failed: bool = False,
scheduled: bool = False,
raise_on_entry_error: bool = False,
) -> None:
"""Refresh data."""
await super()._async_refresh(
log_failures,
raise_on_auth_failed,
scheduled,
raise_on_entry_error,
)
if self.last_update_success:
self.last_update_success_time = utcnow()
class BaseCoordinatorEntity(entity.Entity, Generic[_BaseDataUpdateCoordinatorT]):
"""Base class for all Coordinator entities."""