Migrate amcrest integration to new async API (#56294)

This commit is contained in:
Sean Vig 2022-01-22 20:49:45 -05:00 committed by GitHub
parent a9cefec1db
commit 7781e308cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 216 additions and 208 deletions

View File

@ -1,16 +1,17 @@
"""Support for Amcrest IP cameras."""
from __future__ import annotations
from collections.abc import Callable
from contextlib import suppress
import asyncio
from collections.abc import AsyncIterator, Callable
from contextlib import asynccontextmanager, suppress
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
import threading
from typing import Any
import aiohttp
from amcrest import AmcrestError, ApiWrapper, LoginError
import httpx
import voluptuous as vol
from homeassistant.auth.models import User
@ -36,8 +37,8 @@ from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.exceptions import Unauthorized, UnknownUser
from homeassistant.helpers import discovery
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_send, dispatcher_send
from homeassistant.helpers.event import track_time_interval
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.service import async_extract_entity_ids
from homeassistant.helpers.typing import ConfigType
@ -143,9 +144,9 @@ class AmcrestChecker(ApiWrapper):
self._hass = hass
self._wrap_name = name
self._wrap_errors = 0
self._wrap_lock = threading.Lock()
self._wrap_lock = asyncio.Lock()
self._wrap_login_err = False
self._wrap_event_flag = threading.Event()
self._wrap_event_flag = asyncio.Event()
self._wrap_event_flag.set()
self._unsub_recheck: Callable[[], None] | None = None
super().__init__(
@ -163,23 +164,40 @@ class AmcrestChecker(ApiWrapper):
return self._wrap_errors <= MAX_ERRORS and not self._wrap_login_err
@property
def available_flag(self) -> threading.Event:
"""Return threading event flag that indicates if camera's API is responding."""
def available_flag(self) -> asyncio.Event:
"""Return event flag that indicates if camera's API is responding."""
return self._wrap_event_flag
def _start_recovery(self) -> None:
self._wrap_event_flag.clear()
dispatcher_send(self._hass, service_signal(SERVICE_UPDATE, self._wrap_name))
self._unsub_recheck = track_time_interval(
async_dispatcher_send(
self._hass, service_signal(SERVICE_UPDATE, self._wrap_name)
)
self._unsub_recheck = async_track_time_interval(
self._hass, self._wrap_test_online, RECHECK_INTERVAL
)
def command(self, *args: Any, **kwargs: Any) -> Any:
async def async_command(self, *args: Any, **kwargs: Any) -> httpx.Response:
"""amcrest.ApiWrapper.command wrapper to catch errors."""
async with self._command_wrapper():
ret = await super().async_command(*args, **kwargs)
return ret
@asynccontextmanager
async def async_stream_command(
self, *args: Any, **kwargs: Any
) -> AsyncIterator[httpx.Response]:
"""amcrest.ApiWrapper.command wrapper to catch errors."""
async with self._command_wrapper():
async with super().async_stream_command(*args, **kwargs) as ret:
yield ret
@asynccontextmanager
async def _command_wrapper(self) -> AsyncIterator[None]:
try:
ret = super().command(*args, **kwargs)
yield
except LoginError as ex:
with self._wrap_lock:
async with self._wrap_lock:
was_online = self.available
was_login_err = self._wrap_login_err
self._wrap_login_err = True
@ -189,7 +207,7 @@ class AmcrestChecker(ApiWrapper):
self._start_recovery()
raise
except AmcrestError:
with self._wrap_lock:
async with self._wrap_lock:
was_online = self.available
errs = self._wrap_errors = self._wrap_errors + 1
offline = not self.available
@ -198,7 +216,7 @@ class AmcrestChecker(ApiWrapper):
_LOGGER.error("%s camera offline: Too many errors", self._wrap_name)
self._start_recovery()
raise
with self._wrap_lock:
async with self._wrap_lock:
was_offline = not self.available
self._wrap_errors = 0
self._wrap_login_err = False
@ -208,28 +226,29 @@ class AmcrestChecker(ApiWrapper):
self._unsub_recheck = None
_LOGGER.error("%s camera back online", self._wrap_name)
self._wrap_event_flag.set()
dispatcher_send(self._hass, service_signal(SERVICE_UPDATE, self._wrap_name))
return ret
async_dispatcher_send(
self._hass, service_signal(SERVICE_UPDATE, self._wrap_name)
)
def _wrap_test_online(self, now: datetime) -> None:
async def _wrap_test_online(self, now: datetime) -> None:
"""Test if camera is back online."""
_LOGGER.debug("Testing if %s back online", self._wrap_name)
with suppress(AmcrestError):
self.current_time # pylint: disable=pointless-statement
await self.async_current_time
def _monitor_events(
async def _monitor_events(
hass: HomeAssistant,
name: str,
api: AmcrestChecker,
event_codes: set[str],
) -> None:
while True:
api.available_flag.wait()
await api.available_flag.wait()
try:
for code, payload in api.event_actions("All", retries=5):
async for code, payload in api.async_event_actions("All"):
event_data = {"camera": name, "event": code, "payload": payload}
hass.bus.fire("amcrest", event_data)
hass.bus.async_fire("amcrest", event_data)
if code in event_codes:
signal = service_signal(SERVICE_EVENT, name, code)
start = any(
@ -237,29 +256,14 @@ def _monitor_events(
for key, val in payload.items()
)
_LOGGER.debug("Sending signal: '%s': %s", signal, start)
dispatcher_send(hass, signal, start)
async_dispatcher_send(hass, signal, start)
except AmcrestError as error:
_LOGGER.warning(
"Error while processing events from %s camera: %r", name, error
)
def _start_event_monitor(
hass: HomeAssistant,
name: str,
api: AmcrestChecker,
event_codes: set[str],
) -> None:
thread = threading.Thread(
target=_monitor_events,
name=f"Amcrest {name}",
args=(hass, name, api, event_codes),
daemon=True,
)
thread.start()
def setup(hass: HomeAssistant, config: ConfigType) -> bool:
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Amcrest IP Camera component."""
hass.data.setdefault(DATA_AMCREST, {DEVICES: {}, CAMERAS: []})
@ -298,13 +302,13 @@ def setup(hass: HomeAssistant, config: ConfigType) -> bool:
control_light,
)
discovery.load_platform(
await discovery.async_load_platform(
hass, Platform.CAMERA, DOMAIN, {CONF_NAME: name}, config
)
event_codes = set()
if binary_sensors:
discovery.load_platform(
await discovery.async_load_platform(
hass,
Platform.BINARY_SENSOR,
DOMAIN,
@ -319,10 +323,10 @@ def setup(hass: HomeAssistant, config: ConfigType) -> bool:
and sensor.event_code is not None
}
_start_event_monitor(hass, name, api, event_codes)
asyncio.create_task(_monitor_events(hass, name, api, event_codes))
if sensors:
discovery.load_platform(
await discovery.async_load_platform(
hass,
Platform.SENSOR,
DOMAIN,
@ -331,7 +335,7 @@ def setup(hass: HomeAssistant, config: ConfigType) -> bool:
)
if switches:
discovery.load_platform(
await discovery.async_load_platform(
hass,
Platform.SWITCH,
DOMAIN,
@ -384,7 +388,7 @@ def setup(hass: HomeAssistant, config: ConfigType) -> bool:
async_dispatcher_send(hass, service_signal(call.service, entity_id), *args)
for service, params in CAMERA_SERVICES.items():
hass.services.register(DOMAIN, service, async_service_handler, params[0])
hass.services.async_register(DOMAIN, service, async_service_handler, params[0])
return True

View File

@ -1,7 +1,6 @@
"""Support for Amcrest IP camera binary sensors."""
from __future__ import annotations
from collections.abc import Callable
from contextlib import suppress
from dataclasses import dataclass
from datetime import timedelta
@ -173,42 +172,41 @@ class AmcrestBinarySensor(BinarySensorEntity):
self._attr_name = f"{name} {entity_description.name}"
self._attr_should_poll = entity_description.should_poll
self._unsub_dispatcher: list[Callable[[], None]] = []
@property
def available(self) -> bool:
"""Return True if entity is available."""
return self.entity_description.key == _ONLINE_KEY or self._api.available
def update(self) -> None:
async def async_update(self) -> None:
"""Update entity."""
if self.entity_description.key == _ONLINE_KEY:
self._update_online()
await self._async_update_online()
else:
self._update_others()
await self._async_update_others()
@Throttle(_ONLINE_SCAN_INTERVAL)
def _update_online(self) -> None:
async def _async_update_online(self) -> None:
if not (self._api.available or self.is_on):
return
_LOGGER.debug(_UPDATE_MSG, self.name)
if self._api.available:
# Send a command to the camera to test if we can still communicate with it.
# Override of Http.command() in __init__.py will set self._api.available
# Override of Http.async_command() in __init__.py will set self._api.available
# accordingly.
with suppress(AmcrestError):
self._api.current_time # pylint: disable=pointless-statement
self._update_unique_id()
await self._api.async_current_time
await self._async_update_unique_id()
self._attr_is_on = self._api.available
def _update_others(self) -> None:
async def _async_update_others(self) -> None:
if not self.available:
return
_LOGGER.debug(_UPDATE_MSG, self.name)
try:
self._update_unique_id()
await self._async_update_unique_id()
except AmcrestError as error:
log_update_error(_LOGGER, "update", self.name, "binary sensor", error)
return
@ -218,26 +216,28 @@ class AmcrestBinarySensor(BinarySensorEntity):
return
try:
self._attr_is_on = len(self._api.event_channels_happened(event_code)) > 0
self._attr_is_on = (
len(await self._api.async_event_channels_happened(event_code)) > 0
)
except AmcrestError as error:
log_update_error(_LOGGER, "update", self.name, "binary sensor", error)
return
def _update_unique_id(self) -> None:
async def _async_update_unique_id(self) -> None:
"""Set the unique id."""
if self._attr_unique_id is None and (serial_number := self._api.serial_number):
if self._attr_unique_id is None and (
serial_number := await self._api.async_serial_number
):
self._attr_unique_id = (
f"{serial_number}-{self.entity_description.key}-{self._channel}"
)
async def async_on_demand_update(self) -> None:
@callback
def async_on_demand_update_online(self) -> None:
"""Update state."""
if self.entity_description.key == _ONLINE_KEY:
_LOGGER.debug(_UPDATE_MSG, self.name)
self._attr_is_on = self._api.available
self.async_write_ha_state()
else:
self.async_schedule_update_ha_state(True)
_LOGGER.debug(_UPDATE_MSG, self.name)
self._attr_is_on = self._api.available
self.async_write_ha_state()
@callback
def async_event_received(self, state: bool) -> None:
@ -248,18 +248,28 @@ class AmcrestBinarySensor(BinarySensorEntity):
async def async_added_to_hass(self) -> None:
"""Subscribe to signals."""
self._unsub_dispatcher.append(
async_dispatcher_connect(
self.hass,
service_signal(SERVICE_UPDATE, self._signal_name),
self.async_on_demand_update,
if self.entity_description.key == _ONLINE_KEY:
self.async_on_remove(
async_dispatcher_connect(
self.hass,
service_signal(SERVICE_UPDATE, self._signal_name),
self.async_on_demand_update_online,
)
)
)
else:
self.async_on_remove(
async_dispatcher_connect(
self.hass,
service_signal(SERVICE_UPDATE, self._signal_name),
self.async_write_ha_state,
)
)
if (
self.entity_description.event_code
and not self.entity_description.should_poll
):
self._unsub_dispatcher.append(
self.async_on_remove(
async_dispatcher_connect(
self.hass,
service_signal(
@ -270,8 +280,3 @@ class AmcrestBinarySensor(BinarySensorEntity):
self.async_event_received,
)
)
async def async_will_remove_from_hass(self) -> None:
"""Disconnect from update signal."""
for unsub_dispatcher in self._unsub_dispatcher:
unsub_dispatcher()

View File

@ -4,7 +4,6 @@ from __future__ import annotations
import asyncio
from collections.abc import Callable
from datetime import timedelta
from functools import partial
import logging
from typing import TYPE_CHECKING, Any
@ -141,7 +140,7 @@ async def async_setup_platform(
# 2021.9.0 introduced unique id's for the camera entity, but these were not
# unique for different resolution streams. If any cameras were configured
# with this version, update the old entity with the new unique id.
serial_number = await hass.async_add_executor_job(lambda: device.api.serial_number) # type: ignore[no-any-return]
serial_number = await device.api.async_serial_number
serial_number = serial_number.strip()
registry = entity_registry.async_get(hass)
entity_id = registry.async_get_entity_id(CAMERA_DOMAIN, DOMAIN, serial_number)
@ -198,20 +197,16 @@ class AmcrestCam(Camera):
)
raise CannotSnapshot
async def _async_get_image(self) -> None:
async def _async_get_image(self) -> bytes | None:
try:
# Send the request to snap a picture and return raw jpg data
# Snapshot command needs a much longer read timeout than other commands.
return await self.hass.async_add_executor_job(
partial(
self._api.snapshot,
timeout=(COMM_TIMEOUT, SNAPSHOT_TIMEOUT),
stream=False,
)
return await self._api.async_snapshot(
timeout=(COMM_TIMEOUT, SNAPSHOT_TIMEOUT)
)
except AmcrestError as error:
log_update_error(_LOGGER, "get image from", self.name, "camera", error)
return
return None
finally:
self._snapshot_task = None
@ -383,7 +378,7 @@ class AmcrestCam(Camera):
for unsub_dispatcher in self._unsub_dispatcher:
unsub_dispatcher()
def update(self) -> None:
async def async_update(self) -> None:
"""Update entity status."""
if not self.available or self._update_succeeded:
if not self.available:
@ -392,33 +387,44 @@ class AmcrestCam(Camera):
_LOGGER.debug("Updating %s camera", self.name)
try:
if self._brand is None:
resp = self._api.vendor_information.strip()
resp = await self._api.async_vendor_information
_LOGGER.debug("Assigned brand=%s", resp)
if resp:
self._brand = resp
else:
self._brand = "unknown"
if self._model is None:
resp = self._api.device_type.strip()
resp = await self._api.async_device_type
_LOGGER.debug("Assigned model=%s", resp)
if resp:
self._model = resp
else:
self._model = "unknown"
if self._attr_unique_id is None:
serial_number = self._api.serial_number.strip()
serial_number = (await self._api.async_serial_number).strip()
if serial_number:
self._attr_unique_id = (
f"{serial_number}-{self._resolution}-{self._channel}"
)
_LOGGER.debug("Assigned unique_id=%s", self._attr_unique_id)
self._attr_is_streaming = self._get_video()
self._is_recording = self._get_recording()
self._motion_detection_enabled = self._get_motion_detection()
self._audio_enabled = self._get_audio()
self._motion_recording_enabled = self._get_motion_recording()
self._color_bw = self._get_color_mode()
self._rtsp_url = self._api.rtsp_url(typeno=self._resolution)
if self._rtsp_url is None:
self._rtsp_url = await self._api.async_rtsp_url(typeno=self._resolution)
(
self._attr_is_streaming,
self._is_recording,
self._motion_detection_enabled,
self._audio_enabled,
self._motion_recording_enabled,
self._color_bw,
) = await asyncio.gather(
self._async_get_video(),
self._async_get_recording(),
self._async_get_motion_detection(),
self._async_get_audio(),
self._async_get_motion_recording(),
self._async_get_color_mode(),
)
except AmcrestError as error:
log_update_error(_LOGGER, "get", self.name, "camera attributes", error)
self._update_succeeded = False
@ -427,63 +433,63 @@ class AmcrestCam(Camera):
# Other Camera method overrides
def turn_off(self) -> None:
async def async_turn_off(self) -> None:
"""Turn off camera."""
self._enable_video(False)
await self._async_enable_video(False)
def turn_on(self) -> None:
async def async_turn_on(self) -> None:
"""Turn on camera."""
self._enable_video(True)
await self._async_enable_video(True)
def enable_motion_detection(self) -> None:
async def async_enable_motion_detection(self) -> None:
"""Enable motion detection in the camera."""
self._enable_motion_detection(True)
await self._async_enable_motion_detection(True)
def disable_motion_detection(self) -> None:
async def async_disable_motion_detection(self) -> None:
"""Disable motion detection in camera."""
self._enable_motion_detection(False)
await self._async_enable_motion_detection(False)
# Additional Amcrest Camera service methods
async def async_enable_recording(self) -> None:
"""Call the job and enable recording."""
await self.hass.async_add_executor_job(self._enable_recording, True)
await self._async_enable_recording(True)
async def async_disable_recording(self) -> None:
"""Call the job and disable recording."""
await self.hass.async_add_executor_job(self._enable_recording, False)
await self._async_enable_recording(False)
async def async_enable_audio(self) -> None:
"""Call the job and enable audio."""
await self.hass.async_add_executor_job(self._enable_audio, True)
await self._async_enable_audio(True)
async def async_disable_audio(self) -> None:
"""Call the job and disable audio."""
await self.hass.async_add_executor_job(self._enable_audio, False)
await self._async_enable_audio(False)
async def async_enable_motion_recording(self) -> None:
"""Call the job and enable motion recording."""
await self.hass.async_add_executor_job(self._enable_motion_recording, True)
await self._async_enable_motion_recording(True)
async def async_disable_motion_recording(self) -> None:
"""Call the job and disable motion recording."""
await self.hass.async_add_executor_job(self._enable_motion_recording, False)
await self._async_enable_motion_recording(False)
async def async_goto_preset(self, preset: int) -> None:
"""Call the job and move camera to preset position."""
await self.hass.async_add_executor_job(self._goto_preset, preset)
await self._async_goto_preset(preset)
async def async_set_color_bw(self, color_bw: str) -> None:
"""Call the job and set camera color mode."""
await self.hass.async_add_executor_job(self._set_color_bw, color_bw)
await self._async_set_color_bw(color_bw)
async def async_start_tour(self) -> None:
"""Call the job and start camera tour."""
await self.hass.async_add_executor_job(self._start_tour, True)
await self._async_start_tour(True)
async def async_stop_tour(self) -> None:
"""Call the job and stop camera tour."""
await self.hass.async_add_executor_job(self._start_tour, False)
await self._async_start_tour(False)
async def async_ptz_control(self, movement: str, travel_time: float) -> None:
"""Move or zoom camera in specified direction."""
@ -496,13 +502,9 @@ class AmcrestCam(Camera):
kwargs["arg1"] = kwargs["arg2"] = 1
try:
await self.hass.async_add_executor_job(
partial(self._api.ptz_control_command, action="start", **kwargs)
)
await self._api.async_ptz_control_command(action="start", **kwargs) # type: ignore[arg-type]
await asyncio.sleep(travel_time)
await self.hass.async_add_executor_job(
partial(self._api.ptz_control_command, action="stop", **kwargs)
)
await self._api.async_ptz_control_command(action="stop", **kwargs) # type: ignore[arg-type]
except AmcrestError as error:
log_update_error(
_LOGGER, "move", self.name, f"camera PTZ {movement}", error
@ -510,7 +512,7 @@ class AmcrestCam(Camera):
# Methods to send commands to Amcrest camera and handle errors
def _change_setting(
async def _async_change_setting(
self, value: str | bool, description: str, attr: str | None = None
) -> None:
func = description.replace(" ", "_")
@ -519,8 +521,8 @@ class AmcrestCam(Camera):
max_tries = 3
for tries in range(max_tries, 0, -1):
try:
getattr(self, f"_set_{func}")(value)
new_value = getattr(self, f"_get_{func}")()
await getattr(self, f"_set_{func}")(value)
new_value = await getattr(self, f"_get_{func}")()
if new_value != value:
raise AmcrestCommandFailed
except (AmcrestError, AmcrestCommandFailed) as error:
@ -536,115 +538,123 @@ class AmcrestCam(Camera):
self.schedule_update_ha_state()
return
def _get_video(self) -> bool:
return self._api.video_enabled
async def _async_get_video(self) -> bool:
return await self._api.async_video_enabled
def _set_video(self, enable: bool) -> None:
self._api.video_enabled = enable
async def _async_set_video(self, enable: bool) -> None:
await self._api.async_set_video_enabled(enable, channel=0)
def _enable_video(self, enable: bool) -> None:
async def _async_enable_video(self, enable: bool) -> None:
"""Enable or disable camera video stream."""
# Given the way the camera's state is determined by
# is_streaming and is_recording, we can't leave
# recording on if video stream is being turned off.
if self.is_recording and not enable:
self._enable_recording(False)
self._change_setting(enable, "video", "is_streaming")
await self._async_enable_recording(False)
await self._async_change_setting(enable, "video", "is_streaming")
if self._control_light:
self._change_light()
await self._async_change_light()
def _get_recording(self) -> bool:
return self._api.record_mode == "Manual"
async def _async_get_recording(self) -> bool:
return (await self._api.async_record_mode) == "Manual"
def _set_recording(self, enable: bool) -> None:
async def _async_set_recording(self, enable: bool) -> None:
rec_mode = {"Automatic": 0, "Manual": 1}
# The property has a str type, but setter has int type, which causes mypy confusion
self._api.record_mode = rec_mode["Manual" if enable else "Automatic"] # type: ignore[assignment]
await self._api.async_set_record_mode(
rec_mode["Manual" if enable else "Automatic"]
)
def _enable_recording(self, enable: bool) -> None:
async def _async_enable_recording(self, enable: bool) -> None:
"""Turn recording on or off."""
# Given the way the camera's state is determined by
# is_streaming and is_recording, we can't leave
# video stream off if recording is being turned on.
if not self.is_streaming and enable:
self._enable_video(True)
self._change_setting(enable, "recording", "_is_recording")
await self._async_enable_video(True)
await self._async_change_setting(enable, "recording", "_is_recording")
def _get_motion_detection(self) -> bool:
return self._api.is_motion_detector_on()
async def _async_get_motion_detection(self) -> bool:
return await self._api.async_is_motion_detector_on()
def _set_motion_detection(self, enable: bool) -> None:
async def _async_set_motion_detection(self, enable: bool) -> None:
# The property has a str type, but setter has bool type, which causes mypy confusion
self._api.motion_detection = enable # type: ignore[assignment]
await self._api.async_set_motion_detection(enable)
def _enable_motion_detection(self, enable: bool) -> None:
async def _async_enable_motion_detection(self, enable: bool) -> None:
"""Enable or disable motion detection."""
self._change_setting(enable, "motion detection", "_motion_detection_enabled")
await self._async_change_setting(
enable, "motion detection", "_motion_detection_enabled"
)
def _get_audio(self) -> bool:
return self._api.audio_enabled
async def _async_get_audio(self) -> bool:
return await self._api.async_audio_enabled
def _set_audio(self, enable: bool) -> None:
self._api.audio_enabled = enable
async def _async_set_audio(self, enable: bool) -> None:
await self._api.async_set_audio_enabled(enable)
def _enable_audio(self, enable: bool) -> None:
async def _async_enable_audio(self, enable: bool) -> None:
"""Enable or disable audio stream."""
self._change_setting(enable, "audio", "_audio_enabled")
await self._async_change_setting(enable, "audio", "_audio_enabled")
if self._control_light:
self._change_light()
await self._async_change_light()
def _get_indicator_light(self) -> bool:
async def _async_get_indicator_light(self) -> bool:
return (
"true"
in self._api.command(
"configManager.cgi?action=getConfig&name=LightGlobal"
in (
await self._api.async_command(
"configManager.cgi?action=getConfig&name=LightGlobal"
)
).content.decode()
)
def _set_indicator_light(self, enable: bool) -> None:
self._api.command(
async def _async_set_indicator_light(self, enable: bool) -> None:
await self._api.async_command(
f"configManager.cgi?action=setConfig&LightGlobal[0].Enable={str(enable).lower()}"
)
def _change_light(self) -> None:
async def _async_change_light(self) -> None:
"""Enable or disable indicator light."""
self._change_setting(
await self._async_change_setting(
self._audio_enabled or self.is_streaming, "indicator light"
)
def _get_motion_recording(self) -> bool:
return self._api.is_record_on_motion_detection()
async def _async_get_motion_recording(self) -> bool:
return await self._api.async_is_record_on_motion_detection()
def _set_motion_recording(self, enable: bool) -> None:
self._api.motion_recording = enable
async def _async_set_motion_recording(self, enable: bool) -> None:
await self._api.async_set_motion_recording(enable)
def _enable_motion_recording(self, enable: bool) -> None:
async def _async_enable_motion_recording(self, enable: bool) -> None:
"""Enable or disable motion recording."""
self._change_setting(enable, "motion recording", "_motion_recording_enabled")
await self._async_change_setting(
enable, "motion recording", "_motion_recording_enabled"
)
def _goto_preset(self, preset: int) -> None:
async def _async_goto_preset(self, preset: int) -> None:
"""Move camera position and zoom to preset."""
try:
self._api.go_to_preset(preset_point_number=preset)
await self._api.async_go_to_preset(preset_point_number=preset)
except AmcrestError as error:
log_update_error(
_LOGGER, "move", self.name, f"camera to preset {preset}", error
)
def _get_color_mode(self) -> str:
return _CBW[self._api.day_night_color]
async def _async_get_color_mode(self) -> str:
return _CBW[await self._api.async_day_night_color]
def _set_color_mode(self, cbw: str) -> None:
self._api.day_night_color = _CBW.index(cbw)
async def _async_set_color_mode(self, cbw: str) -> None:
await self._api.async_set_day_night_color(_CBW.index(cbw), channel=0)
def _set_color_bw(self, cbw: str) -> None:
async def _async_set_color_bw(self, cbw: str) -> None:
"""Set camera color mode."""
self._change_setting(cbw, "color mode", "_color_bw")
await self._async_change_setting(cbw, "color mode", "_color_bw")
def _start_tour(self, start: bool) -> None:
async def _async_start_tour(self, start: bool) -> None:
"""Start camera tour."""
try:
self._api.tour(start=start)
await self._api.async_tour(start=start)
except AmcrestError as error:
log_update_error(
_LOGGER, "start" if start else "stop", self.name, "camera tour", error

View File

@ -1,7 +1,6 @@
"""Support for Amcrest IP camera sensors."""
from __future__ import annotations
from collections.abc import Callable
from datetime import timedelta
import logging
from typing import TYPE_CHECKING
@ -79,7 +78,6 @@ class AmcrestSensor(SensorEntity):
self._signal_name = name
self._api = device.api
self._channel = device.channel
self._unsub_dispatcher: Callable[[], None] | None = None
self._attr_name = f"{name} {description.name}"
self._attr_extra_state_attributes = {}
@ -89,27 +87,25 @@ class AmcrestSensor(SensorEntity):
"""Return True if entity is available."""
return self._api.available
def update(self) -> None:
async def async_update(self) -> None:
"""Get the latest data and updates the state."""
if not self.available:
return
_LOGGER.debug("Updating %s sensor", self.name)
sensor_type = self.entity_description.key
if self._attr_unique_id is None and (serial_number := self._api.serial_number):
self._attr_unique_id = f"{serial_number}-{sensor_type}-{self._channel}"
try:
if self._attr_unique_id is None and (
serial_number := self._api.serial_number
serial_number := (await self._api.async_serial_number)
):
self._attr_unique_id = f"{serial_number}-{sensor_type}-{self._channel}"
if sensor_type == SENSOR_PTZ_PRESET:
self._attr_native_value = self._api.ptz_presets_count
self._attr_native_value = await self._api.async_ptz_presets_count
elif sensor_type == SENSOR_SDCARD:
storage = self._api.storage_all
storage = await self._api.async_storage_all
try:
self._attr_extra_state_attributes[
"Total"
@ -133,19 +129,12 @@ class AmcrestSensor(SensorEntity):
except AmcrestError as error:
log_update_error(_LOGGER, "update", self.name, "sensor", error)
async def async_on_demand_update(self) -> None:
"""Update state."""
self.async_schedule_update_ha_state(True)
async def async_added_to_hass(self) -> None:
"""Subscribe to update signal."""
self._unsub_dispatcher = async_dispatcher_connect(
self.hass,
service_signal(SERVICE_UPDATE, self._signal_name),
self.async_on_demand_update,
self.async_on_remove(
async_dispatcher_connect(
self.hass,
service_signal(SERVICE_UPDATE, self._signal_name),
self.async_write_ha_state,
)
)
async def async_will_remove_from_hass(self) -> None:
"""Disconnect from update signal."""
assert self._unsub_dispatcher is not None
self._unsub_dispatcher()

View File

@ -69,22 +69,22 @@ class AmcrestSwitch(SwitchEntity):
"""Return True if entity is available."""
return self._api.available
def turn_on(self, **kwargs: Any) -> None:
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn the switch on."""
self._turn_switch(True)
await self._async_turn_switch(True)
def turn_off(self, **kwargs: Any) -> None:
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the switch off."""
self._turn_switch(False)
await self._async_turn_switch(False)
def _turn_switch(self, mode: bool) -> None:
async def _async_turn_switch(self, mode: bool) -> None:
"""Set privacy mode."""
lower_str = str(mode).lower()
self._api.command(
await self._api.async_command(
f"configManager.cgi?action=setConfig&LeLensMask[0].Enable={lower_str}"
)
def update(self) -> None:
async def async_update(self) -> None:
"""Update switch."""
io_res = self._api.privacy_config().splitlines()[0].split("=")[1]
io_res = (await self._api.async_privacy_config()).splitlines()[0].split("=")[1]
self._attr_is_on = io_res == "true"