mirror of https://github.com/home-assistant/core
Add webhook support to onvif (#91485)
This commit is contained in:
parent
a491859875
commit
3beb6e9718
|
@ -24,7 +24,7 @@ async def async_setup_entry(
|
|||
async_add_entities: AddEntitiesCallback,
|
||||
) -> None:
|
||||
"""Set up a ONVIF binary sensor."""
|
||||
device = hass.data[DOMAIN][config_entry.unique_id]
|
||||
device: ONVIFDevice = hass.data[DOMAIN][config_entry.unique_id]
|
||||
|
||||
entities = {
|
||||
event.uid: ONVIFBinarySensor(event.uid, device)
|
||||
|
@ -39,16 +39,20 @@ async def async_setup_entry(
|
|||
)
|
||||
|
||||
async_add_entities(entities.values())
|
||||
uids_by_platform = device.events.get_uids_by_platform("binary_sensor")
|
||||
|
||||
@callback
|
||||
def async_check_entities():
|
||||
def async_check_entities() -> None:
|
||||
"""Check if we have added an entity for the event."""
|
||||
new_entities = []
|
||||
for event in device.events.get_platform("binary_sensor"):
|
||||
if event.uid not in entities:
|
||||
entities[event.uid] = ONVIFBinarySensor(event.uid, device)
|
||||
new_entities.append(entities[event.uid])
|
||||
async_add_entities(new_entities)
|
||||
nonlocal uids_by_platform
|
||||
if not (missing := uids_by_platform.difference(entities)):
|
||||
return
|
||||
new_entities: dict[str, ONVIFBinarySensor] = {
|
||||
uid: ONVIFBinarySensor(uid, device) for uid in missing
|
||||
}
|
||||
if new_entities:
|
||||
entities.update(new_entities)
|
||||
async_add_entities(new_entities.values())
|
||||
|
||||
device.events.async_add_listener(async_check_entities)
|
||||
|
||||
|
|
|
@ -101,7 +101,7 @@ class ONVIFDevice:
|
|||
|
||||
# Create event manager
|
||||
assert self.config_entry.unique_id
|
||||
self.events = EventManager(self.hass, self.device, self.config_entry.unique_id)
|
||||
self.events = EventManager(self.hass, self.device, self.config_entry, self.name)
|
||||
|
||||
# Fetch basic device info and capabilities
|
||||
self.info = await self.async_get_device_info()
|
||||
|
@ -159,10 +159,10 @@ class ONVIFDevice:
|
|||
|
||||
async def async_check_date_and_time(self) -> None:
|
||||
"""Warns if device and system date not synced."""
|
||||
LOGGER.debug("Setting up the ONVIF device management service")
|
||||
LOGGER.debug("%s: Setting up the ONVIF device management service", self.name)
|
||||
device_mgmt = self.device.create_devicemgmt_service()
|
||||
|
||||
LOGGER.debug("Retrieving current device date/time")
|
||||
LOGGER.debug("%s: Retrieving current device date/time", self.name)
|
||||
try:
|
||||
system_date = dt_util.utcnow()
|
||||
device_time = await device_mgmt.GetSystemDateAndTime()
|
||||
|
@ -174,7 +174,7 @@ class ONVIFDevice:
|
|||
)
|
||||
return
|
||||
|
||||
LOGGER.debug("Device time: %s", device_time)
|
||||
LOGGER.debug("%s: Device time: %s", self.name, device_time)
|
||||
|
||||
tzone = dt_util.DEFAULT_TIME_ZONE
|
||||
cdate = device_time.LocalDateTime
|
||||
|
@ -185,7 +185,9 @@ class ONVIFDevice:
|
|||
tzone = dt_util.get_time_zone(device_time.TimeZone.TZ) or tzone
|
||||
|
||||
if cdate is None:
|
||||
LOGGER.warning("Could not retrieve date/time on this camera")
|
||||
LOGGER.warning(
|
||||
"%s: Could not retrieve date/time on this camera", self.name
|
||||
)
|
||||
else:
|
||||
cam_date = dt.datetime(
|
||||
cdate.Date.Year,
|
||||
|
@ -201,7 +203,8 @@ class ONVIFDevice:
|
|||
cam_date_utc = cam_date.astimezone(dt_util.UTC)
|
||||
|
||||
LOGGER.debug(
|
||||
"Device date/time: %s | System date/time: %s",
|
||||
"%s: Device date/time: %s | System date/time: %s",
|
||||
self.name,
|
||||
cam_date_utc,
|
||||
system_date,
|
||||
)
|
||||
|
@ -266,10 +269,6 @@ class ONVIFDevice:
|
|||
media_capabilities = await media_service.GetServiceCapabilities()
|
||||
snapshot = media_capabilities and media_capabilities.SnapshotUri
|
||||
|
||||
pullpoint = False
|
||||
with suppress(ONVIFError, Fault, RequestError, XMLParseError):
|
||||
pullpoint = await self.events.async_start()
|
||||
|
||||
ptz = False
|
||||
with suppress(ONVIFError, Fault, RequestError):
|
||||
self.device.get_definition("ptz")
|
||||
|
@ -280,7 +279,11 @@ class ONVIFDevice:
|
|||
self.device.create_imaging_service()
|
||||
imaging = True
|
||||
|
||||
return Capabilities(snapshot, pullpoint, ptz, imaging)
|
||||
events = False
|
||||
with suppress(ONVIFError, Fault, RequestError, XMLParseError):
|
||||
events = await self.events.async_start()
|
||||
|
||||
return Capabilities(snapshot, events, ptz, imaging)
|
||||
|
||||
async def async_get_profiles(self) -> list[Profile]:
|
||||
"""Obtain media profiles for this device."""
|
||||
|
|
|
@ -28,5 +28,9 @@ async def async_get_config_entry_diagnostics(
|
|||
"capabilities": asdict(device.capabilities),
|
||||
"profiles": [asdict(profile) for profile in device.profiles],
|
||||
}
|
||||
data["events"] = {
|
||||
"webhook_manager_state": device.events.webhook_manager.state,
|
||||
"pullpoint_manager_state": device.events.pullpoint_manager.state,
|
||||
}
|
||||
|
||||
return data
|
||||
|
|
|
@ -5,24 +5,57 @@ import asyncio
|
|||
from collections.abc import Callable
|
||||
from contextlib import suppress
|
||||
import datetime as dt
|
||||
from logging import DEBUG, WARNING
|
||||
|
||||
from httpx import RemoteProtocolError, TransportError
|
||||
from aiohttp.web import Request
|
||||
from httpx import RemoteProtocolError, RequestError, TransportError
|
||||
from onvif import ONVIFCamera, ONVIFService
|
||||
from onvif.client import NotificationManager
|
||||
from onvif.exceptions import ONVIFError
|
||||
from zeep.exceptions import Fault, XMLParseError
|
||||
|
||||
from homeassistant.core import CALLBACK_TYPE, CoreState, HomeAssistant, callback
|
||||
from homeassistant.components import webhook
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.core import (
|
||||
CALLBACK_TYPE,
|
||||
CoreState,
|
||||
HassJob,
|
||||
HomeAssistant,
|
||||
callback,
|
||||
)
|
||||
from homeassistant.helpers.event import async_call_later
|
||||
from homeassistant.util import dt as dt_util
|
||||
from homeassistant.helpers.network import NoURLAvailableError, get_url
|
||||
|
||||
from .const import LOGGER
|
||||
from .models import Event
|
||||
from .const import DOMAIN, LOGGER
|
||||
from .models import Event, PullPointManagerState, WebHookManagerState
|
||||
from .parsers import PARSERS
|
||||
|
||||
UNHANDLED_TOPICS: set[str] = set()
|
||||
|
||||
SUBSCRIPTION_ERRORS = (Fault, asyncio.TimeoutError, TransportError)
|
||||
CREATE_ERRORS = (ONVIFError, Fault, RequestError, XMLParseError)
|
||||
SET_SYNCHRONIZATION_POINT_ERRORS = (*SUBSCRIPTION_ERRORS, TypeError)
|
||||
UNSUBSCRIBE_ERRORS = (XMLParseError, *SUBSCRIPTION_ERRORS)
|
||||
RENEW_ERRORS = (ONVIFError, RequestError, XMLParseError, *SUBSCRIPTION_ERRORS)
|
||||
#
|
||||
# We only keep the subscription alive for 3 minutes, and will keep
|
||||
# renewing it every 1.5 minutes. This is to avoid the camera
|
||||
# accumulating subscriptions which will be impossible to clean up
|
||||
# since ONVIF does not provide a way to list existing subscriptions.
|
||||
#
|
||||
# If we max out the number of subscriptions, the camera will stop
|
||||
# sending events to us, and we will not be able to recover until
|
||||
# the subscriptions expire or the camera is rebooted.
|
||||
#
|
||||
SUBSCRIPTION_TIME = dt.timedelta(minutes=3)
|
||||
SUBSCRIPTION_RELATIVE_TIME = (
|
||||
"PT3M" # use relative time since the time on the camera is not reliable
|
||||
)
|
||||
SUBSCRIPTION_RENEW_INTERVAL = SUBSCRIPTION_TIME.total_seconds() / 2
|
||||
SUBSCRIPTION_RENEW_INTERVAL_ON_ERROR = 60.0
|
||||
|
||||
PULLPOINT_POLL_TIME = dt.timedelta(seconds=60)
|
||||
PULLPOINT_MESSAGE_LIMIT = 100
|
||||
PULLPOINT_COOLDOWN_TIME = 0.75
|
||||
|
||||
|
||||
def _stringify_onvif_error(error: Exception) -> str:
|
||||
|
@ -32,45 +65,49 @@ def _stringify_onvif_error(error: Exception) -> str:
|
|||
return str(error)
|
||||
|
||||
|
||||
def _get_next_termination_time() -> str:
|
||||
"""Get next termination time."""
|
||||
return (
|
||||
(dt_util.utcnow() + dt.timedelta(days=1))
|
||||
.isoformat(timespec="seconds")
|
||||
.replace("+00:00", "Z")
|
||||
)
|
||||
|
||||
|
||||
class EventManager:
|
||||
"""ONVIF Event Manager."""
|
||||
|
||||
def __init__(
|
||||
self, hass: HomeAssistant, device: ONVIFCamera, unique_id: str
|
||||
self,
|
||||
hass: HomeAssistant,
|
||||
device: ONVIFCamera,
|
||||
config_entry: ConfigEntry,
|
||||
name: str,
|
||||
) -> None:
|
||||
"""Initialize event manager."""
|
||||
self.hass: HomeAssistant = hass
|
||||
self.device: ONVIFCamera = device
|
||||
self.unique_id: str = unique_id
|
||||
self.started: bool = False
|
||||
self.hass = hass
|
||||
self.device = device
|
||||
self.config_entry = config_entry
|
||||
self.unique_id = config_entry.unique_id
|
||||
self.name = name
|
||||
|
||||
self._subscription: ONVIFService = None
|
||||
self.webhook_manager = WebHookManager(self)
|
||||
self.pullpoint_manager = PullPointManager(self)
|
||||
|
||||
self._uid_by_platform: dict[str, set[str]] = {}
|
||||
self._events: dict[str, Event] = {}
|
||||
self._listeners: list[CALLBACK_TYPE] = []
|
||||
self._unsub_refresh: CALLBACK_TYPE | None = None
|
||||
|
||||
super().__init__()
|
||||
|
||||
@property
|
||||
def platforms(self) -> set[str]:
|
||||
"""Return platforms to setup."""
|
||||
return {event.platform for event in self._events.values()}
|
||||
def started(self) -> bool:
|
||||
"""Return True if event manager is started."""
|
||||
return (
|
||||
self.webhook_manager.state == WebHookManagerState.STARTED
|
||||
or self.pullpoint_manager.state == PullPointManagerState.STARTED
|
||||
)
|
||||
|
||||
@property
|
||||
def has_listeners(self) -> bool:
|
||||
"""Return if there are listeners."""
|
||||
return bool(self._listeners)
|
||||
|
||||
@callback
|
||||
def async_add_listener(self, update_callback: CALLBACK_TYPE) -> Callable[[], None]:
|
||||
"""Listen for data updates."""
|
||||
# This is the first listener, set up polling.
|
||||
if not self._listeners:
|
||||
self.async_schedule_pull()
|
||||
self.pullpoint_manager.async_schedule_pull_messages()
|
||||
|
||||
self._listeners.append(update_callback)
|
||||
|
||||
|
@ -87,160 +124,33 @@ class EventManager:
|
|||
if update_callback in self._listeners:
|
||||
self._listeners.remove(update_callback)
|
||||
|
||||
if not self._listeners and self._unsub_refresh:
|
||||
self._unsub_refresh()
|
||||
self._unsub_refresh = None
|
||||
if not self._listeners:
|
||||
self.pullpoint_manager.async_cancel_pull_messages()
|
||||
|
||||
async def async_start(self) -> bool:
|
||||
"""Start polling events."""
|
||||
if not await self.device.create_pullpoint_subscription(
|
||||
{"InitialTerminationTime": _get_next_termination_time()}
|
||||
):
|
||||
return False
|
||||
|
||||
# Create subscription manager
|
||||
self._subscription = self.device.create_subscription_service(
|
||||
"PullPointSubscription"
|
||||
)
|
||||
|
||||
# Renew immediately
|
||||
await self.async_renew()
|
||||
|
||||
# Initialize events
|
||||
pullpoint = self.device.create_pullpoint_service()
|
||||
with suppress(*SET_SYNCHRONIZATION_POINT_ERRORS):
|
||||
await pullpoint.SetSynchronizationPoint()
|
||||
response = await pullpoint.PullMessages(
|
||||
{"MessageLimit": 100, "Timeout": dt.timedelta(seconds=5)}
|
||||
)
|
||||
|
||||
# Parse event initialization
|
||||
await self.async_parse_messages(response.NotificationMessage)
|
||||
|
||||
self.started = True
|
||||
return True
|
||||
# Always start pull point first, since it will populate the event list
|
||||
event_via_pull_point = await self.pullpoint_manager.async_start()
|
||||
events_via_webhook = await self.webhook_manager.async_start()
|
||||
return events_via_webhook or event_via_pull_point
|
||||
|
||||
async def async_stop(self) -> None:
|
||||
"""Unsubscribe from events."""
|
||||
self._listeners = []
|
||||
self.started = False
|
||||
await self.pullpoint_manager.async_stop()
|
||||
await self.webhook_manager.async_stop()
|
||||
|
||||
if not self._subscription:
|
||||
return
|
||||
|
||||
with suppress(*SUBSCRIPTION_ERRORS):
|
||||
await self._subscription.Unsubscribe()
|
||||
self._subscription = None
|
||||
|
||||
async def async_restart(self, _now: dt.datetime | None = None) -> None:
|
||||
"""Restart the subscription assuming the camera rebooted."""
|
||||
if not self.started:
|
||||
return
|
||||
|
||||
if self._subscription:
|
||||
# Suppressed. The subscription may no longer exist.
|
||||
try:
|
||||
await self._subscription.Unsubscribe()
|
||||
except (XMLParseError, *SUBSCRIPTION_ERRORS) as err:
|
||||
LOGGER.debug(
|
||||
(
|
||||
"Failed to unsubscribe ONVIF PullPoint subscription for '%s';"
|
||||
" This is normal if the device restarted: %s"
|
||||
),
|
||||
self.unique_id,
|
||||
err,
|
||||
)
|
||||
self._subscription = None
|
||||
|
||||
try:
|
||||
restarted = await self.async_start()
|
||||
except (XMLParseError, *SUBSCRIPTION_ERRORS) as err:
|
||||
restarted = False
|
||||
# Device may not support subscriptions so log at debug level
|
||||
# when we get an XMLParseError
|
||||
LOGGER.log(
|
||||
DEBUG if isinstance(err, XMLParseError) else WARNING,
|
||||
(
|
||||
"Failed to restart ONVIF PullPoint subscription for '%s'; "
|
||||
"Retrying later: %s"
|
||||
),
|
||||
self.unique_id,
|
||||
_stringify_onvif_error(err),
|
||||
)
|
||||
|
||||
if not restarted:
|
||||
# Try again in a minute
|
||||
self._unsub_refresh = async_call_later(self.hass, 60, self.async_restart)
|
||||
elif self._listeners:
|
||||
LOGGER.debug(
|
||||
"Restarted ONVIF PullPoint subscription for '%s'", self.unique_id
|
||||
)
|
||||
self.async_schedule_pull()
|
||||
|
||||
async def async_renew(self) -> None:
|
||||
"""Renew subscription."""
|
||||
if not self._subscription:
|
||||
return
|
||||
|
||||
with suppress(*SUBSCRIPTION_ERRORS):
|
||||
# The first time we renew, we may get a Fault error so we
|
||||
# suppress it. The subscription will be restarted in
|
||||
# async_restart later.
|
||||
await self._subscription.Renew(_get_next_termination_time())
|
||||
|
||||
def async_schedule_pull(self) -> None:
|
||||
"""Schedule async_pull_messages to run."""
|
||||
self._unsub_refresh = async_call_later(self.hass, 1, self.async_pull_messages)
|
||||
|
||||
async def async_pull_messages(self, _now: dt.datetime | None = None) -> None:
|
||||
"""Pull messages from device."""
|
||||
if self.hass.state == CoreState.running:
|
||||
try:
|
||||
pullpoint = self.device.create_pullpoint_service()
|
||||
response = await pullpoint.PullMessages(
|
||||
{"MessageLimit": 100, "Timeout": dt.timedelta(seconds=60)}
|
||||
)
|
||||
|
||||
# Renew subscription if less than two hours is left
|
||||
if (
|
||||
dt_util.as_utc(response.TerminationTime) - dt_util.utcnow()
|
||||
).total_seconds() < 7200:
|
||||
await self.async_renew()
|
||||
except RemoteProtocolError:
|
||||
# Likely a shutdown event, nothing to see here
|
||||
return
|
||||
except (XMLParseError, *SUBSCRIPTION_ERRORS) as err:
|
||||
# Device may not support subscriptions so log at debug level
|
||||
# when we get an XMLParseError
|
||||
LOGGER.log(
|
||||
DEBUG if isinstance(err, XMLParseError) else WARNING,
|
||||
(
|
||||
"Failed to fetch ONVIF PullPoint subscription messages for"
|
||||
" '%s': %s"
|
||||
),
|
||||
self.unique_id,
|
||||
_stringify_onvif_error(err),
|
||||
)
|
||||
# Treat errors as if the camera restarted. Assume that the pullpoint
|
||||
# subscription is no longer valid.
|
||||
self._unsub_refresh = None
|
||||
await self.async_restart()
|
||||
return
|
||||
|
||||
# Parse response
|
||||
await self.async_parse_messages(response.NotificationMessage)
|
||||
|
||||
# Update entities
|
||||
for update_callback in self._listeners:
|
||||
update_callback()
|
||||
|
||||
# Reschedule another pull
|
||||
if self._listeners:
|
||||
self.async_schedule_pull()
|
||||
@callback
|
||||
def async_callback_listeners(self) -> None:
|
||||
"""Update listeners."""
|
||||
for update_callback in self._listeners:
|
||||
update_callback()
|
||||
|
||||
# pylint: disable=protected-access
|
||||
async def async_parse_messages(self, messages) -> None:
|
||||
"""Parse notification message."""
|
||||
unique_id = self.unique_id
|
||||
assert unique_id is not None
|
||||
for msg in messages:
|
||||
# Guard against empty message
|
||||
if not msg.Topic:
|
||||
|
@ -250,25 +160,640 @@ class EventManager:
|
|||
if not (parser := PARSERS.get(topic)):
|
||||
if topic not in UNHANDLED_TOPICS:
|
||||
LOGGER.info(
|
||||
"No registered handler for event from %s: %s",
|
||||
self.unique_id,
|
||||
"%s: No registered handler for event from %s: %s",
|
||||
self.name,
|
||||
unique_id,
|
||||
msg,
|
||||
)
|
||||
UNHANDLED_TOPICS.add(topic)
|
||||
continue
|
||||
|
||||
event = await parser(self.unique_id, msg)
|
||||
event = await parser(unique_id, msg)
|
||||
|
||||
if not event:
|
||||
LOGGER.info("Unable to parse event from %s: %s", self.unique_id, msg)
|
||||
LOGGER.info(
|
||||
"%s: Unable to parse event from %s: %s", self.name, unique_id, msg
|
||||
)
|
||||
return
|
||||
|
||||
self.get_uids_by_platform(event.platform).add(event.uid)
|
||||
self._events[event.uid] = event
|
||||
|
||||
def get_uid(self, uid) -> Event | None:
|
||||
def get_uid(self, uid: str) -> Event | None:
|
||||
"""Retrieve event for given id."""
|
||||
return self._events.get(uid)
|
||||
|
||||
def get_platform(self, platform) -> list[Event]:
|
||||
"""Retrieve events for given platform."""
|
||||
return [event for event in self._events.values() if event.platform == platform]
|
||||
|
||||
def get_uids_by_platform(self, platform: str) -> set[str]:
|
||||
"""Retrieve uids for a given platform."""
|
||||
if (possible_uids := self._uid_by_platform.get(platform)) is None:
|
||||
uids: set[str] = set()
|
||||
self._uid_by_platform[platform] = uids
|
||||
return uids
|
||||
return possible_uids
|
||||
|
||||
@callback
|
||||
def async_webhook_failed(self) -> None:
|
||||
"""Mark webhook as failed."""
|
||||
if self.pullpoint_manager.state != PullPointManagerState.PAUSED:
|
||||
return
|
||||
LOGGER.debug("%s: Switching to PullPoint for events", self.name)
|
||||
self.pullpoint_manager.async_resume()
|
||||
|
||||
@callback
|
||||
def async_webhook_working(self) -> None:
|
||||
"""Mark webhook as working."""
|
||||
if self.pullpoint_manager.state != PullPointManagerState.STARTED:
|
||||
return
|
||||
LOGGER.debug("%s: Switching to webhook for events", self.name)
|
||||
self.pullpoint_manager.async_pause()
|
||||
|
||||
|
||||
class PullPointManager:
|
||||
"""ONVIF PullPoint Manager.
|
||||
|
||||
If the camera supports webhooks and the webhook is reachable, the pullpoint
|
||||
manager will keep the pull point subscription alive, but will not poll for
|
||||
messages unless the webhook fails.
|
||||
"""
|
||||
|
||||
def __init__(self, event_manager: EventManager) -> None:
|
||||
"""Initialize pullpoint manager."""
|
||||
self.state = PullPointManagerState.STOPPED
|
||||
|
||||
self._event_manager = event_manager
|
||||
self._device = event_manager.device
|
||||
self._hass = event_manager.hass
|
||||
self._name = event_manager.name
|
||||
|
||||
self._pullpoint_subscription: ONVIFService = None
|
||||
self._pullpoint_service: ONVIFService = None
|
||||
self._pull_lock: asyncio.Lock = asyncio.Lock()
|
||||
|
||||
self._cancel_pull_messages: CALLBACK_TYPE | None = None
|
||||
self._cancel_pullpoint_renew: CALLBACK_TYPE | None = None
|
||||
|
||||
self._renew_lock: asyncio.Lock = asyncio.Lock()
|
||||
self._renew_or_restart_job = HassJob(
|
||||
self._async_renew_or_restart_pullpoint,
|
||||
f"{self._name}: renew or restart pullpoint",
|
||||
)
|
||||
self._pull_messages_job = HassJob(
|
||||
self._async_background_pull_messages,
|
||||
f"{self._name}: pull messages",
|
||||
)
|
||||
|
||||
async def async_start(self) -> bool:
|
||||
"""Start pullpoint subscription."""
|
||||
assert (
|
||||
self.state == PullPointManagerState.STOPPED
|
||||
), "PullPoint manager already started"
|
||||
LOGGER.debug("%s: Starting PullPoint manager", self._name)
|
||||
if not await self._async_start_pullpoint():
|
||||
self.state = PullPointManagerState.FAILED
|
||||
return False
|
||||
self.state = PullPointManagerState.STARTED
|
||||
return True
|
||||
|
||||
@callback
|
||||
def async_pause(self) -> None:
|
||||
"""Pause pullpoint subscription."""
|
||||
LOGGER.debug("%s: Pausing PullPoint manager", self._name)
|
||||
self.state = PullPointManagerState.PAUSED
|
||||
self._hass.async_create_task(self._async_cancel_and_unsubscribe())
|
||||
|
||||
@callback
|
||||
def async_resume(self) -> None:
|
||||
"""Resume pullpoint subscription."""
|
||||
LOGGER.debug("%s: Resuming PullPoint manager", self._name)
|
||||
self.state = PullPointManagerState.STARTED
|
||||
self.async_schedule_pullpoint_renew(0.0)
|
||||
|
||||
@callback
|
||||
def async_schedule_pullpoint_renew(self, delay: float) -> None:
|
||||
"""Schedule PullPoint subscription renewal."""
|
||||
self._async_cancel_pullpoint_renew()
|
||||
self._cancel_pullpoint_renew = async_call_later(
|
||||
self._hass,
|
||||
delay,
|
||||
self._renew_or_restart_job,
|
||||
)
|
||||
|
||||
@callback
|
||||
def async_cancel_pull_messages(self) -> None:
|
||||
"""Cancel the PullPoint task."""
|
||||
if self._cancel_pull_messages:
|
||||
self._cancel_pull_messages()
|
||||
self._cancel_pull_messages = None
|
||||
|
||||
@callback
|
||||
def async_schedule_pull_messages(self, delay: float | None = None) -> None:
|
||||
"""Schedule async_pull_messages to run.
|
||||
|
||||
Used as fallback when webhook is not working.
|
||||
|
||||
Must not check if the webhook is working.
|
||||
"""
|
||||
self.async_cancel_pull_messages()
|
||||
if self.state != PullPointManagerState.STARTED:
|
||||
return
|
||||
if self._pullpoint_service:
|
||||
when = delay if delay is not None else PULLPOINT_COOLDOWN_TIME
|
||||
self._cancel_pull_messages = async_call_later(
|
||||
self._hass, when, self._pull_messages_job
|
||||
)
|
||||
|
||||
async def async_stop(self) -> None:
|
||||
"""Unsubscribe from PullPoint and cancel callbacks."""
|
||||
self.state = PullPointManagerState.STOPPED
|
||||
await self._async_cancel_and_unsubscribe()
|
||||
|
||||
async def _async_start_pullpoint(self) -> bool:
|
||||
"""Start pullpoint subscription."""
|
||||
try:
|
||||
try:
|
||||
started = await self._async_create_pullpoint_subscription()
|
||||
except RemoteProtocolError:
|
||||
# http://datatracker.ietf.org/doc/html/rfc2616#section-8.1.4 allows the server
|
||||
# to close the connection at any time, we treat this as a normal and try again
|
||||
# once since we do not want to declare the camera as not supporting PullPoint
|
||||
# if it just happened to close the connection at the wrong time.
|
||||
started = await self._async_create_pullpoint_subscription()
|
||||
except CREATE_ERRORS as err:
|
||||
LOGGER.debug(
|
||||
"%s: Device does not support PullPoint service or has too many subscriptions: %s",
|
||||
self._name,
|
||||
_stringify_onvif_error(err),
|
||||
)
|
||||
return False
|
||||
|
||||
if started:
|
||||
self.async_schedule_pullpoint_renew(SUBSCRIPTION_RENEW_INTERVAL)
|
||||
|
||||
return started
|
||||
|
||||
async def _async_cancel_and_unsubscribe(self) -> None:
|
||||
"""Cancel and unsubscribe from PullPoint."""
|
||||
self._async_cancel_pullpoint_renew()
|
||||
self.async_cancel_pull_messages()
|
||||
await self._async_unsubscribe_pullpoint()
|
||||
|
||||
async def _async_renew_or_restart_pullpoint(
|
||||
self, now: dt.datetime | None = None
|
||||
) -> None:
|
||||
"""Renew or start pullpoint subscription."""
|
||||
if self._hass.is_stopping or self.state != PullPointManagerState.STARTED:
|
||||
return
|
||||
if self._renew_lock.locked():
|
||||
LOGGER.debug("%s: PullPoint renew already in progress", self._name)
|
||||
# Renew is already running, another one will be
|
||||
# scheduled when the current one is done if needed.
|
||||
return
|
||||
async with self._renew_lock:
|
||||
next_attempt = SUBSCRIPTION_RENEW_INTERVAL_ON_ERROR
|
||||
try:
|
||||
if (
|
||||
await self._async_renew_pullpoint()
|
||||
or await self._async_restart_pullpoint()
|
||||
):
|
||||
next_attempt = SUBSCRIPTION_RENEW_INTERVAL
|
||||
finally:
|
||||
self.async_schedule_pullpoint_renew(next_attempt)
|
||||
|
||||
async def _async_create_pullpoint_subscription(self) -> bool:
|
||||
"""Create pullpoint subscription."""
|
||||
|
||||
if not await self._device.create_pullpoint_subscription(
|
||||
{"InitialTerminationTime": SUBSCRIPTION_RELATIVE_TIME}
|
||||
):
|
||||
LOGGER.debug("%s: Failed to create PullPoint subscription", self._name)
|
||||
return False
|
||||
|
||||
# Create subscription manager
|
||||
self._pullpoint_subscription = self._device.create_subscription_service(
|
||||
"PullPointSubscription"
|
||||
)
|
||||
|
||||
# Create the service that will be used to pull messages from the device.
|
||||
self._pullpoint_service = self._device.create_pullpoint_service()
|
||||
|
||||
# Initialize events
|
||||
with suppress(*SET_SYNCHRONIZATION_POINT_ERRORS):
|
||||
sync_result = await self._pullpoint_service.SetSynchronizationPoint()
|
||||
LOGGER.debug("%s: SetSynchronizationPoint: %s", self._name, sync_result)
|
||||
|
||||
# Always schedule an initial pull messages
|
||||
self.async_schedule_pull_messages(0.0)
|
||||
|
||||
return True
|
||||
|
||||
@callback
|
||||
def _async_cancel_pullpoint_renew(self) -> None:
|
||||
"""Cancel the pullpoint renew task."""
|
||||
if self._cancel_pullpoint_renew:
|
||||
self._cancel_pullpoint_renew()
|
||||
self._cancel_pullpoint_renew = None
|
||||
|
||||
async def _async_restart_pullpoint(self) -> bool:
|
||||
"""Restart the subscription assuming the camera rebooted."""
|
||||
self.async_cancel_pull_messages()
|
||||
await self._async_unsubscribe_pullpoint()
|
||||
restarted = await self._async_start_pullpoint()
|
||||
if restarted and self._event_manager.has_listeners:
|
||||
LOGGER.debug("%s: Restarted PullPoint subscription", self._name)
|
||||
self.async_schedule_pull_messages(0.0)
|
||||
return restarted
|
||||
|
||||
async def _async_unsubscribe_pullpoint(self) -> None:
|
||||
"""Unsubscribe the pullpoint subscription."""
|
||||
if not self._pullpoint_subscription:
|
||||
return
|
||||
LOGGER.debug("%s: Unsubscribing from PullPoint", self._name)
|
||||
try:
|
||||
await self._pullpoint_subscription.Unsubscribe()
|
||||
except UNSUBSCRIBE_ERRORS as err:
|
||||
LOGGER.debug(
|
||||
(
|
||||
"%s: Failed to unsubscribe PullPoint subscription;"
|
||||
" This is normal if the device restarted: %s"
|
||||
),
|
||||
self._name,
|
||||
_stringify_onvif_error(err),
|
||||
)
|
||||
self._pullpoint_subscription = None
|
||||
|
||||
async def _async_renew_pullpoint(self) -> bool:
|
||||
"""Renew the PullPoint subscription."""
|
||||
if not self._pullpoint_subscription:
|
||||
return False
|
||||
try:
|
||||
# The first time we renew, we may get a Fault error so we
|
||||
# suppress it. The subscription will be restarted in
|
||||
# async_restart later.
|
||||
await self._pullpoint_subscription.Renew(SUBSCRIPTION_RELATIVE_TIME)
|
||||
LOGGER.debug("%s: Renewed PullPoint subscription", self._name)
|
||||
return True
|
||||
except RENEW_ERRORS as err:
|
||||
LOGGER.debug(
|
||||
"%s: Failed to renew PullPoint subscription; %s",
|
||||
self._name,
|
||||
_stringify_onvif_error(err),
|
||||
)
|
||||
return False
|
||||
|
||||
async def _async_pull_messages_with_lock(self) -> bool:
|
||||
"""Pull messages from device while holding the lock.
|
||||
|
||||
This function must not be called directly, it should only
|
||||
be called from _async_pull_messages.
|
||||
|
||||
Returns True if the subscription is working.
|
||||
|
||||
Returns False if the subscription is not working and should be restarted.
|
||||
"""
|
||||
assert self._pull_lock.locked(), "Pull lock must be held"
|
||||
assert self._pullpoint_service is not None, "PullPoint service does not exist"
|
||||
event_manager = self._event_manager
|
||||
LOGGER.debug(
|
||||
"%s: Pulling PullPoint messages timeout=%s limit=%s",
|
||||
self._name,
|
||||
PULLPOINT_POLL_TIME,
|
||||
PULLPOINT_MESSAGE_LIMIT,
|
||||
)
|
||||
try:
|
||||
response = await self._pullpoint_service.PullMessages(
|
||||
{
|
||||
"MessageLimit": PULLPOINT_MESSAGE_LIMIT,
|
||||
"Timeout": PULLPOINT_POLL_TIME,
|
||||
}
|
||||
)
|
||||
except RemoteProtocolError as err:
|
||||
# Either a shutdown event or the camera closed the connection. Because
|
||||
# http://datatracker.ietf.org/doc/html/rfc2616#section-8.1.4 allows the server
|
||||
# to close the connection at any time, we treat this as a normal. Some
|
||||
# cameras may close the connection if there are no messages to pull.
|
||||
LOGGER.debug(
|
||||
"%s: PullPoint subscription encountered a remote protocol error "
|
||||
"(this is normal for some cameras): %s",
|
||||
self._name,
|
||||
_stringify_onvif_error(err),
|
||||
)
|
||||
return True
|
||||
except (XMLParseError, *SUBSCRIPTION_ERRORS) as err:
|
||||
# Device may not support subscriptions so log at debug level
|
||||
# when we get an XMLParseError
|
||||
LOGGER.debug(
|
||||
"%s: Failed to fetch PullPoint subscription messages: %s",
|
||||
self._name,
|
||||
_stringify_onvif_error(err),
|
||||
)
|
||||
# Treat errors as if the camera restarted. Assume that the pullpoint
|
||||
# subscription is no longer valid.
|
||||
return False
|
||||
|
||||
if self.state != PullPointManagerState.STARTED:
|
||||
# If the webhook became started working during the long poll,
|
||||
# and we got paused, our data is stale and we should not process it.
|
||||
LOGGER.debug(
|
||||
"%s: PullPoint is paused (likely due to working webhook), skipping PullPoint messages",
|
||||
self._name,
|
||||
)
|
||||
return True
|
||||
|
||||
# Parse response
|
||||
if (notification_message := response.NotificationMessage) and (
|
||||
number_of_events := len(notification_message)
|
||||
):
|
||||
LOGGER.debug(
|
||||
"%s: continuous PullMessages: %s event(s)",
|
||||
self._name,
|
||||
number_of_events,
|
||||
)
|
||||
await event_manager.async_parse_messages(notification_message)
|
||||
event_manager.async_callback_listeners()
|
||||
else:
|
||||
LOGGER.debug("%s: continuous PullMessages: no events", self._name)
|
||||
|
||||
return True
|
||||
|
||||
@callback
|
||||
def _async_background_pull_messages(self, _now: dt.datetime | None = None) -> None:
|
||||
"""Pull messages from device in the background."""
|
||||
self._cancel_pull_messages = None
|
||||
self._hass.async_create_background_task(
|
||||
self._async_pull_messages(),
|
||||
f"{self._name} background pull messages",
|
||||
)
|
||||
|
||||
async def _async_pull_messages(self) -> None:
|
||||
"""Pull messages from device."""
|
||||
event_manager = self._event_manager
|
||||
|
||||
if self._pull_lock.locked():
|
||||
# Pull messages if the lock is not already locked
|
||||
# any pull will do, so we don't need to wait for the lock
|
||||
LOGGER.debug(
|
||||
"%s: PullPoint subscription is already locked, skipping pull",
|
||||
self._name,
|
||||
)
|
||||
return
|
||||
|
||||
async with self._pull_lock:
|
||||
# Before we pop out of the lock we always need to schedule the next pull
|
||||
# or call async_schedule_pullpoint_renew if the pull fails so the pull
|
||||
# loop continues.
|
||||
try:
|
||||
if self._hass.state == CoreState.running:
|
||||
if not await self._async_pull_messages_with_lock():
|
||||
self.async_schedule_pullpoint_renew(0.0)
|
||||
return
|
||||
finally:
|
||||
if event_manager.has_listeners:
|
||||
self.async_schedule_pull_messages()
|
||||
|
||||
|
||||
class WebHookManager:
|
||||
"""Manage ONVIF webhook subscriptions.
|
||||
|
||||
If the camera supports webhooks, we will use that instead of
|
||||
pullpoint subscriptions as soon as we detect that the camera
|
||||
can reach our webhook.
|
||||
"""
|
||||
|
||||
def __init__(self, event_manager: EventManager) -> None:
|
||||
"""Initialize webhook manager."""
|
||||
self.state = WebHookManagerState.STOPPED
|
||||
|
||||
self._event_manager = event_manager
|
||||
self._device = event_manager.device
|
||||
self._hass = event_manager.hass
|
||||
self._webhook_unique_id = f"{DOMAIN}_{event_manager.config_entry.entry_id}"
|
||||
self._name = event_manager.name
|
||||
|
||||
self._webhook_url: str | None = None
|
||||
|
||||
self._webhook_subscription: ONVIFService | None = None
|
||||
self._notification_manager: NotificationManager | None = None
|
||||
|
||||
self._cancel_webhook_renew: CALLBACK_TYPE | None = None
|
||||
self._renew_lock = asyncio.Lock()
|
||||
self._renew_or_restart_job = HassJob(
|
||||
self._async_renew_or_restart_webhook,
|
||||
f"{self._name}: renew or restart webhook",
|
||||
)
|
||||
|
||||
async def async_start(self) -> bool:
|
||||
"""Start polling events."""
|
||||
LOGGER.debug("%s: Starting webhook manager", self._name)
|
||||
assert (
|
||||
self.state == WebHookManagerState.STOPPED
|
||||
), "Webhook manager already started"
|
||||
assert self._webhook_url is None, "Webhook already registered"
|
||||
self._async_register_webhook()
|
||||
if not await self._async_start_webhook():
|
||||
self.state = WebHookManagerState.FAILED
|
||||
return False
|
||||
self.state = WebHookManagerState.STARTED
|
||||
return True
|
||||
|
||||
async def async_stop(self) -> None:
|
||||
"""Unsubscribe from events."""
|
||||
self.state = WebHookManagerState.STOPPED
|
||||
self._async_cancel_webhook_renew()
|
||||
await self._async_unsubscribe_webhook()
|
||||
self._async_unregister_webhook()
|
||||
|
||||
@callback
|
||||
def _async_schedule_webhook_renew(self, delay: float) -> None:
|
||||
"""Schedule webhook subscription renewal."""
|
||||
self._async_cancel_webhook_renew()
|
||||
self._cancel_webhook_renew = async_call_later(
|
||||
self._hass,
|
||||
delay,
|
||||
self._renew_or_restart_job,
|
||||
)
|
||||
|
||||
async def _async_create_webhook_subscription(self) -> None:
|
||||
"""Create webhook subscription."""
|
||||
LOGGER.debug("%s: Creating webhook subscription", self._name)
|
||||
self._notification_manager = self._device.create_notification_manager(
|
||||
{
|
||||
"InitialTerminationTime": SUBSCRIPTION_RELATIVE_TIME,
|
||||
"ConsumerReference": {"Address": self._webhook_url},
|
||||
}
|
||||
)
|
||||
self._webhook_subscription = await self._notification_manager.setup()
|
||||
await self._notification_manager.start()
|
||||
LOGGER.debug("%s: Webhook subscription created", self._name)
|
||||
|
||||
async def _async_start_webhook(self) -> bool:
|
||||
"""Start webhook."""
|
||||
try:
|
||||
try:
|
||||
await self._async_create_webhook_subscription()
|
||||
except RemoteProtocolError:
|
||||
# http://datatracker.ietf.org/doc/html/rfc2616#section-8.1.4 allows the server
|
||||
# to close the connection at any time, we treat this as a normal and try again
|
||||
# once since we do not want to declare the camera as not supporting webhooks
|
||||
# if it just happened to close the connection at the wrong time.
|
||||
await self._async_create_webhook_subscription()
|
||||
except CREATE_ERRORS as err:
|
||||
self._event_manager.async_webhook_failed()
|
||||
LOGGER.debug(
|
||||
"%s: Device does not support notification service or too many subscriptions: %s",
|
||||
self._name,
|
||||
_stringify_onvif_error(err),
|
||||
)
|
||||
return False
|
||||
|
||||
self._async_schedule_webhook_renew(SUBSCRIPTION_RENEW_INTERVAL)
|
||||
return True
|
||||
|
||||
async def _async_restart_webhook(self) -> bool:
|
||||
"""Restart the webhook subscription assuming the camera rebooted."""
|
||||
await self._async_unsubscribe_webhook()
|
||||
return await self._async_start_webhook()
|
||||
|
||||
async def _async_renew_webhook(self) -> bool:
|
||||
"""Renew webhook subscription."""
|
||||
if not self._webhook_subscription:
|
||||
return False
|
||||
try:
|
||||
await self._webhook_subscription.Renew(SUBSCRIPTION_RELATIVE_TIME)
|
||||
LOGGER.debug("%s: Renewed Webhook subscription", self._name)
|
||||
return True
|
||||
except RENEW_ERRORS as err:
|
||||
LOGGER.debug(
|
||||
"%s: Failed to renew webhook subscription %s",
|
||||
self._name,
|
||||
_stringify_onvif_error(err),
|
||||
)
|
||||
return False
|
||||
|
||||
async def _async_renew_or_restart_webhook(
|
||||
self, now: dt.datetime | None = None
|
||||
) -> None:
|
||||
"""Renew or start webhook subscription."""
|
||||
if self._hass.is_stopping or self.state != WebHookManagerState.STARTED:
|
||||
return
|
||||
if self._renew_lock.locked():
|
||||
LOGGER.debug("%s: Webhook renew already in progress", self._name)
|
||||
# Renew is already running, another one will be
|
||||
# scheduled when the current one is done if needed.
|
||||
return
|
||||
async with self._renew_lock:
|
||||
next_attempt = SUBSCRIPTION_RENEW_INTERVAL_ON_ERROR
|
||||
try:
|
||||
if (
|
||||
await self._async_renew_webhook()
|
||||
or await self._async_restart_webhook()
|
||||
):
|
||||
next_attempt = SUBSCRIPTION_RENEW_INTERVAL
|
||||
finally:
|
||||
self._async_schedule_webhook_renew(next_attempt)
|
||||
|
||||
@callback
|
||||
def _async_register_webhook(self) -> None:
|
||||
"""Register the webhook for motion events."""
|
||||
LOGGER.debug("%s: Registering webhook: %s", self._name, self._webhook_unique_id)
|
||||
|
||||
try:
|
||||
base_url = get_url(self._hass, prefer_external=False)
|
||||
except NoURLAvailableError:
|
||||
try:
|
||||
base_url = get_url(self._hass, prefer_external=True)
|
||||
except NoURLAvailableError:
|
||||
return
|
||||
|
||||
webhook_id = self._webhook_unique_id
|
||||
webhook.async_register(
|
||||
self._hass, DOMAIN, webhook_id, webhook_id, self._async_handle_webhook
|
||||
)
|
||||
webhook_path = webhook.async_generate_path(webhook_id)
|
||||
self._webhook_url = f"{base_url}{webhook_path}"
|
||||
LOGGER.debug("%s: Registered webhook: %s", self._name, webhook_id)
|
||||
|
||||
@callback
|
||||
def _async_unregister_webhook(self):
|
||||
"""Unregister the webhook for motion events."""
|
||||
LOGGER.debug(
|
||||
"%s: Unregistering webhook %s", self._name, self._webhook_unique_id
|
||||
)
|
||||
webhook.async_unregister(self._hass, self._webhook_unique_id)
|
||||
self._webhook_url = None
|
||||
|
||||
async def _async_handle_webhook(
|
||||
self, hass: HomeAssistant, webhook_id: str, request: Request
|
||||
) -> None:
|
||||
"""Handle incoming webhook."""
|
||||
content: bytes | None = None
|
||||
try:
|
||||
content = await request.read()
|
||||
except ConnectionResetError as ex:
|
||||
LOGGER.error("Error reading webhook: %s", ex)
|
||||
return
|
||||
except asyncio.CancelledError as ex:
|
||||
LOGGER.error("Error reading webhook: %s", ex)
|
||||
raise
|
||||
finally:
|
||||
self._hass.async_create_background_task(
|
||||
self._async_process_webhook(hass, webhook_id, content),
|
||||
f"ONVIF event webhook for {self._name}",
|
||||
)
|
||||
|
||||
async def _async_process_webhook(
|
||||
self, hass: HomeAssistant, webhook_id: str, content: bytes | None
|
||||
) -> None:
|
||||
"""Process incoming webhook data in the background."""
|
||||
event_manager = self._event_manager
|
||||
if content is None:
|
||||
# webhook is marked as not working as something
|
||||
# went wrong. We will mark it as working again
|
||||
# when we receive a valid notification.
|
||||
event_manager.async_webhook_failed()
|
||||
return
|
||||
if not self._notification_manager:
|
||||
LOGGER.debug(
|
||||
"%s: Received webhook before notification manager is setup", self._name
|
||||
)
|
||||
return
|
||||
if not (result := self._notification_manager.process(content)):
|
||||
LOGGER.debug("%s: Failed to process webhook %s", self._name, webhook_id)
|
||||
return
|
||||
LOGGER.debug(
|
||||
"%s: Processed webhook %s with %s event(s)",
|
||||
self._name,
|
||||
webhook_id,
|
||||
len(result.NotificationMessage),
|
||||
)
|
||||
event_manager.async_webhook_working()
|
||||
await event_manager.async_parse_messages(result.NotificationMessage)
|
||||
event_manager.async_callback_listeners()
|
||||
|
||||
@callback
|
||||
def _async_cancel_webhook_renew(self) -> None:
|
||||
"""Cancel the webhook renew task."""
|
||||
if self._cancel_webhook_renew:
|
||||
self._cancel_webhook_renew()
|
||||
self._cancel_webhook_renew = None
|
||||
|
||||
async def _async_unsubscribe_webhook(self) -> None:
|
||||
"""Unsubscribe from the webhook."""
|
||||
if not self._webhook_subscription:
|
||||
return
|
||||
LOGGER.debug("%s: Unsubscribing from webhook", self._name)
|
||||
try:
|
||||
await self._webhook_subscription.Unsubscribe()
|
||||
except UNSUBSCRIBE_ERRORS as err:
|
||||
LOGGER.debug(
|
||||
(
|
||||
"%s: Failed to unsubscribe webhook subscription;"
|
||||
" This is normal if the device restarted: %s"
|
||||
),
|
||||
self._name,
|
||||
_stringify_onvif_error(err),
|
||||
)
|
||||
self._webhook_subscription = None
|
||||
|
|
|
@ -8,5 +8,5 @@
|
|||
"documentation": "https://www.home-assistant.io/integrations/onvif",
|
||||
"iot_class": "local_push",
|
||||
"loggers": ["onvif", "wsdiscovery", "zeep"],
|
||||
"requirements": ["onvif-zeep-async==1.2.11", "WSDiscovery==2.0.0"]
|
||||
"requirements": ["onvif-zeep-async==1.3.0", "WSDiscovery==2.0.0"]
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
from homeassistant.const import EntityCategory
|
||||
|
@ -78,3 +79,20 @@ class Event:
|
|||
value: Any = None
|
||||
entity_category: EntityCategory | None = None
|
||||
entity_enabled: bool = True
|
||||
|
||||
|
||||
class PullPointManagerState(Enum):
|
||||
"""States for the pullpoint manager."""
|
||||
|
||||
STOPPED = 0 # Not running or not supported
|
||||
STARTED = 1 # Running and renewing
|
||||
PAUSED = 2 # Switched to webhook, but can resume
|
||||
FAILED = 3 # Failed to do initial subscription
|
||||
|
||||
|
||||
class WebHookManagerState(Enum):
|
||||
"""States for the webhook manager."""
|
||||
|
||||
STOPPED = 0
|
||||
STARTED = 1
|
||||
FAILED = 2 # Failed to do initial subscription
|
||||
|
|
|
@ -23,7 +23,7 @@ async def async_setup_entry(
|
|||
async_add_entities: AddEntitiesCallback,
|
||||
) -> None:
|
||||
"""Set up a ONVIF binary sensor."""
|
||||
device = hass.data[DOMAIN][config_entry.unique_id]
|
||||
device: ONVIFDevice = hass.data[DOMAIN][config_entry.unique_id]
|
||||
|
||||
entities = {
|
||||
event.uid: ONVIFSensor(event.uid, device)
|
||||
|
@ -36,16 +36,20 @@ async def async_setup_entry(
|
|||
entities[entry.unique_id] = ONVIFSensor(entry.unique_id, device, entry)
|
||||
|
||||
async_add_entities(entities.values())
|
||||
uids_by_platform = device.events.get_uids_by_platform("sensor")
|
||||
|
||||
@callback
|
||||
def async_check_entities():
|
||||
def async_check_entities() -> None:
|
||||
"""Check if we have added an entity for the event."""
|
||||
new_entities = []
|
||||
for event in device.events.get_platform("sensor"):
|
||||
if event.uid not in entities:
|
||||
entities[event.uid] = ONVIFSensor(event.uid, device)
|
||||
new_entities.append(entities[event.uid])
|
||||
async_add_entities(new_entities)
|
||||
nonlocal uids_by_platform
|
||||
if not (missing := uids_by_platform.difference(entities)):
|
||||
return
|
||||
new_entities: dict[str, ONVIFSensor] = {
|
||||
uid: ONVIFSensor(uid, device) for uid in missing
|
||||
}
|
||||
if new_entities:
|
||||
entities.update(new_entities)
|
||||
async_add_entities(new_entities.values())
|
||||
|
||||
device.events.async_add_listener(async_check_entities)
|
||||
|
||||
|
@ -84,6 +88,7 @@ class ONVIFSensor(ONVIFBaseEntity, RestoreSensor):
|
|||
@property
|
||||
def native_value(self) -> StateType | date | datetime | Decimal:
|
||||
"""Return the value reported by the sensor."""
|
||||
assert self._attr_unique_id is not None
|
||||
if (event := self.device.events.get_uid(self._attr_unique_id)) is not None:
|
||||
return event.value
|
||||
return self._attr_native_value
|
||||
|
|
|
@ -1261,7 +1261,7 @@ ondilo==0.2.0
|
|||
onkyo-eiscp==1.2.7
|
||||
|
||||
# homeassistant.components.onvif
|
||||
onvif-zeep-async==1.2.11
|
||||
onvif-zeep-async==1.3.0
|
||||
|
||||
# homeassistant.components.opengarage
|
||||
open-garage==0.2.0
|
||||
|
|
|
@ -942,7 +942,7 @@ omnilogic==0.4.5
|
|||
ondilo==0.2.0
|
||||
|
||||
# homeassistant.components.onvif
|
||||
onvif-zeep-async==1.2.11
|
||||
onvif-zeep-async==1.3.0
|
||||
|
||||
# homeassistant.components.opengarage
|
||||
open-garage==0.2.0
|
||||
|
|
|
@ -10,8 +10,10 @@ from homeassistant.components.onvif.models import (
|
|||
Capabilities,
|
||||
DeviceInfo,
|
||||
Profile,
|
||||
PullPointManagerState,
|
||||
Resolution,
|
||||
Video,
|
||||
WebHookManagerState,
|
||||
)
|
||||
from homeassistant.const import HTTP_DIGEST_AUTHENTICATION
|
||||
|
||||
|
@ -111,6 +113,10 @@ def setup_mock_device(mock_device):
|
|||
video_source_token=None,
|
||||
)
|
||||
mock_device.profiles = [profile1]
|
||||
mock_device.events = MagicMock(
|
||||
webhook_manager=MagicMock(state=WebHookManagerState.STARTED),
|
||||
pullpoint_manager=MagicMock(state=PullPointManagerState.PAUSED),
|
||||
)
|
||||
|
||||
def mock_constructor(hass, config):
|
||||
"""Fake the controller constructor."""
|
||||
|
|
|
@ -72,4 +72,14 @@ async def test_diagnostics(
|
|||
}
|
||||
],
|
||||
},
|
||||
"events": {
|
||||
"pullpoint_manager_state": {
|
||||
"__type": "<enum " "'PullPointManagerState'>",
|
||||
"repr": "<PullPointManagerState.PAUSED: " "2>",
|
||||
},
|
||||
"webhook_manager_state": {
|
||||
"__type": "<enum 'WebHookManagerState'>",
|
||||
"repr": "<WebHookManagerState.STARTED: " "1>",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue