Implement local discovery of Smappee legacy devices (#37812)

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
bsmappee 2020-08-10 13:34:18 +02:00 committed by GitHub
parent 65f1e0c71a
commit 07de9deab6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 708 additions and 60 deletions

View File

@ -5,7 +5,12 @@ from pysmappee import Smappee
import voluptuous as vol
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_CLIENT_ID, CONF_CLIENT_SECRET, CONF_PLATFORM
from homeassistant.const import (
CONF_CLIENT_ID,
CONF_CLIENT_SECRET,
CONF_IP_ADDRESS,
CONF_PLATFORM,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_entry_oauth2_flow, config_validation as cv
from homeassistant.util import Throttle
@ -13,7 +18,7 @@ from homeassistant.util import Throttle
from . import api, config_flow
from .const import (
AUTHORIZE_URL,
BASE,
CONF_SERIALNUMBER,
DOMAIN,
MIN_TIME_BETWEEN_UPDATES,
SMAPPEE_PLATFORMS,
@ -40,11 +45,14 @@ async def async_setup(hass: HomeAssistant, config: dict):
if DOMAIN not in config:
return True
client_id = config[DOMAIN][CONF_CLIENT_ID]
hass.data[DOMAIN][client_id] = {}
# decide platform
platform = "PRODUCTION"
if config[DOMAIN][CONF_CLIENT_ID] == "homeassistant_f2":
if client_id == "homeassistant_f2":
platform = "ACCEPTANCE"
elif config[DOMAIN][CONF_CLIENT_ID] == "homeassistant_f3":
elif client_id == "homeassistant_f3":
platform = "DEVELOPMENT"
hass.data[DOMAIN][CONF_PLATFORM] = platform
@ -65,17 +73,22 @@ async def async_setup(hass: HomeAssistant, config: dict):
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
"""Set up Smappee from a config entry."""
implementation = await config_entry_oauth2_flow.async_get_config_entry_implementation(
hass, entry
)
"""Set up Smappee from a zeroconf or config entry."""
if CONF_IP_ADDRESS in entry.data:
smappee_api = api.api.SmappeeLocalApi(ip=entry.data[CONF_IP_ADDRESS])
smappee = Smappee(api=smappee_api, serialnumber=entry.data[CONF_SERIALNUMBER])
await hass.async_add_executor_job(smappee.load_local_service_location)
else:
implementation = await config_entry_oauth2_flow.async_get_config_entry_implementation(
hass, entry
)
smappee_api = api.ConfigEntrySmappeeApi(hass, entry, implementation)
smappee_api = api.ConfigEntrySmappeeApi(hass, entry, implementation)
smappee = Smappee(smappee_api)
await hass.async_add_executor_job(smappee.load_service_locations)
smappee = Smappee(api=smappee_api)
await hass.async_add_executor_job(smappee.load_service_locations)
hass.data[DOMAIN][BASE] = SmappeeBase(hass, smappee)
hass.data[DOMAIN][entry.entry_id] = SmappeeBase(hass, smappee)
for component in SMAPPEE_PLATFORMS:
hass.async_create_task(
@ -97,8 +110,7 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry):
)
if unload_ok:
hass.data[DOMAIN].pop(BASE, None)
hass.data[DOMAIN].pop(CONF_PLATFORM, None)
hass.data[DOMAIN].pop(entry.entry_id, None)
return unload_ok

View File

@ -3,7 +3,7 @@ import logging
from homeassistant.components.binary_sensor import BinarySensorEntity
from .const import BASE, DOMAIN
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
@ -13,7 +13,7 @@ PRESENCE_PREFIX = "Presence"
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up the Smappee binary sensor."""
smappee_base = hass.data[DOMAIN][BASE]
smappee_base = hass.data[DOMAIN][config_entry.entry_id]
entities = []
for service_location in smappee_base.smappee.service_locations.values():
@ -29,7 +29,9 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
)
)
entities.append(SmappeePresence(smappee_base, service_location))
if not smappee_base.smappee.local_polling:
# presence value only available in cloud env
entities.append(SmappeePresence(smappee_base, service_location))
async_add_entities(entities, True)

View File

@ -1,10 +1,14 @@
"""Config flow for Smappee."""
import logging
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import CONF_HOST, CONF_IP_ADDRESS
from homeassistant.helpers import config_entry_oauth2_flow
from .const import DOMAIN # pylint: disable=unused-import
from . import api
from .const import CONF_HOSTNAME, CONF_SERIALNUMBER, DOMAIN, ENV_CLOUD, ENV_LOCAL
_LOGGER = logging.getLogger(__name__)
@ -12,12 +16,160 @@ _LOGGER = logging.getLogger(__name__)
class SmappeeFlowHandler(
config_entry_oauth2_flow.AbstractOAuth2FlowHandler, domain=DOMAIN
):
"""Config flow to handle Smappee OAuth2 authentication."""
"""Config Smappee config flow."""
DOMAIN = DOMAIN
CONNECTION_CLASS = config_entries.CONN_CLASS_CLOUD_POLL
async def async_oauth_create_entry(self, data):
"""Create an entry for the flow."""
await self.async_set_unique_id(unique_id=f"{DOMAIN}Cloud")
return self.async_create_entry(title=f"{DOMAIN}Cloud", data=data)
@property
def logger(self) -> logging.Logger:
"""Return logger."""
return logging.getLogger(__name__)
async def async_step_zeroconf(self, discovery_info):
"""Handle zeroconf discovery."""
if not discovery_info[CONF_HOSTNAME].startswith("Smappee1"):
# We currently only support Energy and Solar models (legacy)
return self.async_abort(reason="invalid_mdns")
serial_number = (
discovery_info[CONF_HOSTNAME].replace(".local.", "").replace("Smappee", "")
)
# Check if already configured (local)
await self.async_set_unique_id(serial_number)
self._abort_if_unique_id_configured()
# Check if already configured (cloud)
if self.is_cloud_device_already_added():
return self.async_abort(reason="already_configured_device")
# pylint: disable=no-member # https://github.com/PyCQA/pylint/issues/3167
self.context.update(
{
CONF_IP_ADDRESS: discovery_info["host"],
CONF_SERIALNUMBER: serial_number,
"title_placeholders": {"name": serial_number},
}
)
return await self.async_step_zeroconf_confirm()
async def async_step_zeroconf_confirm(self, user_input=None):
"""Confirm zeroconf flow."""
errors = {}
# Check if already configured (cloud)
if self.is_cloud_device_already_added():
return self.async_abort(reason="already_configured_device")
if user_input is None:
# pylint: disable=no-member # https://github.com/PyCQA/pylint/issues/3167
serialnumber = self.context.get(CONF_SERIALNUMBER)
return self.async_show_form(
step_id="zeroconf_confirm",
description_placeholders={"serialnumber": serialnumber},
errors=errors,
)
# pylint: disable=no-member # https://github.com/PyCQA/pylint/issues/3167
ip_address = self.context.get(CONF_IP_ADDRESS)
serial_number = self.context.get(CONF_SERIALNUMBER)
# Attempt to make a connection to the local device
smappee_api = api.api.SmappeeLocalApi(ip=ip_address)
logon = await self.hass.async_add_executor_job(smappee_api.logon)
if logon is None:
return self.async_abort(reason="connection_error")
return self.async_create_entry(
title=f"{DOMAIN}{serial_number}",
data={CONF_IP_ADDRESS: ip_address, CONF_SERIALNUMBER: serial_number},
)
async def async_step_user(self, user_input=None):
"""Handle a flow initiated by the user."""
# If there is a CLOUD entry already, abort a new LOCAL entry
if self.is_cloud_device_already_added():
return self.async_abort(reason="already_configured_device")
return await self.async_step_environment()
async def async_step_environment(self, user_input=None):
"""Decide environment, cloud or local."""
if user_input is None:
return self.async_show_form(
step_id="environment",
data_schema=vol.Schema(
{
vol.Required("environment", default=ENV_CLOUD): vol.In(
[ENV_CLOUD, ENV_LOCAL]
)
}
),
errors={},
)
# Environment chosen, request additional host information for LOCAL or OAuth2 flow for CLOUD
# Ask for host detail
if user_input["environment"] == ENV_LOCAL:
return await self.async_step_local()
# Abort cloud option if a LOCAL entry has already been added
if user_input["environment"] == ENV_CLOUD and self._async_current_entries():
return self.async_abort(reason="already_configured_local_device")
return await self.async_step_pick_implementation()
async def async_step_local(self, user_input=None):
"""Handle local flow."""
if user_input is None:
return self.async_show_form(
step_id="local",
data_schema=vol.Schema({vol.Required(CONF_HOST): str}),
errors={},
)
# In a LOCAL setup we still need to resolve the host to serial number
ip_address = user_input["host"]
smappee_api = api.api.SmappeeLocalApi(ip=ip_address)
logon = await self.hass.async_add_executor_job(smappee_api.logon)
if logon is None:
return self.async_abort(reason="connection_error")
advanced_config = await self.hass.async_add_executor_job(
smappee_api.load_advanced_config
)
serial_number = None
for config_item in advanced_config:
if config_item["key"] == "mdnsHostName":
serial_number = config_item["value"]
if serial_number is None or not serial_number.startswith("Smappee1"):
# We currently only support Energy and Solar models (legacy)
return self.async_abort(reason="invalid_mdns")
serial_number = serial_number.replace("Smappee", "")
# Check if already configured (local)
await self.async_set_unique_id(serial_number, raise_on_progress=False)
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=f"{DOMAIN}{serial_number}",
data={CONF_IP_ADDRESS: ip_address, CONF_SERIALNUMBER: serial_number},
)
def is_cloud_device_already_added(self):
"""Check if a CLOUD device has already been added."""
for entry in self._async_current_entries():
if entry.unique_id is not None and entry.unique_id == f"{DOMAIN}Cloud":
return True
return False

View File

@ -5,11 +5,16 @@ from datetime import timedelta
DOMAIN = "smappee"
DATA_CLIENT = "smappee_data"
BASE = "BASE"
CONF_HOSTNAME = "hostname"
CONF_SERIALNUMBER = "serialnumber"
CONF_TITLE = "title"
ENV_CLOUD = "cloud"
ENV_LOCAL = "local"
SMAPPEE_PLATFORMS = ["binary_sensor", "sensor", "switch"]
MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=5)
MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=20)
AUTHORIZE_URL = {
"PRODUCTION": "https://app1pub.smappee.net/dev/v1/oauth2/authorize",

View File

@ -5,9 +5,12 @@
"documentation": "https://www.home-assistant.io/integrations/smappee",
"dependencies": ["http"],
"requirements": [
"pysmappee==0.1.5"
"pysmappee==0.2.9"
],
"codeowners": [
"@bsmappee"
],
"zeroconf": [
"_ssh._tcp.local."
]
}

View File

@ -4,7 +4,7 @@ import logging
from homeassistant.const import DEVICE_CLASS_POWER, ENERGY_WATT_HOUR, POWER_WATT, VOLT
from homeassistant.helpers.entity import Entity
from .const import BASE, DOMAIN
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
@ -15,6 +15,7 @@ TREND_SENSORS = {
POWER_WATT,
"total_power",
DEVICE_CLASS_POWER,
True, # both cloud and local
],
"alwayson": [
"Always on - Active power",
@ -22,6 +23,7 @@ TREND_SENSORS = {
POWER_WATT,
"alwayson",
DEVICE_CLASS_POWER,
False, # cloud only
],
"power_today": [
"Total consumption - Today",
@ -29,6 +31,7 @@ TREND_SENSORS = {
ENERGY_WATT_HOUR,
"power_today",
None,
False, # cloud only
],
"power_current_hour": [
"Total consumption - Current hour",
@ -36,6 +39,7 @@ TREND_SENSORS = {
ENERGY_WATT_HOUR,
"power_current_hour",
None,
False, # cloud only
],
"power_last_5_minutes": [
"Total consumption - Last 5 minutes",
@ -43,6 +47,7 @@ TREND_SENSORS = {
ENERGY_WATT_HOUR,
"power_last_5_minutes",
None,
False, # cloud only
],
"alwayson_today": [
"Always on - Today",
@ -50,6 +55,7 @@ TREND_SENSORS = {
ENERGY_WATT_HOUR,
"alwayson_today",
None,
False, # cloud only
],
}
REACTIVE_SENSORS = {
@ -68,6 +74,7 @@ SOLAR_SENSORS = {
POWER_WATT,
"solar_power",
DEVICE_CLASS_POWER,
True, # both cloud and local
],
"solar_today": [
"Total production - Today",
@ -75,6 +82,7 @@ SOLAR_SENSORS = {
ENERGY_WATT_HOUR,
"solar_today",
None,
False, # cloud only
],
"solar_current_hour": [
"Total production - Current hour",
@ -82,6 +90,7 @@ SOLAR_SENSORS = {
ENERGY_WATT_HOUR,
"solar_current_hour",
None,
False, # cloud only
],
}
VOLTAGE_SENSORS = {
@ -138,20 +147,22 @@ VOLTAGE_SENSORS = {
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up the Smappee sensor."""
smappee_base = hass.data[DOMAIN][BASE]
smappee_base = hass.data[DOMAIN][config_entry.entry_id]
entities = []
for service_location in smappee_base.smappee.service_locations.values():
# Add all basic sensors (realtime values and aggregators)
# Some are available in local only env
for sensor in TREND_SENSORS:
entities.append(
SmappeeSensor(
smappee_base=smappee_base,
service_location=service_location,
sensor=sensor,
attributes=TREND_SENSORS[sensor],
if not service_location.local_polling or TREND_SENSORS[sensor][5]:
entities.append(
SmappeeSensor(
smappee_base=smappee_base,
service_location=service_location,
sensor=sensor,
attributes=TREND_SENSORS[sensor],
)
)
)
if service_location.has_reactive_value:
for reactive_sensor in REACTIVE_SENSORS:
@ -164,17 +175,18 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
)
)
# Add solar sensors
# Add solar sensors (some are available in local only env)
if service_location.has_solar_production:
for sensor in SOLAR_SENSORS:
entities.append(
SmappeeSensor(
smappee_base=smappee_base,
service_location=service_location,
sensor=sensor,
attributes=SOLAR_SENSORS[sensor],
if not service_location.local_polling or SOLAR_SENSORS[sensor][5]:
entities.append(
SmappeeSensor(
smappee_base=smappee_base,
service_location=service_location,
sensor=sensor,
attributes=SOLAR_SENSORS[sensor],
)
)
)
# Add all CT measurements
for measurement_id, measurement in service_location.measurements.items():

View File

@ -1,13 +1,34 @@
{
"config": {
"flow_title": "Smappee: {name}",
"step": {
"environment": {
"description": "Set up your Smappee to integrate with Home Assistant.",
"data": {
"environment": "Environment"
}
},
"local": {
"description": "Enter the host to initiate the Smappee local integration",
"data": {
"host": "[%key:common::config_flow::data::host%]"
}
},
"zeroconf_confirm": {
"description": "Do you want to add the Smappee device with serialnumber `{serialnumber}` to Home Assistant?",
"title": "Discovered Smappee device"
},
"pick_implementation": {
"title": "Pick Authentication Method"
}
},
"abort": {
"already_configured_device": "[%key:common::config_flow::abort::already_configured_device%]",
"already_configured_local_device": "Local device(s) is already configured. Please remove those first before configuring a cloud device.",
"authorize_url_timeout": "Timeout generating authorize url.",
"missing_configuration": "The component is not configured. Please follow the documentation."
"connection_error": "Failed to connect to Smappee device.",
"missing_configuration": "The component is not configured. Please follow the documentation.",
"invalid_mdns": "Unsupported device for the Smappee integration."
}
}
}

View File

@ -4,7 +4,7 @@ import logging
from homeassistant.components.switch import SwitchEntity
from .const import BASE, DOMAIN
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
@ -15,7 +15,7 @@ SCAN_INTERVAL = timedelta(seconds=5)
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up the Smappee Comfort Plugs."""
smappee_base = hass.data[DOMAIN][BASE]
smappee_base = hass.data[DOMAIN][config_entry.entry_id]
entities = []
for service_location in smappee_base.smappee.service_locations.values():

View File

@ -1,13 +1,34 @@
{
"config": {
"abort": {
"authorize_url_timeout": "Timeout generating authorize url.",
"missing_configuration": "The component is not configured. Please follow the documentation."
},
"flow_title": "Smappee: {name}",
"step": {
"environment": {
"description": "Set up your Smappee to integrate with Home Assistant.",
"data": {
"environment": "Environment"
}
},
"local": {
"description": "Enter the host to initiate the Smappee local integration",
"data": {
"host": "Host"
}
},
"zeroconf_confirm": {
"description": "Do you want to add the Smappee device with serialnumber `{serialnumber}` to Home Assistant?",
"title": "Discovered Smappee device"
},
"pick_implementation": {
"title": "Pick Authentication Method"
}
},
"abort": {
"already_configured_device": "[%key:common::config_flow::abort::already_configured_device%]",
"already_configured_local_device": "Local device(s) is already configured. Please remove those first before configuring a cloud device.",
"authorize_url_timeout": "Timeout generating authorize url.",
"connection_error": "Failed to connect to Smappee device.",
"missing_configuration": "The component is not configured. Please follow the documentation.",
"invalid_mdns": "Unsupported device for the Smappee integration."
}
}
}
}

View File

@ -59,6 +59,9 @@ ZEROCONF = {
"_spotify-connect._tcp.local.": [
"spotify"
],
"_ssh._tcp.local.": [
"smappee"
],
"_viziocast._tcp.local.": [
"vizio"
],

View File

@ -1623,7 +1623,7 @@ pyskyqhub==0.1.1
pysma==0.3.5
# homeassistant.components.smappee
pysmappee==0.1.5
pysmappee==0.2.9
# homeassistant.components.smartthings
pysmartapp==0.3.2

View File

@ -752,7 +752,7 @@ pysignalclirestapi==0.3.4
pysma==0.3.5
# homeassistant.components.smappee
pysmappee==0.1.5
pysmappee==0.2.9
# homeassistant.components.smartthings
pysmartapp==0.3.2

View File

@ -1,16 +1,326 @@
"""Test the Smappee config flow."""
from homeassistant import config_entries, setup
from homeassistant.components.smappee.const import AUTHORIZE_URL, DOMAIN, TOKEN_URL
"""Test the Smappee component config flow module."""
from homeassistant import data_entry_flow, setup
from homeassistant.components.smappee.const import (
CONF_HOSTNAME,
CONF_SERIALNUMBER,
DOMAIN,
ENV_CLOUD,
ENV_LOCAL,
TOKEN_URL,
)
from homeassistant.config_entries import SOURCE_USER, SOURCE_ZEROCONF
from homeassistant.const import CONF_CLIENT_ID, CONF_CLIENT_SECRET
from homeassistant.helpers import config_entry_oauth2_flow
from tests.async_mock import patch
from tests.common import MockConfigEntry
CLIENT_ID = "1234"
CLIENT_SECRET = "5678"
async def test_full_flow(hass, aiohttp_client, aioclient_mock):
async def test_show_user_form(hass):
"""Test that the user set up form is served."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER},
)
assert result["step_id"] == "environment"
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
async def test_show_user_host_form(hass):
"""Test that the host form is served after choosing the local option."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER},
)
assert result["step_id"] == "environment"
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"environment": ENV_LOCAL}
)
assert result["step_id"] == ENV_LOCAL
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
async def test_show_zeroconf_connection_error_form(hass):
"""Test that the zeroconf confirmation form is served."""
with patch("pysmappee.api.SmappeeLocalApi.logon", return_value=None):
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_ZEROCONF},
data={
"host": "1.2.3.4",
"port": 22,
CONF_HOSTNAME: "Smappee1006000212.local.",
"type": "_ssh._tcp.local.",
"name": "Smappee1006000212._ssh._tcp.local.",
"properties": {"_raw": {}},
},
)
assert result["description_placeholders"] == {CONF_SERIALNUMBER: "1006000212"}
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "zeroconf_confirm"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"host": "1.2.3.4"}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "connection_error"
assert len(hass.config_entries.async_entries(DOMAIN)) == 0
async def test_connection_error(hass):
"""Test we show user form on Smappee connection error."""
with patch("pysmappee.api.SmappeeLocalApi.logon", return_value=None):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER},
)
assert result["step_id"] == "environment"
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"environment": ENV_LOCAL}
)
assert result["step_id"] == ENV_LOCAL
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"host": "1.2.3.4"}
)
assert result["reason"] == "connection_error"
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
async def test_zeroconf_wrong_mdns(hass):
"""Test we abort if unsupported mDNS name is discovered."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_ZEROCONF},
data={
"host": "1.2.3.4",
"port": 22,
CONF_HOSTNAME: "example.local.",
"type": "_ssh._tcp.local.",
"name": "example._ssh._tcp.local.",
"properties": {"_raw": {}},
},
)
assert result["reason"] == "invalid_mdns"
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
async def test_full_user_wrong_mdns(hass):
"""Test we abort user flow if unsupported mDNS name got resolved."""
with patch("pysmappee.api.SmappeeLocalApi.logon", return_value={}), patch(
"pysmappee.api.SmappeeLocalApi.load_advanced_config",
return_value=[{"key": "mdnsHostName", "value": "Smappee2006000212"}],
), patch(
"pysmappee.api.SmappeeLocalApi.load_command_control_config", return_value=[]
), patch(
"pysmappee.api.SmappeeLocalApi.load_instantaneous",
return_value=[{"key": "phase0ActivePower", "value": 0}],
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER},
)
assert result["step_id"] == "environment"
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"environment": ENV_LOCAL}
)
assert result["step_id"] == ENV_LOCAL
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"host": "1.2.3.4"}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "invalid_mdns"
async def test_user_device_exists_abort(hass):
"""Test we abort user flow if Smappee device already configured."""
with patch("pysmappee.api.SmappeeLocalApi.logon", return_value={}), patch(
"pysmappee.api.SmappeeLocalApi.load_advanced_config",
return_value=[{"key": "mdnsHostName", "value": "Smappee1006000212"}],
), patch(
"pysmappee.api.SmappeeLocalApi.load_command_control_config", return_value=[]
), patch(
"pysmappee.api.SmappeeLocalApi.load_instantaneous",
return_value=[{"key": "phase0ActivePower", "value": 0}],
):
config_entry = MockConfigEntry(
domain=DOMAIN,
data={"host": "1.2.3.4"},
unique_id="1006000212",
source=SOURCE_USER,
)
config_entry.add_to_hass(hass)
assert len(hass.config_entries.async_entries(DOMAIN)) == 1
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER},
)
assert result["step_id"] == "environment"
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"environment": ENV_LOCAL}
)
assert result["step_id"] == ENV_LOCAL
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"host": "1.2.3.4"}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_configured"
assert len(hass.config_entries.async_entries(DOMAIN)) == 1
async def test_zeroconf_device_exists_abort(hass):
"""Test we abort zeroconf flow if Smappee device already configured."""
with patch("pysmappee.api.SmappeeLocalApi.logon", return_value={}), patch(
"pysmappee.api.SmappeeLocalApi.load_advanced_config",
return_value=[{"key": "mdnsHostName", "value": "Smappee1006000212"}],
), patch(
"pysmappee.api.SmappeeLocalApi.load_command_control_config", return_value=[]
), patch(
"pysmappee.api.SmappeeLocalApi.load_instantaneous",
return_value=[{"key": "phase0ActivePower", "value": 0}],
):
config_entry = MockConfigEntry(
domain=DOMAIN,
data={"host": "1.2.3.4"},
unique_id="1006000212",
source=SOURCE_USER,
)
config_entry.add_to_hass(hass)
assert len(hass.config_entries.async_entries(DOMAIN)) == 1
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_ZEROCONF},
data={
"host": "1.2.3.4",
"port": 22,
CONF_HOSTNAME: "Smappee1006000212.local.",
"type": "_ssh._tcp.local.",
"name": "Smappee1006000212._ssh._tcp.local.",
"properties": {"_raw": {}},
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_configured"
assert len(hass.config_entries.async_entries(DOMAIN)) == 1
async def test_cloud_device_exists_abort(hass):
"""Test we abort cloud flow if Smappee Cloud device already configured."""
config_entry = MockConfigEntry(
domain=DOMAIN, unique_id="smappeeCloud", source=SOURCE_USER,
)
config_entry.add_to_hass(hass)
assert len(hass.config_entries.async_entries(DOMAIN)) == 1
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_configured_device"
assert len(hass.config_entries.async_entries(DOMAIN)) == 1
async def test_zeroconf_abort_if_cloud_device_exists(hass):
"""Test we abort zeroconf flow if Smappee Cloud device already configured."""
config_entry = MockConfigEntry(
domain=DOMAIN, unique_id="smappeeCloud", source=SOURCE_USER,
)
config_entry.add_to_hass(hass)
assert len(hass.config_entries.async_entries(DOMAIN)) == 1
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_ZEROCONF},
data={
"host": "1.2.3.4",
"port": 22,
CONF_HOSTNAME: "Smappee1006000212.local.",
"type": "_ssh._tcp.local.",
"name": "Smappee1006000212._ssh._tcp.local.",
"properties": {"_raw": {}},
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_configured_device"
assert len(hass.config_entries.async_entries(DOMAIN)) == 1
async def test_zeroconf_confirm_abort_if_cloud_device_exists(hass):
"""Test we abort zeroconf confirm flow if Smappee Cloud device already configured."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_ZEROCONF},
data={
"host": "1.2.3.4",
"port": 22,
CONF_HOSTNAME: "Smappee1006000212.local.",
"type": "_ssh._tcp.local.",
"name": "Smappee1006000212._ssh._tcp.local.",
"properties": {"_raw": {}},
},
)
config_entry = MockConfigEntry(
domain=DOMAIN, unique_id="smappeeCloud", source=SOURCE_USER,
)
config_entry.add_to_hass(hass)
assert len(hass.config_entries.async_entries(DOMAIN)) == 1
result = await hass.config_entries.flow.async_configure(result["flow_id"])
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_configured_device"
assert len(hass.config_entries.async_entries(DOMAIN)) == 1
async def test_abort_cloud_flow_if_local_device_exists(hass):
"""Test we abort the cloud flow if a Smappee local device already configured."""
config_entry = MockConfigEntry(
domain=DOMAIN,
data={"host": "1.2.3.4"},
unique_id="1006000212",
source=SOURCE_USER,
)
config_entry.add_to_hass(hass)
assert len(hass.config_entries.async_entries(DOMAIN)) == 1
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER},
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"environment": ENV_CLOUD}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_configured_local_device"
assert len(hass.config_entries.async_entries(DOMAIN)) == 1
async def test_full_user_flow(hass, aiohttp_client, aioclient_mock):
"""Check full flow."""
assert await setup.async_setup_component(
hass,
@ -22,16 +332,13 @@ async def test_full_flow(hass, aiohttp_client, aioclient_mock):
)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
DOMAIN, context={"source": SOURCE_USER},
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"environment": ENV_CLOUD}
)
state = config_entry_oauth2_flow._encode_jwt(hass, {"flow_id": result["flow_id"]})
assert result["url"] == (
f"{AUTHORIZE_URL['PRODUCTION']}?response_type=code&client_id={CLIENT_ID}"
"&redirect_uri=https://example.com/auth/external/callback"
f"&state={state}"
)
client = await aiohttp_client(hass.http.app)
resp = await client.get(f"/auth/external/callback?code=abcd&state={state}")
assert resp.status == 200
@ -54,3 +361,81 @@ async def test_full_flow(hass, aiohttp_client, aioclient_mock):
assert len(hass.config_entries.async_entries(DOMAIN)) == 1
assert len(mock_setup.mock_calls) == 1
async def test_full_zeroconf_flow(hass):
"""Test the full zeroconf flow."""
with patch("pysmappee.api.SmappeeLocalApi.logon", return_value={}), patch(
"pysmappee.api.SmappeeLocalApi.load_advanced_config",
return_value=[{"key": "mdnsHostName", "value": "Smappee1006000212"}],
), patch(
"pysmappee.api.SmappeeLocalApi.load_command_control_config", return_value=[]
), patch(
"pysmappee.api.SmappeeLocalApi.load_instantaneous",
return_value=[{"key": "phase0ActivePower", "value": 0}],
), patch(
"homeassistant.components.smappee.async_setup_entry", return_value=True
):
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_ZEROCONF},
data={
"host": "1.2.3.4",
"port": 22,
CONF_HOSTNAME: "Smappee1006000212.local.",
"type": "_ssh._tcp.local.",
"name": "Smappee1006000212._ssh._tcp.local.",
"properties": {"_raw": {}},
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "zeroconf_confirm"
assert result["description_placeholders"] == {CONF_SERIALNUMBER: "1006000212"}
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"host": "1.2.3.4"}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == "smappee1006000212"
assert len(hass.config_entries.async_entries(DOMAIN)) == 1
entry = hass.config_entries.async_entries(DOMAIN)[0]
assert entry.unique_id == "1006000212"
async def test_full_user_local_flow(hass):
"""Test the full zeroconf flow."""
with patch("pysmappee.api.SmappeeLocalApi.logon", return_value={}), patch(
"pysmappee.api.SmappeeLocalApi.load_advanced_config",
return_value=[{"key": "mdnsHostName", "value": "Smappee1006000212"}],
), patch(
"pysmappee.api.SmappeeLocalApi.load_command_control_config", return_value=[]
), patch(
"pysmappee.api.SmappeeLocalApi.load_instantaneous",
return_value=[{"key": "phase0ActivePower", "value": 0}],
), patch(
"homeassistant.components.smappee.async_setup_entry", return_value=True
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER},
)
assert result["step_id"] == "environment"
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["description_placeholders"] is None
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"environment": ENV_LOCAL},
)
assert result["step_id"] == ENV_LOCAL
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"host": "1.2.3.4"}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == "smappee1006000212"
assert len(hass.config_entries.async_entries(DOMAIN)) == 1
entry = hass.config_entries.async_entries(DOMAIN)[0]
assert entry.unique_id == "1006000212"

View File

@ -0,0 +1,32 @@
"""Tests for the Smappee component init module."""
from homeassistant.components.smappee.const import DOMAIN
from homeassistant.config_entries import SOURCE_ZEROCONF
from tests.async_mock import patch
from tests.common import MockConfigEntry
async def test_unload_config_entry(hass):
"""Test unload config entry flow."""
with patch("pysmappee.api.SmappeeLocalApi.logon", return_value={}), patch(
"pysmappee.api.SmappeeLocalApi.load_advanced_config",
return_value=[{"key": "mdnsHostName", "value": "Smappee1006000212"}],
), patch(
"pysmappee.api.SmappeeLocalApi.load_command_control_config", return_value=[]
), patch(
"pysmappee.api.SmappeeLocalApi.load_instantaneous",
return_value=[{"key": "phase0ActivePower", "value": 0}],
):
config_entry = MockConfigEntry(
domain=DOMAIN,
data={"host": "1.2.3.4"},
unique_id="smappee1006000212",
source=SOURCE_ZEROCONF,
)
config_entry.add_to_hass(hass)
assert len(hass.config_entries.async_entries(DOMAIN)) == 1
entry = hass.config_entries.async_entries(DOMAIN)[0]
await hass.config_entries.async_unload(entry.entry_id)
await hass.async_block_till_done()
assert not hass.data.get(DOMAIN)