From f53109f5135a87900e86e0e547c948ce09bc53a9 Mon Sep 17 00:00:00 2001 From: Matthias Alphart Date: Sun, 7 Jan 2024 23:26:46 +0100 Subject: [PATCH] Move KNX service registration to `async_setup` (#106635) --- homeassistant/components/knx/__init__.py | 248 +------------------- homeassistant/components/knx/const.py | 9 + homeassistant/components/knx/services.py | 284 +++++++++++++++++++++++ tests/components/knx/test_services.py | 16 ++ 4 files changed, 320 insertions(+), 237 deletions(-) create mode 100644 homeassistant/components/knx/services.py diff --git a/homeassistant/components/knx/__init__.py b/homeassistant/components/knx/__init__.py index 3444e9b002a2..c6869f34eeb6 100644 --- a/homeassistant/components/knx/__init__.py +++ b/homeassistant/components/knx/__init__.py @@ -5,23 +5,17 @@ import asyncio import contextlib import logging from pathlib import Path -from typing import Final import voluptuous as vol from xknx import XKNX from xknx.core import XknxConnectionState from xknx.core.telegram_queue import TelegramQueue -from xknx.dpt import DPTArray, DPTBase, DPTBinary +from xknx.dpt import DPTBase from xknx.exceptions import ConversionError, CouldNotParseTelegram, XKNXException from xknx.io import ConnectionConfig, ConnectionType, SecureConfig from xknx.telegram import AddressFilter, Telegram -from xknx.telegram.address import ( - DeviceGroupAddress, - GroupAddress, - InternalGroupAddress, - parse_device_group_address, -) -from xknx.telegram.apci import GroupValueRead, GroupValueResponse, GroupValueWrite +from xknx.telegram.address import DeviceGroupAddress, GroupAddress, InternalGroupAddress +from xknx.telegram.apci import GroupValueResponse, GroupValueWrite from homeassistant.config_entries import ConfigEntry from homeassistant.const import ( @@ -30,15 +24,13 @@ from homeassistant.const import ( CONF_PORT, CONF_TYPE, EVENT_HOMEASSISTANT_STOP, - SERVICE_RELOAD, Platform, ) -from homeassistant.core import Event, HomeAssistant, ServiceCall -from homeassistant.exceptions import ConfigEntryNotReady, HomeAssistantError +from homeassistant.core import Event, HomeAssistant +from homeassistant.exceptions import ConfigEntryNotReady from homeassistant.helpers import discovery import homeassistant.helpers.config_validation as cv from homeassistant.helpers.reload import async_integration_yaml_config -from homeassistant.helpers.service import async_register_admin_service from homeassistant.helpers.storage import STORAGE_DIR from homeassistant.helpers.typing import ConfigType @@ -95,24 +87,14 @@ from .schema import ( TextSchema, TimeSchema, WeatherSchema, - ga_validator, - sensor_type_validator, ) +from .services import register_knx_services from .telegrams import STORAGE_KEY as TELEGRAMS_STORAGE_KEY, Telegrams from .websocket import register_panel _LOGGER = logging.getLogger(__name__) -SERVICE_KNX_SEND: Final = "send" -SERVICE_KNX_ATTR_PAYLOAD: Final = "payload" -SERVICE_KNX_ATTR_TYPE: Final = "type" -SERVICE_KNX_ATTR_RESPONSE: Final = "response" -SERVICE_KNX_ATTR_REMOVE: Final = "remove" -SERVICE_KNX_EVENT_REGISTER: Final = "event_register" -SERVICE_KNX_EXPOSURE_REGISTER: Final = "exposure_register" -SERVICE_KNX_READ: Final = "read" - CONFIG_SCHEMA = vol.Schema( { DOMAIN: vol.All( @@ -158,69 +140,6 @@ CONFIG_SCHEMA = vol.Schema( extra=vol.ALLOW_EXTRA, ) -SERVICE_KNX_SEND_SCHEMA = vol.Any( - vol.Schema( - { - vol.Required(KNX_ADDRESS): vol.All( - cv.ensure_list, - [ga_validator], - ), - vol.Required(SERVICE_KNX_ATTR_PAYLOAD): cv.match_all, - vol.Required(SERVICE_KNX_ATTR_TYPE): sensor_type_validator, - vol.Optional(SERVICE_KNX_ATTR_RESPONSE, default=False): cv.boolean, - } - ), - vol.Schema( - # without type given payload is treated as raw bytes - { - vol.Required(KNX_ADDRESS): vol.All( - cv.ensure_list, - [ga_validator], - ), - vol.Required(SERVICE_KNX_ATTR_PAYLOAD): vol.Any( - cv.positive_int, [cv.positive_int] - ), - vol.Optional(SERVICE_KNX_ATTR_RESPONSE, default=False): cv.boolean, - } - ), -) - -SERVICE_KNX_READ_SCHEMA = vol.Schema( - { - vol.Required(KNX_ADDRESS): vol.All( - cv.ensure_list, - [ga_validator], - ) - } -) - -SERVICE_KNX_EVENT_REGISTER_SCHEMA = vol.Schema( - { - vol.Required(KNX_ADDRESS): vol.All( - cv.ensure_list, - [ga_validator], - ), - vol.Optional(CONF_TYPE): sensor_type_validator, - vol.Optional(SERVICE_KNX_ATTR_REMOVE, default=False): cv.boolean, - } -) - -SERVICE_KNX_EXPOSURE_REGISTER_SCHEMA = vol.Any( - ExposeSchema.EXPOSE_SENSOR_SCHEMA.extend( - { - vol.Optional(SERVICE_KNX_ATTR_REMOVE, default=False): cv.boolean, - } - ), - vol.Schema( - # for removing only `address` is required - { - vol.Required(KNX_ADDRESS): ga_validator, - vol.Required(SERVICE_KNX_ATTR_REMOVE): vol.All(cv.boolean, True), - }, - extra=vol.ALLOW_EXTRA, - ), -) - async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Start the KNX integration.""" @@ -235,6 +154,8 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: conf = dict(conf) hass.data[DATA_KNX_CONFIG] = conf + register_knx_services(hass) + return True @@ -287,43 +208,6 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: ) ) - hass.services.async_register( - DOMAIN, - SERVICE_KNX_SEND, - knx_module.service_send_to_knx_bus, - schema=SERVICE_KNX_SEND_SCHEMA, - ) - - hass.services.async_register( - DOMAIN, - SERVICE_KNX_READ, - knx_module.service_read_to_knx_bus, - schema=SERVICE_KNX_READ_SCHEMA, - ) - - async_register_admin_service( - hass, - DOMAIN, - SERVICE_KNX_EVENT_REGISTER, - knx_module.service_event_register_modify, - schema=SERVICE_KNX_EVENT_REGISTER_SCHEMA, - ) - - async_register_admin_service( - hass, - DOMAIN, - SERVICE_KNX_EXPOSURE_REGISTER, - knx_module.service_exposure_register_modify, - schema=SERVICE_KNX_EXPOSURE_REGISTER_SCHEMA, - ) - - async def _reload_integration(call: ServiceCall) -> None: - """Reload the integration.""" - await hass.config_entries.async_reload(entry.entry_id) - hass.bus.async_fire(f"event_{DOMAIN}_reloaded", context=call.context) - - async_register_admin_service(hass, DOMAIN, SERVICE_RELOAD, _reload_integration) - await register_panel(hass) return True @@ -419,10 +303,8 @@ class KNXModule: ) self._address_filter_transcoder: dict[AddressFilter, type[DPTBase]] = {} - self._group_address_transcoder: dict[DeviceGroupAddress, type[DPTBase]] = {} - self._knx_event_callback: TelegramQueue.Callback = ( - self.register_event_callback() - ) + self.group_address_transcoder: dict[DeviceGroupAddress, type[DPTBase]] = {} + self.knx_event_callback: TelegramQueue.Callback = self.register_event_callback() self.entry.async_on_unload( self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.stop) @@ -555,7 +437,7 @@ class KNXModule: ): data = telegram.payload.value.value if transcoder := ( - self._group_address_transcoder.get(telegram.destination_address) + self.group_address_transcoder.get(telegram.destination_address) or next( ( _transcoder @@ -612,111 +494,3 @@ class KNXModule: group_addresses=[], match_for_outgoing=True, ) - - async def service_event_register_modify(self, call: ServiceCall) -> None: - """Service for adding or removing a GroupAddress to the knx_event filter.""" - attr_address = call.data[KNX_ADDRESS] - group_addresses = list(map(parse_device_group_address, attr_address)) - - if call.data.get(SERVICE_KNX_ATTR_REMOVE): - for group_address in group_addresses: - try: - self._knx_event_callback.group_addresses.remove(group_address) - except ValueError: - _LOGGER.warning( - "Service event_register could not remove event for '%s'", - str(group_address), - ) - if group_address in self._group_address_transcoder: - del self._group_address_transcoder[group_address] - return - - if (dpt := call.data.get(CONF_TYPE)) and ( - transcoder := DPTBase.parse_transcoder(dpt) - ): - self._group_address_transcoder.update( - { - _address: transcoder # type: ignore[type-abstract] - for _address in group_addresses - } - ) - for group_address in group_addresses: - if group_address in self._knx_event_callback.group_addresses: - continue - self._knx_event_callback.group_addresses.append(group_address) - _LOGGER.debug( - "Service event_register registered event for '%s'", - str(group_address), - ) - - async def service_exposure_register_modify(self, call: ServiceCall) -> None: - """Service for adding or removing an exposure to KNX bus.""" - group_address = call.data[KNX_ADDRESS] - - if call.data.get(SERVICE_KNX_ATTR_REMOVE): - try: - removed_exposure = self.service_exposures.pop(group_address) - except KeyError as err: - raise HomeAssistantError( - f"Could not find exposure for '{group_address}' to remove." - ) from err - - removed_exposure.shutdown() - return - - if group_address in self.service_exposures: - replaced_exposure = self.service_exposures.pop(group_address) - _LOGGER.warning( - ( - "Service exposure_register replacing already registered exposure" - " for '%s' - %s" - ), - group_address, - replaced_exposure.device.name, - ) - replaced_exposure.shutdown() - exposure = create_knx_exposure(self.hass, self.xknx, call.data) - self.service_exposures[group_address] = exposure - _LOGGER.debug( - "Service exposure_register registered exposure for '%s' - %s", - group_address, - exposure.device.name, - ) - - async def service_send_to_knx_bus(self, call: ServiceCall) -> None: - """Service for sending an arbitrary KNX message to the KNX bus.""" - attr_address = call.data[KNX_ADDRESS] - attr_payload = call.data[SERVICE_KNX_ATTR_PAYLOAD] - attr_type = call.data.get(SERVICE_KNX_ATTR_TYPE) - attr_response = call.data[SERVICE_KNX_ATTR_RESPONSE] - - payload: DPTBinary | DPTArray - if attr_type is not None: - transcoder = DPTBase.parse_transcoder(attr_type) - if transcoder is None: - raise ValueError(f"Invalid type for knx.send service: {attr_type}") - payload = transcoder.to_knx(attr_payload) - elif isinstance(attr_payload, int): - payload = DPTBinary(attr_payload) - else: - payload = DPTArray(attr_payload) - - for address in attr_address: - telegram = Telegram( - destination_address=parse_device_group_address(address), - payload=GroupValueResponse(payload) - if attr_response - else GroupValueWrite(payload), - source_address=self.xknx.current_address, - ) - await self.xknx.telegrams.put(telegram) - - async def service_read_to_knx_bus(self, call: ServiceCall) -> None: - """Service for sending a GroupValueRead telegram to the KNX bus.""" - for address in call.data[KNX_ADDRESS]: - telegram = Telegram( - destination_address=parse_device_group_address(address), - payload=GroupValueRead(), - source_address=self.xknx.current_address, - ) - await self.xknx.telegrams.put(telegram) diff --git a/homeassistant/components/knx/const.py b/homeassistant/components/knx/const.py index aa48bcdf5574..8cb1986c5402 100644 --- a/homeassistant/components/knx/const.py +++ b/homeassistant/components/knx/const.py @@ -88,6 +88,15 @@ SIGNAL_KNX_TELEGRAM_DICT: Final = "knx_telegram_dict" AsyncMessageCallbackType = Callable[[Telegram], Awaitable[None]] MessageCallbackType = Callable[[Telegram], None] +SERVICE_KNX_SEND: Final = "send" +SERVICE_KNX_ATTR_PAYLOAD: Final = "payload" +SERVICE_KNX_ATTR_TYPE: Final = "type" +SERVICE_KNX_ATTR_RESPONSE: Final = "response" +SERVICE_KNX_ATTR_REMOVE: Final = "remove" +SERVICE_KNX_EVENT_REGISTER: Final = "event_register" +SERVICE_KNX_EXPOSURE_REGISTER: Final = "exposure_register" +SERVICE_KNX_READ: Final = "read" + class KNXConfigEntryData(TypedDict, total=False): """Config entry for the KNX integration.""" diff --git a/homeassistant/components/knx/services.py b/homeassistant/components/knx/services.py new file mode 100644 index 000000000000..99c44a5eee6f --- /dev/null +++ b/homeassistant/components/knx/services.py @@ -0,0 +1,284 @@ +"""KNX integration services.""" +from __future__ import annotations + +from functools import partial +import logging +from typing import TYPE_CHECKING + +import voluptuous as vol +from xknx.dpt import DPTArray, DPTBase, DPTBinary +from xknx.telegram import Telegram +from xknx.telegram.address import parse_device_group_address +from xknx.telegram.apci import GroupValueRead, GroupValueResponse, GroupValueWrite + +from homeassistant.const import CONF_TYPE, SERVICE_RELOAD +from homeassistant.core import HomeAssistant, ServiceCall, callback +from homeassistant.exceptions import HomeAssistantError, ServiceValidationError +import homeassistant.helpers.config_validation as cv +from homeassistant.helpers.service import async_register_admin_service + +from .const import ( + DOMAIN, + KNX_ADDRESS, + SERVICE_KNX_ATTR_PAYLOAD, + SERVICE_KNX_ATTR_REMOVE, + SERVICE_KNX_ATTR_RESPONSE, + SERVICE_KNX_ATTR_TYPE, + SERVICE_KNX_EVENT_REGISTER, + SERVICE_KNX_EXPOSURE_REGISTER, + SERVICE_KNX_READ, + SERVICE_KNX_SEND, +) +from .expose import create_knx_exposure +from .schema import ExposeSchema, ga_validator, sensor_type_validator + +if TYPE_CHECKING: + from . import KNXModule + +_LOGGER = logging.getLogger(__name__) + + +@callback +def register_knx_services(hass: HomeAssistant) -> None: + """Register KNX integration services.""" + hass.services.async_register( + DOMAIN, + SERVICE_KNX_SEND, + partial(service_send_to_knx_bus, hass), + schema=SERVICE_KNX_SEND_SCHEMA, + ) + + hass.services.async_register( + DOMAIN, + SERVICE_KNX_READ, + partial(service_read_to_knx_bus, hass), + schema=SERVICE_KNX_READ_SCHEMA, + ) + + async_register_admin_service( + hass, + DOMAIN, + SERVICE_KNX_EVENT_REGISTER, + partial(service_event_register_modify, hass), + schema=SERVICE_KNX_EVENT_REGISTER_SCHEMA, + ) + + async_register_admin_service( + hass, + DOMAIN, + SERVICE_KNX_EXPOSURE_REGISTER, + partial(service_exposure_register_modify, hass), + schema=SERVICE_KNX_EXPOSURE_REGISTER_SCHEMA, + ) + + async_register_admin_service( + hass, + DOMAIN, + SERVICE_RELOAD, + partial(service_reload_integration, hass), + ) + + +@callback +def get_knx_module(hass: HomeAssistant) -> KNXModule: + """Return KNXModule instance.""" + try: + return hass.data[DOMAIN] # type: ignore[no-any-return] + except KeyError as err: + raise HomeAssistantError("KNX entry not loaded") from err + + +SERVICE_KNX_EVENT_REGISTER_SCHEMA = vol.Schema( + { + vol.Required(KNX_ADDRESS): vol.All( + cv.ensure_list, + [ga_validator], + ), + vol.Optional(CONF_TYPE): sensor_type_validator, + vol.Optional(SERVICE_KNX_ATTR_REMOVE, default=False): cv.boolean, + } +) + + +async def service_event_register_modify(hass: HomeAssistant, call: ServiceCall) -> None: + """Service for adding or removing a GroupAddress to the knx_event filter.""" + knx_module = get_knx_module(hass) + + attr_address = call.data[KNX_ADDRESS] + group_addresses = list(map(parse_device_group_address, attr_address)) + + if call.data.get(SERVICE_KNX_ATTR_REMOVE): + for group_address in group_addresses: + try: + knx_module.knx_event_callback.group_addresses.remove(group_address) + except ValueError: + _LOGGER.warning( + "Service event_register could not remove event for '%s'", + str(group_address), + ) + if group_address in knx_module.group_address_transcoder: + del knx_module.group_address_transcoder[group_address] + return + + if (dpt := call.data.get(CONF_TYPE)) and ( + transcoder := DPTBase.parse_transcoder(dpt) + ): + knx_module.group_address_transcoder.update( + { + _address: transcoder # type: ignore[type-abstract] + for _address in group_addresses + } + ) + for group_address in group_addresses: + if group_address in knx_module.knx_event_callback.group_addresses: + continue + knx_module.knx_event_callback.group_addresses.append(group_address) + _LOGGER.debug( + "Service event_register registered event for '%s'", + str(group_address), + ) + + +SERVICE_KNX_EXPOSURE_REGISTER_SCHEMA = vol.Any( + ExposeSchema.EXPOSE_SENSOR_SCHEMA.extend( + { + vol.Optional(SERVICE_KNX_ATTR_REMOVE, default=False): cv.boolean, + } + ), + vol.Schema( + # for removing only `address` is required + { + vol.Required(KNX_ADDRESS): ga_validator, + vol.Required(SERVICE_KNX_ATTR_REMOVE): vol.All(cv.boolean, True), + }, + extra=vol.ALLOW_EXTRA, + ), +) + + +async def service_exposure_register_modify( + hass: HomeAssistant, call: ServiceCall +) -> None: + """Service for adding or removing an exposure to KNX bus.""" + knx_module = get_knx_module(hass) + + group_address = call.data[KNX_ADDRESS] + + if call.data.get(SERVICE_KNX_ATTR_REMOVE): + try: + removed_exposure = knx_module.service_exposures.pop(group_address) + except KeyError as err: + raise ServiceValidationError( + f"Could not find exposure for '{group_address}' to remove." + ) from err + + removed_exposure.shutdown() + return + + if group_address in knx_module.service_exposures: + replaced_exposure = knx_module.service_exposures.pop(group_address) + _LOGGER.warning( + ( + "Service exposure_register replacing already registered exposure" + " for '%s' - %s" + ), + group_address, + replaced_exposure.device.name, + ) + replaced_exposure.shutdown() + exposure = create_knx_exposure(knx_module.hass, knx_module.xknx, call.data) + knx_module.service_exposures[group_address] = exposure + _LOGGER.debug( + "Service exposure_register registered exposure for '%s' - %s", + group_address, + exposure.device.name, + ) + + +SERVICE_KNX_SEND_SCHEMA = vol.Any( + vol.Schema( + { + vol.Required(KNX_ADDRESS): vol.All( + cv.ensure_list, + [ga_validator], + ), + vol.Required(SERVICE_KNX_ATTR_PAYLOAD): cv.match_all, + vol.Required(SERVICE_KNX_ATTR_TYPE): sensor_type_validator, + vol.Optional(SERVICE_KNX_ATTR_RESPONSE, default=False): cv.boolean, + } + ), + vol.Schema( + # without type given payload is treated as raw bytes + { + vol.Required(KNX_ADDRESS): vol.All( + cv.ensure_list, + [ga_validator], + ), + vol.Required(SERVICE_KNX_ATTR_PAYLOAD): vol.Any( + cv.positive_int, [cv.positive_int] + ), + vol.Optional(SERVICE_KNX_ATTR_RESPONSE, default=False): cv.boolean, + } + ), +) + + +async def service_send_to_knx_bus(hass: HomeAssistant, call: ServiceCall) -> None: + """Service for sending an arbitrary KNX message to the KNX bus.""" + knx_module = get_knx_module(hass) + + attr_address = call.data[KNX_ADDRESS] + attr_payload = call.data[SERVICE_KNX_ATTR_PAYLOAD] + attr_type = call.data.get(SERVICE_KNX_ATTR_TYPE) + attr_response = call.data[SERVICE_KNX_ATTR_RESPONSE] + + payload: DPTBinary | DPTArray + if attr_type is not None: + transcoder = DPTBase.parse_transcoder(attr_type) + if transcoder is None: + raise ValueError(f"Invalid type for knx.send service: {attr_type}") + payload = transcoder.to_knx(attr_payload) + elif isinstance(attr_payload, int): + payload = DPTBinary(attr_payload) + else: + payload = DPTArray(attr_payload) + + for address in attr_address: + telegram = Telegram( + destination_address=parse_device_group_address(address), + payload=GroupValueResponse(payload) + if attr_response + else GroupValueWrite(payload), + source_address=knx_module.xknx.current_address, + ) + await knx_module.xknx.telegrams.put(telegram) + + +SERVICE_KNX_READ_SCHEMA = vol.Schema( + { + vol.Required(KNX_ADDRESS): vol.All( + cv.ensure_list, + [ga_validator], + ) + } +) + + +async def service_read_to_knx_bus(hass: HomeAssistant, call: ServiceCall) -> None: + """Service for sending a GroupValueRead telegram to the KNX bus.""" + knx_module = get_knx_module(hass) + + for address in call.data[KNX_ADDRESS]: + telegram = Telegram( + destination_address=parse_device_group_address(address), + payload=GroupValueRead(), + source_address=knx_module.xknx.current_address, + ) + await knx_module.xknx.telegrams.put(telegram) + + +async def service_reload_integration(hass: HomeAssistant, call: ServiceCall) -> None: + """Reload the integration.""" + knx_module = get_knx_module(hass) + await hass.config_entries.async_reload(knx_module.entry.entry_id) + hass.bus.async_fire(f"event_{DOMAIN}_reloaded", context=call.context) diff --git a/tests/components/knx/test_services.py b/tests/components/knx/test_services.py index 5796eae8393e..30b297218cc1 100644 --- a/tests/components/knx/test_services.py +++ b/tests/components/knx/test_services.py @@ -7,6 +7,7 @@ from xknx.telegram.apci import GroupValueResponse, GroupValueWrite from homeassistant.components.knx import async_unload_entry as knx_async_unload_entry from homeassistant.const import STATE_OFF, STATE_ON from homeassistant.core import HomeAssistant +from homeassistant.exceptions import HomeAssistantError from .conftest import KNXTestKit @@ -274,3 +275,18 @@ async def test_reload_service( ) mock_unload_entry.assert_called_once() mock_setup_entry.assert_called_once() + + +async def test_service_setup_failed(hass: HomeAssistant, knx: KNXTestKit) -> None: + """Test service setup failed.""" + await knx.setup_integration({}) + await knx.mock_config_entry.async_unload(hass) + + with pytest.raises(HomeAssistantError) as exc_info: + await hass.services.async_call( + "knx", + "send", + {"address": "1/2/3", "payload": True, "response": False}, + blocking=True, + ) + assert str(exc_info.value) == "KNX entry not loaded"