Split out event handling from Axis hub (#113837)

* Split out event handling from Axis hub

* Improve test coverage

* Mark internal methods with '_'

* Rename to event source
This commit is contained in:
Robert Svensson 2024-04-24 20:47:22 +02:00 committed by GitHub
parent 4b53471b60
commit f8c38fad00
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 126 additions and 68 deletions

View File

@ -0,0 +1,93 @@
"""Axis network device abstraction."""
from __future__ import annotations
import axis
from axis.errors import Unauthorized
from axis.interfaces.mqtt import mqtt_json_to_event
from axis.models.mqtt import ClientState
from axis.stream_manager import Signal, State
from homeassistant.components import mqtt
from homeassistant.components.mqtt import DOMAIN as MQTT_DOMAIN
from homeassistant.components.mqtt.models import ReceiveMessage
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.setup import async_when_setup
class AxisEventSource:
"""Manage connection to event sources from an Axis device."""
def __init__(
self, hass: HomeAssistant, config_entry: ConfigEntry, api: axis.AxisDevice
) -> None:
"""Initialize the device."""
self.hass = hass
self.config_entry = config_entry
self.api = api
self.signal_reachable = f"axis_reachable_{config_entry.entry_id}"
self.available = True
@callback
def setup(self) -> None:
"""Set up the device events."""
self.api.stream.connection_status_callback.append(self._connection_status_cb)
self.api.enable_events()
self.api.stream.start()
if self.api.vapix.mqtt.supported:
async_when_setup(self.hass, MQTT_DOMAIN, self._async_use_mqtt)
@callback
def teardown(self) -> None:
"""Tear down connections."""
self._disconnect_from_stream()
@callback
def _disconnect_from_stream(self) -> None:
"""Stop stream."""
if self.api.stream.state != State.STOPPED:
self.api.stream.connection_status_callback.clear()
self.api.stream.stop()
async def _async_use_mqtt(self, hass: HomeAssistant, component: str) -> None:
"""Set up to use MQTT."""
try:
status = await self.api.vapix.mqtt.get_client_status()
except Unauthorized:
# This means the user has too low privileges
return
if status.status.state == ClientState.ACTIVE:
self.config_entry.async_on_unload(
await mqtt.async_subscribe(
hass, f"{status.config.device_topic_prefix}/#", self._mqtt_message
)
)
@callback
def _mqtt_message(self, message: ReceiveMessage) -> None:
"""Receive Axis MQTT message."""
self._disconnect_from_stream()
if message.topic.endswith("event/connection"):
return
event = mqtt_json_to_event(message.payload)
self.api.event.handler(event)
@callback
def _connection_status_cb(self, status: Signal) -> None:
"""Handle signals of device connection status.
This is called on every RTSP keep-alive message.
Only signal state change if state change is true.
"""
if self.available != (status == Signal.PLAYING):
self.available = not self.available
async_dispatcher_send(self.hass, self.signal_reachable)

View File

@ -5,24 +5,17 @@ from __future__ import annotations
from typing import Any
import axis
from axis.errors import Unauthorized
from axis.interfaces.mqtt import mqtt_json_to_event
from axis.models.mqtt import ClientState
from axis.stream_manager import Signal, State
from homeassistant.components import mqtt
from homeassistant.components.mqtt import DOMAIN as MQTT_DOMAIN
from homeassistant.components.mqtt.models import ReceiveMessage
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import Event, HomeAssistant, callback
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC, format_mac
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.setup import async_when_setup
from ..const import ATTR_MANUFACTURER, DOMAIN as AXIS_DOMAIN
from .config import AxisConfig
from .entity_loader import AxisEntityLoader
from .event_source import AxisEventSource
class AxisHub:
@ -35,9 +28,9 @@ class AxisHub:
self.hass = hass
self.config = AxisConfig.from_config_entry(config_entry)
self.entity_loader = AxisEntityLoader(self)
self.event_source = AxisEventSource(hass, config_entry, api)
self.api = api
self.available = True
self.fw_version = api.vapix.firmware_version
self.product_type = api.vapix.product_type
self.unique_id = format_mac(api.vapix.serial_number)
@ -51,32 +44,23 @@ class AxisHub:
hub: AxisHub = hass.data[AXIS_DOMAIN][config_entry.entry_id]
return hub
@property
def available(self) -> bool:
"""Connection state to the device."""
return self.event_source.available
# Signals
@property
def signal_reachable(self) -> str:
"""Device specific event to signal a change in connection status."""
return f"axis_reachable_{self.config.entry.entry_id}"
return self.event_source.signal_reachable
@property
def signal_new_address(self) -> str:
"""Device specific event to signal a change in device address."""
return f"axis_new_address_{self.config.entry.entry_id}"
# Callbacks
@callback
def connection_status_callback(self, status: Signal) -> None:
"""Handle signals of device connection status.
This is called on every RTSP keep-alive message.
Only signal state change if state change is true.
"""
if self.available != (status == Signal.PLAYING):
self.available = not self.available
async_dispatcher_send(self.hass, self.signal_reachable)
@staticmethod
async def async_new_address_callback(
hass: HomeAssistant, config_entry: ConfigEntry
@ -89,6 +73,7 @@ class AxisHub:
"""
hub = AxisHub.get_hub(hass, config_entry)
hub.config = AxisConfig.from_config_entry(config_entry)
hub.event_source.config_entry = config_entry
hub.api.config.host = hub.config.host
async_dispatcher_send(hass, hub.signal_new_address)
@ -106,57 +91,19 @@ class AxisHub:
sw_version=self.fw_version,
)
async def async_use_mqtt(self, hass: HomeAssistant, component: str) -> None:
"""Set up to use MQTT."""
try:
status = await self.api.vapix.mqtt.get_client_status()
except Unauthorized:
# This means the user has too low privileges
return
if status.status.state == ClientState.ACTIVE:
self.config.entry.async_on_unload(
await mqtt.async_subscribe(
hass, f"{status.config.device_topic_prefix}/#", self.mqtt_message
)
)
@callback
def mqtt_message(self, message: ReceiveMessage) -> None:
"""Receive Axis MQTT message."""
self.disconnect_from_stream()
if message.topic.endswith("event/connection"):
return
event = mqtt_json_to_event(message.payload)
self.api.event.handler(event)
# Setup and teardown methods
@callback
def setup(self) -> None:
"""Set up the device events."""
self.entity_loader.initialize_platforms()
self.api.stream.connection_status_callback.append(
self.connection_status_callback
)
self.api.enable_events()
self.api.stream.start()
if self.api.vapix.mqtt.supported:
async_when_setup(self.hass, MQTT_DOMAIN, self.async_use_mqtt)
@callback
def disconnect_from_stream(self) -> None:
"""Stop stream."""
if self.api.stream.state != State.STOPPED:
self.api.stream.connection_status_callback.clear()
self.api.stream.stop()
self.event_source.setup()
async def shutdown(self, event: Event) -> None:
"""Stop the event stream."""
self.disconnect_from_stream()
self.event_source.teardown()
@callback
def teardown(self) -> None:
"""Reset this device to default state."""
self.disconnect_from_stream()
self.event_source.teardown()

View File

@ -114,6 +114,7 @@ def default_request_fixture(
port_management_payload: dict[str, Any],
param_properties_payload: dict[str, Any],
param_ports_payload: dict[str, Any],
mqtt_status_code: int,
) -> Callable[[str], None]:
"""Mock default Vapix requests responses."""
@ -131,7 +132,7 @@ def default_request_fixture(
json=port_management_payload,
)
respx.post("/axis-cgi/mqtt/client.cgi").respond(
json=MQTT_CLIENT_RESPONSE,
json=MQTT_CLIENT_RESPONSE, status_code=mqtt_status_code
)
respx.post("/axis-cgi/streamprofile.cgi").respond(
json=STREAM_PROFILES_RESPONSE,
@ -239,6 +240,12 @@ def param_ports_data_fixture() -> dict[str, Any]:
return PORTS_RESPONSE
@pytest.fixture(name="mqtt_status_code")
def mqtt_status_code_fixture():
"""Property parameter data."""
return 200
@pytest.fixture(name="setup_default_vapix_requests")
def default_vapix_requests_fixture(mock_vapix_requests: Callable[[str], None]) -> None:
"""Mock default Vapix requests responses."""

View File

@ -2,7 +2,7 @@
from ipaddress import ip_address
from unittest import mock
from unittest.mock import Mock, patch
from unittest.mock import Mock, call, patch
import axis as axislib
import pytest
@ -91,7 +91,8 @@ async def test_device_support_mqtt(
hass: HomeAssistant, mqtt_mock: MqttMockHAClient, setup_config_entry
) -> None:
"""Successful setup."""
mqtt_mock.async_subscribe.assert_called_with(f"axis/{MAC}/#", mock.ANY, 0, "utf-8")
mqtt_call = call(f"axis/{MAC}/#", mock.ANY, 0, "utf-8")
assert mqtt_call in mqtt_mock.async_subscribe.call_args_list
topic = f"axis/{MAC}/event/tns:onvif/Device/tns:axis/Sensor/PIR/$source/sensor/0"
message = (
@ -109,6 +110,16 @@ async def test_device_support_mqtt(
assert pir.name == f"{NAME} PIR 0"
@pytest.mark.parametrize("api_discovery_items", [API_DISCOVERY_MQTT])
@pytest.mark.parametrize("mqtt_status_code", [401])
async def test_device_support_mqtt_low_privilege(
hass: HomeAssistant, mqtt_mock: MqttMockHAClient, setup_config_entry
) -> None:
"""Successful setup."""
mqtt_call = call(f"{MAC}/#", mock.ANY, 0, "utf-8")
assert mqtt_call not in mqtt_mock.async_subscribe.call_args_list
async def test_update_address(
hass: HomeAssistant, setup_config_entry, mock_vapix_requests
) -> None: