1
mirror of https://github.com/home-assistant/core synced 2024-07-15 09:42:11 +02:00

Move KNX service registration to async_setup (#106635)

This commit is contained in:
Matthias Alphart 2024-01-07 23:26:46 +01:00 committed by GitHub
parent 426a1511d5
commit f53109f513
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 320 additions and 237 deletions

View File

@ -5,23 +5,17 @@ import asyncio
import contextlib
import logging
from pathlib import Path
from typing import Final
import voluptuous as vol
from xknx import XKNX
from xknx.core import XknxConnectionState
from xknx.core.telegram_queue import TelegramQueue
from xknx.dpt import DPTArray, DPTBase, DPTBinary
from xknx.dpt import DPTBase
from xknx.exceptions import ConversionError, CouldNotParseTelegram, XKNXException
from xknx.io import ConnectionConfig, ConnectionType, SecureConfig
from xknx.telegram import AddressFilter, Telegram
from xknx.telegram.address import (
DeviceGroupAddress,
GroupAddress,
InternalGroupAddress,
parse_device_group_address,
)
from xknx.telegram.apci import GroupValueRead, GroupValueResponse, GroupValueWrite
from xknx.telegram.address import DeviceGroupAddress, GroupAddress, InternalGroupAddress
from xknx.telegram.apci import GroupValueResponse, GroupValueWrite
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
@ -30,15 +24,13 @@ from homeassistant.const import (
CONF_PORT,
CONF_TYPE,
EVENT_HOMEASSISTANT_STOP,
SERVICE_RELOAD,
Platform,
)
from homeassistant.core import Event, HomeAssistant, ServiceCall
from homeassistant.exceptions import ConfigEntryNotReady, HomeAssistantError
from homeassistant.core import Event, HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import discovery
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.reload import async_integration_yaml_config
from homeassistant.helpers.service import async_register_admin_service
from homeassistant.helpers.storage import STORAGE_DIR
from homeassistant.helpers.typing import ConfigType
@ -95,24 +87,14 @@ from .schema import (
TextSchema,
TimeSchema,
WeatherSchema,
ga_validator,
sensor_type_validator,
)
from .services import register_knx_services
from .telegrams import STORAGE_KEY as TELEGRAMS_STORAGE_KEY, Telegrams
from .websocket import register_panel
_LOGGER = logging.getLogger(__name__)
SERVICE_KNX_SEND: Final = "send"
SERVICE_KNX_ATTR_PAYLOAD: Final = "payload"
SERVICE_KNX_ATTR_TYPE: Final = "type"
SERVICE_KNX_ATTR_RESPONSE: Final = "response"
SERVICE_KNX_ATTR_REMOVE: Final = "remove"
SERVICE_KNX_EVENT_REGISTER: Final = "event_register"
SERVICE_KNX_EXPOSURE_REGISTER: Final = "exposure_register"
SERVICE_KNX_READ: Final = "read"
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.All(
@ -158,69 +140,6 @@ CONFIG_SCHEMA = vol.Schema(
extra=vol.ALLOW_EXTRA,
)
SERVICE_KNX_SEND_SCHEMA = vol.Any(
vol.Schema(
{
vol.Required(KNX_ADDRESS): vol.All(
cv.ensure_list,
[ga_validator],
),
vol.Required(SERVICE_KNX_ATTR_PAYLOAD): cv.match_all,
vol.Required(SERVICE_KNX_ATTR_TYPE): sensor_type_validator,
vol.Optional(SERVICE_KNX_ATTR_RESPONSE, default=False): cv.boolean,
}
),
vol.Schema(
# without type given payload is treated as raw bytes
{
vol.Required(KNX_ADDRESS): vol.All(
cv.ensure_list,
[ga_validator],
),
vol.Required(SERVICE_KNX_ATTR_PAYLOAD): vol.Any(
cv.positive_int, [cv.positive_int]
),
vol.Optional(SERVICE_KNX_ATTR_RESPONSE, default=False): cv.boolean,
}
),
)
SERVICE_KNX_READ_SCHEMA = vol.Schema(
{
vol.Required(KNX_ADDRESS): vol.All(
cv.ensure_list,
[ga_validator],
)
}
)
SERVICE_KNX_EVENT_REGISTER_SCHEMA = vol.Schema(
{
vol.Required(KNX_ADDRESS): vol.All(
cv.ensure_list,
[ga_validator],
),
vol.Optional(CONF_TYPE): sensor_type_validator,
vol.Optional(SERVICE_KNX_ATTR_REMOVE, default=False): cv.boolean,
}
)
SERVICE_KNX_EXPOSURE_REGISTER_SCHEMA = vol.Any(
ExposeSchema.EXPOSE_SENSOR_SCHEMA.extend(
{
vol.Optional(SERVICE_KNX_ATTR_REMOVE, default=False): cv.boolean,
}
),
vol.Schema(
# for removing only `address` is required
{
vol.Required(KNX_ADDRESS): ga_validator,
vol.Required(SERVICE_KNX_ATTR_REMOVE): vol.All(cv.boolean, True),
},
extra=vol.ALLOW_EXTRA,
),
)
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Start the KNX integration."""
@ -235,6 +154,8 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
conf = dict(conf)
hass.data[DATA_KNX_CONFIG] = conf
register_knx_services(hass)
return True
@ -287,43 +208,6 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
)
)
hass.services.async_register(
DOMAIN,
SERVICE_KNX_SEND,
knx_module.service_send_to_knx_bus,
schema=SERVICE_KNX_SEND_SCHEMA,
)
hass.services.async_register(
DOMAIN,
SERVICE_KNX_READ,
knx_module.service_read_to_knx_bus,
schema=SERVICE_KNX_READ_SCHEMA,
)
async_register_admin_service(
hass,
DOMAIN,
SERVICE_KNX_EVENT_REGISTER,
knx_module.service_event_register_modify,
schema=SERVICE_KNX_EVENT_REGISTER_SCHEMA,
)
async_register_admin_service(
hass,
DOMAIN,
SERVICE_KNX_EXPOSURE_REGISTER,
knx_module.service_exposure_register_modify,
schema=SERVICE_KNX_EXPOSURE_REGISTER_SCHEMA,
)
async def _reload_integration(call: ServiceCall) -> None:
"""Reload the integration."""
await hass.config_entries.async_reload(entry.entry_id)
hass.bus.async_fire(f"event_{DOMAIN}_reloaded", context=call.context)
async_register_admin_service(hass, DOMAIN, SERVICE_RELOAD, _reload_integration)
await register_panel(hass)
return True
@ -419,10 +303,8 @@ class KNXModule:
)
self._address_filter_transcoder: dict[AddressFilter, type[DPTBase]] = {}
self._group_address_transcoder: dict[DeviceGroupAddress, type[DPTBase]] = {}
self._knx_event_callback: TelegramQueue.Callback = (
self.register_event_callback()
)
self.group_address_transcoder: dict[DeviceGroupAddress, type[DPTBase]] = {}
self.knx_event_callback: TelegramQueue.Callback = self.register_event_callback()
self.entry.async_on_unload(
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.stop)
@ -555,7 +437,7 @@ class KNXModule:
):
data = telegram.payload.value.value
if transcoder := (
self._group_address_transcoder.get(telegram.destination_address)
self.group_address_transcoder.get(telegram.destination_address)
or next(
(
_transcoder
@ -612,111 +494,3 @@ class KNXModule:
group_addresses=[],
match_for_outgoing=True,
)
async def service_event_register_modify(self, call: ServiceCall) -> None:
"""Service for adding or removing a GroupAddress to the knx_event filter."""
attr_address = call.data[KNX_ADDRESS]
group_addresses = list(map(parse_device_group_address, attr_address))
if call.data.get(SERVICE_KNX_ATTR_REMOVE):
for group_address in group_addresses:
try:
self._knx_event_callback.group_addresses.remove(group_address)
except ValueError:
_LOGGER.warning(
"Service event_register could not remove event for '%s'",
str(group_address),
)
if group_address in self._group_address_transcoder:
del self._group_address_transcoder[group_address]
return
if (dpt := call.data.get(CONF_TYPE)) and (
transcoder := DPTBase.parse_transcoder(dpt)
):
self._group_address_transcoder.update(
{
_address: transcoder # type: ignore[type-abstract]
for _address in group_addresses
}
)
for group_address in group_addresses:
if group_address in self._knx_event_callback.group_addresses:
continue
self._knx_event_callback.group_addresses.append(group_address)
_LOGGER.debug(
"Service event_register registered event for '%s'",
str(group_address),
)
async def service_exposure_register_modify(self, call: ServiceCall) -> None:
"""Service for adding or removing an exposure to KNX bus."""
group_address = call.data[KNX_ADDRESS]
if call.data.get(SERVICE_KNX_ATTR_REMOVE):
try:
removed_exposure = self.service_exposures.pop(group_address)
except KeyError as err:
raise HomeAssistantError(
f"Could not find exposure for '{group_address}' to remove."
) from err
removed_exposure.shutdown()
return
if group_address in self.service_exposures:
replaced_exposure = self.service_exposures.pop(group_address)
_LOGGER.warning(
(
"Service exposure_register replacing already registered exposure"
" for '%s' - %s"
),
group_address,
replaced_exposure.device.name,
)
replaced_exposure.shutdown()
exposure = create_knx_exposure(self.hass, self.xknx, call.data)
self.service_exposures[group_address] = exposure
_LOGGER.debug(
"Service exposure_register registered exposure for '%s' - %s",
group_address,
exposure.device.name,
)
async def service_send_to_knx_bus(self, call: ServiceCall) -> None:
"""Service for sending an arbitrary KNX message to the KNX bus."""
attr_address = call.data[KNX_ADDRESS]
attr_payload = call.data[SERVICE_KNX_ATTR_PAYLOAD]
attr_type = call.data.get(SERVICE_KNX_ATTR_TYPE)
attr_response = call.data[SERVICE_KNX_ATTR_RESPONSE]
payload: DPTBinary | DPTArray
if attr_type is not None:
transcoder = DPTBase.parse_transcoder(attr_type)
if transcoder is None:
raise ValueError(f"Invalid type for knx.send service: {attr_type}")
payload = transcoder.to_knx(attr_payload)
elif isinstance(attr_payload, int):
payload = DPTBinary(attr_payload)
else:
payload = DPTArray(attr_payload)
for address in attr_address:
telegram = Telegram(
destination_address=parse_device_group_address(address),
payload=GroupValueResponse(payload)
if attr_response
else GroupValueWrite(payload),
source_address=self.xknx.current_address,
)
await self.xknx.telegrams.put(telegram)
async def service_read_to_knx_bus(self, call: ServiceCall) -> None:
"""Service for sending a GroupValueRead telegram to the KNX bus."""
for address in call.data[KNX_ADDRESS]:
telegram = Telegram(
destination_address=parse_device_group_address(address),
payload=GroupValueRead(),
source_address=self.xknx.current_address,
)
await self.xknx.telegrams.put(telegram)

View File

@ -88,6 +88,15 @@ SIGNAL_KNX_TELEGRAM_DICT: Final = "knx_telegram_dict"
AsyncMessageCallbackType = Callable[[Telegram], Awaitable[None]]
MessageCallbackType = Callable[[Telegram], None]
SERVICE_KNX_SEND: Final = "send"
SERVICE_KNX_ATTR_PAYLOAD: Final = "payload"
SERVICE_KNX_ATTR_TYPE: Final = "type"
SERVICE_KNX_ATTR_RESPONSE: Final = "response"
SERVICE_KNX_ATTR_REMOVE: Final = "remove"
SERVICE_KNX_EVENT_REGISTER: Final = "event_register"
SERVICE_KNX_EXPOSURE_REGISTER: Final = "exposure_register"
SERVICE_KNX_READ: Final = "read"
class KNXConfigEntryData(TypedDict, total=False):
"""Config entry for the KNX integration."""

View File

@ -0,0 +1,284 @@
"""KNX integration services."""
from __future__ import annotations
from functools import partial
import logging
from typing import TYPE_CHECKING
import voluptuous as vol
from xknx.dpt import DPTArray, DPTBase, DPTBinary
from xknx.telegram import Telegram
from xknx.telegram.address import parse_device_group_address
from xknx.telegram.apci import GroupValueRead, GroupValueResponse, GroupValueWrite
from homeassistant.const import CONF_TYPE, SERVICE_RELOAD
from homeassistant.core import HomeAssistant, ServiceCall, callback
from homeassistant.exceptions import HomeAssistantError, ServiceValidationError
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.service import async_register_admin_service
from .const import (
DOMAIN,
KNX_ADDRESS,
SERVICE_KNX_ATTR_PAYLOAD,
SERVICE_KNX_ATTR_REMOVE,
SERVICE_KNX_ATTR_RESPONSE,
SERVICE_KNX_ATTR_TYPE,
SERVICE_KNX_EVENT_REGISTER,
SERVICE_KNX_EXPOSURE_REGISTER,
SERVICE_KNX_READ,
SERVICE_KNX_SEND,
)
from .expose import create_knx_exposure
from .schema import ExposeSchema, ga_validator, sensor_type_validator
if TYPE_CHECKING:
from . import KNXModule
_LOGGER = logging.getLogger(__name__)
@callback
def register_knx_services(hass: HomeAssistant) -> None:
"""Register KNX integration services."""
hass.services.async_register(
DOMAIN,
SERVICE_KNX_SEND,
partial(service_send_to_knx_bus, hass),
schema=SERVICE_KNX_SEND_SCHEMA,
)
hass.services.async_register(
DOMAIN,
SERVICE_KNX_READ,
partial(service_read_to_knx_bus, hass),
schema=SERVICE_KNX_READ_SCHEMA,
)
async_register_admin_service(
hass,
DOMAIN,
SERVICE_KNX_EVENT_REGISTER,
partial(service_event_register_modify, hass),
schema=SERVICE_KNX_EVENT_REGISTER_SCHEMA,
)
async_register_admin_service(
hass,
DOMAIN,
SERVICE_KNX_EXPOSURE_REGISTER,
partial(service_exposure_register_modify, hass),
schema=SERVICE_KNX_EXPOSURE_REGISTER_SCHEMA,
)
async_register_admin_service(
hass,
DOMAIN,
SERVICE_RELOAD,
partial(service_reload_integration, hass),
)
@callback
def get_knx_module(hass: HomeAssistant) -> KNXModule:
"""Return KNXModule instance."""
try:
return hass.data[DOMAIN] # type: ignore[no-any-return]
except KeyError as err:
raise HomeAssistantError("KNX entry not loaded") from err
SERVICE_KNX_EVENT_REGISTER_SCHEMA = vol.Schema(
{
vol.Required(KNX_ADDRESS): vol.All(
cv.ensure_list,
[ga_validator],
),
vol.Optional(CONF_TYPE): sensor_type_validator,
vol.Optional(SERVICE_KNX_ATTR_REMOVE, default=False): cv.boolean,
}
)
async def service_event_register_modify(hass: HomeAssistant, call: ServiceCall) -> None:
"""Service for adding or removing a GroupAddress to the knx_event filter."""
knx_module = get_knx_module(hass)
attr_address = call.data[KNX_ADDRESS]
group_addresses = list(map(parse_device_group_address, attr_address))
if call.data.get(SERVICE_KNX_ATTR_REMOVE):
for group_address in group_addresses:
try:
knx_module.knx_event_callback.group_addresses.remove(group_address)
except ValueError:
_LOGGER.warning(
"Service event_register could not remove event for '%s'",
str(group_address),
)
if group_address in knx_module.group_address_transcoder:
del knx_module.group_address_transcoder[group_address]
return
if (dpt := call.data.get(CONF_TYPE)) and (
transcoder := DPTBase.parse_transcoder(dpt)
):
knx_module.group_address_transcoder.update(
{
_address: transcoder # type: ignore[type-abstract]
for _address in group_addresses
}
)
for group_address in group_addresses:
if group_address in knx_module.knx_event_callback.group_addresses:
continue
knx_module.knx_event_callback.group_addresses.append(group_address)
_LOGGER.debug(
"Service event_register registered event for '%s'",
str(group_address),
)
SERVICE_KNX_EXPOSURE_REGISTER_SCHEMA = vol.Any(
ExposeSchema.EXPOSE_SENSOR_SCHEMA.extend(
{
vol.Optional(SERVICE_KNX_ATTR_REMOVE, default=False): cv.boolean,
}
),
vol.Schema(
# for removing only `address` is required
{
vol.Required(KNX_ADDRESS): ga_validator,
vol.Required(SERVICE_KNX_ATTR_REMOVE): vol.All(cv.boolean, True),
},
extra=vol.ALLOW_EXTRA,
),
)
async def service_exposure_register_modify(
hass: HomeAssistant, call: ServiceCall
) -> None:
"""Service for adding or removing an exposure to KNX bus."""
knx_module = get_knx_module(hass)
group_address = call.data[KNX_ADDRESS]
if call.data.get(SERVICE_KNX_ATTR_REMOVE):
try:
removed_exposure = knx_module.service_exposures.pop(group_address)
except KeyError as err:
raise ServiceValidationError(
f"Could not find exposure for '{group_address}' to remove."
) from err
removed_exposure.shutdown()
return
if group_address in knx_module.service_exposures:
replaced_exposure = knx_module.service_exposures.pop(group_address)
_LOGGER.warning(
(
"Service exposure_register replacing already registered exposure"
" for '%s' - %s"
),
group_address,
replaced_exposure.device.name,
)
replaced_exposure.shutdown()
exposure = create_knx_exposure(knx_module.hass, knx_module.xknx, call.data)
knx_module.service_exposures[group_address] = exposure
_LOGGER.debug(
"Service exposure_register registered exposure for '%s' - %s",
group_address,
exposure.device.name,
)
SERVICE_KNX_SEND_SCHEMA = vol.Any(
vol.Schema(
{
vol.Required(KNX_ADDRESS): vol.All(
cv.ensure_list,
[ga_validator],
),
vol.Required(SERVICE_KNX_ATTR_PAYLOAD): cv.match_all,
vol.Required(SERVICE_KNX_ATTR_TYPE): sensor_type_validator,
vol.Optional(SERVICE_KNX_ATTR_RESPONSE, default=False): cv.boolean,
}
),
vol.Schema(
# without type given payload is treated as raw bytes
{
vol.Required(KNX_ADDRESS): vol.All(
cv.ensure_list,
[ga_validator],
),
vol.Required(SERVICE_KNX_ATTR_PAYLOAD): vol.Any(
cv.positive_int, [cv.positive_int]
),
vol.Optional(SERVICE_KNX_ATTR_RESPONSE, default=False): cv.boolean,
}
),
)
async def service_send_to_knx_bus(hass: HomeAssistant, call: ServiceCall) -> None:
"""Service for sending an arbitrary KNX message to the KNX bus."""
knx_module = get_knx_module(hass)
attr_address = call.data[KNX_ADDRESS]
attr_payload = call.data[SERVICE_KNX_ATTR_PAYLOAD]
attr_type = call.data.get(SERVICE_KNX_ATTR_TYPE)
attr_response = call.data[SERVICE_KNX_ATTR_RESPONSE]
payload: DPTBinary | DPTArray
if attr_type is not None:
transcoder = DPTBase.parse_transcoder(attr_type)
if transcoder is None:
raise ValueError(f"Invalid type for knx.send service: {attr_type}")
payload = transcoder.to_knx(attr_payload)
elif isinstance(attr_payload, int):
payload = DPTBinary(attr_payload)
else:
payload = DPTArray(attr_payload)
for address in attr_address:
telegram = Telegram(
destination_address=parse_device_group_address(address),
payload=GroupValueResponse(payload)
if attr_response
else GroupValueWrite(payload),
source_address=knx_module.xknx.current_address,
)
await knx_module.xknx.telegrams.put(telegram)
SERVICE_KNX_READ_SCHEMA = vol.Schema(
{
vol.Required(KNX_ADDRESS): vol.All(
cv.ensure_list,
[ga_validator],
)
}
)
async def service_read_to_knx_bus(hass: HomeAssistant, call: ServiceCall) -> None:
"""Service for sending a GroupValueRead telegram to the KNX bus."""
knx_module = get_knx_module(hass)
for address in call.data[KNX_ADDRESS]:
telegram = Telegram(
destination_address=parse_device_group_address(address),
payload=GroupValueRead(),
source_address=knx_module.xknx.current_address,
)
await knx_module.xknx.telegrams.put(telegram)
async def service_reload_integration(hass: HomeAssistant, call: ServiceCall) -> None:
"""Reload the integration."""
knx_module = get_knx_module(hass)
await hass.config_entries.async_reload(knx_module.entry.entry_id)
hass.bus.async_fire(f"event_{DOMAIN}_reloaded", context=call.context)

View File

@ -7,6 +7,7 @@ from xknx.telegram.apci import GroupValueResponse, GroupValueWrite
from homeassistant.components.knx import async_unload_entry as knx_async_unload_entry
from homeassistant.const import STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from .conftest import KNXTestKit
@ -274,3 +275,18 @@ async def test_reload_service(
)
mock_unload_entry.assert_called_once()
mock_setup_entry.assert_called_once()
async def test_service_setup_failed(hass: HomeAssistant, knx: KNXTestKit) -> None:
"""Test service setup failed."""
await knx.setup_integration({})
await knx.mock_config_entry.async_unload(hass)
with pytest.raises(HomeAssistantError) as exc_info:
await hass.services.async_call(
"knx",
"send",
{"address": "1/2/3", "payload": True, "response": False},
blocking=True,
)
assert str(exc_info.value) == "KNX entry not loaded"