1
mirror of https://github.com/home-assistant/core synced 2024-09-25 00:41:32 +02:00

Breakout heartbeat monitor and poe command queue in UniFi (#112529)

* Split out entity helper functionality to own class

* Split out heartbeat to own class

* Break out poe command

* Make more parts private

* Make more things private and simplify naming

* Sort initialize

* Fix ruff
This commit is contained in:
Robert Svensson 2024-04-23 21:45:20 +02:00 committed by GitHub
parent 3e0a45eee2
commit 8bf3c87336
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 193 additions and 91 deletions

View File

@ -240,7 +240,7 @@ class UnifiScannerEntity(UnifiEntity[HandlerT, ApiItemT], ScannerEntity):
self._ignore_events = False
self._is_connected = description.is_connected_fn(self.hub, self._obj_id)
if self.is_connected:
self.hub.async_heartbeat(
self.hub.update_heartbeat(
self.unique_id,
dt_util.utcnow()
+ description.heartbeat_timedelta_fn(self.hub, self._obj_id),
@ -301,12 +301,12 @@ class UnifiScannerEntity(UnifiEntity[HandlerT, ApiItemT], ScannerEntity):
# From unifi.entity.async_signal_reachable_callback
# Controller connection state has changed and entity is unavailable
# Cancel heartbeat
self.hub.async_heartbeat(self.unique_id)
self.hub.remove_heartbeat(self.unique_id)
return
if is_connected := description.is_connected_fn(self.hub, self._obj_id):
self._is_connected = is_connected
self.hub.async_heartbeat(
self.hub.update_heartbeat(
self.unique_id,
dt_util.utcnow()
+ description.heartbeat_timedelta_fn(self.hub, self._obj_id),
@ -319,12 +319,12 @@ class UnifiScannerEntity(UnifiEntity[HandlerT, ApiItemT], ScannerEntity):
return
if event.key in self._event_is_on:
self.hub.async_heartbeat(self.unique_id)
self.hub.remove_heartbeat(self.unique_id)
self._is_connected = True
self.async_write_ha_state()
return
self.hub.async_heartbeat(
self.hub.update_heartbeat(
self.unique_id,
dt_util.utcnow()
+ self.entity_description.heartbeat_timedelta_fn(self.hub, self._obj_id),
@ -344,7 +344,7 @@ class UnifiScannerEntity(UnifiEntity[HandlerT, ApiItemT], ScannerEntity):
async def async_will_remove_from_hass(self) -> None:
"""Disconnect object when removed."""
await super().async_will_remove_from_hass()
self.hub.async_heartbeat(self.unique_id)
self.hub.remove_heartbeat(self.unique_id)
@property
def extra_state_attributes(self) -> Mapping[str, Any] | None:

View File

@ -0,0 +1,156 @@
"""UniFi Network entity helper."""
from __future__ import annotations
from datetime import datetime, timedelta
import aiounifi
from aiounifi.models.device import DeviceSetPoePortModeRequest
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.event import async_call_later, async_track_time_interval
import homeassistant.util.dt as dt_util
class UnifiEntityHelper:
"""UniFi Network integration handling platforms for entity registration."""
def __init__(self, hass: HomeAssistant, api: aiounifi.Controller) -> None:
"""Initialize the UniFi entity loader."""
self.hass = hass
self.api = api
self._device_command = UnifiDeviceCommand(hass, api)
self._heartbeat = UnifiEntityHeartbeat(hass)
@callback
def reset(self) -> None:
"""Cancel timers."""
self._device_command.reset()
self._heartbeat.reset()
@callback
def initialize(self) -> None:
"""Initialize entity helper."""
self._heartbeat.initialize()
@property
def signal_heartbeat(self) -> str:
"""Event to signal new heartbeat missed."""
return self._heartbeat.signal
@callback
def update_heartbeat(self, unique_id: str, heartbeat_expire_time: datetime) -> None:
"""Update device time in heartbeat monitor."""
self._heartbeat.update(unique_id, heartbeat_expire_time)
@callback
def remove_heartbeat(self, unique_id: str) -> None:
"""Update device time in heartbeat monitor."""
self._heartbeat.remove(unique_id)
@callback
def queue_poe_port_command(
self, device_id: str, port_idx: int, poe_mode: str
) -> None:
"""Queue commands to execute them together per device."""
self._device_command.queue_poe_command(device_id, port_idx, poe_mode)
class UnifiEntityHeartbeat:
"""UniFi entity heartbeat monitor."""
CHECK_HEARTBEAT_INTERVAL = timedelta(seconds=1)
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize the heartbeat monitor."""
self.hass = hass
self._cancel_heartbeat_check: CALLBACK_TYPE | None = None
self._heartbeat_time: dict[str, datetime] = {}
@callback
def reset(self) -> None:
"""Cancel timers."""
if self._cancel_heartbeat_check:
self._cancel_heartbeat_check()
self._cancel_heartbeat_check = None
@callback
def initialize(self) -> None:
"""Initialize heartbeat monitor."""
self._cancel_heartbeat_check = async_track_time_interval(
self.hass, self._check_for_stale, self.CHECK_HEARTBEAT_INTERVAL
)
@property
def signal(self) -> str:
"""Event to signal new heartbeat missed."""
return "unifi-heartbeat-missed"
@callback
def update(self, unique_id: str, heartbeat_expire_time: datetime) -> None:
"""Update device time in heartbeat monitor."""
self._heartbeat_time[unique_id] = heartbeat_expire_time
@callback
def remove(self, unique_id: str) -> None:
"""Remove device from heartbeat monitor."""
self._heartbeat_time.pop(unique_id, None)
@callback
def _check_for_stale(self, *_: datetime) -> None:
"""Check for any devices scheduled to be marked disconnected."""
now = dt_util.utcnow()
unique_ids_to_remove = []
for unique_id, heartbeat_expire_time in self._heartbeat_time.items():
if now > heartbeat_expire_time:
async_dispatcher_send(self.hass, f"{self.signal}_{unique_id}")
unique_ids_to_remove.append(unique_id)
for unique_id in unique_ids_to_remove:
del self._heartbeat_time[unique_id]
class UnifiDeviceCommand:
"""UniFi Device command helper class."""
COMMAND_DELAY = 5
def __init__(self, hass: HomeAssistant, api: aiounifi.Controller) -> None:
"""Initialize device command helper."""
self.hass = hass
self.api = api
self._command_queue: dict[str, dict[int, str]] = {}
self._cancel_command: CALLBACK_TYPE | None = None
@callback
def reset(self) -> None:
"""Cancel timers."""
if self._cancel_command:
self._cancel_command()
self._cancel_command = None
@callback
def queue_poe_command(self, device_id: str, port_idx: int, poe_mode: str) -> None:
"""Queue commands to execute them together per device."""
self.reset()
device_queue = self._command_queue.setdefault(device_id, {})
device_queue[port_idx] = poe_mode
async def _command(now: datetime) -> None:
"""Execute previously queued commands."""
queue = self._command_queue.copy()
self._command_queue.clear()
for device_id, device_commands in queue.items():
device = self.api.devices[device_id]
commands = list(device_commands.items())
await self.api.request(
DeviceSetPoePortModeRequest.create(device, targets=commands)
)
self._cancel_command = async_call_later(self.hass, self.COMMAND_DELAY, _command)

View File

@ -2,13 +2,12 @@
from __future__ import annotations
from datetime import datetime, timedelta
from datetime import datetime
import aiounifi
from aiounifi.models.device import DeviceSetPoePortModeRequest
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant, callback
from homeassistant.core import Event, HomeAssistant, callback
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.device_registry import (
DeviceEntry,
@ -16,16 +15,13 @@ from homeassistant.helpers.device_registry import (
DeviceInfo,
)
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.event import async_call_later, async_track_time_interval
import homeassistant.util.dt as dt_util
from ..const import ATTR_MANUFACTURER, CONF_SITE_ID, DOMAIN as UNIFI_DOMAIN, PLATFORMS
from .config import UnifiConfig
from .entity_helper import UnifiEntityHelper
from .entity_loader import UnifiEntityLoader
from .websocket import UnifiWebsocket
CHECK_HEARTBEAT_INTERVAL = timedelta(seconds=1)
class UnifiHub:
"""Manages a single UniFi Network instance."""
@ -38,17 +34,12 @@ class UnifiHub:
self.api = api
self.config = UnifiConfig.from_config_entry(config_entry)
self.entity_loader = UnifiEntityLoader(self)
self._entity_helper = UnifiEntityHelper(hass, api)
self.websocket = UnifiWebsocket(hass, api, self.signal_reachable)
self.site = config_entry.data[CONF_SITE_ID]
self.is_admin = False
self._cancel_heartbeat_check: CALLBACK_TYPE | None = None
self._heartbeat_time: dict[str, datetime] = {}
self.poe_command_queue: dict[str, dict[int, str]] = {}
self._cancel_poe_command: CALLBACK_TYPE | None = None
@callback
@staticmethod
def get_hub(hass: HomeAssistant, config_entry: ConfigEntry) -> UnifiHub:
@ -61,6 +52,28 @@ class UnifiHub:
"""Websocket connection state."""
return self.websocket.available
@property
def signal_heartbeat_missed(self) -> str:
"""Event to signal new heartbeat missed."""
return self._entity_helper.signal_heartbeat
@callback
def update_heartbeat(self, unique_id: str, heartbeat_expire_time: datetime) -> None:
"""Update device time in heartbeat monitor."""
self._entity_helper.update_heartbeat(unique_id, heartbeat_expire_time)
@callback
def remove_heartbeat(self, unique_id: str) -> None:
"""Update device time in heartbeat monitor."""
self._entity_helper.remove_heartbeat(unique_id)
@callback
def queue_poe_port_command(
self, device_id: str, port_idx: int, poe_mode: str
) -> None:
"""Queue commands to execute them together per device."""
self._entity_helper.queue_poe_port_command(device_id, port_idx, poe_mode)
@property
def signal_reachable(self) -> str:
"""Integration specific event to signal a change in connection status."""
@ -71,77 +84,16 @@ class UnifiHub:
"""Event specific per UniFi entry to signal new options."""
return f"unifi-options-{self.config.entry.entry_id}"
@property
def signal_heartbeat_missed(self) -> str:
"""Event specific per UniFi device tracker to signal new heartbeat missed."""
return "unifi-heartbeat-missed"
async def initialize(self) -> None:
"""Set up a UniFi Network instance."""
await self.entity_loader.initialize()
self._entity_helper.initialize()
assert self.config.entry.unique_id is not None
self.is_admin = self.api.sites[self.config.entry.unique_id].role == "admin"
self.config.entry.add_update_listener(self.async_config_entry_updated)
self._cancel_heartbeat_check = async_track_time_interval(
self.hass, self._async_check_for_stale, CHECK_HEARTBEAT_INTERVAL
)
@callback
def async_heartbeat(
self, unique_id: str, heartbeat_expire_time: datetime | None = None
) -> None:
"""Signal when a device has fresh home state."""
if heartbeat_expire_time is not None:
self._heartbeat_time[unique_id] = heartbeat_expire_time
return
if unique_id in self._heartbeat_time:
del self._heartbeat_time[unique_id]
@callback
def _async_check_for_stale(self, *_: datetime) -> None:
"""Check for any devices scheduled to be marked disconnected."""
now = dt_util.utcnow()
unique_ids_to_remove = []
for unique_id, heartbeat_expire_time in self._heartbeat_time.items():
if now > heartbeat_expire_time:
async_dispatcher_send(
self.hass, f"{self.signal_heartbeat_missed}_{unique_id}"
)
unique_ids_to_remove.append(unique_id)
for unique_id in unique_ids_to_remove:
del self._heartbeat_time[unique_id]
@callback
def async_queue_poe_port_command(
self, device_id: str, port_idx: int, poe_mode: str
) -> None:
"""Queue commands to execute them together per device."""
if self._cancel_poe_command:
self._cancel_poe_command()
self._cancel_poe_command = None
device_queue = self.poe_command_queue.setdefault(device_id, {})
device_queue[port_idx] = poe_mode
async def async_execute_command(now: datetime) -> None:
"""Execute previously queued commands."""
queue = self.poe_command_queue.copy()
self.poe_command_queue.clear()
for device_id, device_commands in queue.items():
device = self.api.devices[device_id]
commands = list(device_commands.items())
await self.api.request(
DeviceSetPoePortModeRequest.create(device, targets=commands)
)
self._cancel_poe_command = async_call_later(self.hass, 5, async_execute_command)
@property
def device_info(self) -> DeviceInfo:
"""UniFi Network device info."""
@ -205,12 +157,6 @@ class UnifiHub:
if not unload_ok:
return False
if self._cancel_heartbeat_check:
self._cancel_heartbeat_check()
self._cancel_heartbeat_check = None
if self._cancel_poe_command:
self._cancel_poe_command()
self._cancel_poe_command = None
self._entity_helper.reset()
return True

View File

@ -460,7 +460,7 @@ class UnifiSensorEntity(UnifiEntity[HandlerT, ApiItemT], SensorEntity):
if description.is_connected_fn is not None:
# Send heartbeat if client is connected
if description.is_connected_fn(self.hub, self._obj_id):
self.hub.async_heartbeat(
self.hub.update_heartbeat(
self._attr_unique_id,
dt_util.utcnow() + self.hub.config.option_detection_time,
)
@ -485,4 +485,4 @@ class UnifiSensorEntity(UnifiEntity[HandlerT, ApiItemT], SensorEntity):
if self.entity_description.is_connected_fn is not None:
# Remove heartbeat registration
self.hub.async_heartbeat(self._attr_unique_id)
self.hub.remove_heartbeat(self._attr_unique_id)

View File

@ -147,7 +147,7 @@ async def async_poe_port_control_fn(hub: UnifiHub, obj_id: str, target: bool) ->
port = hub.api.ports[obj_id]
on_state = "auto" if port.raw["poe_caps"] != 8 else "passthrough"
state = on_state if target else "off"
hub.async_queue_poe_port_command(mac, int(index), state)
hub.queue_poe_port_command(mac, int(index), state)
async def async_port_forward_control_fn(