1
mirror of https://github.com/home-assistant/core synced 2024-10-04 07:58:43 +02:00

Add the ability to register for shelly event callbacks (#82052)

This commit is contained in:
J. Nick Koston 2022-11-14 12:58:10 -06:00 committed by GitHub
parent 13577981f9
commit 956120662e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 91 additions and 30 deletions

View File

@ -1,7 +1,7 @@
"""Coordinators for the Shelly integration."""
from __future__ import annotations
from collections.abc import Coroutine
from collections.abc import Callable, Coroutine
from dataclasses import dataclass
from datetime import timedelta
from typing import Any, cast
@ -13,7 +13,7 @@ from aioshelly.rpc_device import RpcDevice
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import ATTR_DEVICE_ID, CONF_HOST, EVENT_HOMEASSISTANT_STOP
from homeassistant.core import Event, HomeAssistant, callback
from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant, callback
from homeassistant.helpers import device_registry
from homeassistant.helpers.debounce import Debouncer
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
@ -337,6 +337,7 @@ class ShellyRpcCoordinator(DataUpdateCoordinator):
self.entry = entry
self.device = device
self._event_listeners: list[Callable[[dict[str, Any]], None]] = []
self._debounced_reload: Debouncer[Coroutine[Any, Any, None]] = Debouncer(
hass,
LOGGER,
@ -346,10 +347,8 @@ class ShellyRpcCoordinator(DataUpdateCoordinator):
)
entry.async_on_unload(self._debounced_reload.async_cancel)
entry.async_on_unload(
self.async_add_listener(self._async_device_updates_handler)
)
self._last_event: dict[str, Any] | None = None
self._last_status: dict[str, Any] | None = None
entry.async_on_unload(
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self._handle_ha_stop)
@ -379,24 +378,32 @@ class ShellyRpcCoordinator(DataUpdateCoordinator):
return True
@callback
def _async_device_updates_handler(self) -> None:
"""Handle device updates."""
if (
not self.device.initialized
or not self.device.event
or self.device.event == self._last_event
):
return
def async_subscribe_events(
self, event_callback: Callable[[dict[str, Any]], None]
) -> CALLBACK_TYPE:
"""Subscribe to events."""
def _unsubscribe() -> None:
self._event_listeners.remove(event_callback)
self._event_listeners.append(event_callback)
return _unsubscribe
@callback
def _async_device_event_handler(self, event_data: dict[str, Any]) -> None:
"""Handle device events."""
self.update_sleep_period()
events: list[dict[str, Any]] = event_data["events"]
self._last_event = self.device.event
for event in self.device.event["events"]:
for event in events:
event_type = event.get("event")
if event_type is None:
continue
for event_callback in self._event_listeners:
event_callback(event)
if event_type == "config_changed":
LOGGER.info(
"Config for %s changed, reloading entry in %s seconds",
@ -453,6 +460,22 @@ class ShellyRpcCoordinator(DataUpdateCoordinator):
"""Firmware version of the device."""
return self.device.firmware_version if self.device.initialized else ""
@callback
def _async_handle_update(self, device_: RpcDevice) -> None:
"""Handle device update."""
device = self.device
if not device.initialized:
return
event = device.event
status = device.status
if event and event != self._last_event:
self._last_event = event
self._async_device_event_handler(event)
if status and status != self._last_status:
self._last_status = status
self.async_set_updated_data(device)
def async_setup(self) -> None:
"""Set up the coordinator."""
dev_reg = device_registry.async_get(self.hass)
@ -467,7 +490,7 @@ class ShellyRpcCoordinator(DataUpdateCoordinator):
configuration_url=f"http://{self.entry.data[CONF_HOST]}",
)
self.device_id = entry.id
self.device.subscribe_updates(self.async_set_updated_data)
self.device.subscribe_updates(self._async_handle_update)
async def shutdown(self) -> None:
"""Shutdown the coordinator."""

View File

@ -1,4 +1,10 @@
"""Tests for the Shelly integration."""
from copy import deepcopy
from typing import Any
from unittest.mock import Mock
import pytest
from homeassistant.components.shelly.const import CONF_SLEEP_PERIOD, DOMAIN
from homeassistant.const import CONF_HOST
from homeassistant.core import HomeAssistant
@ -26,3 +32,16 @@ async def init_integration(
await hass.async_block_till_done()
return entry
def mutate_rpc_device_status(
monkeypatch: pytest.MonkeyPatch,
mock_rpc_device: Mock,
top_level_key: str,
key: str,
value: Any,
) -> None:
"""Mutate status for rpc device."""
new_status = deepcopy(mock_rpc_device.status)
new_status[top_level_key][key] = value
monkeypatch.setattr(mock_rpc_device, "status", new_status)

View File

@ -1,4 +1,8 @@
"""Tests for Shelly cover platform."""
from unittest.mock import Mock
import pytest
from homeassistant.components.cover import (
ATTR_CURRENT_POSITION,
ATTR_POSITION,
@ -13,8 +17,9 @@ from homeassistant.components.cover import (
STATE_OPENING,
)
from homeassistant.const import ATTR_ENTITY_ID
from homeassistant.core import HomeAssistant
from . import init_integration
from . import init_integration, mutate_rpc_device_status
ROLLER_BLOCK_ID = 1
@ -77,7 +82,9 @@ async def test_block_device_no_roller_blocks(hass, mock_block_device, monkeypatc
assert hass.states.get("cover.test_name") is None
async def test_rpc_device_services(hass, mock_rpc_device, monkeypatch):
async def test_rpc_device_services(
hass: HomeAssistant, mock_rpc_device: Mock, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Test RPC device cover services."""
await init_integration(hass, 2)
@ -90,7 +97,9 @@ async def test_rpc_device_services(hass, mock_rpc_device, monkeypatch):
state = hass.states.get("cover.test_cover_0")
assert state.attributes[ATTR_CURRENT_POSITION] == 50
monkeypatch.setitem(mock_rpc_device.status["cover:0"], "state", "opening")
mutate_rpc_device_status(
monkeypatch, mock_rpc_device, "cover:0", "state", "opening"
)
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_OPEN_COVER,
@ -100,7 +109,9 @@ async def test_rpc_device_services(hass, mock_rpc_device, monkeypatch):
mock_rpc_device.mock_update()
assert hass.states.get("cover.test_cover_0").state == STATE_OPENING
monkeypatch.setitem(mock_rpc_device.status["cover:0"], "state", "closing")
mutate_rpc_device_status(
monkeypatch, mock_rpc_device, "cover:0", "state", "closing"
)
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_CLOSE_COVER,
@ -110,7 +121,7 @@ async def test_rpc_device_services(hass, mock_rpc_device, monkeypatch):
mock_rpc_device.mock_update()
assert hass.states.get("cover.test_cover_0").state == STATE_CLOSING
monkeypatch.setitem(mock_rpc_device.status["cover:0"], "state", "closed")
mutate_rpc_device_status(monkeypatch, mock_rpc_device, "cover:0", "state", "closed")
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_STOP_COVER,
@ -121,26 +132,34 @@ async def test_rpc_device_services(hass, mock_rpc_device, monkeypatch):
assert hass.states.get("cover.test_cover_0").state == STATE_CLOSED
async def test_rpc_device_no_cover_keys(hass, mock_rpc_device, monkeypatch):
async def test_rpc_device_no_cover_keys(
hass: HomeAssistant, mock_rpc_device: Mock, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Test RPC device without cover keys."""
monkeypatch.delitem(mock_rpc_device.status, "cover:0")
await init_integration(hass, 2)
assert hass.states.get("cover.test_cover_0") is None
async def test_rpc_device_update(hass, mock_rpc_device, monkeypatch):
async def test_rpc_device_update(
hass: HomeAssistant, mock_rpc_device: Mock, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Test RPC device update."""
monkeypatch.setitem(mock_rpc_device.status["cover:0"], "state", "closed")
mutate_rpc_device_status(monkeypatch, mock_rpc_device, "cover:0", "state", "closed")
await init_integration(hass, 2)
assert hass.states.get("cover.test_cover_0").state == STATE_CLOSED
monkeypatch.setitem(mock_rpc_device.status["cover:0"], "state", "open")
mutate_rpc_device_status(monkeypatch, mock_rpc_device, "cover:0", "state", "open")
mock_rpc_device.mock_update()
assert hass.states.get("cover.test_cover_0").state == STATE_OPEN
async def test_rpc_device_no_position_control(hass, mock_rpc_device, monkeypatch):
async def test_rpc_device_no_position_control(
hass: HomeAssistant, mock_rpc_device: Mock, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Test RPC device with no position control."""
monkeypatch.setitem(mock_rpc_device.status["cover:0"], "pos_control", False)
mutate_rpc_device_status(
monkeypatch, mock_rpc_device, "cover:0", "pos_control", False
)
await init_integration(hass, 2)
assert hass.states.get("cover.test_cover_0").state == STATE_OPEN

View File

@ -25,7 +25,7 @@ from homeassistant.const import (
STATE_ON,
)
from . import init_integration
from . import init_integration, mutate_rpc_device_status
RELAY_BLOCK_ID = 0
LIGHT_BLOCK_ID = 2
@ -374,7 +374,7 @@ async def test_rpc_device_switch_type_lights_mode(hass, mock_rpc_device, monkeyp
)
assert hass.states.get("light.test_switch_0").state == STATE_ON
monkeypatch.setitem(mock_rpc_device.status["switch:0"], "output", False)
mutate_rpc_device_status(monkeypatch, mock_rpc_device, "switch:0", "output", False)
await hass.services.async_call(
LIGHT_DOMAIN,
SERVICE_TURN_OFF,