1
mirror of https://github.com/home-assistant/core synced 2024-09-03 08:14:07 +02:00

Add support for IP secure to KNX config flow (#68906)

* Add support for TCP Secure in KNX config flow

* Add support for TCP Secure in KNX config flow

* Fix typing

* Fix import

* Move assert up to cover all possible cases
This commit is contained in:
Marvin Wichmann 2022-03-30 21:10:47 +02:00 committed by GitHub
parent 3ccec8f051
commit 9b05a1264a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 636 additions and 41 deletions

View File

@ -11,7 +11,7 @@ from xknx.core import XknxConnectionState
from xknx.core.telegram_queue import TelegramQueue
from xknx.dpt import DPTArray, DPTBase, DPTBinary
from xknx.exceptions import ConversionError, XKNXException
from xknx.io import ConnectionConfig, ConnectionType
from xknx.io import ConnectionConfig, ConnectionType, SecureConfig
from xknx.telegram import AddressFilter, Telegram
from xknx.telegram.address import (
DeviceGroupAddress,
@ -36,21 +36,28 @@ from homeassistant.helpers import discovery
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.reload import async_integration_yaml_config
from homeassistant.helpers.service import async_register_admin_service
from homeassistant.helpers.storage import STORAGE_DIR
from homeassistant.helpers.typing import ConfigType
from .const import (
CONF_KNX_CONNECTION_TYPE,
CONF_KNX_EXPOSE,
CONF_KNX_INDIVIDUAL_ADDRESS,
CONF_KNX_KNXKEY_FILENAME,
CONF_KNX_KNXKEY_PASSWORD,
CONF_KNX_LOCAL_IP,
CONF_KNX_MCAST_GRP,
CONF_KNX_MCAST_PORT,
CONF_KNX_RATE_LIMIT,
CONF_KNX_ROUTE_BACK,
CONF_KNX_ROUTING,
CONF_KNX_SECURE_DEVICE_AUTHENTICATION,
CONF_KNX_SECURE_USER_ID,
CONF_KNX_SECURE_USER_PASSWORD,
CONF_KNX_STATE_UPDATER,
CONF_KNX_TUNNELING,
CONF_KNX_TUNNELING_TCP,
CONF_KNX_TUNNELING_TCP_SECURE,
DATA_HASS_CONFIG,
DATA_KNX_CONFIG,
DOMAIN,
@ -399,6 +406,31 @@ class KNXModule:
auto_reconnect=True,
threaded=True,
)
if _conn_type == CONF_KNX_TUNNELING_TCP_SECURE:
knxkeys_file: str | None = (
self.hass.config.path(
STORAGE_DIR,
self.entry.data[CONF_KNX_KNXKEY_FILENAME],
)
if self.entry.data.get(CONF_KNX_KNXKEY_FILENAME) is not None
else None
)
return ConnectionConfig(
connection_type=ConnectionType.TUNNELING_TCP_SECURE,
gateway_ip=self.entry.data[CONF_HOST],
gateway_port=self.entry.data[CONF_PORT],
secure_config=SecureConfig(
user_id=self.entry.data.get(CONF_KNX_SECURE_USER_ID),
user_password=self.entry.data.get(CONF_KNX_SECURE_USER_PASSWORD),
device_authentication_password=self.entry.data.get(
CONF_KNX_SECURE_DEVICE_AUTHENTICATION
),
knxkeys_password=self.entry.data.get(CONF_KNX_KNXKEY_PASSWORD),
knxkeys_file_path=knxkeys_file,
),
auto_reconnect=True,
threaded=True,
)
return ConnectionConfig(
auto_reconnect=True,
threaded=True,

View File

@ -5,8 +5,10 @@ from typing import Any, Final
import voluptuous as vol
from xknx import XKNX
from xknx.exceptions.exception import InvalidSignature
from xknx.io import DEFAULT_MCAST_GRP, DEFAULT_MCAST_PORT
from xknx.io.gateway_scanner import GatewayDescriptor, GatewayScanner
from xknx.secure import load_key_ring
from homeassistant import config_entries
from homeassistant.config_entries import ConfigEntry, OptionsFlow
@ -14,6 +16,7 @@ from homeassistant.const import CONF_HOST, CONF_PORT
from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowResult
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.storage import STORAGE_DIR
from .const import (
CONF_KNX_AUTOMATIC,
@ -22,23 +25,31 @@ from .const import (
CONF_KNX_DEFAULT_STATE_UPDATER,
CONF_KNX_INDIVIDUAL_ADDRESS,
CONF_KNX_INITIAL_CONNECTION_TYPES,
CONF_KNX_KNXKEY_FILENAME,
CONF_KNX_KNXKEY_PASSWORD,
CONF_KNX_LOCAL_IP,
CONF_KNX_MCAST_GRP,
CONF_KNX_MCAST_PORT,
CONF_KNX_RATE_LIMIT,
CONF_KNX_ROUTE_BACK,
CONF_KNX_ROUTING,
CONF_KNX_SECURE_DEVICE_AUTHENTICATION,
CONF_KNX_SECURE_USER_ID,
CONF_KNX_SECURE_USER_PASSWORD,
CONF_KNX_STATE_UPDATER,
CONF_KNX_TUNNELING,
CONF_KNX_TUNNELING_TCP,
CONF_KNX_TUNNELING_TCP_SECURE,
CONST_KNX_STORAGE_KEY,
DOMAIN,
KNXConfigEntryData,
)
CONF_KNX_GATEWAY: Final = "gateway"
CONF_MAX_RATE_LIMIT: Final = 60
CONF_DEFAULT_LOCAL_IP: Final = "0.0.0.0"
DEFAULT_ENTRY_DATA: Final = {
DEFAULT_ENTRY_DATA: KNXConfigEntryData = {
CONF_KNX_STATE_UPDATER: CONF_KNX_DEFAULT_STATE_UPDATER,
CONF_KNX_RATE_LIMIT: CONF_KNX_DEFAULT_RATE_LIMIT,
CONF_KNX_INDIVIDUAL_ADDRESS: XKNX.DEFAULT_ADDRESS,
@ -48,6 +59,7 @@ DEFAULT_ENTRY_DATA: Final = {
CONF_KNX_TUNNELING_TYPE: Final = "tunneling_type"
CONF_KNX_LABEL_TUNNELING_TCP: Final = "TCP"
CONF_KNX_LABEL_TUNNELING_TCP_SECURE: Final = "TCP with IP Secure"
CONF_KNX_LABEL_TUNNELING_UDP: Final = "UDP"
CONF_KNX_LABEL_TUNNELING_UDP_ROUTE_BACK: Final = "UDP with route back / NAT mode"
@ -59,6 +71,7 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
_found_tunnels: list[GatewayDescriptor]
_selected_tunnel: GatewayDescriptor | None
_tunneling_config: KNXConfigEntryData | None
@staticmethod
@callback
@ -73,6 +86,7 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
self._found_tunnels = []
self._selected_tunnel = None
self._tunneling_config = None
return await self.async_step_type()
async def async_step_type(self, user_input: dict | None = None) -> FlowResult:
@ -80,9 +94,13 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
if user_input is not None:
connection_type = user_input[CONF_KNX_CONNECTION_TYPE]
if connection_type == CONF_KNX_AUTOMATIC:
entry_data: KNXConfigEntryData = {
**DEFAULT_ENTRY_DATA, # type: ignore[misc]
CONF_KNX_CONNECTION_TYPE: user_input[CONF_KNX_CONNECTION_TYPE],
}
return self.async_create_entry(
title=CONF_KNX_AUTOMATIC.capitalize(),
data={**DEFAULT_ENTRY_DATA, **user_input},
data=entry_data,
)
if connection_type == CONF_KNX_ROUTING:
@ -95,7 +113,6 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
errors: dict = {}
supported_connection_types = CONF_KNX_INITIAL_CONNECTION_TYPES.copy()
fields = {}
gateways = await scan_for_gateways()
if gateways:
@ -142,31 +159,40 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""Manually configure tunnel connection parameters. Fields default to preselected gateway if one was found."""
if user_input is not None:
connection_type = user_input[CONF_KNX_TUNNELING_TYPE]
entry_data: KNXConfigEntryData = {
**DEFAULT_ENTRY_DATA, # type: ignore[misc]
CONF_HOST: user_input[CONF_HOST],
CONF_PORT: user_input[CONF_PORT],
CONF_KNX_INDIVIDUAL_ADDRESS: user_input[CONF_KNX_INDIVIDUAL_ADDRESS],
CONF_KNX_ROUTE_BACK: (
connection_type == CONF_KNX_LABEL_TUNNELING_UDP_ROUTE_BACK
),
CONF_KNX_LOCAL_IP: user_input.get(CONF_KNX_LOCAL_IP),
CONF_KNX_CONNECTION_TYPE: (
CONF_KNX_TUNNELING_TCP
if connection_type == CONF_KNX_LABEL_TUNNELING_TCP
else CONF_KNX_TUNNELING
),
}
if connection_type == CONF_KNX_LABEL_TUNNELING_TCP_SECURE:
self._tunneling_config = entry_data
return self.async_show_menu(
step_id="secure_tunneling",
menu_options=["secure_knxkeys", "secure_manual"],
)
return self.async_create_entry(
title=f"{CONF_KNX_TUNNELING.capitalize()} @ {user_input[CONF_HOST]}",
data={
**DEFAULT_ENTRY_DATA,
CONF_HOST: user_input[CONF_HOST],
CONF_PORT: user_input[CONF_PORT],
CONF_KNX_INDIVIDUAL_ADDRESS: user_input[
CONF_KNX_INDIVIDUAL_ADDRESS
],
CONF_KNX_ROUTE_BACK: (
connection_type == CONF_KNX_LABEL_TUNNELING_UDP_ROUTE_BACK
),
CONF_KNX_LOCAL_IP: user_input.get(CONF_KNX_LOCAL_IP),
CONF_KNX_CONNECTION_TYPE: (
CONF_KNX_TUNNELING_TCP
if connection_type == CONF_KNX_LABEL_TUNNELING_TCP
else CONF_KNX_TUNNELING
),
},
data=entry_data,
)
errors: dict = {}
connection_methods: list[str] = [
CONF_KNX_LABEL_TUNNELING_TCP,
CONF_KNX_LABEL_TUNNELING_UDP,
CONF_KNX_LABEL_TUNNELING_TCP_SECURE,
CONF_KNX_LABEL_TUNNELING_UDP_ROUTE_BACK,
]
ip_address = ""
@ -193,6 +219,85 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
step_id="manual_tunnel", data_schema=vol.Schema(fields), errors=errors
)
async def async_step_secure_manual(
self, user_input: dict | None = None
) -> FlowResult:
"""Configure ip secure manually."""
errors: dict = {}
if user_input is not None:
assert self._tunneling_config
entry_data: KNXConfigEntryData = {
**self._tunneling_config, # type: ignore[misc]
CONF_KNX_SECURE_USER_ID: user_input[CONF_KNX_SECURE_USER_ID],
CONF_KNX_SECURE_USER_PASSWORD: user_input[
CONF_KNX_SECURE_USER_PASSWORD
],
CONF_KNX_SECURE_DEVICE_AUTHENTICATION: user_input[
CONF_KNX_SECURE_DEVICE_AUTHENTICATION
],
CONF_KNX_CONNECTION_TYPE: CONF_KNX_TUNNELING_TCP_SECURE,
}
return self.async_create_entry(
title=f"Secure {CONF_KNX_TUNNELING.capitalize()} @ {self._tunneling_config[CONF_HOST]}",
data=entry_data,
)
fields = {
vol.Required(CONF_KNX_SECURE_USER_ID): int,
vol.Required(CONF_KNX_SECURE_USER_PASSWORD): str,
vol.Required(CONF_KNX_SECURE_DEVICE_AUTHENTICATION): str,
}
return self.async_show_form(
step_id="secure_manual", data_schema=vol.Schema(fields), errors=errors
)
async def async_step_secure_knxkeys(
self, user_input: dict | None = None
) -> FlowResult:
"""Configure secure knxkeys used to authenticate."""
errors = {}
if user_input is not None:
try:
assert self._tunneling_config
storage_key: str = (
CONST_KNX_STORAGE_KEY + user_input[CONF_KNX_KNXKEY_FILENAME]
)
load_key_ring(
self.hass.config.path(
STORAGE_DIR,
storage_key,
),
user_input[CONF_KNX_KNXKEY_PASSWORD],
)
entry_data: KNXConfigEntryData = {
**self._tunneling_config, # type: ignore[misc]
CONF_KNX_KNXKEY_FILENAME: storage_key,
CONF_KNX_KNXKEY_PASSWORD: user_input[CONF_KNX_KNXKEY_PASSWORD],
CONF_KNX_CONNECTION_TYPE: CONF_KNX_TUNNELING_TCP_SECURE,
}
return self.async_create_entry(
title=f"Secure {CONF_KNX_TUNNELING.capitalize()} @ {self._tunneling_config[CONF_HOST]}",
data=entry_data,
)
except InvalidSignature:
errors["base"] = "invalid_signature"
except FileNotFoundError:
errors["base"] = "file_not_found"
fields = {
vol.Required(CONF_KNX_KNXKEY_FILENAME): str,
vol.Required(CONF_KNX_KNXKEY_PASSWORD): str,
}
return self.async_show_form(
step_id="secure_knxkeys", data_schema=vol.Schema(fields), errors=errors
)
async def async_step_routing(self, user_input: dict | None = None) -> FlowResult:
"""Routing setup."""
if user_input is not None:

View File

@ -1,6 +1,6 @@
"""Constants for the KNX integration."""
from enum import Enum
from typing import Final
from typing import Final, TypedDict
from homeassistant.components.climate.const import (
CURRENT_HVAC_COOL,
@ -39,15 +39,27 @@ CONF_KNX_AUTOMATIC: Final = "automatic"
CONF_KNX_ROUTING: Final = "routing"
CONF_KNX_TUNNELING: Final = "tunneling"
CONF_KNX_TUNNELING_TCP: Final = "tunneling_tcp"
CONF_KNX_LOCAL_IP = "local_ip"
CONF_KNX_MCAST_GRP = "multicast_group"
CONF_KNX_MCAST_PORT = "multicast_port"
CONF_KNX_TUNNELING_TCP_SECURE: Final = "tunneling_tcp_secure"
CONF_KNX_LOCAL_IP: Final = "local_ip"
CONF_KNX_MCAST_GRP: Final = "multicast_group"
CONF_KNX_MCAST_PORT: Final = "multicast_port"
CONF_KNX_RATE_LIMIT = "rate_limit"
CONF_KNX_ROUTE_BACK = "route_back"
CONF_KNX_STATE_UPDATER = "state_updater"
CONF_KNX_DEFAULT_STATE_UPDATER = True
CONF_KNX_DEFAULT_RATE_LIMIT = 20
CONF_KNX_RATE_LIMIT: Final = "rate_limit"
CONF_KNX_ROUTE_BACK: Final = "route_back"
CONF_KNX_STATE_UPDATER: Final = "state_updater"
CONF_KNX_DEFAULT_STATE_UPDATER: Final = True
CONF_KNX_DEFAULT_RATE_LIMIT: Final = 20
##
# Secure constants
##
CONST_KNX_STORAGE_KEY: Final = "knx/"
CONF_KNX_KNXKEY_FILENAME: Final = "knxkeys_filename"
CONF_KNX_KNXKEY_PASSWORD: Final = "knxkeys_password"
CONF_KNX_SECURE_USER_ID: Final = "user_id"
CONF_KNX_SECURE_USER_PASSWORD: Final = "user_password"
CONF_KNX_SECURE_DEVICE_AUTHENTICATION: Final = "device_authentication"
CONF_PAYLOAD: Final = "payload"
@ -67,6 +79,27 @@ ATTR_COUNTER: Final = "counter"
ATTR_SOURCE: Final = "source"
class KNXConfigEntryData(TypedDict, total=False):
"""Config entry for the KNX integration."""
connection_type: str
individual_address: str
local_ip: str
multicast_group: str
multicast_port: int
route_back: bool
state_updater: bool
rate_limit: int
host: str
port: int
user_id: int
user_password: str
device_authentication: str
knxkeys_filename: str
knxkeys_password: str
class ColorTempModes(Enum):
"""Color temperature modes for config validation."""

View File

@ -6,11 +6,23 @@ from typing import Any
import voluptuous as vol
from homeassistant import config as conf_util
from homeassistant.components.diagnostics import async_redact_data
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from . import CONFIG_SCHEMA
from .const import DOMAIN
from .const import (
CONF_KNX_KNXKEY_PASSWORD,
CONF_KNX_SECURE_DEVICE_AUTHENTICATION,
CONF_KNX_SECURE_USER_PASSWORD,
DOMAIN,
)
TO_REDACT = {
CONF_KNX_KNXKEY_PASSWORD,
CONF_KNX_SECURE_USER_PASSWORD,
CONF_KNX_SECURE_DEVICE_AUTHENTICATION,
}
async def async_get_config_entry_diagnostics(
@ -24,7 +36,7 @@ async def async_get_config_entry_diagnostics(
"current_address": str(knx_module.xknx.current_address),
}
diag["config_entry_data"] = dict(config_entry.data)
diag["config_entry_data"] = async_redact_data(dict(config_entry.data), TO_REDACT)
raw_config = await conf_util.async_hass_config_yaml(hass)
diag["configuration_yaml"] = raw_config.get(DOMAIN)

View File

@ -23,6 +23,28 @@
"local_ip": "Local IP of Home Assistant (leave empty for automatic detection)"
}
},
"secure_tunneling": {
"description": "Select how you want to configure IP Secure.",
"menu_options": {
"secure_knxkeys": "Configure a knxkeys file containing IP secure information",
"secure_manual": "Configure IP secure manually"
}
},
"secure_knxkeys": {
"description": "Please enter the information for your knxkeys file.",
"data": {
"knxkeys_filename": "The full name of your knxkeys file",
"knxkeys_password": "The password to decrypt the knxkeys file"
}
},
"secure_manual": {
"description": "Please enter the IP secure information.",
"data": {
"user_id": "User ID",
"user_password": "User password",
"device_authentication": "Device authentication password"
}
},
"routing": {
"description": "Please configure the routing options.",
"data": {
@ -38,7 +60,9 @@
"single_instance_allowed": "[%key:common::config_flow::abort::single_instance_allowed%]"
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]"
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_signature": "The password to decrypt the knxkeys file is wrong.",
"file_not_found": "The specified knxkeys file was not found in the path config/.storage/knx/"
}
},
"options": {

View File

@ -5,7 +5,9 @@
"single_instance_allowed": "Already configured. Only a single configuration possible."
},
"error": {
"cannot_connect": "Failed to connect"
"cannot_connect": "Failed to connect",
"file_not_found": "The specified knxkeys file was not found in the path config/.storage/knx/",
"invalid_signature": "The password to decrypt the knxkeys file is wrong."
},
"step": {
"manual_tunnel": {
@ -14,7 +16,6 @@
"individual_address": "Individual address for the connection",
"local_ip": "Local IP of Home Assistant (leave empty for automatic detection)",
"port": "Port",
"route_back": "Route Back / NAT Mode",
"tunneling_type": "KNX Tunneling Type"
},
"description": "Please enter the connection information of your tunneling device."
@ -28,6 +29,28 @@
},
"description": "Please configure the routing options."
},
"secure_knxkeys": {
"data": {
"knxkeys_filename": "The full name of your knxkeys file",
"knxkeys_password": "The password to decrypt the knxkeys file."
},
"description": "Please enter the information for your knxkeys file."
},
"secure_tunneling": {
"description": "Select how you want to configure IP Secure.",
"menu_options": {
"secure_knxkeys": "Configure a knxkeys file containing IP secure information",
"secure_manual": "Configure IP secure manually"
}
},
"secure_manual": {
"description": "Please enter the IP secure information.",
"data": {
"user_id": "User ID",
"user_password": "User password",
"device_authentication": "Device authentication password"
}
},
"tunnel": {
"data": {
"gateway": "KNX Tunnel Connection"
@ -58,9 +81,7 @@
"tunnel": {
"data": {
"host": "Host",
"local_ip": "Local IP (leave empty if unsure)",
"port": "Port",
"route_back": "Route Back / NAT Mode",
"tunneling_type": "KNX Tunneling Type"
}
}

View File

@ -2,6 +2,7 @@
from unittest.mock import patch
import pytest
from xknx.exceptions.exception import InvalidSignature
from xknx.io import DEFAULT_MCAST_GRP
from xknx.io.gateway_scanner import GatewayDescriptor
@ -10,6 +11,7 @@ from homeassistant.components.knx.config_flow import (
CONF_DEFAULT_LOCAL_IP,
CONF_KNX_GATEWAY,
CONF_KNX_LABEL_TUNNELING_TCP,
CONF_KNX_LABEL_TUNNELING_TCP_SECURE,
CONF_KNX_LABEL_TUNNELING_UDP,
CONF_KNX_LABEL_TUNNELING_UDP_ROUTE_BACK,
CONF_KNX_TUNNELING_TYPE,
@ -20,20 +22,30 @@ from homeassistant.components.knx.const import (
CONF_KNX_AUTOMATIC,
CONF_KNX_CONNECTION_TYPE,
CONF_KNX_INDIVIDUAL_ADDRESS,
CONF_KNX_KNXKEY_FILENAME,
CONF_KNX_KNXKEY_PASSWORD,
CONF_KNX_LOCAL_IP,
CONF_KNX_MCAST_GRP,
CONF_KNX_MCAST_PORT,
CONF_KNX_RATE_LIMIT,
CONF_KNX_ROUTE_BACK,
CONF_KNX_ROUTING,
CONF_KNX_SECURE_DEVICE_AUTHENTICATION,
CONF_KNX_SECURE_USER_ID,
CONF_KNX_SECURE_USER_PASSWORD,
CONF_KNX_STATE_UPDATER,
CONF_KNX_TUNNELING,
CONF_KNX_TUNNELING_TCP,
CONF_KNX_TUNNELING_TCP_SECURE,
DOMAIN,
)
from homeassistant.const import CONF_HOST, CONF_PORT
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import RESULT_TYPE_CREATE_ENTRY, RESULT_TYPE_FORM
from homeassistant.data_entry_flow import (
RESULT_TYPE_CREATE_ENTRY,
RESULT_TYPE_FORM,
RESULT_TYPE_MENU,
)
from tests.common import MockConfigEntry
@ -426,6 +438,184 @@ async def test_form_with_automatic_connection_handling(hass: HomeAssistant) -> N
assert len(mock_setup_entry.mock_calls) == 1
async def _get_menu_step(hass: HomeAssistant) -> None:
"""Test ip secure manuel."""
gateway = _gateway_descriptor("192.168.0.1", 3675, True)
with patch("xknx.io.gateway_scanner.GatewayScanner.scan") as gateways:
gateways.return_value = [gateway]
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == RESULT_TYPE_FORM
assert not result["errors"]
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_KNX_CONNECTION_TYPE: CONF_KNX_TUNNELING,
},
)
await hass.async_block_till_done()
assert result2["type"] == RESULT_TYPE_FORM
assert result2["step_id"] == "manual_tunnel"
assert not result2["errors"]
result3 = await hass.config_entries.flow.async_configure(
result2["flow_id"],
{
CONF_KNX_TUNNELING_TYPE: CONF_KNX_LABEL_TUNNELING_TCP_SECURE,
CONF_HOST: "192.168.0.1",
CONF_PORT: 3675,
},
)
await hass.async_block_till_done()
assert result3["type"] == RESULT_TYPE_MENU
assert result3["step_id"] == "secure_tunneling"
return result3
async def test_configure_secure_manual(hass: HomeAssistant):
"""Test configure secure manual."""
menu_step = await _get_menu_step(hass)
result = await hass.config_entries.flow.async_configure(
menu_step["flow_id"],
{"next_step_id": "secure_manual"},
)
assert result["type"] == RESULT_TYPE_FORM
assert result["step_id"] == "secure_manual"
assert not result["errors"]
with patch(
"homeassistant.components.knx.async_setup_entry",
return_value=True,
) as mock_setup_entry:
secure_manual = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_KNX_SECURE_USER_ID: 2,
CONF_KNX_SECURE_USER_PASSWORD: "password",
CONF_KNX_SECURE_DEVICE_AUTHENTICATION: "device_auth",
},
)
await hass.async_block_till_done()
assert secure_manual["type"] == RESULT_TYPE_CREATE_ENTRY
assert secure_manual["data"] == {
**DEFAULT_ENTRY_DATA,
CONF_KNX_CONNECTION_TYPE: CONF_KNX_TUNNELING_TCP_SECURE,
CONF_KNX_SECURE_USER_ID: 2,
CONF_KNX_SECURE_USER_PASSWORD: "password",
CONF_KNX_SECURE_DEVICE_AUTHENTICATION: "device_auth",
CONF_HOST: "192.168.0.1",
CONF_PORT: 3675,
CONF_KNX_INDIVIDUAL_ADDRESS: "15.15.250",
CONF_KNX_ROUTE_BACK: False,
CONF_KNX_LOCAL_IP: None,
}
assert len(mock_setup_entry.mock_calls) == 1
async def test_configure_secure_knxkeys(hass: HomeAssistant):
"""Test configure secure knxkeys."""
menu_step = await _get_menu_step(hass)
result = await hass.config_entries.flow.async_configure(
menu_step["flow_id"],
{"next_step_id": "secure_knxkeys"},
)
assert result["type"] == RESULT_TYPE_FORM
assert result["step_id"] == "secure_knxkeys"
assert not result["errors"]
with patch(
"homeassistant.components.knx.async_setup_entry",
return_value=True,
) as mock_setup_entry, patch(
"homeassistant.components.knx.config_flow.load_key_ring", return_value=True
):
secure_knxkeys = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_KNX_KNXKEY_FILENAME: "testcase.knxkeys",
CONF_KNX_KNXKEY_PASSWORD: "password",
},
)
await hass.async_block_till_done()
assert secure_knxkeys["type"] == RESULT_TYPE_CREATE_ENTRY
assert secure_knxkeys["data"] == {
**DEFAULT_ENTRY_DATA,
CONF_KNX_CONNECTION_TYPE: CONF_KNX_TUNNELING_TCP_SECURE,
CONF_KNX_KNXKEY_FILENAME: "knx/testcase.knxkeys",
CONF_KNX_KNXKEY_PASSWORD: "password",
CONF_HOST: "192.168.0.1",
CONF_PORT: 3675,
CONF_KNX_INDIVIDUAL_ADDRESS: "15.15.250",
CONF_KNX_ROUTE_BACK: False,
CONF_KNX_LOCAL_IP: None,
}
assert len(mock_setup_entry.mock_calls) == 1
async def test_configure_secure_knxkeys_file_not_found(hass: HomeAssistant):
"""Test configure secure knxkeys but file was not found."""
menu_step = await _get_menu_step(hass)
result = await hass.config_entries.flow.async_configure(
menu_step["flow_id"],
{"next_step_id": "secure_knxkeys"},
)
assert result["type"] == RESULT_TYPE_FORM
assert result["step_id"] == "secure_knxkeys"
assert not result["errors"]
with patch(
"homeassistant.components.knx.config_flow.load_key_ring",
side_effect=FileNotFoundError(),
):
secure_knxkeys = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_KNX_KNXKEY_FILENAME: "testcase.knxkeys",
CONF_KNX_KNXKEY_PASSWORD: "password",
},
)
await hass.async_block_till_done()
assert secure_knxkeys["type"] == RESULT_TYPE_FORM
assert secure_knxkeys["errors"]
assert secure_knxkeys["errors"]["base"] == "file_not_found"
async def test_configure_secure_knxkeys_invalid_signature(hass: HomeAssistant):
"""Test configure secure knxkeys but file was not found."""
menu_step = await _get_menu_step(hass)
result = await hass.config_entries.flow.async_configure(
menu_step["flow_id"],
{"next_step_id": "secure_knxkeys"},
)
assert result["type"] == RESULT_TYPE_FORM
assert result["step_id"] == "secure_knxkeys"
assert not result["errors"]
with patch(
"homeassistant.components.knx.config_flow.load_key_ring",
side_effect=InvalidSignature(),
):
secure_knxkeys = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_KNX_KNXKEY_FILENAME: "testcase.knxkeys",
CONF_KNX_KNXKEY_PASSWORD: "password",
},
)
await hass.async_block_till_done()
assert secure_knxkeys["type"] == RESULT_TYPE_FORM
assert secure_knxkeys["errors"]
assert secure_knxkeys["errors"]["base"] == "invalid_signature"
async def test_options_flow(
hass: HomeAssistant, mock_config_entry: MockConfigEntry
) -> None:

View File

@ -2,7 +2,24 @@
from unittest.mock import patch
from aiohttp import ClientSession
from xknx import XKNX
from xknx.io import DEFAULT_MCAST_GRP, DEFAULT_MCAST_PORT
from homeassistant.components.knx.const import (
CONF_KNX_AUTOMATIC,
CONF_KNX_CONNECTION_TYPE,
CONF_KNX_DEFAULT_RATE_LIMIT,
CONF_KNX_DEFAULT_STATE_UPDATER,
CONF_KNX_INDIVIDUAL_ADDRESS,
CONF_KNX_KNXKEY_PASSWORD,
CONF_KNX_MCAST_GRP,
CONF_KNX_MCAST_PORT,
CONF_KNX_RATE_LIMIT,
CONF_KNX_SECURE_DEVICE_AUTHENTICATION,
CONF_KNX_SECURE_USER_PASSWORD,
CONF_KNX_STATE_UPDATER,
DOMAIN as KNX_DOMAIN,
)
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry
@ -69,3 +86,49 @@ async def test_diagnostic_config_error(
"configuration_yaml": {"wrong_key": {}},
"xknx": {"current_address": "0.0.0", "version": "1.0.0"},
}
async def test_diagnostic_redact(
hass: HomeAssistant,
hass_client: ClientSession,
):
"""Test diagnostics redacting data."""
mock_config_entry: MockConfigEntry = MockConfigEntry(
title="KNX",
domain=KNX_DOMAIN,
data={
CONF_KNX_CONNECTION_TYPE: CONF_KNX_AUTOMATIC,
CONF_KNX_RATE_LIMIT: CONF_KNX_DEFAULT_RATE_LIMIT,
CONF_KNX_STATE_UPDATER: CONF_KNX_DEFAULT_STATE_UPDATER,
CONF_KNX_MCAST_PORT: DEFAULT_MCAST_PORT,
CONF_KNX_MCAST_GRP: DEFAULT_MCAST_GRP,
CONF_KNX_INDIVIDUAL_ADDRESS: XKNX.DEFAULT_ADDRESS,
CONF_KNX_KNXKEY_PASSWORD: "password",
CONF_KNX_SECURE_USER_PASSWORD: "user_password",
CONF_KNX_SECURE_DEVICE_AUTHENTICATION: "device_authentication",
},
)
knx: KNXTestKit = KNXTestKit(hass, mock_config_entry)
await knx.setup_integration({})
with patch("homeassistant.config.async_hass_config_yaml", return_value={}):
# Overwrite the version for this test since we don't want to change this with every library bump
knx.xknx.version = "1.0.0"
assert await get_diagnostics_for_config_entry(
hass, hass_client, mock_config_entry
) == {
"config_entry_data": {
"connection_type": "automatic",
"individual_address": "15.15.250",
"multicast_group": "224.0.23.12",
"multicast_port": 3671,
"rate_limit": 20,
"state_updater": True,
"knxkeys_password": "**REDACTED**",
"user_password": "**REDACTED**",
"device_authentication": "**REDACTED**",
},
"configuration_error": None,
"configuration_yaml": None,
"xknx": {"current_address": "0.0.0", "version": "1.0.0"},
}

View File

@ -6,6 +6,7 @@ from xknx.io import (
DEFAULT_MCAST_PORT,
ConnectionConfig,
ConnectionType,
SecureConfig,
)
from homeassistant.components.knx.const import (
@ -14,15 +15,23 @@ from homeassistant.components.knx.const import (
CONF_KNX_DEFAULT_RATE_LIMIT,
CONF_KNX_DEFAULT_STATE_UPDATER,
CONF_KNX_INDIVIDUAL_ADDRESS,
CONF_KNX_KNXKEY_FILENAME,
CONF_KNX_KNXKEY_PASSWORD,
CONF_KNX_LOCAL_IP,
CONF_KNX_MCAST_GRP,
CONF_KNX_MCAST_PORT,
CONF_KNX_RATE_LIMIT,
CONF_KNX_ROUTE_BACK,
CONF_KNX_ROUTING,
CONF_KNX_SECURE_DEVICE_AUTHENTICATION,
CONF_KNX_SECURE_USER_ID,
CONF_KNX_SECURE_USER_PASSWORD,
CONF_KNX_STATE_UPDATER,
CONF_KNX_TUNNELING,
CONF_KNX_TUNNELING_TCP,
CONF_KNX_TUNNELING_TCP_SECURE,
DOMAIN as KNX_DOMAIN,
KNXConfigEntryData,
)
from homeassistant.const import CONF_HOST, CONF_PORT
from homeassistant.core import HomeAssistant
@ -85,10 +94,83 @@ from tests.common import MockConfigEntry
threaded=True,
),
),
(
{
CONF_KNX_CONNECTION_TYPE: CONF_KNX_TUNNELING_TCP,
CONF_HOST: "192.168.0.2",
CONF_PORT: 3675,
CONF_KNX_RATE_LIMIT: CONF_KNX_DEFAULT_RATE_LIMIT,
CONF_KNX_STATE_UPDATER: CONF_KNX_DEFAULT_STATE_UPDATER,
CONF_KNX_MCAST_PORT: DEFAULT_MCAST_PORT,
CONF_KNX_MCAST_GRP: DEFAULT_MCAST_GRP,
CONF_KNX_INDIVIDUAL_ADDRESS: XKNX.DEFAULT_ADDRESS,
},
ConnectionConfig(
connection_type=ConnectionType.TUNNELING_TCP,
gateway_ip="192.168.0.2",
gateway_port=3675,
auto_reconnect=True,
threaded=True,
),
),
(
{
CONF_KNX_CONNECTION_TYPE: CONF_KNX_TUNNELING_TCP_SECURE,
CONF_HOST: "192.168.0.2",
CONF_PORT: 3675,
CONF_KNX_RATE_LIMIT: CONF_KNX_DEFAULT_RATE_LIMIT,
CONF_KNX_STATE_UPDATER: CONF_KNX_DEFAULT_STATE_UPDATER,
CONF_KNX_MCAST_PORT: DEFAULT_MCAST_PORT,
CONF_KNX_MCAST_GRP: DEFAULT_MCAST_GRP,
CONF_KNX_INDIVIDUAL_ADDRESS: XKNX.DEFAULT_ADDRESS,
CONF_KNX_KNXKEY_FILENAME: "knx/testcase.knxkeys",
CONF_KNX_KNXKEY_PASSWORD: "password",
},
ConnectionConfig(
connection_type=ConnectionType.TUNNELING_TCP_SECURE,
gateway_ip="192.168.0.2",
gateway_port=3675,
secure_config=SecureConfig(
knxkeys_file_path="testcase.knxkeys", knxkeys_password="password"
),
auto_reconnect=True,
threaded=True,
),
),
(
{
CONF_KNX_CONNECTION_TYPE: CONF_KNX_TUNNELING_TCP_SECURE,
CONF_HOST: "192.168.0.2",
CONF_PORT: 3675,
CONF_KNX_RATE_LIMIT: CONF_KNX_DEFAULT_RATE_LIMIT,
CONF_KNX_STATE_UPDATER: CONF_KNX_DEFAULT_STATE_UPDATER,
CONF_KNX_MCAST_PORT: DEFAULT_MCAST_PORT,
CONF_KNX_MCAST_GRP: DEFAULT_MCAST_GRP,
CONF_KNX_INDIVIDUAL_ADDRESS: XKNX.DEFAULT_ADDRESS,
CONF_KNX_SECURE_USER_ID: 2,
CONF_KNX_SECURE_USER_PASSWORD: "password",
CONF_KNX_SECURE_DEVICE_AUTHENTICATION: "device_auth",
},
ConnectionConfig(
connection_type=ConnectionType.TUNNELING_TCP_SECURE,
gateway_ip="192.168.0.2",
gateway_port=3675,
secure_config=SecureConfig(
device_authentication_password="device_auth",
user_password="password",
user_id=2,
),
auto_reconnect=True,
threaded=True,
),
),
],
)
async def test_init_connection_handling(
hass: HomeAssistant, knx: KNXTestKit, config_entry_data, connection_config
hass: HomeAssistant,
knx: KNXTestKit,
config_entry_data: KNXConfigEntryData,
connection_config: ConnectionConfig,
):
"""Test correctly generating connection config."""
@ -102,6 +184,39 @@ async def test_init_connection_handling(
assert hass.data.get(KNX_DOMAIN) is not None
assert (
hass.data[KNX_DOMAIN].connection_config().__dict__ == connection_config.__dict__
original_connection_config = (
hass.data[KNX_DOMAIN].connection_config().__dict__.copy()
)
del original_connection_config["secure_config"]
connection_config_dict = connection_config.__dict__.copy()
del connection_config_dict["secure_config"]
assert original_connection_config == connection_config_dict
if connection_config.secure_config is not None:
assert (
hass.data[KNX_DOMAIN].connection_config().secure_config.knxkeys_password
== connection_config.secure_config.knxkeys_password
)
assert (
hass.data[KNX_DOMAIN].connection_config().secure_config.user_password
== connection_config.secure_config.user_password
)
assert (
hass.data[KNX_DOMAIN].connection_config().secure_config.user_id
== connection_config.secure_config.user_id
)
assert (
hass.data[KNX_DOMAIN]
.connection_config()
.secure_config.device_authentication_password
== connection_config.secure_config.device_authentication_password
)
if connection_config.secure_config.knxkeys_file_path is not None:
assert (
connection_config.secure_config.knxkeys_file_path
in hass.data[KNX_DOMAIN]
.connection_config()
.secure_config.knxkeys_file_path
)