Rework TPLink integration to use python-kasa (#56701)

Co-authored-by: J. Nick Koston <nick@koston.org>
Co-authored-by: Teemu R. <tpr@iki.fi>
Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
Teemu R 2021-09-27 21:11:55 +02:00 committed by GitHub
parent 7a2bc130b7
commit b40d229369
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 2140 additions and 2218 deletions

View File

@ -1096,8 +1096,6 @@ omit =
homeassistant/components/totalconnect/binary_sensor.py
homeassistant/components/totalconnect/const.py
homeassistant/components/touchline/climate.py
homeassistant/components/tplink/common.py
homeassistant/components/tplink/switch.py
homeassistant/components/tplink_lte/*
homeassistant/components/traccar/device_tracker.py
homeassistant/components/traccar/const.py

View File

@ -107,6 +107,7 @@ homeassistant.components.tag.*
homeassistant.components.tautulli.*
homeassistant.components.tcp.*
homeassistant.components.tile.*
homeassistant.components.tplink.*
homeassistant.components.tradfri.*
homeassistant.components.tts.*
homeassistant.components.upcloud.*

View File

@ -1,198 +1,136 @@
"""Component to embed TP-Link smart home devices."""
from __future__ import annotations
from datetime import timedelta
import logging
import time
from typing import Any
from pyHS100.smartdevice import SmartDevice, SmartDeviceException
from pyHS100.smartplug import SmartPlug
from kasa import SmartDevice, SmartDeviceException
from kasa.discover import Discover
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components.switch import ATTR_CURRENT_POWER_W, ATTR_TODAY_ENERGY_KWH
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
ATTR_VOLTAGE,
CONF_ALIAS,
CONF_DEVICE_ID,
CONF_HOST,
CONF_MAC,
CONF_STATE,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.config_entries import ConfigEntry, ConfigEntryNotReady
from homeassistant.const import CONF_HOST, CONF_MAC, CONF_NAME
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import config_validation as cv, device_registry as dr
from homeassistant.helpers.typing import ConfigType
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .common import SmartDevices, async_discover_devices, get_static_devices
from .const import (
ATTR_CONFIG,
ATTR_CURRENT_A,
ATTR_TOTAL_ENERGY_KWH,
CONF_DIMMER,
CONF_DISCOVERY,
CONF_EMETER_PARAMS,
CONF_LIGHT,
CONF_MODEL,
CONF_STRIP,
CONF_SW_VERSION,
CONF_SWITCH,
COORDINATORS,
DOMAIN,
PLATFORMS,
UNAVAILABLE_DEVICES,
UNAVAILABLE_RETRY_DELAY,
)
_LOGGER = logging.getLogger(__name__)
DOMAIN = "tplink"
from .coordinator import TPLinkDataUpdateCoordinator
from .migration import (
async_migrate_entities_devices,
async_migrate_legacy_entries,
async_migrate_yaml_entries,
)
TPLINK_HOST_SCHEMA = vol.Schema({vol.Required(CONF_HOST): cv.string})
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Optional(CONF_LIGHT, default=[]): vol.All(
cv.ensure_list, [TPLINK_HOST_SCHEMA]
),
vol.Optional(CONF_SWITCH, default=[]): vol.All(
cv.ensure_list, [TPLINK_HOST_SCHEMA]
),
vol.Optional(CONF_STRIP, default=[]): vol.All(
cv.ensure_list, [TPLINK_HOST_SCHEMA]
),
vol.Optional(CONF_DIMMER, default=[]): vol.All(
cv.ensure_list, [TPLINK_HOST_SCHEMA]
),
vol.Optional(CONF_DISCOVERY, default=True): cv.boolean,
}
)
},
vol.All(
cv.deprecated(DOMAIN),
{
DOMAIN: vol.Schema(
{
vol.Optional(CONF_LIGHT, default=[]): vol.All(
cv.ensure_list, [TPLINK_HOST_SCHEMA]
),
vol.Optional(CONF_SWITCH, default=[]): vol.All(
cv.ensure_list, [TPLINK_HOST_SCHEMA]
),
vol.Optional(CONF_STRIP, default=[]): vol.All(
cv.ensure_list, [TPLINK_HOST_SCHEMA]
),
vol.Optional(CONF_DIMMER, default=[]): vol.All(
cv.ensure_list, [TPLINK_HOST_SCHEMA]
),
vol.Optional(CONF_DISCOVERY, default=True): cv.boolean,
}
)
},
),
extra=vol.ALLOW_EXTRA,
)
@callback
def async_trigger_discovery(
hass: HomeAssistant,
discovered_devices: dict[str, SmartDevice],
) -> None:
"""Trigger config flows for discovered devices."""
for formatted_mac, device in discovered_devices.items():
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_DISCOVERY},
data={
CONF_NAME: device.alias,
CONF_HOST: device.host,
CONF_MAC: formatted_mac,
},
)
)
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the TP-Link component."""
conf = config.get(DOMAIN)
hass.data[DOMAIN] = {}
hass.data[DOMAIN][ATTR_CONFIG] = conf
legacy_entry = None
config_entries_by_mac = {}
for entry in hass.config_entries.async_entries(DOMAIN):
if async_entry_is_legacy(entry):
legacy_entry = entry
elif entry.unique_id:
config_entries_by_mac[entry.unique_id] = entry
discovered_devices = {
dr.format_mac(device.mac): device
for device in (await Discover.discover()).values()
}
hosts_by_mac = {mac: device.host for mac, device in discovered_devices.items()}
if legacy_entry:
async_migrate_legacy_entries(
hass, hosts_by_mac, config_entries_by_mac, legacy_entry
)
if conf is not None:
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_IMPORT}
)
)
async_migrate_yaml_entries(hass, conf)
if discovered_devices:
async_trigger_discovery(hass, discovered_devices)
return True
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up TPLink from a config entry."""
config_data = hass.data[DOMAIN].get(ATTR_CONFIG)
if config_data is None and entry.data:
config_data = entry.data
elif config_data is not None:
hass.config_entries.async_update_entry(entry, data=config_data)
if async_entry_is_legacy(entry):
return True
device_registry = dr.async_get(hass)
tplink_devices = dr.async_entries_for_config_entry(device_registry, entry.entry_id)
device_count = len(tplink_devices)
hass_data: dict[str, Any] = hass.data[DOMAIN]
# These will contain the initialized devices
hass_data[CONF_LIGHT] = []
hass_data[CONF_SWITCH] = []
hass_data[UNAVAILABLE_DEVICES] = []
lights: list[SmartDevice] = hass_data[CONF_LIGHT]
switches: list[SmartPlug] = hass_data[CONF_SWITCH]
unavailable_devices: list[SmartDevice] = hass_data[UNAVAILABLE_DEVICES]
# Add static devices
static_devices = SmartDevices()
if config_data is not None:
static_devices = get_static_devices(config_data)
lights.extend(static_devices.lights)
switches.extend(static_devices.switches)
# Add discovered devices
if config_data is None or config_data[CONF_DISCOVERY]:
discovered_devices = await async_discover_devices(
hass, static_devices, device_count
)
lights.extend(discovered_devices.lights)
switches.extend(discovered_devices.switches)
if lights:
_LOGGER.debug(
"Got %s lights: %s", len(lights), ", ".join(d.host for d in lights)
)
if switches:
_LOGGER.debug(
"Got %s switches: %s",
len(switches),
", ".join(d.host for d in switches),
)
async def async_retry_devices(self) -> None:
"""Retry unavailable devices."""
unavailable_devices: list[SmartDevice] = hass_data[UNAVAILABLE_DEVICES]
_LOGGER.debug(
"retry during setup unavailable devices: %s",
[d.host for d in unavailable_devices],
)
for device in unavailable_devices:
try:
await hass.async_add_executor_job(device.get_sysinfo)
except SmartDeviceException:
continue
_LOGGER.debug(
"at least one device is available again, so reload integration"
)
await hass.config_entries.async_reload(entry.entry_id)
legacy_entry: ConfigEntry | None = None
for config_entry in hass.config_entries.async_entries(DOMAIN):
if async_entry_is_legacy(config_entry):
legacy_entry = config_entry
break
# prepare DataUpdateCoordinators
hass_data[COORDINATORS] = {}
for switch in switches:
if legacy_entry is not None:
await async_migrate_entities_devices(hass, legacy_entry.entry_id, entry)
try:
info = await hass.async_add_executor_job(switch.get_sysinfo)
except SmartDeviceException:
_LOGGER.warning(
"Device at '%s' not reachable during setup, will retry later",
switch.host,
)
unavailable_devices.append(switch)
continue
hass_data[COORDINATORS][
switch.context or switch.mac
] = coordinator = SmartPlugDataUpdateCoordinator(hass, switch, info["alias"])
await coordinator.async_config_entry_first_refresh()
if unavailable_devices:
entry.async_on_unload(
async_track_time_interval(
hass, async_retry_devices, UNAVAILABLE_RETRY_DELAY
)
)
unavailable_devices_hosts = [d.host for d in unavailable_devices]
hass_data[CONF_SWITCH] = [
s for s in switches if s.host not in unavailable_devices_hosts
]
try:
device: SmartDevice = await Discover.discover_single(entry.data[CONF_HOST])
except SmartDeviceException as ex:
raise ConfigEntryNotReady from ex
hass.data[DOMAIN][entry.entry_id] = TPLinkDataUpdateCoordinator(hass, device)
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
return True
@ -200,81 +138,25 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
hass_data: dict[str, Any] = hass.data[DOMAIN]
if unload_ok:
hass_data.clear()
if entry.entry_id not in hass_data:
return True
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
hass_data.pop(entry.entry_id)
return unload_ok
class SmartPlugDataUpdateCoordinator(DataUpdateCoordinator):
"""DataUpdateCoordinator to gather data for specific SmartPlug."""
@callback
def async_entry_is_legacy(entry: ConfigEntry) -> bool:
"""Check if a config entry is the legacy shared one."""
return entry.unique_id is None or entry.unique_id == DOMAIN
def __init__(
self,
hass: HomeAssistant,
smartplug: SmartPlug,
alias: str,
) -> None:
"""Initialize DataUpdateCoordinator to gather data for specific SmartPlug."""
self.smartplug = smartplug
update_interval = timedelta(seconds=30)
super().__init__(
hass,
_LOGGER,
name=alias,
update_interval=update_interval,
)
def _update_data(self) -> dict:
"""Fetch all device and sensor data from api."""
try:
info = self.smartplug.sys_info
data = {
CONF_HOST: self.smartplug.host,
CONF_MAC: info["mac"],
CONF_MODEL: info["model"],
CONF_SW_VERSION: info["sw_ver"],
}
if self.smartplug.context is None:
data[CONF_ALIAS] = info["alias"]
data[CONF_DEVICE_ID] = info["mac"]
data[CONF_STATE] = bool(info["relay_state"])
else:
plug_from_context = next(
c
for c in self.smartplug.sys_info["children"]
if c["id"] == self.smartplug.context
)
data[CONF_ALIAS] = plug_from_context["alias"]
data[CONF_DEVICE_ID] = self.smartplug.context
data[CONF_STATE] = plug_from_context["state"] == 1
# Check if the device has emeter
if "ENE" in info["feature"]:
emeter_readings = self.smartplug.get_emeter_realtime()
data[CONF_EMETER_PARAMS] = {
ATTR_CURRENT_POWER_W: round(float(emeter_readings["power"]), 2),
ATTR_TOTAL_ENERGY_KWH: round(float(emeter_readings["total"]), 3),
ATTR_VOLTAGE: round(float(emeter_readings["voltage"]), 1),
ATTR_CURRENT_A: round(float(emeter_readings["current"]), 2),
}
emeter_statics = self.smartplug.get_emeter_daily()
if emeter_statics.get(int(time.strftime("%e"))):
data[CONF_EMETER_PARAMS][ATTR_TODAY_ENERGY_KWH] = round(
float(emeter_statics[int(time.strftime("%e"))]), 3
)
else:
# today's consumption not available, when device was off all the day
data[CONF_EMETER_PARAMS][ATTR_TODAY_ENERGY_KWH] = 0.0
except SmartDeviceException as ex:
raise UpdateFailed(ex) from ex
self.name = data[CONF_ALIAS]
return data
async def _async_update_data(self) -> dict:
"""Fetch all device and sensor data from api."""
return await self.hass.async_add_executor_job(self._update_data)
def legacy_device_id(device: SmartDevice) -> str:
"""Convert the device id so it matches what was used in the original version."""
device_id: str = device.device_id
# Plugs are prefixed with the mac in python-kasa but not
# in pyHS100 so we need to strip off the mac
if "_" not in device_id:
return device_id
return device_id.split("_")[1]

View File

@ -1,186 +0,0 @@
"""Common code for tplink."""
from __future__ import annotations
import logging
from typing import Callable
from pyHS100 import (
Discover,
SmartBulb,
SmartDevice,
SmartDeviceException,
SmartPlug,
SmartStrip,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import Entity
from .const import (
CONF_DIMMER,
CONF_LIGHT,
CONF_STRIP,
CONF_SWITCH,
DOMAIN as TPLINK_DOMAIN,
MAX_DISCOVERY_RETRIES,
)
_LOGGER = logging.getLogger(__name__)
class SmartDevices:
"""Hold different kinds of devices."""
def __init__(
self, lights: list[SmartDevice] = None, switches: list[SmartDevice] = None
) -> None:
"""Initialize device holder."""
self._lights = lights or []
self._switches = switches or []
@property
def lights(self) -> list[SmartDevice]:
"""Get the lights."""
return self._lights
@property
def switches(self) -> list[SmartDevice]:
"""Get the switches."""
return self._switches
def has_device_with_host(self, host: str) -> bool:
"""Check if a devices exists with a specific host."""
for device in self.lights + self.switches:
if device.host == host:
return True
return False
async def async_get_discoverable_devices(hass: HomeAssistant) -> dict[str, SmartDevice]:
"""Return if there are devices that can be discovered."""
def discover() -> dict[str, SmartDevice]:
return Discover.discover()
return await hass.async_add_executor_job(discover)
async def async_discover_devices(
hass: HomeAssistant, existing_devices: SmartDevices, target_device_count: int
) -> SmartDevices:
"""Get devices through discovery."""
lights = []
switches = []
def process_devices() -> None:
for dev in devices.values():
# If this device already exists, ignore dynamic setup.
if existing_devices.has_device_with_host(dev.host):
continue
if isinstance(dev, SmartStrip):
for plug in dev.plugs.values():
switches.append(plug)
elif isinstance(dev, SmartPlug):
try:
if dev.is_dimmable: # Dimmers act as lights
lights.append(dev)
else:
switches.append(dev)
except SmartDeviceException as ex:
_LOGGER.error("Unable to connect to device %s: %s", dev.host, ex)
elif isinstance(dev, SmartBulb):
lights.append(dev)
else:
_LOGGER.error("Unknown smart device type: %s", type(dev))
devices: dict[str, SmartDevice] = {}
for attempt in range(1, MAX_DISCOVERY_RETRIES + 1):
_LOGGER.debug(
"Discovering tplink devices, attempt %s of %s",
attempt,
MAX_DISCOVERY_RETRIES,
)
discovered_devices = await async_get_discoverable_devices(hass)
_LOGGER.info(
"Discovered %s TP-Link of expected %s smart home device(s)",
len(discovered_devices),
target_device_count,
)
for device_ip in discovered_devices:
devices[device_ip] = discovered_devices[device_ip]
if len(discovered_devices) >= target_device_count:
_LOGGER.info(
"Discovered at least as many devices on the network as exist in our device registry, no need to retry"
)
break
_LOGGER.info(
"Found %s unique TP-Link smart home device(s) after %s discovery attempts",
len(devices),
attempt,
)
await hass.async_add_executor_job(process_devices)
return SmartDevices(lights, switches)
def get_static_devices(config_data) -> SmartDevices:
"""Get statically defined devices in the config."""
_LOGGER.debug("Getting static devices")
lights = []
switches = []
for type_ in (CONF_LIGHT, CONF_SWITCH, CONF_STRIP, CONF_DIMMER):
for entry in config_data[type_]:
host = entry["host"]
try:
if type_ == CONF_LIGHT:
lights.append(SmartBulb(host))
elif type_ == CONF_SWITCH:
switches.append(SmartPlug(host))
elif type_ == CONF_STRIP:
for plug in SmartStrip(host).plugs.values():
switches.append(plug)
# Dimmers need to be defined as smart plugs to work correctly.
elif type_ == CONF_DIMMER:
lights.append(SmartPlug(host))
except SmartDeviceException as sde:
_LOGGER.error(
"Failed to setup device %s due to %s; not retrying", host, sde
)
return SmartDevices(lights, switches)
def add_available_devices(
hass: HomeAssistant, device_type: str, device_class: Callable
) -> list[Entity]:
"""Get sysinfo for all devices."""
devices: list[SmartDevice] = hass.data[TPLINK_DOMAIN][device_type]
if f"{device_type}_remaining" in hass.data[TPLINK_DOMAIN]:
devices: list[SmartDevice] = hass.data[TPLINK_DOMAIN][
f"{device_type}_remaining"
]
entities_ready: list[Entity] = []
devices_unavailable: list[SmartDevice] = []
for device in devices:
try:
device.get_sysinfo()
entities_ready.append(device_class(device))
except SmartDeviceException as ex:
devices_unavailable.append(device)
_LOGGER.warning(
"Unable to communicate with device %s: %s",
device.host,
ex,
)
hass.data[TPLINK_DOMAIN][f"{device_type}_remaining"] = devices_unavailable
return entities_ready

View File

@ -1,11 +1,181 @@
"""Config flow for TP-Link."""
from homeassistant.helpers import config_entry_flow
from __future__ import annotations
from .common import async_get_discoverable_devices
import logging
from typing import Any
from kasa import SmartDevice, SmartDeviceException
from kasa.discover import Discover
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components.dhcp import IP_ADDRESS, MAC_ADDRESS
from homeassistant.const import CONF_DEVICE, CONF_HOST, CONF_MAC, CONF_NAME
from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.typing import DiscoveryInfoType
from . import async_entry_is_legacy
from .const import DOMAIN
config_entry_flow.register_discovery_flow(
DOMAIN,
"TP-Link Smart Home",
async_get_discoverable_devices,
)
_LOGGER = logging.getLogger(__name__)
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for tplink."""
VERSION = 1
def __init__(self) -> None:
"""Initialize the config flow."""
self._discovered_devices: dict[str, SmartDevice] = {}
self._discovered_device: SmartDevice | None = None
async def async_step_dhcp(self, discovery_info: DiscoveryInfoType) -> FlowResult:
"""Handle discovery via dhcp."""
return await self._async_handle_discovery(
discovery_info[IP_ADDRESS], discovery_info[MAC_ADDRESS]
)
async def async_step_discovery(
self, discovery_info: DiscoveryInfoType
) -> FlowResult:
"""Handle discovery."""
return await self._async_handle_discovery(
discovery_info[CONF_HOST], discovery_info[CONF_MAC]
)
async def _async_handle_discovery(self, host: str, mac: str) -> FlowResult:
"""Handle any discovery."""
await self.async_set_unique_id(dr.format_mac(mac))
self._abort_if_unique_id_configured(updates={CONF_HOST: host})
self._async_abort_entries_match({CONF_HOST: host})
self.context[CONF_HOST] = host
for progress in self._async_in_progress():
if progress.get("context", {}).get(CONF_HOST) == host:
return self.async_abort(reason="already_in_progress")
try:
self._discovered_device = await self._async_try_connect(
host, raise_on_progress=True
)
except SmartDeviceException:
return self.async_abort(reason="cannot_connect")
return await self.async_step_discovery_confirm()
async def async_step_discovery_confirm(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Confirm discovery."""
assert self._discovered_device is not None
if user_input is not None:
return self._async_create_entry_from_device(self._discovered_device)
self._set_confirm_only()
placeholders = {
"name": self._discovered_device.alias,
"model": self._discovered_device.model,
"host": self._discovered_device.host,
}
self.context["title_placeholders"] = placeholders
return self.async_show_form(
step_id="discovery_confirm", description_placeholders=placeholders
)
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle the initial step."""
errors = {}
if user_input is not None:
host = user_input[CONF_HOST]
if not host:
return await self.async_step_pick_device()
try:
device = await self._async_try_connect(host, raise_on_progress=False)
except SmartDeviceException:
errors["base"] = "cannot_connect"
else:
return self._async_create_entry_from_device(device)
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({vol.Optional(CONF_HOST, default=""): str}),
errors=errors,
)
async def async_step_pick_device(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle the step to pick discovered device."""
if user_input is not None:
mac = user_input[CONF_DEVICE]
await self.async_set_unique_id(mac, raise_on_progress=False)
return self._async_create_entry_from_device(self._discovered_devices[mac])
configured_devices = {
entry.unique_id
for entry in self._async_current_entries()
if not async_entry_is_legacy(entry)
}
self._discovered_devices = {
dr.format_mac(device.mac): device
for device in (await Discover.discover()).values()
}
devices_name = {
formatted_mac: f"{device.alias} {device.model} ({device.host}) {formatted_mac}"
for formatted_mac, device in self._discovered_devices.items()
if formatted_mac not in configured_devices
}
# Check if there is at least one device
if not devices_name:
return self.async_abort(reason="no_devices_found")
return self.async_show_form(
step_id="pick_device",
data_schema=vol.Schema({vol.Required(CONF_DEVICE): vol.In(devices_name)}),
)
async def async_step_migration(self, migration_input: dict[str, Any]) -> FlowResult:
"""Handle migration from legacy config entry to per device config entry."""
mac = migration_input[CONF_MAC]
await self.async_set_unique_id(dr.format_mac(mac), raise_on_progress=False)
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=migration_input[CONF_NAME],
data={
CONF_HOST: migration_input[CONF_HOST],
},
)
@callback
def _async_create_entry_from_device(self, device: SmartDevice) -> FlowResult:
"""Create a config entry from a smart device."""
self._abort_if_unique_id_configured(updates={CONF_HOST: device.host})
return self.async_create_entry(
title=f"{device.alias} {device.model}",
data={
CONF_HOST: device.host,
},
)
async def async_step_import(self, user_input: dict[str, Any]) -> FlowResult:
"""Handle import step."""
host = user_input[CONF_HOST]
try:
device = await self._async_try_connect(host, raise_on_progress=False)
except SmartDeviceException:
_LOGGER.error("Failed to import %s: cannot connect", host)
return self.async_abort(reason="cannot_connect")
return self._async_create_entry_from_device(device)
async def _async_try_connect(
self, host: str, raise_on_progress: bool = True
) -> SmartDevice:
"""Try to connect."""
self._async_abort_entries_match({CONF_HOST: host})
device: SmartDevice = await Discover.discover_single(host)
await self.async_set_unique_id(
dr.format_mac(device.mac), raise_on_progress=raise_on_progress
)
return device

View File

@ -1,28 +1,20 @@
"""Const for TP-Link."""
from __future__ import annotations
import datetime
from typing import Final
DOMAIN = "tplink"
COORDINATORS = "coordinators"
UNAVAILABLE_DEVICES = "unavailable_devices"
UNAVAILABLE_RETRY_DELAY = datetime.timedelta(seconds=300)
MIN_TIME_BETWEEN_UPDATES = datetime.timedelta(seconds=8)
MAX_DISCOVERY_RETRIES = 4
ATTR_CURRENT_A: Final = "current_a"
ATTR_CURRENT_POWER_W: Final = "current_power_w"
ATTR_TODAY_ENERGY_KWH: Final = "today_energy_kwh"
ATTR_TOTAL_ENERGY_KWH: Final = "total_energy_kwh"
ATTR_CONFIG = "config"
ATTR_TOTAL_ENERGY_KWH = "total_energy_kwh"
ATTR_CURRENT_A = "current_a"
CONF_DIMMER: Final = "dimmer"
CONF_DISCOVERY: Final = "discovery"
CONF_LIGHT: Final = "light"
CONF_STRIP: Final = "strip"
CONF_SWITCH: Final = "switch"
CONF_SENSOR: Final = "sensor"
CONF_MODEL = "model"
CONF_SW_VERSION = "sw_ver"
CONF_EMETER_PARAMS = "emeter_params"
CONF_DIMMER = "dimmer"
CONF_DISCOVERY = "discovery"
CONF_LIGHT = "light"
CONF_STRIP = "strip"
CONF_SWITCH = "switch"
CONF_SENSOR = "sensor"
PLATFORMS = [CONF_LIGHT, CONF_SENSOR, CONF_SWITCH]
PLATFORMS: Final = [CONF_LIGHT, CONF_SENSOR, CONF_SWITCH]

View File

@ -0,0 +1,57 @@
"""Component to embed TP-Link smart home devices."""
from __future__ import annotations
from datetime import timedelta
import logging
from kasa import SmartDevice, SmartDeviceException
from homeassistant.core import HomeAssistant
from homeassistant.helpers.debounce import Debouncer
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
_LOGGER = logging.getLogger(__name__)
REQUEST_REFRESH_DELAY = 0.35
class TPLinkDataUpdateCoordinator(DataUpdateCoordinator):
"""DataUpdateCoordinator to gather data for a specific TPLink device."""
def __init__(
self,
hass: HomeAssistant,
device: SmartDevice,
) -> None:
"""Initialize DataUpdateCoordinator to gather data for specific SmartPlug."""
self.device = device
self.update_children = True
update_interval = timedelta(seconds=10)
super().__init__(
hass,
_LOGGER,
name=device.host,
update_interval=update_interval,
# We don't want an immediate refresh since the device
# takes a moment to reflect the state change
request_refresh_debouncer=Debouncer(
hass, _LOGGER, cooldown=REQUEST_REFRESH_DELAY, immediate=False
),
)
async def async_request_refresh_without_children(self) -> None:
"""Request a refresh without the children."""
# If the children do get updated this is ok as this is an
# optimization to reduce the number of requests on the device
# when we do not need it.
self.update_children = False
await self.async_request_refresh()
async def _async_update_data(self) -> None:
"""Fetch all device and sensor data from api."""
try:
await self.device.update(update_children=self.update_children)
except SmartDeviceException as ex:
raise UpdateFailed(ex) from ex
finally:
self.update_children = True

View File

@ -0,0 +1,63 @@
"""Common code for tplink."""
from __future__ import annotations
from typing import Any, Callable, TypeVar, cast
from kasa import SmartDevice
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN
from .coordinator import TPLinkDataUpdateCoordinator
WrapFuncType = TypeVar("WrapFuncType", bound=Callable[..., Any])
def async_refresh_after(func: WrapFuncType) -> WrapFuncType:
"""Define a wrapper to refresh after."""
async def _async_wrap(
self: CoordinatedTPLinkEntity, *args: Any, **kwargs: Any
) -> None:
await func(self, *args, **kwargs)
await self.coordinator.async_request_refresh_without_children()
return cast(WrapFuncType, _async_wrap)
class CoordinatedTPLinkEntity(CoordinatorEntity):
"""Common base class for all coordinated tplink entities."""
coordinator: TPLinkDataUpdateCoordinator
def __init__(
self, device: SmartDevice, coordinator: TPLinkDataUpdateCoordinator
) -> None:
"""Initialize the switch."""
super().__init__(coordinator)
self.device: SmartDevice = device
self._attr_unique_id = self.device.device_id
@property
def name(self) -> str:
"""Return the name of the Smart Plug."""
return cast(str, self.device.alias)
@property
def device_info(self) -> DeviceInfo:
"""Return information about the device."""
return {
"name": self.device.alias,
"model": self.device.model,
"manufacturer": "TP-Link",
"identifiers": {(DOMAIN, str(self.device.device_id))},
"connections": {(dr.CONNECTION_NETWORK_MAC, self.device.mac)},
"sw_version": self.device.hw_info["sw_ver"],
}
@property
def is_on(self) -> bool:
"""Return true if switch is on."""
return bool(self.device.is_on)

View File

@ -1,552 +1,154 @@
"""Support for TPLink lights."""
from __future__ import annotations
import asyncio
from collections.abc import Mapping
from datetime import timedelta
import logging
import re
import time
from typing import Any, NamedTuple, cast
from typing import Any
from pyHS100 import SmartBulb, SmartDeviceException
from kasa import SmartDevice
from homeassistant.components.light import (
ATTR_BRIGHTNESS,
ATTR_COLOR_TEMP,
ATTR_HS_COLOR,
SUPPORT_BRIGHTNESS,
SUPPORT_COLOR,
SUPPORT_COLOR_TEMP,
ATTR_TRANSITION,
COLOR_MODE_BRIGHTNESS,
COLOR_MODE_COLOR_TEMP,
COLOR_MODE_HS,
COLOR_MODE_ONOFF,
SUPPORT_TRANSITION,
LightEntity,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError, PlatformNotReady
import homeassistant.helpers.device_registry as dr
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.util.color import (
color_temperature_kelvin_to_mired as kelvin_to_mired,
color_temperature_mired_to_kelvin as mired_to_kelvin,
)
import homeassistant.util.dt as dt_util
from . import CONF_LIGHT, DOMAIN as TPLINK_DOMAIN
from .common import add_available_devices
PARALLEL_UPDATES = 0
SCAN_INTERVAL = timedelta(seconds=5)
CURRENT_POWER_UPDATE_INTERVAL = timedelta(seconds=60)
HISTORICAL_POWER_UPDATE_INTERVAL = timedelta(minutes=60)
from .const import DOMAIN
from .coordinator import TPLinkDataUpdateCoordinator
from .entity import CoordinatedTPLinkEntity, async_refresh_after
_LOGGER = logging.getLogger(__name__)
ATTR_CURRENT_POWER_W = "current_power_w"
ATTR_DAILY_ENERGY_KWH = "daily_energy_kwh"
ATTR_MONTHLY_ENERGY_KWH = "monthly_energy_kwh"
LIGHT_STATE_DFT_ON = "dft_on_state"
LIGHT_STATE_DFT_IGNORE = "ignore_default"
LIGHT_STATE_ON_OFF = "on_off"
LIGHT_STATE_RELAY_STATE = "relay_state"
LIGHT_STATE_BRIGHTNESS = "brightness"
LIGHT_STATE_COLOR_TEMP = "color_temp"
LIGHT_STATE_HUE = "hue"
LIGHT_STATE_SATURATION = "saturation"
LIGHT_STATE_ERROR_MSG = "err_msg"
LIGHT_SYSINFO_MAC = "mac"
LIGHT_SYSINFO_ALIAS = "alias"
LIGHT_SYSINFO_MODEL = "model"
LIGHT_SYSINFO_IS_DIMMABLE = "is_dimmable"
LIGHT_SYSINFO_IS_VARIABLE_COLOR_TEMP = "is_variable_color_temp"
LIGHT_SYSINFO_IS_COLOR = "is_color"
MAX_ATTEMPTS = 300
SLEEP_TIME = 2
class ColorTempRange(NamedTuple):
"""Color temperature range (in Kelvin)."""
min: int
max: int
TPLINK_KELVIN: dict[str, ColorTempRange] = {
"LB130": ColorTempRange(2500, 9000),
"LB120": ColorTempRange(2700, 6500),
"LB230": ColorTempRange(2500, 9000),
"KB130": ColorTempRange(2500, 9000),
"KL130": ColorTempRange(2500, 9000),
"KL125": ColorTempRange(2500, 6500),
r"KL120\(EU\)": ColorTempRange(2700, 6500),
r"KL120\(US\)": ColorTempRange(2700, 5000),
r"KL430\(US\)": ColorTempRange(2500, 9000),
}
FALLBACK_MIN_COLOR = 2700
FALLBACK_MAX_COLOR = 5000
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up lights."""
entities = await hass.async_add_executor_job(
add_available_devices, hass, CONF_LIGHT, TPLinkSmartBulb
)
if entities:
async_add_entities(entities, update_before_add=True)
if hass.data[TPLINK_DOMAIN][f"{CONF_LIGHT}_remaining"]:
raise PlatformNotReady
"""Set up switches."""
coordinator: TPLinkDataUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id]
device = coordinator.device
if device.is_bulb or device.is_light_strip or device.is_dimmer:
async_add_entities([TPLinkSmartBulb(device, coordinator)])
def brightness_to_percentage(byt):
"""Convert brightness from absolute 0..255 to percentage."""
return round((byt * 100.0) / 255.0)
def brightness_from_percentage(percent):
"""Convert percentage to absolute value 0..255."""
return round((percent * 255.0) / 100.0)
class LightState(NamedTuple):
"""Light state."""
state: bool
brightness: int
color_temp: float
hs: tuple[int, int]
def to_param(self):
"""Return a version that we can send to the bulb."""
color_temp = None
if self.color_temp:
color_temp = mired_to_kelvin(self.color_temp)
return {
LIGHT_STATE_ON_OFF: 1 if self.state else 0,
LIGHT_STATE_DFT_IGNORE: 1 if self.state else 0,
LIGHT_STATE_BRIGHTNESS: brightness_to_percentage(self.brightness),
LIGHT_STATE_COLOR_TEMP: color_temp,
LIGHT_STATE_HUE: self.hs[0] if self.hs else 0,
LIGHT_STATE_SATURATION: self.hs[1] if self.hs else 0,
}
class LightFeatures(NamedTuple):
"""Light features."""
sysinfo: dict[str, Any]
mac: str
alias: str
model: str
supported_features: int
min_mireds: float
max_mireds: float
has_emeter: bool
class TPLinkSmartBulb(LightEntity):
class TPLinkSmartBulb(CoordinatedTPLinkEntity, LightEntity):
"""Representation of a TPLink Smart Bulb."""
def __init__(self, smartbulb: SmartBulb) -> None:
"""Initialize the bulb."""
self.smartbulb = smartbulb
self._light_features = cast(LightFeatures, None)
self._light_state = cast(LightState, None)
self._is_available = True
self._is_setting_light_state = False
self._last_current_power_update = None
self._last_historical_power_update = None
self._emeter_params = {}
coordinator: TPLinkDataUpdateCoordinator
self._host = None
self._alias = None
@property
def unique_id(self) -> str | None:
"""Return a unique ID."""
return self._light_features.mac
@property
def name(self) -> str | None:
"""Return the name of the Smart Bulb."""
return self._light_features.alias
@property
def device_info(self) -> DeviceInfo:
"""Return information about the device."""
return {
"name": self._light_features.alias,
"model": self._light_features.model,
"manufacturer": "TP-Link",
"connections": {(dr.CONNECTION_NETWORK_MAC, self._light_features.mac)},
"sw_version": self._light_features.sysinfo["sw_ver"],
}
@property
def available(self) -> bool:
"""Return if bulb is available."""
return self._is_available
@property
def extra_state_attributes(self) -> Mapping[str, Any] | None:
"""Return the state attributes of the device."""
return self._emeter_params
def __init__(
self,
device: SmartDevice,
coordinator: TPLinkDataUpdateCoordinator,
) -> None:
"""Initialize the switch."""
super().__init__(device, coordinator)
# For backwards compat with pyHS100
self._attr_unique_id = self.device.mac.replace(":", "").upper()
@async_refresh_after
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn the light on."""
if ATTR_BRIGHTNESS in kwargs:
brightness = int(kwargs[ATTR_BRIGHTNESS])
elif self._light_state.brightness is not None:
brightness = self._light_state.brightness
else:
brightness = 255
transition = kwargs.get(ATTR_TRANSITION)
if (brightness := kwargs.get(ATTR_BRIGHTNESS)) is not None:
brightness = round((brightness * 100.0) / 255.0)
# Handle turning to temp mode
if ATTR_COLOR_TEMP in kwargs:
color_tmp = int(kwargs[ATTR_COLOR_TEMP])
else:
color_tmp = self._light_state.color_temp
color_tmp = mired_to_kelvin(int(kwargs[ATTR_COLOR_TEMP]))
_LOGGER.debug("Changing color temp to %s", color_tmp)
await self.device.set_color_temp(
color_tmp, brightness=brightness, transition=transition
)
return
# Handling turning to hs color mode
if ATTR_HS_COLOR in kwargs:
# TP-Link requires integers.
hue_sat = tuple(int(val) for val in kwargs[ATTR_HS_COLOR])
hue, sat = tuple(int(val) for val in kwargs[ATTR_HS_COLOR])
await self.device.set_hsv(hue, sat, brightness, transition=transition)
return
# TP-Link cannot have both color temp and hue_sat
color_tmp = 0
# Fallback to adjusting brightness or turning the bulb on
if brightness is not None:
await self.device.set_brightness(brightness, transition=transition)
else:
hue_sat = self._light_state.hs
await self._async_set_light_state_retry(
self._light_state,
self._light_state._replace(
state=True,
brightness=brightness,
color_temp=color_tmp,
hs=hue_sat,
),
)
await self.device.turn_on(transition=transition)
@async_refresh_after
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the light off."""
await self._async_set_light_state_retry(
self._light_state,
self._light_state._replace(state=False),
)
await self.device.turn_off(transition=kwargs.get(ATTR_TRANSITION))
@property
def min_mireds(self) -> int:
"""Return minimum supported color temperature."""
return self._light_features.min_mireds
return kelvin_to_mired(self.device.valid_temperature_range.max)
@property
def max_mireds(self) -> int:
"""Return maximum supported color temperature."""
return self._light_features.max_mireds
return kelvin_to_mired(self.device.valid_temperature_range.min)
@property
def color_temp(self) -> int | None:
"""Return the color temperature of this light in mireds for HA."""
return self._light_state.color_temp
return kelvin_to_mired(self.device.color_temp)
@property
def brightness(self) -> int | None:
"""Return the brightness of this light between 0..255."""
return self._light_state.brightness
return round((self.device.brightness * 255.0) / 100.0)
@property
def hs_color(self) -> tuple[float, float] | None:
def hs_color(self) -> tuple[int, int] | None:
"""Return the color."""
return self._light_state.hs
@property
def is_on(self) -> bool:
"""Return True if device is on."""
return self._light_state.state
def attempt_update(self, update_attempt: int) -> bool:
"""Attempt to get details the TP-Link bulb."""
# State is currently being set, ignore.
if self._is_setting_light_state:
return False
try:
if not self._light_features:
self._light_features = self._get_light_features()
self._alias = self._light_features.alias
self._host = self.smartbulb.host
self._light_state = self._get_light_state()
return True
except (SmartDeviceException, OSError) as ex:
if update_attempt == 0:
_LOGGER.debug(
"Retrying in %s seconds for %s|%s due to: %s",
SLEEP_TIME,
self._host,
self._alias,
ex,
)
return False
hue, saturation, _ = self.device.hsv
return hue, saturation
@property
def supported_features(self) -> int:
"""Flag supported features."""
return self._light_features.supported_features
return SUPPORT_TRANSITION
def _get_valid_temperature_range(self) -> ColorTempRange:
"""Return the device-specific white temperature range (in Kelvin).
@property
def supported_color_modes(self) -> set[str] | None:
"""Return list of available color modes."""
modes = set()
if self.device.is_variable_color_temp:
modes.add(COLOR_MODE_COLOR_TEMP)
if self.device.is_color:
modes.add(COLOR_MODE_HS)
if self.device.is_dimmable:
modes.add(COLOR_MODE_BRIGHTNESS)
:return: White temperature range in Kelvin (minimum, maximum)
"""
model = self.smartbulb.sys_info[LIGHT_SYSINFO_MODEL]
for obj, temp_range in TPLINK_KELVIN.items():
if re.match(obj, model):
return temp_range
# pyHS100 is abandoned, but some bulb definitions aren't present
# use "safe" values for something that advertises color temperature
return ColorTempRange(FALLBACK_MIN_COLOR, FALLBACK_MAX_COLOR)
if not modes:
modes.add(COLOR_MODE_ONOFF)
def _get_light_features(self) -> LightFeatures:
"""Determine all supported features in one go."""
sysinfo = self.smartbulb.sys_info
supported_features = 0
# Calling api here as it reformats
mac = self.smartbulb.mac
alias = sysinfo[LIGHT_SYSINFO_ALIAS]
model = sysinfo[LIGHT_SYSINFO_MODEL]
min_mireds = None
max_mireds = None
has_emeter = self.smartbulb.has_emeter
return modes
if sysinfo.get(LIGHT_SYSINFO_IS_DIMMABLE) or LIGHT_STATE_BRIGHTNESS in sysinfo:
supported_features += SUPPORT_BRIGHTNESS
if sysinfo.get(LIGHT_SYSINFO_IS_VARIABLE_COLOR_TEMP):
supported_features += SUPPORT_COLOR_TEMP
color_temp_range = self._get_valid_temperature_range()
min_mireds = kelvin_to_mired(color_temp_range.max)
max_mireds = kelvin_to_mired(color_temp_range.min)
if sysinfo.get(LIGHT_SYSINFO_IS_COLOR):
supported_features += SUPPORT_COLOR
@property
def color_mode(self) -> str | None:
"""Return the active color mode."""
if self.device.is_color:
if self.device.color_temp:
return COLOR_MODE_COLOR_TEMP
return COLOR_MODE_HS
if self.device.is_variable_color_temp:
return COLOR_MODE_COLOR_TEMP
return LightFeatures(
sysinfo=sysinfo,
mac=mac,
alias=alias,
model=model,
supported_features=supported_features,
min_mireds=min_mireds,
max_mireds=max_mireds,
has_emeter=has_emeter,
)
def _light_state_from_params(self, light_state_params: Any) -> LightState:
brightness = None
color_temp = None
hue_saturation = None
light_features = self._light_features
state = bool(light_state_params[LIGHT_STATE_ON_OFF])
if not state and LIGHT_STATE_DFT_ON in light_state_params:
light_state_params = light_state_params[LIGHT_STATE_DFT_ON]
if light_features.supported_features & SUPPORT_BRIGHTNESS:
brightness = brightness_from_percentage(
light_state_params[LIGHT_STATE_BRIGHTNESS]
)
if (
light_features.supported_features & SUPPORT_COLOR_TEMP
and light_state_params.get(LIGHT_STATE_COLOR_TEMP) is not None
and light_state_params[LIGHT_STATE_COLOR_TEMP] != 0
):
color_temp = kelvin_to_mired(light_state_params[LIGHT_STATE_COLOR_TEMP])
if color_temp is None and light_features.supported_features & SUPPORT_COLOR:
hue_saturation = (
light_state_params[LIGHT_STATE_HUE],
light_state_params[LIGHT_STATE_SATURATION],
)
return LightState(
state=state,
brightness=brightness,
color_temp=color_temp,
hs=hue_saturation,
)
def _get_light_state(self) -> LightState:
"""Get the light state."""
self._update_emeter()
return self._light_state_from_params(self._get_device_state())
def _update_emeter(self) -> None:
if not self._light_features.has_emeter:
return
now = dt_util.utcnow()
if (
not self._last_current_power_update
or self._last_current_power_update + CURRENT_POWER_UPDATE_INTERVAL < now
):
self._last_current_power_update = now
self._emeter_params[ATTR_CURRENT_POWER_W] = round(
float(self.smartbulb.current_consumption()), 1
)
if (
not self._last_historical_power_update
or self._last_historical_power_update + HISTORICAL_POWER_UPDATE_INTERVAL
< now
):
self._last_historical_power_update = now
daily_statistics = self.smartbulb.get_emeter_daily()
monthly_statistics = self.smartbulb.get_emeter_monthly()
try:
self._emeter_params[ATTR_DAILY_ENERGY_KWH] = round(
float(daily_statistics[int(time.strftime("%d"))]), 3
)
self._emeter_params[ATTR_MONTHLY_ENERGY_KWH] = round(
float(monthly_statistics[int(time.strftime("%m"))]), 3
)
except KeyError:
# device returned no daily/monthly history
pass
async def _async_set_light_state_retry(
self, old_light_state: LightState, new_light_state: LightState
) -> None:
"""Set the light state with retry."""
# Tell the device to set the states.
if not _light_state_diff(old_light_state, new_light_state):
# Nothing to do, avoid the executor
return
self._is_setting_light_state = True
try:
light_state_params = await self.hass.async_add_executor_job(
self._set_light_state, old_light_state, new_light_state
)
self._is_available = True
self._is_setting_light_state = False
if LIGHT_STATE_ERROR_MSG in light_state_params:
raise HomeAssistantError(light_state_params[LIGHT_STATE_ERROR_MSG])
# Some devices do not report the new state in their responses, so we skip
# set here and wait for the next poll to update the values. See #47600
if LIGHT_STATE_ON_OFF in light_state_params:
self._light_state = self._light_state_from_params(light_state_params)
return
except (SmartDeviceException, OSError):
pass
try:
_LOGGER.debug("Retrying setting light state")
light_state_params = await self.hass.async_add_executor_job(
self._set_light_state, old_light_state, new_light_state
)
self._is_available = True
if LIGHT_STATE_ERROR_MSG in light_state_params:
raise HomeAssistantError(light_state_params[LIGHT_STATE_ERROR_MSG])
self._light_state = self._light_state_from_params(light_state_params)
except (SmartDeviceException, OSError) as ex:
self._is_available = False
_LOGGER.warning("Could not set data for %s: %s", self.smartbulb.host, ex)
self._is_setting_light_state = False
def _set_light_state(
self, old_light_state: LightState, new_light_state: LightState
) -> None:
"""Set the light state."""
diff = _light_state_diff(old_light_state, new_light_state)
if not diff:
return
return self._set_device_state(diff)
def _get_device_state(self) -> dict:
"""State of the bulb or smart dimmer switch."""
if isinstance(self.smartbulb, SmartBulb):
return self.smartbulb.get_light_state()
sysinfo = self.smartbulb.sys_info
# Its not really a bulb, its a dimmable SmartPlug (aka Wall Switch)
return {
LIGHT_STATE_ON_OFF: sysinfo[LIGHT_STATE_RELAY_STATE],
LIGHT_STATE_BRIGHTNESS: sysinfo.get(LIGHT_STATE_BRIGHTNESS, 0),
LIGHT_STATE_COLOR_TEMP: 0,
LIGHT_STATE_HUE: 0,
LIGHT_STATE_SATURATION: 0,
}
def _set_device_state(self, state):
"""Set state of the bulb or smart dimmer switch."""
if isinstance(self.smartbulb, SmartBulb):
return self.smartbulb.set_light_state(state)
# Its not really a bulb, its a dimmable SmartPlug (aka Wall Switch)
if LIGHT_STATE_BRIGHTNESS in state:
# Brightness of 0 is accepted by the
# device but the underlying library rejects it
# so we turn off instead.
if state[LIGHT_STATE_BRIGHTNESS]:
self.smartbulb.brightness = state[LIGHT_STATE_BRIGHTNESS]
else:
self.smartbulb.state = self.smartbulb.SWITCH_STATE_OFF
elif LIGHT_STATE_ON_OFF in state:
if state[LIGHT_STATE_ON_OFF]:
self.smartbulb.state = self.smartbulb.SWITCH_STATE_ON
else:
self.smartbulb.state = self.smartbulb.SWITCH_STATE_OFF
return self._get_device_state()
async def async_update(self) -> None:
"""Update the TP-Link bulb's state."""
for update_attempt in range(MAX_ATTEMPTS):
is_ready = await self.hass.async_add_executor_job(
self.attempt_update, update_attempt
)
if is_ready:
self._is_available = True
if update_attempt > 0:
_LOGGER.debug(
"Device %s|%s responded after %s attempts",
self._host,
self._alias,
update_attempt,
)
break
await asyncio.sleep(SLEEP_TIME)
else:
if self._is_available:
_LOGGER.warning(
"Could not read state for %s|%s",
self._host,
self._alias,
)
self._is_available = False
def _light_state_diff(
old_light_state: LightState, new_light_state: LightState
) -> dict[str, Any]:
old_state_param = old_light_state.to_param()
new_state_param = new_light_state.to_param()
return {
key: value
for key, value in new_state_param.items()
if new_state_param.get(key) != old_state_param.get(key)
}
return COLOR_MODE_BRIGHTNESS

View File

@ -3,8 +3,9 @@
"name": "TP-Link Kasa Smart",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/tplink",
"requirements": ["pyHS100==0.3.5.2"],
"requirements": ["python-kasa==0.4.0"],
"codeowners": ["@rytilahti", "@thegardenmonkey"],
"quality_scale": "platinum",
"iot_class": "local_polling",
"dhcp": [
{
@ -27,6 +28,10 @@
"hostname": "hs*",
"macaddress": "B09575*"
},
{
"hostname": "hs*",
"macaddress": "C006C3*"
},
{
"hostname": "k[lp]*",
"macaddress": "1C3BF3*"
@ -47,6 +52,10 @@
"hostname": "k[lp]*",
"macaddress": "B09575*"
},
{
"hostname": "k[lp]*",
"macaddress": "C006C3*"
},
{
"hostname": "lb*",
"macaddress": "1C3BF3*"

View File

@ -0,0 +1,109 @@
"""Component to embed TP-Link smart home devices."""
from __future__ import annotations
from datetime import datetime
from homeassistant import config_entries
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_HOST,
CONF_MAC,
CONF_NAME,
EVENT_HOMEASSISTANT_STARTED,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers.typing import ConfigType
from .const import CONF_DIMMER, CONF_LIGHT, CONF_STRIP, CONF_SWITCH, DOMAIN
async def async_cleanup_legacy_entry(
hass: HomeAssistant,
legacy_entry_id: str,
) -> None:
"""Cleanup the legacy entry if the migration is successful."""
entity_registry = er.async_get(hass)
if not er.async_entries_for_config_entry(entity_registry, legacy_entry_id):
await hass.config_entries.async_remove(legacy_entry_id)
@callback
def async_migrate_legacy_entries(
hass: HomeAssistant,
hosts_by_mac: dict[str, str],
config_entries_by_mac: dict[str, ConfigEntry],
legacy_entry: ConfigEntry,
) -> None:
"""Migrate the legacy config entries to have an entry per device."""
device_registry = dr.async_get(hass)
for dev_entry in dr.async_entries_for_config_entry(
device_registry, legacy_entry.entry_id
):
for connection_type, mac in dev_entry.connections:
if (
connection_type != dr.CONNECTION_NETWORK_MAC
or mac in config_entries_by_mac
):
continue
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN,
context={"source": "migration"},
data={
CONF_HOST: hosts_by_mac.get(mac),
CONF_MAC: mac,
CONF_NAME: dev_entry.name or f"TP-Link device {mac}",
},
)
)
async def _async_cleanup_legacy_entry(_now: datetime) -> None:
await async_cleanup_legacy_entry(hass, legacy_entry.entry_id)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, _async_cleanup_legacy_entry)
@callback
def async_migrate_yaml_entries(hass: HomeAssistant, conf: ConfigType) -> None:
"""Migrate yaml to config entries."""
for device_type in (CONF_LIGHT, CONF_SWITCH, CONF_STRIP, CONF_DIMMER):
for device in conf.get(device_type, []):
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={
CONF_HOST: device[CONF_HOST],
},
)
)
async def async_migrate_entities_devices(
hass: HomeAssistant, legacy_entry_id: str, new_entry: ConfigEntry
) -> None:
"""Move entities and devices to the new config entry."""
migrated_devices = []
device_registry = dr.async_get(hass)
for dev_entry in dr.async_entries_for_config_entry(
device_registry, legacy_entry_id
):
for connection_type, value in dev_entry.connections:
if (
connection_type == dr.CONNECTION_NETWORK_MAC
and value == new_entry.unique_id
):
migrated_devices.append(dev_entry.id)
device_registry.async_update_device(
dev_entry.id, add_config_entry_id=new_entry.entry_id
)
entity_registry = er.async_get(hass)
for reg_entity in er.async_entries_for_config_entry(
entity_registry, legacy_entry_id
):
if reg_entity.device_id in migrated_devices:
entity_registry.async_update_entity(
reg_entity.entity_id, config_entry_id=new_entry.entry_id
)

View File

@ -1,9 +1,10 @@
"""Support for TPLink HS100/HS110/HS200 smart switch energy sensors."""
from __future__ import annotations
from typing import Any, Final
from dataclasses import dataclass
from typing import cast
from pyHS100 import SmartPlug
from kasa import SmartDevice
from homeassistant.components.sensor import (
STATE_CLASS_MEASUREMENT,
@ -11,13 +12,9 @@ from homeassistant.components.sensor import (
SensorEntity,
SensorEntityDescription,
)
from homeassistant.components.tplink import SmartPlugDataUpdateCoordinator
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
ATTR_VOLTAGE,
CONF_ALIAS,
CONF_DEVICE_ID,
CONF_MAC,
DEVICE_CLASS_CURRENT,
DEVICE_CLASS_ENERGY,
DEVICE_CLASS_POWER,
@ -28,65 +25,86 @@ from homeassistant.const import (
POWER_WATT,
)
from homeassistant.core import HomeAssistant
import homeassistant.helpers.device_registry as dr
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
)
from . import legacy_device_id
from .const import (
CONF_EMETER_PARAMS,
CONF_MODEL,
CONF_SW_VERSION,
CONF_SWITCH,
COORDINATORS,
DOMAIN as TPLINK_DOMAIN,
ATTR_CURRENT_A,
ATTR_CURRENT_POWER_W,
ATTR_TODAY_ENERGY_KWH,
ATTR_TOTAL_ENERGY_KWH,
DOMAIN,
)
from .coordinator import TPLinkDataUpdateCoordinator
from .entity import CoordinatedTPLinkEntity
ATTR_CURRENT_A = "current_a"
ATTR_CURRENT_POWER_W = "current_power_w"
ATTR_TODAY_ENERGY_KWH = "today_energy_kwh"
ATTR_TOTAL_ENERGY_KWH = "total_energy_kwh"
ENERGY_SENSORS: Final[list[SensorEntityDescription]] = [
SensorEntityDescription(
@dataclass
class TPLinkSensorEntityDescription(SensorEntityDescription):
"""Describes TPLink sensor entity."""
emeter_attr: str | None = None
ENERGY_SENSORS: tuple[TPLinkSensorEntityDescription, ...] = (
TPLinkSensorEntityDescription(
key=ATTR_CURRENT_POWER_W,
native_unit_of_measurement=POWER_WATT,
device_class=DEVICE_CLASS_POWER,
state_class=STATE_CLASS_MEASUREMENT,
name="Current Consumption",
emeter_attr="power",
),
SensorEntityDescription(
TPLinkSensorEntityDescription(
key=ATTR_TOTAL_ENERGY_KWH,
native_unit_of_measurement=ENERGY_KILO_WATT_HOUR,
device_class=DEVICE_CLASS_ENERGY,
state_class=STATE_CLASS_TOTAL_INCREASING,
name="Total Consumption",
emeter_attr="total",
),
SensorEntityDescription(
TPLinkSensorEntityDescription(
key=ATTR_TODAY_ENERGY_KWH,
native_unit_of_measurement=ENERGY_KILO_WATT_HOUR,
device_class=DEVICE_CLASS_ENERGY,
state_class=STATE_CLASS_TOTAL_INCREASING,
name="Today's Consumption",
),
SensorEntityDescription(
TPLinkSensorEntityDescription(
key=ATTR_VOLTAGE,
native_unit_of_measurement=ELECTRIC_POTENTIAL_VOLT,
device_class=DEVICE_CLASS_VOLTAGE,
state_class=STATE_CLASS_MEASUREMENT,
name="Voltage",
emeter_attr="voltage",
),
SensorEntityDescription(
TPLinkSensorEntityDescription(
key=ATTR_CURRENT_A,
native_unit_of_measurement=ELECTRIC_CURRENT_AMPERE,
device_class=DEVICE_CLASS_CURRENT,
state_class=STATE_CLASS_MEASUREMENT,
name="Current",
emeter_attr="current",
),
]
)
def async_emeter_from_device(
device: SmartDevice, description: TPLinkSensorEntityDescription
) -> float | None:
"""Map a sensor key to the device attribute."""
if attr := description.emeter_attr:
val = getattr(device.emeter_realtime, attr)
if val is None:
return None
return cast(float, val)
# ATTR_TODAY_ENERGY_KWH
if (emeter_today := device.emeter_today) is not None:
return cast(float, emeter_today)
# today's consumption not available, when device was off all the day
# bulb's do not report this information, so filter it out
return None if device.is_bulb else 0.0
async def async_setup_entry(
@ -94,62 +112,58 @@ async def async_setup_entry(
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up switches."""
"""Set up sensors."""
coordinator: TPLinkDataUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id]
entities: list[SmartPlugSensor] = []
coordinators: list[SmartPlugDataUpdateCoordinator] = hass.data[TPLINK_DOMAIN][
COORDINATORS
]
switches: list[SmartPlug] = hass.data[TPLINK_DOMAIN][CONF_SWITCH]
for switch in switches:
coordinator: SmartPlugDataUpdateCoordinator = coordinators[
switch.context or switch.mac
parent = coordinator.device
if not parent.has_emeter:
return
def _async_sensors_for_device(device: SmartDevice) -> list[SmartPlugSensor]:
return [
SmartPlugSensor(device, coordinator, description)
for description in ENERGY_SENSORS
if async_emeter_from_device(device, description) is not None
]
if not switch.has_emeter and coordinator.data.get(CONF_EMETER_PARAMS) is None:
continue
for description in ENERGY_SENSORS:
if coordinator.data[CONF_EMETER_PARAMS].get(description.key) is not None:
entities.append(SmartPlugSensor(switch, coordinator, description))
if parent.is_strip:
# Historically we only add the children if the device is a strip
for child in parent.children:
entities.extend(_async_sensors_for_device(child))
else:
entities.extend(_async_sensors_for_device(parent))
async_add_entities(entities)
class SmartPlugSensor(CoordinatorEntity, SensorEntity):
class SmartPlugSensor(CoordinatedTPLinkEntity, SensorEntity):
"""Representation of a TPLink Smart Plug energy sensor."""
coordinator: TPLinkDataUpdateCoordinator
entity_description: TPLinkSensorEntityDescription
def __init__(
self,
smartplug: SmartPlug,
coordinator: DataUpdateCoordinator,
description: SensorEntityDescription,
device: SmartDevice,
coordinator: TPLinkDataUpdateCoordinator,
description: TPLinkSensorEntityDescription,
) -> None:
"""Initialize the switch."""
super().__init__(coordinator)
self.smartplug = smartplug
super().__init__(device, coordinator)
self.entity_description = description
self._attr_name = f"{coordinator.data[CONF_ALIAS]} {description.name}"
self._attr_unique_id = (
f"{legacy_device_id(self.device)}_{self.entity_description.key}"
)
@property
def data(self) -> dict[str, Any]:
"""Return data from DataUpdateCoordinator."""
return self.coordinator.data
def name(self) -> str:
"""Return the name of the Smart Plug.
Overridden to include the description.
"""
return f"{self.device.alias} {self.entity_description.name}"
@property
def native_value(self) -> float | None:
"""Return the sensors state."""
return self.data[CONF_EMETER_PARAMS][self.entity_description.key]
@property
def unique_id(self) -> str | None:
"""Return a unique ID."""
return f"{self.data[CONF_DEVICE_ID]}_{self.entity_description.key}"
@property
def device_info(self) -> DeviceInfo:
"""Return information about the device."""
return {
"name": self.data[CONF_ALIAS],
"model": self.data[CONF_MODEL],
"manufacturer": "TP-Link",
"connections": {(dr.CONNECTION_NETWORK_MAC, self.data[CONF_MAC])},
"sw_version": self.data[CONF_SW_VERSION],
}
return async_emeter_from_device(self.device, self.entity_description)

View File

@ -1,12 +1,27 @@
{
"config": {
"flow_title": "{name} {model} ({host})",
"step": {
"confirm": {
"description": "Do you want to setup TP-Link smart devices?"
"user": {
"description": "If you leave the host empty, discovery will be used to find devices.",
"data": {
"host": "[%key:common::config_flow::data::host%]"
}
},
"pick_device": {
"data": {
"device": "Device"
}
},
"discovery_confirm": {
"description": "Do you want to setup {name} {model} ({host})?"
}
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]"
},
"abort": {
"single_instance_allowed": "[%key:common::config_flow::abort::single_instance_allowed%]",
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"no_devices_found": "[%key:common::config_flow::abort::no_devices_found%]"
}
}

View File

@ -1,31 +1,22 @@
"""Support for TPLink HS100/HS110/HS200 smart switch."""
from __future__ import annotations
from asyncio import sleep
import logging
from typing import Any
from pyHS100 import SmartPlug
from kasa import SmartDevice
from homeassistant.components.switch import SwitchEntity
from homeassistant.components.tplink import SmartPlugDataUpdateCoordinator
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_ALIAS, CONF_DEVICE_ID, CONF_MAC, CONF_STATE
from homeassistant.core import HomeAssistant
import homeassistant.helpers.device_registry as dr
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
)
from .const import (
CONF_MODEL,
CONF_SW_VERSION,
CONF_SWITCH,
COORDINATORS,
DOMAIN as TPLINK_DOMAIN,
)
from . import legacy_device_id
from .const import DOMAIN
from .coordinator import TPLinkDataUpdateCoordinator
from .entity import CoordinatedTPLinkEntity, async_refresh_after
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(
@ -34,71 +25,43 @@ async def async_setup_entry(
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up switches."""
entities: list[SmartPlugSwitch] = []
coordinators: list[SmartPlugDataUpdateCoordinator] = hass.data[TPLINK_DOMAIN][
COORDINATORS
]
switches: list[SmartPlug] = hass.data[TPLINK_DOMAIN][CONF_SWITCH]
for switch in switches:
coordinator = coordinators[switch.context or switch.mac]
entities.append(SmartPlugSwitch(switch, coordinator))
coordinator: TPLinkDataUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id]
device = coordinator.device
if not device.is_plug and not device.is_strip:
return
entities = []
if device.is_strip:
# Historically we only add the children if the device is a strip
_LOGGER.debug("Initializing strip with %s sockets", len(device.children))
for child in device.children:
entities.append(SmartPlugSwitch(child, coordinator))
else:
entities.append(SmartPlugSwitch(device, coordinator))
async_add_entities(entities)
class SmartPlugSwitch(CoordinatorEntity, SwitchEntity):
class SmartPlugSwitch(CoordinatedTPLinkEntity, SwitchEntity):
"""Representation of a TPLink Smart Plug switch."""
coordinator: TPLinkDataUpdateCoordinator
def __init__(
self, smartplug: SmartPlug, coordinator: DataUpdateCoordinator
self,
device: SmartDevice,
coordinator: TPLinkDataUpdateCoordinator,
) -> None:
"""Initialize the switch."""
super().__init__(coordinator)
self.smartplug = smartplug
@property
def data(self) -> dict[str, Any]:
"""Return data from DataUpdateCoordinator."""
return self.coordinator.data
@property
def unique_id(self) -> str | None:
"""Return a unique ID."""
return self.data[CONF_DEVICE_ID]
@property
def name(self) -> str | None:
"""Return the name of the Smart Plug."""
return self.data[CONF_ALIAS]
@property
def device_info(self) -> DeviceInfo:
"""Return information about the device."""
return {
"name": self.data[CONF_ALIAS],
"model": self.data[CONF_MODEL],
"manufacturer": "TP-Link",
"connections": {(dr.CONNECTION_NETWORK_MAC, self.data[CONF_MAC])},
"sw_version": self.data[CONF_SW_VERSION],
}
@property
def is_on(self) -> bool | None:
"""Return true if switch is on."""
return self.data[CONF_STATE]
super().__init__(device, coordinator)
# For backwards compat with pyHS100
self._attr_unique_id = legacy_device_id(device)
@async_refresh_after
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn the switch on."""
await self.hass.async_add_executor_job(self.smartplug.turn_on)
# Workaround for delayed device state update on HS210: #55190
if "HS210" in self.device_info["model"]:
await sleep(0.5)
await self.coordinator.async_refresh()
await self.device.turn_on()
@async_refresh_after
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the switch off."""
await self.hass.async_add_executor_job(self.smartplug.turn_off)
# Workaround for delayed device state update on HS210: #55190
if "HS210" in self.device_info["model"]:
await sleep(0.5)
await self.coordinator.async_refresh()
await self.device.turn_off()

View File

@ -1,12 +1,27 @@
{
"config": {
"abort": {
"no_devices_found": "No devices found on the network",
"single_instance_allowed": "Already configured. Only a single configuration possible."
"already_configured": "Device is already configured",
"no_devices_found": "No devices found on the network"
},
"error": {
"cannot_connect": "Failed to connect"
},
"flow_title": "{name} {model} ({host})",
"step": {
"confirm": {
"description": "Do you want to setup TP-Link smart devices?"
"discovery_confirm": {
"description": "Do you want to setup {name} {model} ({host})?"
},
"pick_device": {
"data": {
"device": "Device"
}
},
"user": {
"data": {
"host": "Host"
},
"description": "If you leave the host empty, discovery will be used to find devices."
}
}
}

View File

@ -289,6 +289,11 @@ DHCP = [
"hostname": "hs*",
"macaddress": "B09575*"
},
{
"domain": "tplink",
"hostname": "hs*",
"macaddress": "C006C3*"
},
{
"domain": "tplink",
"hostname": "k[lp]*",
@ -314,6 +319,11 @@ DHCP = [
"hostname": "k[lp]*",
"macaddress": "B09575*"
},
{
"domain": "tplink",
"hostname": "k[lp]*",
"macaddress": "C006C3*"
},
{
"domain": "tplink",
"hostname": "lb*",

View File

@ -1188,6 +1188,17 @@ no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.tplink.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.tradfri.*]
check_untyped_defs = true
disallow_incomplete_defs = true
@ -1689,9 +1700,6 @@ ignore_errors = true
[mypy-homeassistant.components.toon.*]
ignore_errors = true
[mypy-homeassistant.components.tplink.*]
ignore_errors = true
[mypy-homeassistant.components.unifi.*]
ignore_errors = true

View File

@ -1307,9 +1307,6 @@ pyCEC==0.5.1
# homeassistant.components.control4
pyControl4==0.0.6
# homeassistant.components.tplink
pyHS100==0.3.5.2
# homeassistant.components.met_eireann
pyMetEireann==2021.8.0
@ -1894,6 +1891,9 @@ python-join-api==0.0.6
# homeassistant.components.juicenet
python-juicenet==1.0.2
# homeassistant.components.tplink
python-kasa==0.4.0
# homeassistant.components.lirc
# python-lirc==1.2.3

View File

@ -758,9 +758,6 @@ py17track==3.2.1
# homeassistant.components.control4
pyControl4==0.0.6
# homeassistant.components.tplink
pyHS100==0.3.5.2
# homeassistant.components.met_eireann
pyMetEireann==2021.8.0
@ -1093,6 +1090,9 @@ python-izone==1.1.6
# homeassistant.components.juicenet
python-juicenet==1.0.2
# homeassistant.components.tplink
python-kasa==0.4.0
# homeassistant.components.xiaomi_miio
python-miio==0.5.8

View File

@ -126,7 +126,6 @@ IGNORED_MODULES: Final[list[str]] = [
"homeassistant.components.telegram_bot.*",
"homeassistant.components.template.*",
"homeassistant.components.toon.*",
"homeassistant.components.tplink.*",
"homeassistant.components.unifi.*",
"homeassistant.components.upnp.*",
"homeassistant.components.vera.*",

View File

@ -1 +1,106 @@
"""Tests for the TP-Link component."""
from unittest.mock import AsyncMock, MagicMock, patch
from kasa import SmartBulb, SmartPlug, SmartStrip
from kasa.exceptions import SmartDeviceException
MODULE = "homeassistant.components.tplink"
MODULE_CONFIG_FLOW = "homeassistant.components.tplink.config_flow"
IP_ADDRESS = "127.0.0.1"
ALIAS = "My Bulb"
MODEL = "HS100"
MAC_ADDRESS = "aa:bb:cc:dd:ee:ff"
DEFAULT_ENTRY_TITLE = f"{ALIAS} {MODEL}"
def _mocked_bulb() -> SmartBulb:
bulb = MagicMock(auto_spec=SmartBulb)
bulb.update = AsyncMock()
bulb.mac = MAC_ADDRESS
bulb.alias = ALIAS
bulb.model = MODEL
bulb.host = IP_ADDRESS
bulb.brightness = 50
bulb.color_temp = 4000
bulb.is_color = True
bulb.is_strip = False
bulb.is_plug = False
bulb.hsv = (10, 30, 5)
bulb.device_id = MAC_ADDRESS
bulb.valid_temperature_range.min = 4000
bulb.valid_temperature_range.max = 9000
bulb.hw_info = {"sw_ver": "1.0.0"}
bulb.turn_off = AsyncMock()
bulb.turn_on = AsyncMock()
bulb.set_brightness = AsyncMock()
bulb.set_hsv = AsyncMock()
bulb.set_color_temp = AsyncMock()
return bulb
def _mocked_plug() -> SmartPlug:
plug = MagicMock(auto_spec=SmartPlug)
plug.update = AsyncMock()
plug.mac = MAC_ADDRESS
plug.alias = "My Plug"
plug.model = MODEL
plug.host = IP_ADDRESS
plug.is_light_strip = False
plug.is_bulb = False
plug.is_dimmer = False
plug.is_strip = False
plug.is_plug = True
plug.device_id = MAC_ADDRESS
plug.hw_info = {"sw_ver": "1.0.0"}
plug.turn_off = AsyncMock()
plug.turn_on = AsyncMock()
return plug
def _mocked_strip() -> SmartStrip:
strip = MagicMock(auto_spec=SmartStrip)
strip.update = AsyncMock()
strip.mac = MAC_ADDRESS
strip.alias = "My Strip"
strip.model = MODEL
strip.host = IP_ADDRESS
strip.is_light_strip = False
strip.is_bulb = False
strip.is_dimmer = False
strip.is_strip = True
strip.is_plug = True
strip.device_id = MAC_ADDRESS
strip.hw_info = {"sw_ver": "1.0.0"}
strip.turn_off = AsyncMock()
strip.turn_on = AsyncMock()
plug0 = _mocked_plug()
plug0.alias = "Plug0"
plug0.device_id = "bb:bb:cc:dd:ee:ff_PLUG0DEVICEID"
plug0.mac = "bb:bb:cc:dd:ee:ff"
plug1 = _mocked_plug()
plug1.device_id = "cc:bb:cc:dd:ee:ff_PLUG1DEVICEID"
plug1.mac = "cc:bb:cc:dd:ee:ff"
plug1.alias = "Plug1"
strip.children = [plug0, plug1]
return strip
def _patch_discovery(device=None, no_device=False):
async def _discovery(*_):
if no_device:
return {}
return {IP_ADDRESS: _mocked_bulb()}
return patch("homeassistant.components.tplink.Discover.discover", new=_discovery)
def _patch_single_discovery(device=None, no_device=False):
async def _discover_single(*_):
if no_device:
raise SmartDeviceException
return device if device else _mocked_bulb()
return patch(
"homeassistant.components.tplink.Discover.discover_single", new=_discover_single
)

View File

@ -1,2 +1,27 @@
"""tplink conftest."""
from tests.components.light.conftest import mock_light_profiles # noqa: F401
import pytest
from . import _patch_discovery
from tests.common import mock_device_registry, mock_registry
@pytest.fixture
def mock_discovery():
"""Mock python-kasa discovery."""
with _patch_discovery() as mock_discover:
mock_discover.return_value = {}
yield mock_discover
@pytest.fixture(name="device_reg")
def device_reg_fixture(hass):
"""Return an empty, loaded, registry."""
return mock_device_registry(hass)
@pytest.fixture(name="entity_reg")
def entity_reg_fixture(hass):
"""Return an empty, loaded, registry."""
return mock_registry(hass)

View File

@ -0,0 +1,477 @@
"""Test the tplink config flow."""
from unittest.mock import patch
import pytest
from homeassistant import config_entries, setup
from homeassistant.components.tplink import DOMAIN
from homeassistant.const import CONF_DEVICE, CONF_HOST, CONF_MAC, CONF_NAME
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import RESULT_TYPE_ABORT, RESULT_TYPE_FORM
from . import (
ALIAS,
DEFAULT_ENTRY_TITLE,
IP_ADDRESS,
MAC_ADDRESS,
MODULE,
_patch_discovery,
_patch_single_discovery,
)
from tests.common import MockConfigEntry
async def test_discovery(hass: HomeAssistant):
"""Test setting up discovery."""
with _patch_discovery(), _patch_single_discovery():
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await hass.async_block_till_done()
assert result["type"] == "form"
assert result["step_id"] == "user"
assert not result["errors"]
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], {})
await hass.async_block_till_done()
assert result2["type"] == "form"
assert result2["step_id"] == "pick_device"
assert not result2["errors"]
# test we can try again
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
assert not result["errors"]
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], {})
await hass.async_block_till_done()
assert result2["type"] == "form"
assert result2["step_id"] == "pick_device"
assert not result2["errors"]
with _patch_discovery(), _patch_single_discovery(), patch(
f"{MODULE}.async_setup", return_value=True
) as mock_setup, patch(
f"{MODULE}.async_setup_entry", return_value=True
) as mock_setup_entry:
result3 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_DEVICE: MAC_ADDRESS},
)
await hass.async_block_till_done()
assert result3["type"] == "create_entry"
assert result3["title"] == DEFAULT_ENTRY_TITLE
assert result3["data"] == {CONF_HOST: IP_ADDRESS}
mock_setup.assert_called_once()
mock_setup_entry.assert_called_once()
# ignore configured devices
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
assert not result["errors"]
with _patch_discovery(), _patch_single_discovery():
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], {})
await hass.async_block_till_done()
assert result2["type"] == "abort"
assert result2["reason"] == "no_devices_found"
async def test_discovery_with_existing_device_present(hass: HomeAssistant):
"""Test setting up discovery."""
config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: "127.0.0.2"}, unique_id="dd:dd:dd:dd:dd:dd"
)
config_entry.add_to_hass(hass)
with _patch_discovery(), _patch_single_discovery(no_device=True):
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
assert not result["errors"]
with _patch_discovery(), _patch_single_discovery():
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], {})
await hass.async_block_till_done()
assert result2["type"] == "form"
assert result2["step_id"] == "pick_device"
assert not result2["errors"]
# Now abort and make sure we can start over
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
assert not result["errors"]
with _patch_discovery(), _patch_single_discovery():
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], {})
await hass.async_block_till_done()
assert result2["type"] == "form"
assert result2["step_id"] == "pick_device"
assert not result2["errors"]
with _patch_discovery(), _patch_single_discovery(), patch(
f"{MODULE}.async_setup_entry", return_value=True
) as mock_setup_entry:
result3 = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_DEVICE: MAC_ADDRESS}
)
assert result3["type"] == "create_entry"
assert result3["title"] == DEFAULT_ENTRY_TITLE
assert result3["data"] == {
CONF_HOST: IP_ADDRESS,
}
await hass.async_block_till_done()
mock_setup_entry.assert_called_once()
# ignore configured devices
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
assert not result["errors"]
with _patch_discovery(), _patch_single_discovery():
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], {})
await hass.async_block_till_done()
assert result2["type"] == "abort"
assert result2["reason"] == "no_devices_found"
async def test_discovery_no_device(hass: HomeAssistant):
"""Test discovery without device."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
with _patch_discovery(no_device=True), _patch_single_discovery():
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], {})
await hass.async_block_till_done()
assert result2["type"] == "abort"
assert result2["reason"] == "no_devices_found"
async def test_import(hass: HomeAssistant):
"""Test import from yaml."""
config = {
CONF_HOST: IP_ADDRESS,
}
# Cannot connect
with _patch_discovery(no_device=True), _patch_single_discovery(no_device=True):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_IMPORT}, data=config
)
await hass.async_block_till_done()
assert result["type"] == "abort"
assert result["reason"] == "cannot_connect"
# Success
with _patch_discovery(), _patch_single_discovery(), patch(
f"{MODULE}.async_setup", return_value=True
) as mock_setup, patch(
f"{MODULE}.async_setup_entry", return_value=True
) as mock_setup_entry:
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_IMPORT}, data=config
)
await hass.async_block_till_done()
assert result["type"] == "create_entry"
assert result["title"] == DEFAULT_ENTRY_TITLE
assert result["data"] == {
CONF_HOST: IP_ADDRESS,
}
mock_setup.assert_called_once()
mock_setup_entry.assert_called_once()
# Duplicate
with _patch_discovery(), _patch_single_discovery():
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_IMPORT}, data=config
)
await hass.async_block_till_done()
assert result["type"] == "abort"
assert result["reason"] == "already_configured"
async def test_manual(hass: HomeAssistant):
"""Test manually setup."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
assert not result["errors"]
# Cannot connect (timeout)
with _patch_discovery(no_device=True), _patch_single_discovery(no_device=True):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_HOST: IP_ADDRESS}
)
await hass.async_block_till_done()
assert result2["type"] == "form"
assert result2["step_id"] == "user"
assert result2["errors"] == {"base": "cannot_connect"}
# Success
with _patch_discovery(), _patch_single_discovery(), patch(
f"{MODULE}.async_setup", return_value=True
), patch(f"{MODULE}.async_setup_entry", return_value=True):
result4 = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_HOST: IP_ADDRESS}
)
await hass.async_block_till_done()
assert result4["type"] == "create_entry"
assert result4["title"] == DEFAULT_ENTRY_TITLE
assert result4["data"] == {
CONF_HOST: IP_ADDRESS,
}
# Duplicate
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
with _patch_discovery(no_device=True), _patch_single_discovery(no_device=True):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_HOST: IP_ADDRESS}
)
await hass.async_block_till_done()
assert result2["type"] == "abort"
assert result2["reason"] == "already_configured"
async def test_manual_no_capabilities(hass: HomeAssistant):
"""Test manually setup without successful get_capabilities."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
assert not result["errors"]
with _patch_discovery(no_device=True), _patch_single_discovery(), patch(
f"{MODULE}.async_setup", return_value=True
), patch(f"{MODULE}.async_setup_entry", return_value=True):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_HOST: IP_ADDRESS}
)
await hass.async_block_till_done()
assert result["type"] == "create_entry"
assert result["data"] == {
CONF_HOST: IP_ADDRESS,
}
async def test_discovered_by_discovery_and_dhcp(hass):
"""Test we get the form with discovery and abort for dhcp source when we get both."""
await setup.async_setup_component(hass, "persistent_notification", {})
with _patch_discovery(), _patch_single_discovery():
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_DISCOVERY},
data={CONF_HOST: IP_ADDRESS, CONF_MAC: MAC_ADDRESS, CONF_NAME: ALIAS},
)
await hass.async_block_till_done()
assert result["type"] == RESULT_TYPE_FORM
assert result["errors"] is None
with _patch_discovery(), _patch_single_discovery():
result2 = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_DHCP},
data={"ip": IP_ADDRESS, "macaddress": MAC_ADDRESS, "hostname": ALIAS},
)
await hass.async_block_till_done()
assert result2["type"] == RESULT_TYPE_ABORT
assert result2["reason"] == "already_in_progress"
with _patch_discovery(), _patch_single_discovery():
result3 = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_DHCP},
data={"ip": IP_ADDRESS, "macaddress": "00:00:00:00:00:00"},
)
await hass.async_block_till_done()
assert result3["type"] == RESULT_TYPE_ABORT
assert result3["reason"] == "already_in_progress"
with _patch_discovery(no_device=True), _patch_single_discovery(no_device=True):
result3 = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_DHCP},
data={"ip": "1.2.3.5", "macaddress": "00:00:00:00:00:01"},
)
await hass.async_block_till_done()
assert result3["type"] == RESULT_TYPE_ABORT
assert result3["reason"] == "cannot_connect"
@pytest.mark.parametrize(
"source, data",
[
(
config_entries.SOURCE_DHCP,
{"ip": IP_ADDRESS, "macaddress": MAC_ADDRESS, "hostname": ALIAS},
),
(
config_entries.SOURCE_DISCOVERY,
{CONF_HOST: IP_ADDRESS, CONF_MAC: MAC_ADDRESS, CONF_NAME: ALIAS},
),
],
)
async def test_discovered_by_dhcp_or_discovery(hass, source, data):
"""Test we can setup when discovered from dhcp or discovery."""
await setup.async_setup_component(hass, "persistent_notification", {})
with _patch_discovery(), _patch_single_discovery():
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": source}, data=data
)
await hass.async_block_till_done()
assert result["type"] == RESULT_TYPE_FORM
assert result["errors"] is None
with _patch_discovery(), _patch_single_discovery(), patch(
f"{MODULE}.async_setup", return_value=True
) as mock_async_setup, patch(
f"{MODULE}.async_setup_entry", return_value=True
) as mock_async_setup_entry:
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], {})
await hass.async_block_till_done()
assert result2["type"] == "create_entry"
assert result2["data"] == {
CONF_HOST: IP_ADDRESS,
}
assert mock_async_setup.called
assert mock_async_setup_entry.called
@pytest.mark.parametrize(
"source, data",
[
(
config_entries.SOURCE_DHCP,
{"ip": IP_ADDRESS, "macaddress": MAC_ADDRESS, "hostname": ALIAS},
),
(
config_entries.SOURCE_DISCOVERY,
{CONF_HOST: IP_ADDRESS, CONF_MAC: MAC_ADDRESS, CONF_NAME: ALIAS},
),
],
)
async def test_discovered_by_dhcp_or_discovery_failed_to_get_device(hass, source, data):
"""Test we abort if we cannot get the unique id when discovered from dhcp."""
await setup.async_setup_component(hass, "persistent_notification", {})
with _patch_discovery(no_device=True), _patch_single_discovery(no_device=True):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": source}, data=data
)
await hass.async_block_till_done()
assert result["type"] == RESULT_TYPE_ABORT
assert result["reason"] == "cannot_connect"
async def test_migration_device_online(hass: HomeAssistant):
"""Test migration from single config entry."""
config_entry = MockConfigEntry(domain=DOMAIN, data={}, unique_id=DOMAIN)
config_entry.add_to_hass(hass)
config = {CONF_MAC: MAC_ADDRESS, CONF_NAME: ALIAS, CONF_HOST: IP_ADDRESS}
with _patch_discovery(), _patch_single_discovery(), patch(
f"{MODULE}.async_setup_entry", return_value=True
) as mock_setup_entry:
await setup.async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": "migration"}, data=config
)
await hass.async_block_till_done()
assert result["type"] == "create_entry"
assert result["title"] == ALIAS
assert result["data"] == {
CONF_HOST: IP_ADDRESS,
}
assert len(mock_setup_entry.mock_calls) == 2
# Duplicate
with _patch_discovery(), _patch_single_discovery():
await setup.async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": "migration"}, data=config
)
await hass.async_block_till_done()
assert result["type"] == "abort"
assert result["reason"] == "already_configured"
async def test_migration_device_offline(hass: HomeAssistant):
"""Test migration from single config entry."""
config_entry = MockConfigEntry(domain=DOMAIN, data={}, unique_id=DOMAIN)
config_entry.add_to_hass(hass)
config = {CONF_MAC: MAC_ADDRESS, CONF_NAME: ALIAS, CONF_HOST: None}
with _patch_discovery(no_device=True), _patch_single_discovery(
no_device=True
), patch(f"{MODULE}.async_setup_entry", return_value=True) as mock_setup_entry:
await setup.async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": "migration"}, data=config
)
await hass.async_block_till_done()
assert result["type"] == "create_entry"
assert result["title"] == ALIAS
new_entry = result["result"]
assert result["data"] == {
CONF_HOST: None,
}
assert len(mock_setup_entry.mock_calls) == 2
# Ensure a manual import updates the missing host
config = {CONF_HOST: IP_ADDRESS}
with _patch_discovery(no_device=True), _patch_single_discovery():
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_IMPORT}, data=config
)
await hass.async_block_till_done()
assert result["type"] == "abort"
assert result["reason"] == "already_configured"
assert new_entry.data[CONF_HOST] == IP_ADDRESS

View File

@ -1,69 +1,22 @@
"""Tests for the TP-Link component."""
from __future__ import annotations
import time
from typing import Any
from unittest.mock import MagicMock, patch
from unittest.mock import patch
from pyHS100 import SmartBulb, SmartDevice, SmartDeviceException, SmartPlug, smartstrip
from pyHS100.smartdevice import EmeterStatus
import pytest
from homeassistant import config_entries, data_entry_flow
from homeassistant.components import tplink
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
from homeassistant.components.switch import DOMAIN as SWITCH_DOMAIN
from homeassistant.components.tplink.common import SmartDevices
from homeassistant.components.tplink.const import (
CONF_DIMMER,
CONF_DISCOVERY,
CONF_LIGHT,
CONF_SW_VERSION,
CONF_SWITCH,
UNAVAILABLE_RETRY_DELAY,
)
from homeassistant.components.tplink.sensor import ENERGY_SENSORS
from homeassistant.components.tplink.const import DOMAIN
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import CONF_HOST
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr
from homeassistant.setup import async_setup_component
from homeassistant.util import dt, slugify
from tests.common import MockConfigEntry, async_fire_time_changed, mock_coro
from tests.components.tplink.consts import (
SMARTPLUG_HS100_DATA,
SMARTPLUG_HS110_DATA,
SMARTSTRIP_KP303_DATA,
)
from . import IP_ADDRESS, MAC_ADDRESS, _patch_discovery, _patch_single_discovery
async def test_creating_entry_tries_discover(hass):
"""Test setting up does discovery."""
with patch(
"homeassistant.components.tplink.async_setup_entry",
return_value=mock_coro(True),
) as mock_setup, patch(
"homeassistant.components.tplink.common.Discover.discover",
return_value={"host": 1234},
):
result = await hass.config_entries.flow.async_init(
tplink.DOMAIN, context={"source": config_entries.SOURCE_USER}
)
# Confirmation form
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.config_entries.flow.async_configure(result["flow_id"], {})
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
await hass.async_block_till_done()
assert len(mock_setup.mock_calls) == 1
from tests.common import MockConfigEntry
async def test_configuring_tplink_causes_discovery(hass):
"""Test that specifying empty config does discovery."""
with patch("homeassistant.components.tplink.common.Discover.discover") as discover:
with patch("homeassistant.components.tplink.Discover.discover") as discover:
discover.return_value = {"host": 1234}
await async_setup_component(hass, tplink.DOMAIN, {tplink.DOMAIN: {}})
await hass.async_block_till_done()
@ -71,371 +24,28 @@ async def test_configuring_tplink_causes_discovery(hass):
assert len(discover.mock_calls) == 1
@pytest.mark.parametrize(
"name,cls,platform",
[
("pyHS100.SmartPlug", SmartPlug, "switch"),
("pyHS100.SmartBulb", SmartBulb, "light"),
],
)
@pytest.mark.parametrize("count", [1, 2, 3])
async def test_configuring_device_types(hass, name, cls, platform, count):
"""Test that light or switch platform list is filled correctly."""
with patch(
"homeassistant.components.tplink.common.Discover.discover"
) as discover, patch(
"homeassistant.components.tplink.common.SmartDevice._query_helper"
), patch(
"homeassistant.components.tplink.light.async_setup_entry",
return_value=True,
):
discovery_data = {
f"123.123.123.{c}": cls("123.123.123.123") for c in range(count)
}
discover.return_value = discovery_data
async def test_config_entry_reload(hass):
"""Test that a config entry can be reloaded."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={}, unique_id=MAC_ADDRESS
)
already_migrated_config_entry.add_to_hass(hass)
with _patch_discovery(), _patch_single_discovery():
await async_setup_component(hass, tplink.DOMAIN, {tplink.DOMAIN: {}})
await hass.async_block_till_done()
assert len(discover.mock_calls) == 1
assert len(hass.data[tplink.DOMAIN][platform]) == count
class UnknownSmartDevice(SmartDevice):
"""Dummy class for testing."""
@property
def has_emeter(self) -> bool:
"""Do nothing."""
def turn_off(self) -> None:
"""Do nothing."""
def turn_on(self) -> None:
"""Do nothing."""
@property
def is_on(self) -> bool:
"""Do nothing."""
@property
def state_information(self) -> dict[str, Any]:
"""Do nothing."""
async def test_configuring_devices_from_multiple_sources(hass):
"""Test static and discover devices are not duplicated."""
with patch(
"homeassistant.components.tplink.common.Discover.discover"
) as discover, patch(
"homeassistant.components.tplink.common.SmartDevice._query_helper"
), patch(
"homeassistant.config_entries.ConfigEntries.async_forward_entry_setup"
):
discover_device_fail = SmartPlug("123.123.123.123")
discover_device_fail.get_sysinfo = MagicMock(side_effect=SmartDeviceException())
discover.return_value = {
"123.123.123.1": SmartBulb("123.123.123.1"),
"123.123.123.2": SmartPlug("123.123.123.2"),
"123.123.123.3": SmartBulb("123.123.123.3"),
"123.123.123.4": SmartPlug("123.123.123.4"),
"123.123.123.123": discover_device_fail,
"123.123.123.124": UnknownSmartDevice("123.123.123.124"),
}
await async_setup_component(
hass,
tplink.DOMAIN,
{
tplink.DOMAIN: {
CONF_LIGHT: [{CONF_HOST: "123.123.123.1"}],
CONF_SWITCH: [{CONF_HOST: "123.123.123.2"}],
CONF_DIMMER: [{CONF_HOST: "123.123.123.22"}],
}
},
)
assert already_migrated_config_entry.state == ConfigEntryState.LOADED
await hass.config_entries.async_unload(already_migrated_config_entry.entry_id)
await hass.async_block_till_done()
assert len(discover.mock_calls) == 1
assert len(hass.data[tplink.DOMAIN][CONF_LIGHT]) == 3
assert len(hass.data[tplink.DOMAIN][CONF_SWITCH]) == 2
assert already_migrated_config_entry.state == ConfigEntryState.NOT_LOADED
async def test_is_dimmable(hass):
"""Test that is_dimmable switches are correctly added as lights."""
with patch(
"homeassistant.components.tplink.common.Discover.discover"
) as discover, patch(
"homeassistant.components.tplink.light.async_setup_entry",
return_value=mock_coro(True),
) as setup, patch(
"homeassistant.components.tplink.common.SmartDevice._query_helper"
), patch(
"homeassistant.components.tplink.common.SmartPlug.is_dimmable", True
):
dimmable_switch = SmartPlug("123.123.123.123")
discover.return_value = {"host": dimmable_switch}
async def test_config_entry_retry(hass):
"""Test that a config entry can be retried."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: IP_ADDRESS}, unique_id=MAC_ADDRESS
)
already_migrated_config_entry.add_to_hass(hass)
with _patch_discovery(no_device=True), _patch_single_discovery(no_device=True):
await async_setup_component(hass, tplink.DOMAIN, {tplink.DOMAIN: {}})
await hass.async_block_till_done()
assert len(discover.mock_calls) == 1
assert len(setup.mock_calls) == 1
assert len(hass.data[tplink.DOMAIN][CONF_LIGHT]) == 1
assert not hass.data[tplink.DOMAIN][CONF_SWITCH]
async def test_configuring_discovery_disabled(hass):
"""Test that discover does not get called when disabled."""
with patch(
"homeassistant.components.tplink.async_setup_entry",
return_value=mock_coro(True),
) as mock_setup, patch(
"homeassistant.components.tplink.common.Discover.discover", return_value=[]
) as discover:
await async_setup_component(
hass, tplink.DOMAIN, {tplink.DOMAIN: {tplink.CONF_DISCOVERY: False}}
)
await hass.async_block_till_done()
assert discover.call_count == 0
assert mock_setup.call_count == 1
async def test_platforms_are_initialized(hass: HomeAssistant):
"""Test that platforms are initialized per configuration array."""
config = {
tplink.DOMAIN: {
CONF_DISCOVERY: False,
CONF_LIGHT: [{CONF_HOST: "123.123.123.123"}],
CONF_SWITCH: [{CONF_HOST: "321.321.321.321"}],
}
}
with patch("homeassistant.components.tplink.common.Discover.discover"), patch(
"homeassistant.components.tplink.get_static_devices"
) as get_static_devices, patch(
"homeassistant.components.tplink.common.SmartDevice._query_helper"
), patch(
"homeassistant.components.tplink.light.async_setup_entry",
return_value=mock_coro(True),
), patch(
"homeassistant.components.tplink.common.SmartPlug.is_dimmable",
False,
):
light = SmartBulb("123.123.123.123")
switch = SmartPlug("321.321.321.321")
switch.get_sysinfo = MagicMock(return_value=SMARTPLUG_HS110_DATA["sysinfo"])
switch.get_emeter_realtime = MagicMock(
return_value=EmeterStatus(SMARTPLUG_HS110_DATA["realtime"])
)
switch.get_emeter_daily = MagicMock(
return_value={int(time.strftime("%e")): 1.123}
)
get_static_devices.return_value = SmartDevices([light], [switch])
# patching is_dimmable is necessray to avoid misdetection as light.
await async_setup_component(hass, tplink.DOMAIN, config)
await hass.async_block_till_done()
state = hass.states.get(f"switch.{switch.alias}")
assert state
assert state.name == switch.alias
for description in ENERGY_SENSORS:
state = hass.states.get(
f"sensor.{switch.alias}_{slugify(description.name)}"
)
assert state
assert state.state is not None
assert state.name == f"{switch.alias} {description.name}"
device_registry = dr.async_get(hass)
assert len(device_registry.devices) == 1
device = next(iter(device_registry.devices.values()))
assert device.name == switch.alias
assert device.model == switch.model
assert device.connections == {(dr.CONNECTION_NETWORK_MAC, switch.mac.lower())}
assert device.sw_version == switch.sys_info[CONF_SW_VERSION]
async def test_smartplug_without_consumption_sensors(hass: HomeAssistant):
"""Test that platforms are initialized per configuration array."""
config = {
tplink.DOMAIN: {
CONF_DISCOVERY: False,
CONF_SWITCH: [{CONF_HOST: "321.321.321.321"}],
}
}
with patch("homeassistant.components.tplink.common.Discover.discover"), patch(
"homeassistant.components.tplink.get_static_devices"
) as get_static_devices, patch(
"homeassistant.components.tplink.common.SmartDevice._query_helper"
), patch(
"homeassistant.components.tplink.light.async_setup_entry",
return_value=mock_coro(True),
), patch(
"homeassistant.components.tplink.common.SmartPlug.is_dimmable", False
):
switch = SmartPlug("321.321.321.321")
switch.get_sysinfo = MagicMock(return_value=SMARTPLUG_HS100_DATA["sysinfo"])
get_static_devices.return_value = SmartDevices([], [switch])
await async_setup_component(hass, tplink.DOMAIN, config)
await hass.async_block_till_done()
entities = hass.states.async_entity_ids(SWITCH_DOMAIN)
assert len(entities) == 1
entities = hass.states.async_entity_ids(SENSOR_DOMAIN)
assert len(entities) == 0
async def test_smartstrip_device(hass: HomeAssistant):
"""Test discover a SmartStrip devices."""
config = {
tplink.DOMAIN: {
CONF_DISCOVERY: True,
}
}
class SmartStrip(smartstrip.SmartStrip):
"""Moked SmartStrip class."""
def get_sysinfo(self):
return SMARTSTRIP_KP303_DATA["sysinfo"]
with patch(
"homeassistant.components.tplink.common.Discover.discover"
) as discover, patch(
"homeassistant.components.tplink.common.SmartDevice._query_helper"
), patch(
"homeassistant.components.tplink.common.SmartPlug.get_sysinfo",
return_value=SMARTSTRIP_KP303_DATA["sysinfo"],
):
strip = SmartStrip("123.123.123.123")
discover.return_value = {"123.123.123.123": strip}
assert await async_setup_component(hass, tplink.DOMAIN, config)
await hass.async_block_till_done()
entities = hass.states.async_entity_ids(SWITCH_DOMAIN)
assert len(entities) == 3
async def test_no_config_creates_no_entry(hass):
"""Test for when there is no tplink in config."""
with patch(
"homeassistant.components.tplink.async_setup_entry",
return_value=mock_coro(True),
) as mock_setup:
await async_setup_component(hass, tplink.DOMAIN, {})
await hass.async_block_till_done()
assert mock_setup.call_count == 0
async def test_not_available_at_startup(hass: HomeAssistant):
"""Test when configured devices are not available."""
config = {
tplink.DOMAIN: {
CONF_DISCOVERY: False,
CONF_SWITCH: [{CONF_HOST: "321.321.321.321"}],
}
}
with patch("homeassistant.components.tplink.common.Discover.discover"), patch(
"homeassistant.components.tplink.get_static_devices"
) as get_static_devices, patch(
"homeassistant.components.tplink.common.SmartDevice._query_helper"
), patch(
"homeassistant.components.tplink.light.async_setup_entry",
return_value=mock_coro(True),
), patch(
"homeassistant.components.tplink.common.SmartPlug.is_dimmable", False
):
switch = SmartPlug("321.321.321.321")
switch.get_sysinfo = MagicMock(side_effect=SmartDeviceException())
get_static_devices.return_value = SmartDevices([], [switch])
# run setup while device unreachable
await async_setup_component(hass, tplink.DOMAIN, config)
await hass.async_block_till_done()
entries = hass.config_entries.async_entries(tplink.DOMAIN)
assert len(entries) == 1
assert entries[0].state is config_entries.ConfigEntryState.LOADED
entities = hass.states.async_entity_ids(SWITCH_DOMAIN)
assert len(entities) == 0
# retrying with still unreachable device
async_fire_time_changed(hass, dt.utcnow() + UNAVAILABLE_RETRY_DELAY)
await hass.async_block_till_done()
entries = hass.config_entries.async_entries(tplink.DOMAIN)
assert len(entries) == 1
assert entries[0].state is config_entries.ConfigEntryState.LOADED
entities = hass.states.async_entity_ids(SWITCH_DOMAIN)
assert len(entities) == 0
# retrying with now reachable device
switch.get_sysinfo = MagicMock(return_value=SMARTPLUG_HS100_DATA["sysinfo"])
async_fire_time_changed(hass, dt.utcnow() + UNAVAILABLE_RETRY_DELAY)
await hass.async_block_till_done()
entries = hass.config_entries.async_entries(tplink.DOMAIN)
assert len(entries) == 1
assert entries[0].state is config_entries.ConfigEntryState.LOADED
entities = hass.states.async_entity_ids(SWITCH_DOMAIN)
assert len(entities) == 1
@pytest.mark.parametrize("platform", ["switch", "light"])
async def test_unload(hass, platform):
"""Test that the async_unload_entry works."""
# As we have currently no configuration, we just to pass the domain here.
entry = MockConfigEntry(domain=tplink.DOMAIN)
entry.add_to_hass(hass)
with patch(
"homeassistant.components.tplink.get_static_devices"
) as get_static_devices, patch(
"homeassistant.components.tplink.common.SmartDevice._query_helper"
), patch(
f"homeassistant.components.tplink.{platform}.async_setup_entry",
return_value=mock_coro(True),
) as async_setup_entry:
config = {
tplink.DOMAIN: {
platform: [{CONF_HOST: "123.123.123.123"}],
CONF_DISCOVERY: False,
}
}
light = SmartBulb("123.123.123.123")
switch = SmartPlug("321.321.321.321")
switch.get_sysinfo = MagicMock(return_value=SMARTPLUG_HS110_DATA["sysinfo"])
switch.get_emeter_realtime = MagicMock(
return_value=EmeterStatus(SMARTPLUG_HS110_DATA["realtime"])
)
if platform == "light":
get_static_devices.return_value = SmartDevices([light], [])
elif platform == "switch":
get_static_devices.return_value = SmartDevices([], [switch])
assert await async_setup_component(hass, tplink.DOMAIN, config)
await hass.async_block_till_done()
assert len(async_setup_entry.mock_calls) == 1
assert tplink.DOMAIN in hass.data
assert await tplink.async_unload_entry(hass, entry)
assert not hass.data[tplink.DOMAIN]
assert already_migrated_config_entry.state == ConfigEntryState.SETUP_RETRY

View File

@ -1,752 +1,266 @@
"""Tests for light platform."""
from datetime import timedelta
import logging
from typing import Callable, NamedTuple
from unittest.mock import Mock, PropertyMock, patch
from pyHS100 import SmartDeviceException
import pytest
from homeassistant.components import tplink
from homeassistant.components.homeassistant import (
DOMAIN as HA_DOMAIN,
SERVICE_UPDATE_ENTITY,
)
from homeassistant.components.light import (
ATTR_BRIGHTNESS,
ATTR_COLOR_MODE,
ATTR_COLOR_TEMP,
ATTR_HS_COLOR,
ATTR_MAX_MIREDS,
ATTR_MIN_MIREDS,
ATTR_RGB_COLOR,
ATTR_SUPPORTED_COLOR_MODES,
ATTR_XY_COLOR,
DOMAIN as LIGHT_DOMAIN,
)
from homeassistant.components.tplink.const import (
CONF_DIMMER,
CONF_DISCOVERY,
CONF_LIGHT,
)
from homeassistant.components.tplink.light import SLEEP_TIME
from homeassistant.const import (
ATTR_ENTITY_ID,
CONF_HOST,
SERVICE_TURN_OFF,
SERVICE_TURN_ON,
)
from homeassistant.components.tplink.const import DOMAIN
from homeassistant.const import ATTR_ENTITY_ID
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
from homeassistant.util.dt import utcnow
from tests.common import async_fire_time_changed
from . import MAC_ADDRESS, _mocked_bulb, _patch_discovery, _patch_single_discovery
from tests.common import MockConfigEntry
class LightMockData(NamedTuple):
"""Mock light data."""
sys_info: dict
light_state: dict
set_light_state: Callable[[dict], None]
set_light_state_mock: Mock
get_light_state_mock: Mock
current_consumption_mock: Mock
get_sysinfo_mock: Mock
get_emeter_daily_mock: Mock
get_emeter_monthly_mock: Mock
class SmartSwitchMockData(NamedTuple):
"""Mock smart switch data."""
sys_info: dict
state_mock: Mock
brightness_mock: Mock
get_sysinfo_mock: Mock
@pytest.fixture(name="unknown_light_mock_data")
def unknown_light_mock_data_fixture() -> None:
"""Create light mock data."""
sys_info = {
"sw_ver": "1.2.3",
"hw_ver": "2.3.4",
"mac": "aa:bb:cc:dd:ee:ff",
"mic_mac": "00:11:22:33:44",
"type": "light",
"hwId": "1234",
"fwId": "4567",
"oemId": "891011",
"dev_name": "light1",
"rssi": 11,
"latitude": "0",
"longitude": "0",
"is_color": True,
"is_dimmable": True,
"is_variable_color_temp": True,
"model": "Foo",
"alias": "light1",
}
light_state = {
"on_off": True,
"dft_on_state": {
"brightness": 12,
"color_temp": 3200,
"hue": 110,
"saturation": 90,
},
"brightness": 13,
"color_temp": 3300,
"hue": 110,
"saturation": 90,
}
def set_light_state(state) -> None:
nonlocal light_state
drt_on_state = light_state["dft_on_state"]
drt_on_state.update(state.get("dft_on_state", {}))
light_state.update(state)
light_state["dft_on_state"] = drt_on_state
return light_state
set_light_state_patch = patch(
"homeassistant.components.tplink.common.SmartBulb.set_light_state",
side_effect=set_light_state,
async def test_color_light(hass: HomeAssistant) -> None:
"""Test a light."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={}, unique_id=MAC_ADDRESS
)
get_light_state_patch = patch(
"homeassistant.components.tplink.common.SmartBulb.get_light_state",
return_value=light_state,
)
current_consumption_patch = patch(
"homeassistant.components.tplink.common.SmartDevice.current_consumption",
return_value=3.23,
)
get_sysinfo_patch = patch(
"homeassistant.components.tplink.common.SmartDevice.get_sysinfo",
return_value=sys_info,
)
get_emeter_daily_patch = patch(
"homeassistant.components.tplink.common.SmartDevice.get_emeter_daily",
return_value={
1: 1.01,
2: 1.02,
3: 1.03,
4: 1.04,
5: 1.05,
6: 1.06,
7: 1.07,
8: 1.08,
9: 1.09,
10: 1.10,
11: 1.11,
12: 1.12,
},
)
get_emeter_monthly_patch = patch(
"homeassistant.components.tplink.common.SmartDevice.get_emeter_monthly",
return_value={
1: 2.01,
2: 2.02,
3: 2.03,
4: 2.04,
5: 2.05,
6: 2.06,
7: 2.07,
8: 2.08,
9: 2.09,
10: 2.10,
11: 2.11,
12: 2.12,
},
)
with set_light_state_patch as set_light_state_mock, get_light_state_patch as get_light_state_mock, current_consumption_patch as current_consumption_mock, get_sysinfo_patch as get_sysinfo_mock, get_emeter_daily_patch as get_emeter_daily_mock, get_emeter_monthly_patch as get_emeter_monthly_mock:
yield LightMockData(
sys_info=sys_info,
light_state=light_state,
set_light_state=set_light_state,
set_light_state_mock=set_light_state_mock,
get_light_state_mock=get_light_state_mock,
current_consumption_mock=current_consumption_mock,
get_sysinfo_mock=get_sysinfo_mock,
get_emeter_daily_mock=get_emeter_daily_mock,
get_emeter_monthly_mock=get_emeter_monthly_mock,
)
@pytest.fixture(name="light_mock_data")
def light_mock_data_fixture() -> None:
"""Create light mock data."""
sys_info = {
"sw_ver": "1.2.3",
"hw_ver": "2.3.4",
"mac": "aa:bb:cc:dd:ee:ff",
"mic_mac": "00:11:22:33:44",
"type": "light",
"hwId": "1234",
"fwId": "4567",
"oemId": "891011",
"dev_name": "light1",
"rssi": 11,
"latitude": "0",
"longitude": "0",
"is_color": True,
"is_dimmable": True,
"is_variable_color_temp": True,
"model": "LB120",
"alias": "light1",
}
light_state = {
"on_off": True,
"dft_on_state": {
"brightness": 12,
"color_temp": 3200,
"hue": 110,
"saturation": 90,
},
"brightness": 13,
"color_temp": 3300,
"hue": 110,
"saturation": 90,
}
def set_light_state(state) -> None:
nonlocal light_state
drt_on_state = light_state["dft_on_state"]
drt_on_state.update(state.get("dft_on_state", {}))
light_state.update(state)
light_state["dft_on_state"] = drt_on_state
return light_state
set_light_state_patch = patch(
"homeassistant.components.tplink.common.SmartBulb.set_light_state",
side_effect=set_light_state,
)
get_light_state_patch = patch(
"homeassistant.components.tplink.common.SmartBulb.get_light_state",
return_value=light_state,
)
current_consumption_patch = patch(
"homeassistant.components.tplink.common.SmartDevice.current_consumption",
return_value=3.23,
)
get_sysinfo_patch = patch(
"homeassistant.components.tplink.common.SmartDevice.get_sysinfo",
return_value=sys_info,
)
get_emeter_daily_patch = patch(
"homeassistant.components.tplink.common.SmartDevice.get_emeter_daily",
return_value={
1: 1.01,
2: 1.02,
3: 1.03,
4: 1.04,
5: 1.05,
6: 1.06,
7: 1.07,
8: 1.08,
9: 1.09,
10: 1.10,
11: 1.11,
12: 1.12,
},
)
get_emeter_monthly_patch = patch(
"homeassistant.components.tplink.common.SmartDevice.get_emeter_monthly",
return_value={
1: 2.01,
2: 2.02,
3: 2.03,
4: 2.04,
5: 2.05,
6: 2.06,
7: 2.07,
8: 2.08,
9: 2.09,
10: 2.10,
11: 2.11,
12: 2.12,
},
)
with set_light_state_patch as set_light_state_mock, get_light_state_patch as get_light_state_mock, current_consumption_patch as current_consumption_mock, get_sysinfo_patch as get_sysinfo_mock, get_emeter_daily_patch as get_emeter_daily_mock, get_emeter_monthly_patch as get_emeter_monthly_mock:
yield LightMockData(
sys_info=sys_info,
light_state=light_state,
set_light_state=set_light_state,
set_light_state_mock=set_light_state_mock,
get_light_state_mock=get_light_state_mock,
current_consumption_mock=current_consumption_mock,
get_sysinfo_mock=get_sysinfo_mock,
get_emeter_daily_mock=get_emeter_daily_mock,
get_emeter_monthly_mock=get_emeter_monthly_mock,
)
@pytest.fixture(name="dimmer_switch_mock_data")
def dimmer_switch_mock_data_fixture() -> None:
"""Create dimmer switch mock data."""
sys_info = {
"sw_ver": "1.2.3",
"hw_ver": "2.3.4",
"mac": "aa:bb:cc:dd:ee:ff",
"mic_mac": "00:11:22:33:44",
"type": "switch",
"hwId": "1234",
"fwId": "4567",
"oemId": "891011",
"dev_name": "dimmer1",
"rssi": 11,
"latitude": "0",
"longitude": "0",
"is_color": False,
"is_dimmable": True,
"is_variable_color_temp": False,
"model": "HS220",
"alias": "dimmer1",
"feature": ":",
"relay_state": 1,
"brightness": 13,
}
def state(*args, **kwargs):
nonlocal sys_info
if len(args) == 0:
return sys_info["relay_state"]
if args[0] == "ON":
sys_info["relay_state"] = 1
else:
sys_info["relay_state"] = 0
def brightness(*args, **kwargs):
nonlocal sys_info
if len(args) == 0:
return sys_info["brightness"]
if sys_info["brightness"] == 0:
sys_info["relay_state"] = 0
else:
sys_info["relay_state"] = 1
sys_info["brightness"] = args[0]
get_sysinfo_patch = patch(
"homeassistant.components.tplink.common.SmartDevice.get_sysinfo",
return_value=sys_info,
)
state_patch = patch(
"homeassistant.components.tplink.common.SmartPlug.state",
new_callable=PropertyMock,
side_effect=state,
)
brightness_patch = patch(
"homeassistant.components.tplink.common.SmartPlug.brightness",
new_callable=PropertyMock,
side_effect=brightness,
)
with brightness_patch as brightness_mock, state_patch as state_mock, get_sysinfo_patch as get_sysinfo_mock:
yield SmartSwitchMockData(
sys_info=sys_info,
brightness_mock=brightness_mock,
state_mock=state_mock,
get_sysinfo_mock=get_sysinfo_mock,
)
async def update_entity(hass: HomeAssistant, entity_id: str) -> None:
"""Run an update action for an entity."""
await hass.services.async_call(
HA_DOMAIN,
SERVICE_UPDATE_ENTITY,
{ATTR_ENTITY_ID: entity_id},
blocking=True,
)
await hass.async_block_till_done()
async def test_smartswitch(
hass: HomeAssistant, dimmer_switch_mock_data: SmartSwitchMockData
) -> None:
"""Test function."""
sys_info = dimmer_switch_mock_data.sys_info
await async_setup_component(hass, HA_DOMAIN, {})
await hass.async_block_till_done()
await async_setup_component(
hass,
tplink.DOMAIN,
{
tplink.DOMAIN: {
CONF_DISCOVERY: False,
CONF_DIMMER: [{CONF_HOST: "123.123.123.123"}],
}
},
)
await hass.async_block_till_done()
assert hass.states.get("light.dimmer1")
await hass.services.async_call(
LIGHT_DOMAIN,
SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: "light.dimmer1"},
blocking=True,
)
await hass.async_block_till_done()
await update_entity(hass, "light.dimmer1")
assert hass.states.get("light.dimmer1").state == "off"
assert sys_info["relay_state"] == 0
await hass.services.async_call(
LIGHT_DOMAIN,
SERVICE_TURN_ON,
{ATTR_ENTITY_ID: "light.dimmer1", ATTR_BRIGHTNESS: 50},
blocking=True,
)
await hass.async_block_till_done()
await update_entity(hass, "light.dimmer1")
state = hass.states.get("light.dimmer1")
assert state.state == "on"
assert state.attributes["brightness"] == 51
assert sys_info["relay_state"] == 1
await hass.services.async_call(
LIGHT_DOMAIN,
SERVICE_TURN_ON,
{ATTR_ENTITY_ID: "light.dimmer1", ATTR_BRIGHTNESS: 55},
blocking=True,
)
await hass.async_block_till_done()
await update_entity(hass, "light.dimmer1")
state = hass.states.get("light.dimmer1")
assert state.state == "on"
assert state.attributes["brightness"] == 56
assert sys_info["brightness"] == 22
sys_info["relay_state"] = 0
sys_info["brightness"] = 66
await hass.services.async_call(
LIGHT_DOMAIN,
SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: "light.dimmer1"},
blocking=True,
)
await hass.async_block_till_done()
await update_entity(hass, "light.dimmer1")
state = hass.states.get("light.dimmer1")
assert state.state == "off"
await hass.services.async_call(
LIGHT_DOMAIN,
SERVICE_TURN_ON,
{ATTR_ENTITY_ID: "light.dimmer1"},
blocking=True,
)
await hass.async_block_till_done()
await update_entity(hass, "light.dimmer1")
state = hass.states.get("light.dimmer1")
assert state.state == "on"
assert state.attributes["brightness"] == 168
assert sys_info["brightness"] == 66
async def test_unknown_light(
hass: HomeAssistant, unknown_light_mock_data: LightMockData
) -> None:
"""Test function."""
await async_setup_component(hass, HA_DOMAIN, {})
await hass.async_block_till_done()
await async_setup_component(
hass,
tplink.DOMAIN,
{
tplink.DOMAIN: {
CONF_DISCOVERY: False,
CONF_LIGHT: [{CONF_HOST: "123.123.123.123"}],
}
},
)
await hass.async_block_till_done()
state = hass.states.get("light.light1")
assert state.state == "on"
assert state.attributes["min_mireds"] == 200
assert state.attributes["max_mireds"] == 370
async def test_light(hass: HomeAssistant, light_mock_data: LightMockData) -> None:
"""Test function."""
light_state = light_mock_data.light_state
set_light_state = light_mock_data.set_light_state
await async_setup_component(hass, HA_DOMAIN, {})
await hass.async_block_till_done()
await async_setup_component(
hass,
tplink.DOMAIN,
{
tplink.DOMAIN: {
CONF_DISCOVERY: False,
CONF_LIGHT: [{CONF_HOST: "123.123.123.123"}],
}
},
)
await hass.async_block_till_done()
assert hass.states.get("light.light1")
await hass.services.async_call(
LIGHT_DOMAIN,
SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: "light.light1"},
blocking=True,
)
await hass.async_block_till_done()
await update_entity(hass, "light.light1")
assert hass.states.get("light.light1").state == "off"
assert light_state["on_off"] == 0
await hass.services.async_call(
LIGHT_DOMAIN,
SERVICE_TURN_ON,
{ATTR_ENTITY_ID: "light.light1", ATTR_COLOR_TEMP: 222, ATTR_BRIGHTNESS: 50},
blocking=True,
)
await hass.async_block_till_done()
await update_entity(hass, "light.light1")
state = hass.states.get("light.light1")
assert state.state == "on"
assert state.attributes["brightness"] == 51
assert state.attributes["color_temp"] == 222
assert "hs_color" in state.attributes
assert light_state["on_off"] == 1
await hass.services.async_call(
LIGHT_DOMAIN,
SERVICE_TURN_ON,
{ATTR_ENTITY_ID: "light.light1", ATTR_BRIGHTNESS: 55, ATTR_HS_COLOR: (23, 27)},
blocking=True,
)
await hass.async_block_till_done()
await update_entity(hass, "light.light1")
state = hass.states.get("light.light1")
assert state.state == "on"
assert state.attributes["brightness"] == 56
assert state.attributes["hs_color"] == (23, 27)
assert "color_temp" not in state.attributes
assert light_state["brightness"] == 22
assert light_state["hue"] == 23
assert light_state["saturation"] == 27
light_state["on_off"] = 0
light_state["dft_on_state"]["on_off"] = 0
light_state["brightness"] = 66
light_state["dft_on_state"]["brightness"] = 66
light_state["color_temp"] = 6400
light_state["dft_on_state"]["color_temp"] = 123
light_state["hue"] = 77
light_state["dft_on_state"]["hue"] = 77
light_state["saturation"] = 78
light_state["dft_on_state"]["saturation"] = 78
await hass.services.async_call(
LIGHT_DOMAIN,
SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: "light.light1"},
blocking=True,
)
await hass.async_block_till_done()
await update_entity(hass, "light.light1")
state = hass.states.get("light.light1")
assert state.state == "off"
await hass.services.async_call(
LIGHT_DOMAIN,
SERVICE_TURN_ON,
{ATTR_ENTITY_ID: "light.light1"},
blocking=True,
)
await hass.async_block_till_done()
await update_entity(hass, "light.light1")
state = hass.states.get("light.light1")
assert state.state == "on"
assert state.attributes["brightness"] == 168
assert state.attributes["color_temp"] == 156
assert "hs_color" in state.attributes
assert light_state["brightness"] == 66
assert light_state["hue"] == 77
assert light_state["saturation"] == 78
set_light_state({"brightness": 91, "dft_on_state": {"brightness": 91}})
await update_entity(hass, "light.light1")
state = hass.states.get("light.light1")
assert state.attributes["brightness"] == 232
async def test_get_light_state_retry(
hass: HomeAssistant, light_mock_data: LightMockData
) -> None:
"""Test function."""
# Setup test for retries for sysinfo.
get_sysinfo_call_count = 0
def get_sysinfo_side_effect():
nonlocal get_sysinfo_call_count
get_sysinfo_call_count += 1
# Need to fail on the 2nd call because the first call is used to
# determine if the device is online during the light platform's
# setup hook.
if get_sysinfo_call_count == 2:
raise SmartDeviceException()
return light_mock_data.sys_info
light_mock_data.get_sysinfo_mock.side_effect = get_sysinfo_side_effect
# Setup test for retries of setting state information.
set_state_call_count = 0
def set_light_state_side_effect(state_data: dict):
nonlocal set_state_call_count, light_mock_data
set_state_call_count += 1
if set_state_call_count == 1:
raise SmartDeviceException()
return light_mock_data.set_light_state(state_data)
light_mock_data.set_light_state_mock.side_effect = set_light_state_side_effect
# Setup component.
await async_setup_component(hass, HA_DOMAIN, {})
await hass.async_block_till_done()
await async_setup_component(
hass,
tplink.DOMAIN,
{
tplink.DOMAIN: {
CONF_DISCOVERY: False,
CONF_LIGHT: [{CONF_HOST: "123.123.123.123"}],
}
},
)
await hass.async_block_till_done()
await hass.services.async_call(
LIGHT_DOMAIN,
SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: "light.light1"},
blocking=True,
)
await hass.async_block_till_done()
await update_entity(hass, "light.light1")
assert light_mock_data.get_sysinfo_mock.call_count > 1
assert light_mock_data.get_light_state_mock.call_count > 1
assert light_mock_data.set_light_state_mock.call_count > 1
assert light_mock_data.get_sysinfo_mock.call_count < 40
assert light_mock_data.get_light_state_mock.call_count < 40
assert light_mock_data.set_light_state_mock.call_count < 10
async def test_update_failure(
hass: HomeAssistant, light_mock_data: LightMockData, caplog
):
"""Test that update failures are logged."""
await async_setup_component(hass, HA_DOMAIN, {})
await hass.async_block_till_done()
await async_setup_component(
hass,
tplink.DOMAIN,
{
tplink.DOMAIN: {
CONF_DISCOVERY: False,
CONF_LIGHT: [{CONF_HOST: "123.123.123.123"}],
}
},
)
await hass.async_block_till_done()
caplog.clear()
caplog.set_level(logging.WARNING)
await hass.helpers.entity_component.async_update_entity("light.light1")
assert caplog.text == ""
with patch("homeassistant.components.tplink.light.MAX_ATTEMPTS", 0):
caplog.clear()
caplog.set_level(logging.WARNING)
await hass.helpers.entity_component.async_update_entity("light.light1")
assert "Could not read state for 123.123.123.123|light1" in caplog.text
get_state_call_count = 0
def get_light_state_side_effect():
nonlocal get_state_call_count
get_state_call_count += 1
if get_state_call_count == 1:
raise SmartDeviceException()
return light_mock_data.light_state
light_mock_data.get_light_state_mock.side_effect = get_light_state_side_effect
with patch("homeassistant.components.tplink.light", MAX_ATTEMPTS=2, SLEEP_TIME=0):
caplog.clear()
caplog.set_level(logging.DEBUG)
await update_entity(hass, "light.light1")
assert (
f"Retrying in {SLEEP_TIME} seconds for 123.123.123.123|light1"
in caplog.text
)
assert "Device 123.123.123.123|light1 responded after " in caplog.text
async def test_async_setup_entry_unavailable(
hass: HomeAssistant, light_mock_data: LightMockData, caplog
):
"""Test unavailable devices trigger a later retry."""
caplog.clear()
caplog.set_level(logging.WARNING)
with patch(
"homeassistant.components.tplink.common.SmartDevice.get_sysinfo",
side_effect=SmartDeviceException,
):
await async_setup_component(hass, HA_DOMAIN, {})
already_migrated_config_entry.add_to_hass(hass)
bulb = _mocked_bulb()
bulb.color_temp = None
with _patch_discovery(device=bulb), _patch_single_discovery(device=bulb):
await async_setup_component(hass, tplink.DOMAIN, {tplink.DOMAIN: {}})
await hass.async_block_till_done()
await async_setup_component(
hass,
tplink.DOMAIN,
{
tplink.DOMAIN: {
CONF_DISCOVERY: False,
CONF_LIGHT: [{CONF_HOST: "123.123.123.123"}],
}
},
)
entity_id = "light.my_bulb"
state = hass.states.get(entity_id)
assert state.state == "on"
attributes = state.attributes
assert attributes[ATTR_BRIGHTNESS] == 128
assert attributes[ATTR_COLOR_MODE] == "hs"
assert attributes[ATTR_SUPPORTED_COLOR_MODES] == ["brightness", "color_temp", "hs"]
assert attributes[ATTR_MIN_MIREDS] == 111
assert attributes[ATTR_MAX_MIREDS] == 250
assert attributes[ATTR_HS_COLOR] == (10, 30)
assert attributes[ATTR_RGB_COLOR] == (255, 191, 178)
assert attributes[ATTR_XY_COLOR] == (0.42, 0.336)
await hass.services.async_call(
LIGHT_DOMAIN, "turn_off", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
bulb.turn_off.assert_called_once()
await hass.services.async_call(
LIGHT_DOMAIN, "turn_on", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
bulb.turn_on.assert_called_once()
bulb.turn_on.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_BRIGHTNESS: 100},
blocking=True,
)
bulb.set_brightness.assert_called_with(39, transition=None)
bulb.set_brightness.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_COLOR_TEMP: 150},
blocking=True,
)
bulb.set_color_temp.assert_called_with(6666, brightness=None, transition=None)
bulb.set_color_temp.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_COLOR_TEMP: 150},
blocking=True,
)
bulb.set_color_temp.assert_called_with(6666, brightness=None, transition=None)
bulb.set_color_temp.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_HS_COLOR: (10, 30)},
blocking=True,
)
bulb.set_hsv.assert_called_with(10, 30, None, transition=None)
bulb.set_hsv.reset_mock()
@pytest.mark.parametrize("is_color", [True, False])
async def test_color_temp_light(hass: HomeAssistant, is_color: bool) -> None:
"""Test a light."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={}, unique_id=MAC_ADDRESS
)
already_migrated_config_entry.add_to_hass(hass)
bulb = _mocked_bulb()
bulb.is_color = is_color
bulb.color_temp = 4000
bulb.is_variable_color_temp = True
with _patch_discovery(device=bulb), _patch_single_discovery(device=bulb):
await async_setup_component(hass, tplink.DOMAIN, {tplink.DOMAIN: {}})
await hass.async_block_till_done()
assert not hass.states.get("light.light1")
future = utcnow() + timedelta(seconds=30)
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
assert hass.states.get("light.light1")
entity_id = "light.my_bulb"
state = hass.states.get(entity_id)
assert state.state == "on"
attributes = state.attributes
assert attributes[ATTR_BRIGHTNESS] == 128
assert attributes[ATTR_COLOR_MODE] == "color_temp"
if bulb.is_color:
assert attributes[ATTR_SUPPORTED_COLOR_MODES] == [
"brightness",
"color_temp",
"hs",
]
else:
assert attributes[ATTR_SUPPORTED_COLOR_MODES] == ["brightness", "color_temp"]
assert attributes[ATTR_MIN_MIREDS] == 111
assert attributes[ATTR_MAX_MIREDS] == 250
assert attributes[ATTR_COLOR_TEMP] == 250
await hass.services.async_call(
LIGHT_DOMAIN, "turn_off", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
bulb.turn_off.assert_called_once()
await hass.services.async_call(
LIGHT_DOMAIN, "turn_on", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
bulb.turn_on.assert_called_once()
bulb.turn_on.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_BRIGHTNESS: 100},
blocking=True,
)
bulb.set_brightness.assert_called_with(39, transition=None)
bulb.set_brightness.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_COLOR_TEMP: 150},
blocking=True,
)
bulb.set_color_temp.assert_called_with(6666, brightness=None, transition=None)
bulb.set_color_temp.reset_mock()
async def test_brightness_only_light(hass: HomeAssistant) -> None:
"""Test a light."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={}, unique_id=MAC_ADDRESS
)
already_migrated_config_entry.add_to_hass(hass)
bulb = _mocked_bulb()
bulb.is_color = False
bulb.is_variable_color_temp = False
with _patch_discovery(device=bulb), _patch_single_discovery(device=bulb):
await async_setup_component(hass, tplink.DOMAIN, {tplink.DOMAIN: {}})
await hass.async_block_till_done()
entity_id = "light.my_bulb"
state = hass.states.get(entity_id)
assert state.state == "on"
attributes = state.attributes
assert attributes[ATTR_BRIGHTNESS] == 128
assert attributes[ATTR_COLOR_MODE] == "brightness"
assert attributes[ATTR_SUPPORTED_COLOR_MODES] == ["brightness"]
await hass.services.async_call(
LIGHT_DOMAIN, "turn_off", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
bulb.turn_off.assert_called_once()
await hass.services.async_call(
LIGHT_DOMAIN, "turn_on", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
bulb.turn_on.assert_called_once()
bulb.turn_on.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_BRIGHTNESS: 100},
blocking=True,
)
bulb.set_brightness.assert_called_with(39, transition=None)
bulb.set_brightness.reset_mock()
async def test_on_off_light(hass: HomeAssistant) -> None:
"""Test a light."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={}, unique_id=MAC_ADDRESS
)
already_migrated_config_entry.add_to_hass(hass)
bulb = _mocked_bulb()
bulb.is_color = False
bulb.is_variable_color_temp = False
bulb.is_dimmable = False
with _patch_discovery(device=bulb), _patch_single_discovery(device=bulb):
await async_setup_component(hass, tplink.DOMAIN, {tplink.DOMAIN: {}})
await hass.async_block_till_done()
entity_id = "light.my_bulb"
state = hass.states.get(entity_id)
assert state.state == "on"
attributes = state.attributes
assert attributes[ATTR_SUPPORTED_COLOR_MODES] == ["onoff"]
await hass.services.async_call(
LIGHT_DOMAIN, "turn_off", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
bulb.turn_off.assert_called_once()
await hass.services.async_call(
LIGHT_DOMAIN, "turn_on", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
bulb.turn_on.assert_called_once()
bulb.turn_on.reset_mock()
async def test_off_at_start_light(hass: HomeAssistant) -> None:
"""Test a light."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={}, unique_id=MAC_ADDRESS
)
already_migrated_config_entry.add_to_hass(hass)
bulb = _mocked_bulb()
bulb.is_color = False
bulb.is_variable_color_temp = False
bulb.is_dimmable = False
bulb.is_on = False
with _patch_discovery(device=bulb), _patch_single_discovery(device=bulb):
await async_setup_component(hass, tplink.DOMAIN, {tplink.DOMAIN: {}})
await hass.async_block_till_done()
entity_id = "light.my_bulb"
state = hass.states.get(entity_id)
assert state.state == "off"
attributes = state.attributes
assert attributes[ATTR_SUPPORTED_COLOR_MODES] == ["onoff"]

View File

@ -0,0 +1,241 @@
"""Test the tplink config flow."""
from homeassistant import setup
from homeassistant.components.tplink import CONF_DISCOVERY, CONF_SWITCH, DOMAIN
from homeassistant.const import CONF_HOST, EVENT_HOMEASSISTANT_STARTED
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers.device_registry import DeviceRegistry
from homeassistant.helpers.entity_registry import EntityRegistry
from . import ALIAS, IP_ADDRESS, MAC_ADDRESS, _patch_discovery, _patch_single_discovery
from tests.common import MockConfigEntry
async def test_migration_device_online_end_to_end(
hass: HomeAssistant, device_reg: DeviceRegistry, entity_reg: EntityRegistry
):
"""Test migration from single config entry."""
config_entry = MockConfigEntry(domain=DOMAIN, data={}, unique_id=DOMAIN)
config_entry.add_to_hass(hass)
device = device_reg.async_get_or_create(
config_entry_id=config_entry.entry_id,
connections={(dr.CONNECTION_NETWORK_MAC, MAC_ADDRESS)},
name=ALIAS,
)
switch_entity_reg = entity_reg.async_get_or_create(
config_entry=config_entry,
platform=DOMAIN,
domain="switch",
unique_id=MAC_ADDRESS,
original_name=ALIAS,
device_id=device.id,
)
light_entity_reg = entity_reg.async_get_or_create(
config_entry=config_entry,
platform=DOMAIN,
domain="light",
unique_id=dr.format_mac(MAC_ADDRESS),
original_name=ALIAS,
device_id=device.id,
)
power_sensor_entity_reg = entity_reg.async_get_or_create(
config_entry=config_entry,
platform=DOMAIN,
domain="sensor",
unique_id=f"{MAC_ADDRESS}_sensor",
original_name=ALIAS,
device_id=device.id,
)
with _patch_discovery(), _patch_single_discovery():
await setup.async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
migrated_entry = None
for entry in hass.config_entries.async_entries(DOMAIN):
if entry.unique_id == DOMAIN:
migrated_entry = entry
break
assert migrated_entry is not None
assert device.config_entries == {migrated_entry.entry_id}
assert light_entity_reg.config_entry_id == migrated_entry.entry_id
assert switch_entity_reg.config_entry_id == migrated_entry.entry_id
assert power_sensor_entity_reg.config_entry_id == migrated_entry.entry_id
assert er.async_entries_for_config_entry(entity_reg, config_entry) == []
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
legacy_entry = None
for entry in hass.config_entries.async_entries(DOMAIN):
if entry.unique_id == DOMAIN:
legacy_entry = entry
break
assert legacy_entry is None
async def test_migration_device_online_end_to_end_after_downgrade(
hass: HomeAssistant, device_reg: DeviceRegistry, entity_reg: EntityRegistry
):
"""Test migration from single config entry can happen again after a downgrade."""
config_entry = MockConfigEntry(domain=DOMAIN, data={}, unique_id=DOMAIN)
config_entry.add_to_hass(hass)
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: IP_ADDRESS}, unique_id=MAC_ADDRESS
)
already_migrated_config_entry.add_to_hass(hass)
device = device_reg.async_get_or_create(
config_entry_id=config_entry.entry_id,
connections={(dr.CONNECTION_NETWORK_MAC, MAC_ADDRESS)},
name=ALIAS,
)
light_entity_reg = entity_reg.async_get_or_create(
config_entry=config_entry,
platform=DOMAIN,
domain="light",
unique_id=MAC_ADDRESS,
original_name=ALIAS,
device_id=device.id,
)
power_sensor_entity_reg = entity_reg.async_get_or_create(
config_entry=config_entry,
platform=DOMAIN,
domain="sensor",
unique_id=f"{MAC_ADDRESS}_sensor",
original_name=ALIAS,
device_id=device.id,
)
with _patch_discovery(), _patch_single_discovery():
await setup.async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
assert device.config_entries == {config_entry.entry_id}
assert light_entity_reg.config_entry_id == config_entry.entry_id
assert power_sensor_entity_reg.config_entry_id == config_entry.entry_id
assert er.async_entries_for_config_entry(entity_reg, config_entry) == []
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
legacy_entry = None
for entry in hass.config_entries.async_entries(DOMAIN):
if entry.unique_id == DOMAIN:
legacy_entry = entry
break
assert legacy_entry is None
async def test_migration_device_online_end_to_end_ignores_other_devices(
hass: HomeAssistant, device_reg: DeviceRegistry, entity_reg: EntityRegistry
):
"""Test migration from single config entry."""
config_entry = MockConfigEntry(domain=DOMAIN, data={}, unique_id=DOMAIN)
config_entry.add_to_hass(hass)
other_domain_config_entry = MockConfigEntry(
domain="other_domain", data={}, unique_id="other_domain"
)
other_domain_config_entry.add_to_hass(hass)
device = device_reg.async_get_or_create(
config_entry_id=config_entry.entry_id,
connections={(dr.CONNECTION_NETWORK_MAC, MAC_ADDRESS)},
name=ALIAS,
)
other_device = device_reg.async_get_or_create(
config_entry_id=other_domain_config_entry.entry_id,
connections={(dr.CONNECTION_NETWORK_MAC, "556655665566")},
name=ALIAS,
)
light_entity_reg = entity_reg.async_get_or_create(
config_entry=config_entry,
platform=DOMAIN,
domain="light",
unique_id=MAC_ADDRESS,
original_name=ALIAS,
device_id=device.id,
)
power_sensor_entity_reg = entity_reg.async_get_or_create(
config_entry=config_entry,
platform=DOMAIN,
domain="sensor",
unique_id=f"{MAC_ADDRESS}_sensor",
original_name=ALIAS,
device_id=device.id,
)
ignored_entity_reg = entity_reg.async_get_or_create(
config_entry=other_domain_config_entry,
platform=DOMAIN,
domain="sensor",
unique_id="00:00:00:00:00:00_sensor",
original_name=ALIAS,
device_id=device.id,
)
garbage_entity_reg = entity_reg.async_get_or_create(
config_entry=config_entry,
platform=DOMAIN,
domain="sensor",
unique_id="garbage",
original_name=ALIAS,
device_id=other_device.id,
)
with _patch_discovery(), _patch_single_discovery():
await setup.async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
migrated_entry = None
for entry in hass.config_entries.async_entries(DOMAIN):
if entry.unique_id == DOMAIN:
migrated_entry = entry
break
assert migrated_entry is not None
assert device.config_entries == {migrated_entry.entry_id}
assert light_entity_reg.config_entry_id == migrated_entry.entry_id
assert power_sensor_entity_reg.config_entry_id == migrated_entry.entry_id
assert ignored_entity_reg.config_entry_id == other_domain_config_entry.entry_id
assert garbage_entity_reg.config_entry_id == config_entry.entry_id
assert er.async_entries_for_config_entry(entity_reg, config_entry) == []
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
legacy_entry = None
for entry in hass.config_entries.async_entries(DOMAIN):
if entry.unique_id == DOMAIN:
legacy_entry = entry
break
assert legacy_entry is not None
async def test_migrate_from_yaml(hass: HomeAssistant):
"""Test migrate from yaml."""
config = {
DOMAIN: {
CONF_DISCOVERY: False,
CONF_SWITCH: [{CONF_HOST: IP_ADDRESS}],
}
}
with _patch_discovery(), _patch_single_discovery():
await setup.async_setup_component(hass, DOMAIN, config)
await hass.async_block_till_done()
migrated_entry = None
for entry in hass.config_entries.async_entries(DOMAIN):
if entry.unique_id == MAC_ADDRESS:
migrated_entry = entry
break
assert migrated_entry is not None
assert migrated_entry.data[CONF_HOST] == IP_ADDRESS

View File

@ -0,0 +1,122 @@
"""Tests for light platform."""
from unittest.mock import Mock
from homeassistant.components import tplink
from homeassistant.components.tplink.const import DOMAIN
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
from . import (
MAC_ADDRESS,
_mocked_bulb,
_mocked_plug,
_patch_discovery,
_patch_single_discovery,
)
from tests.common import MockConfigEntry
async def test_color_light_with_an_emeter(hass: HomeAssistant) -> None:
"""Test a light with an emeter."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={}, unique_id=MAC_ADDRESS
)
already_migrated_config_entry.add_to_hass(hass)
bulb = _mocked_bulb()
bulb.color_temp = None
bulb.has_emeter = True
bulb.emeter_realtime = Mock(
power=None,
total=None,
voltage=None,
current=5,
)
bulb.emeter_today = 5000
with _patch_discovery(device=bulb), _patch_single_discovery(device=bulb):
await async_setup_component(hass, tplink.DOMAIN, {tplink.DOMAIN: {}})
await hass.async_block_till_done()
await hass.async_block_till_done()
expected = {
"sensor.my_bulb_today_s_consumption": 5000,
"sensor.my_bulb_current": 5,
}
entity_id = "light.my_bulb"
state = hass.states.get(entity_id)
assert state.state == "on"
for sensor_entity_id, value in expected.items():
assert hass.states.get(sensor_entity_id).state == str(value)
not_expected = {
"sensor.my_bulb_current_consumption",
"sensor.my_bulb_total_consumption",
"sensor.my_bulb_voltage",
}
for sensor_entity_id in not_expected:
assert hass.states.get(sensor_entity_id) is None
async def test_plug_with_an_emeter(hass: HomeAssistant) -> None:
"""Test a plug with an emeter."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={}, unique_id=MAC_ADDRESS
)
already_migrated_config_entry.add_to_hass(hass)
plug = _mocked_plug()
plug.color_temp = None
plug.has_emeter = True
plug.emeter_realtime = Mock(
power=100,
total=30,
voltage=121,
current=5,
)
plug.emeter_today = None
with _patch_discovery(device=plug), _patch_single_discovery(device=plug):
await async_setup_component(hass, tplink.DOMAIN, {tplink.DOMAIN: {}})
await hass.async_block_till_done()
await hass.async_block_till_done()
expected = {
"sensor.my_plug_current_consumption": 100,
"sensor.my_plug_total_consumption": 30,
"sensor.my_plug_today_s_consumption": 0.0,
"sensor.my_plug_voltage": 121,
"sensor.my_plug_current": 5,
}
entity_id = "switch.my_plug"
state = hass.states.get(entity_id)
assert state.state == "on"
for sensor_entity_id, value in expected.items():
assert hass.states.get(sensor_entity_id).state == str(value)
async def test_color_light_no_emeter(hass: HomeAssistant) -> None:
"""Test a light without an emeter."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={}, unique_id=MAC_ADDRESS
)
already_migrated_config_entry.add_to_hass(hass)
bulb = _mocked_bulb()
bulb.color_temp = None
bulb.has_emeter = False
with _patch_discovery(device=bulb), _patch_single_discovery(device=bulb):
await async_setup_component(hass, tplink.DOMAIN, {tplink.DOMAIN: {}})
await hass.async_block_till_done()
await hass.async_block_till_done()
entity_id = "light.my_bulb"
state = hass.states.get(entity_id)
assert state.state == "on"
not_expected = [
"sensor.my_bulb_current_consumption"
"sensor.my_bulb_total_consumption"
"sensor.my_bulb_today_s_consumption"
"sensor.my_bulb_voltage"
"sensor.my_bulb_current"
]
for sensor_entity_id in not_expected:
assert hass.states.get(sensor_entity_id) is None

View File

@ -0,0 +1,107 @@
"""Tests for switch platform."""
from datetime import timedelta
from unittest.mock import AsyncMock
from kasa import SmartDeviceException
from homeassistant.components import tplink
from homeassistant.components.switch import DOMAIN as SWITCH_DOMAIN
from homeassistant.components.tplink.const import DOMAIN
from homeassistant.const import ATTR_ENTITY_ID, STATE_ON, STATE_UNAVAILABLE
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
from homeassistant.util import dt as dt_util
from . import (
MAC_ADDRESS,
_mocked_plug,
_mocked_strip,
_patch_discovery,
_patch_single_discovery,
)
from tests.common import MockConfigEntry, async_fire_time_changed
async def test_plug(hass: HomeAssistant) -> None:
"""Test a smart plug."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={}, unique_id=MAC_ADDRESS
)
already_migrated_config_entry.add_to_hass(hass)
plug = _mocked_plug()
with _patch_discovery(device=plug), _patch_single_discovery(device=plug):
await async_setup_component(hass, tplink.DOMAIN, {tplink.DOMAIN: {}})
await hass.async_block_till_done()
entity_id = "switch.my_plug"
state = hass.states.get(entity_id)
assert state.state == STATE_ON
await hass.services.async_call(
SWITCH_DOMAIN, "turn_off", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
plug.turn_off.assert_called_once()
plug.turn_off.reset_mock()
await hass.services.async_call(
SWITCH_DOMAIN, "turn_on", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
plug.turn_on.assert_called_once()
plug.turn_on.reset_mock()
async def test_plug_update_fails(hass: HomeAssistant) -> None:
"""Test a smart plug update failure."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={}, unique_id=MAC_ADDRESS
)
already_migrated_config_entry.add_to_hass(hass)
plug = _mocked_plug()
with _patch_discovery(device=plug), _patch_single_discovery(device=plug):
await async_setup_component(hass, tplink.DOMAIN, {tplink.DOMAIN: {}})
await hass.async_block_till_done()
entity_id = "switch.my_plug"
state = hass.states.get(entity_id)
assert state.state == STATE_ON
plug.update = AsyncMock(side_effect=SmartDeviceException)
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=30))
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state.state == STATE_UNAVAILABLE
async def test_strip(hass: HomeAssistant) -> None:
"""Test a smart strip."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={}, unique_id=MAC_ADDRESS
)
already_migrated_config_entry.add_to_hass(hass)
plug = _mocked_strip()
with _patch_discovery(device=plug), _patch_single_discovery(device=plug):
await async_setup_component(hass, tplink.DOMAIN, {tplink.DOMAIN: {}})
await hass.async_block_till_done()
# Verify we only create entities for the children
# since this is what the previous version did
assert hass.states.get("switch.my_strip") is None
for plug_id in range(2):
entity_id = f"switch.plug{plug_id}"
state = hass.states.get(entity_id)
assert state.state == STATE_ON
await hass.services.async_call(
SWITCH_DOMAIN, "turn_off", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
plug.children[plug_id].turn_off.assert_called_once()
plug.children[plug_id].turn_off.reset_mock()
await hass.services.async_call(
SWITCH_DOMAIN, "turn_on", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
plug.children[plug_id].turn_on.assert_called_once()
plug.children[plug_id].turn_on.reset_mock()