Add strict typing to core.py (1) - EventBus (#63239)

This commit is contained in:
Marc Mueller 2022-01-06 03:14:42 +01:00 committed by GitHub
parent 4283b2358c
commit 8207665c3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 19 deletions

View File

@ -20,7 +20,7 @@ from homeassistant.const import (
LENGTH_METERS,
Platform,
)
from homeassistant.core import HomeAssistant
from homeassistant.core import Event, HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.util.distance import convert as convert_distance
@ -109,7 +109,7 @@ class MetDataUpdateCoordinator(DataUpdateCoordinator):
if self._unsub_track_home:
return
async def _async_update_weather_data(_event: str | None = None) -> None:
async def _async_update_weather_data(_event: Event | None = None) -> None:
"""Update weather data."""
if self.weather.set_coordinates():
await self.async_refresh()

View File

@ -22,7 +22,7 @@ from homeassistant.const import (
EVENT_HOMEASSISTANT_START,
EVENT_HOMEASSISTANT_STOP,
)
from homeassistant.core import CoreState, HomeAssistant, ServiceCall
from homeassistant.core import CoreState, Event, HomeAssistant, ServiceCall
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers import (
aiohttp_client,
@ -150,7 +150,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
_webhook_retries = 0
async def unregister_webhook(call: ServiceCall | None) -> None:
async def unregister_webhook(call_or_event: ServiceCall | Event | None) -> None:
if CONF_WEBHOOK_ID not in entry.data:
return
_LOGGER.debug("Unregister Netatmo webhook (%s)", entry.data[CONF_WEBHOOK_ID])
@ -172,7 +172,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
_webhook_retries += 1
async_call_later(hass, 30, register_webhook)
async def register_webhook(call: ServiceCall | None) -> None:
async def register_webhook(call_or_event: ServiceCall | Event | None) -> None:
if CONF_WEBHOOK_ID not in entry.data:
data = {**entry.data, CONF_WEBHOOK_ID: secrets.token_hex()}
hass.config_entries.async_update_entry(entry, data=data)

View File

@ -18,7 +18,16 @@ import re
import threading
from time import monotonic
from types import MappingProxyType
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Optional, TypeVar, cast
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
NamedTuple,
Optional,
TypeVar,
cast,
)
from urllib.parse import urlparse
import attr
@ -673,12 +682,19 @@ class Event:
)
class _FilterableJob(NamedTuple):
"""Event listener job to be executed with optional filter."""
job: HassJob
event_filter: Callable[[Event], bool] | None
class EventBus:
"""Allow the firing of and listening for events."""
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize a new event bus."""
self._listeners: dict[str, list[tuple[HassJob, Callable | None]]] = {}
self._listeners: dict[str, list[_FilterableJob]] = {}
self._hass = hass
@callback
@ -697,7 +713,7 @@ class EventBus:
def fire(
self,
event_type: str,
event_data: dict | None = None,
event_data: dict[str, Any] | None = None,
origin: EventOrigin = EventOrigin.local,
context: Context | None = None,
) -> None:
@ -726,7 +742,7 @@ class EventBus:
listeners = self._listeners.get(event_type, [])
# EVENT_HOMEASSISTANT_CLOSE should go only to his listeners
# EVENT_HOMEASSISTANT_CLOSE should go only to this listeners
match_all_listeners = self._listeners.get(MATCH_ALL)
if match_all_listeners is not None and event_type != EVENT_HOMEASSISTANT_CLOSE:
listeners = match_all_listeners + listeners
@ -749,7 +765,11 @@ class EventBus:
continue
self._hass.async_add_hass_job(job, event)
def listen(self, event_type: str, listener: Callable) -> CALLBACK_TYPE:
def listen(
self,
event_type: str,
listener: Callable[[Event], None | Awaitable[None]],
) -> CALLBACK_TYPE:
"""Listen for all events or events of a specific type.
To listen to all events specify the constant ``MATCH_ALL``
@ -769,8 +789,8 @@ class EventBus:
def async_listen(
self,
event_type: str,
listener: Callable,
event_filter: Callable | None = None,
listener: Callable[[Event], None | Awaitable[None]],
event_filter: Callable[[Event], bool] | None = None,
) -> CALLBACK_TYPE:
"""Listen for all events or events of a specific type.
@ -786,12 +806,12 @@ class EventBus:
if event_filter is not None and not is_callback(event_filter):
raise HomeAssistantError(f"Event filter {event_filter} is not a callback")
return self._async_listen_filterable_job(
event_type, (HassJob(listener), event_filter)
event_type, _FilterableJob(HassJob(listener), event_filter)
)
@callback
def _async_listen_filterable_job(
self, event_type: str, filterable_job: tuple[HassJob, Callable | None]
self, event_type: str, filterable_job: _FilterableJob
) -> CALLBACK_TYPE:
self._listeners.setdefault(event_type, []).append(filterable_job)
@ -802,7 +822,7 @@ class EventBus:
return remove_listener
def listen_once(
self, event_type: str, listener: Callable[[Event], None]
self, event_type: str, listener: Callable[[Event], None | Awaitable[None]]
) -> CALLBACK_TYPE:
"""Listen once for event of a specific type.
@ -822,7 +842,9 @@ class EventBus:
return remove_listener
@callback
def async_listen_once(self, event_type: str, listener: Callable) -> CALLBACK_TYPE:
def async_listen_once(
self, event_type: str, listener: Callable[[Event], None | Awaitable[None]]
) -> CALLBACK_TYPE:
"""Listen once for event of a specific type.
To listen to all events specify the constant ``MATCH_ALL``
@ -832,7 +854,7 @@ class EventBus:
This method must be run in the event loop.
"""
filterable_job: tuple[HassJob, Callable | None] | None = None
filterable_job: _FilterableJob | None = None
@callback
def _onetime_listener(event: Event) -> None:
@ -854,13 +876,13 @@ class EventBus:
_onetime_listener, listener, ("__name__", "__qualname__", "__module__"), []
)
filterable_job = (HassJob(_onetime_listener), None)
filterable_job = _FilterableJob(HassJob(_onetime_listener), None)
return self._async_listen_filterable_job(event_type, filterable_job)
@callback
def _async_remove_listener(
self, event_type: str, filterable_job: tuple[HassJob, Callable | None]
self, event_type: str, filterable_job: _FilterableJob
) -> None:
"""Remove a listener of a specific event_type.