Migrate LIFX to config entry per device (#74316)

This commit is contained in:
J. Nick Koston 2022-07-18 17:56:34 -05:00 committed by GitHub
parent 983bcfa935
commit 1354952977
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 3601 additions and 759 deletions

View File

@ -647,9 +647,6 @@ omit =
homeassistant/components/life360/const.py
homeassistant/components/life360/coordinator.py
homeassistant/components/life360/device_tracker.py
homeassistant/components/lifx/__init__.py
homeassistant/components/lifx/const.py
homeassistant/components/lifx/light.py
homeassistant/components/lifx_cloud/scene.py
homeassistant/components/lightwave/*
homeassistant/components/limitlessled/light.py

View File

@ -146,6 +146,7 @@ homeassistant.components.lametric.*
homeassistant.components.laundrify.*
homeassistant.components.lcn.*
homeassistant.components.light.*
homeassistant.components.lifx.*
homeassistant.components.local_ip.*
homeassistant.components.lock.*
homeassistant.components.logbook.*

View File

@ -577,7 +577,8 @@ build.json @home-assistant/supervisor
/homeassistant/components/lg_netcast/ @Drafteed
/homeassistant/components/life360/ @pnbruckner
/tests/components/life360/ @pnbruckner
/homeassistant/components/lifx/ @Djelibeybi
/homeassistant/components/lifx/ @bdraco @Djelibeybi
/tests/components/lifx/ @bdraco @Djelibeybi
/homeassistant/components/light/ @home-assistant/core
/tests/components/light/ @home-assistant/core
/homeassistant/components/linux_battery/ @fabaff

View File

@ -1,19 +1,41 @@
"""Support for LIFX."""
from __future__ import annotations
import asyncio
from collections.abc import Iterable
from datetime import datetime, timedelta
import socket
from typing import Any
from aiolifx.aiolifx import Light
from aiolifx_connection import LIFXConnection
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components.light import DOMAIN as LIGHT_DOMAIN
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_PORT, Platform
from homeassistant.core import HomeAssistant
from homeassistant.const import (
CONF_HOST,
CONF_PORT,
EVENT_HOMEASSISTANT_STARTED,
Platform,
)
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.exceptions import ConfigEntryNotReady
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.event import async_call_later, async_track_time_interval
from homeassistant.helpers.typing import ConfigType
from .const import DOMAIN
from .const import _LOGGER, DATA_LIFX_MANAGER, DOMAIN, TARGET_ANY
from .coordinator import LIFXUpdateCoordinator
from .discovery import async_discover_devices, async_trigger_discovery
from .manager import LIFXManager
from .migration import async_migrate_entities_devices, async_migrate_legacy_entries
from .util import async_entry_is_legacy, async_get_legacy_entry
CONF_SERVER = "server"
CONF_BROADCAST = "broadcast"
INTERFACE_SCHEMA = vol.Schema(
{
vol.Optional(CONF_SERVER): cv.string,
@ -22,39 +44,176 @@ INTERFACE_SCHEMA = vol.Schema(
}
)
CONFIG_SCHEMA = vol.Schema(
{DOMAIN: {LIGHT_DOMAIN: vol.Schema(vol.All(cv.ensure_list, [INTERFACE_SCHEMA]))}},
extra=vol.ALLOW_EXTRA,
CONFIG_SCHEMA = vol.All(
cv.deprecated(DOMAIN),
vol.Schema(
{
DOMAIN: {
LIGHT_DOMAIN: vol.Schema(vol.All(cv.ensure_list, [INTERFACE_SCHEMA]))
}
},
extra=vol.ALLOW_EXTRA,
),
)
DATA_LIFX_MANAGER = "lifx_manager"
PLATFORMS = [Platform.LIGHT]
DISCOVERY_INTERVAL = timedelta(minutes=15)
MIGRATION_INTERVAL = timedelta(minutes=5)
DISCOVERY_COOLDOWN = 5
async def async_legacy_migration(
hass: HomeAssistant,
legacy_entry: ConfigEntry,
discovered_devices: Iterable[Light],
) -> bool:
"""Migrate config entries."""
existing_serials = {
entry.unique_id
for entry in hass.config_entries.async_entries(DOMAIN)
if entry.unique_id and not async_entry_is_legacy(entry)
}
# device.mac_addr is not the mac_address, its the serial number
hosts_by_serial = {device.mac_addr: device.ip_addr for device in discovered_devices}
missing_discovery_count = await async_migrate_legacy_entries(
hass, hosts_by_serial, existing_serials, legacy_entry
)
if missing_discovery_count:
_LOGGER.info(
"Migration in progress, waiting to discover %s device(s)",
missing_discovery_count,
)
return False
_LOGGER.debug(
"Migration successful, removing legacy entry %s", legacy_entry.entry_id
)
await hass.config_entries.async_remove(legacy_entry.entry_id)
return True
class LIFXDiscoveryManager:
"""Manage discovery and migration."""
def __init__(self, hass: HomeAssistant, migrating: bool) -> None:
"""Init the manager."""
self.hass = hass
self.lock = asyncio.Lock()
self.migrating = migrating
self._cancel_discovery: CALLBACK_TYPE | None = None
@callback
def async_setup_discovery_interval(self) -> None:
"""Set up discovery at an interval."""
if self._cancel_discovery:
self._cancel_discovery()
self._cancel_discovery = None
discovery_interval = (
MIGRATION_INTERVAL if self.migrating else DISCOVERY_INTERVAL
)
_LOGGER.debug(
"LIFX starting discovery with interval: %s and migrating: %s",
discovery_interval,
self.migrating,
)
self._cancel_discovery = async_track_time_interval(
self.hass, self.async_discovery, discovery_interval
)
async def async_discovery(self, *_: Any) -> None:
"""Discovery and migrate LIFX devics."""
migrating_was_in_progress = self.migrating
async with self.lock:
discovered = await async_discover_devices(self.hass)
if legacy_entry := async_get_legacy_entry(self.hass):
migration_complete = await async_legacy_migration(
self.hass, legacy_entry, discovered
)
if migration_complete and migrating_was_in_progress:
self.migrating = False
_LOGGER.debug(
"LIFX migration complete, switching to normal discovery interval: %s",
DISCOVERY_INTERVAL,
)
self.async_setup_discovery_interval()
if discovered:
async_trigger_discovery(self.hass, discovered)
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the LIFX component."""
conf = config.get(DOMAIN)
hass.data[DOMAIN] = {}
migrating = bool(async_get_legacy_entry(hass))
discovery_manager = LIFXDiscoveryManager(hass, migrating)
hass.data[DOMAIN] = conf or {}
@callback
def _async_delayed_discovery(now: datetime) -> None:
"""Start an untracked task to discover devices.
if conf is not None:
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_IMPORT}
)
)
We do not want the discovery task to block startup.
"""
asyncio.create_task(discovery_manager.async_discovery())
# Let the system settle a bit before starting discovery
# to reduce the risk we miss devices because the event
# loop is blocked at startup.
discovery_manager.async_setup_discovery_interval()
async_call_later(hass, DISCOVERY_COOLDOWN, _async_delayed_discovery)
hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_STARTED, discovery_manager.async_discovery
)
return True
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up LIFX from a config entry."""
if async_entry_is_legacy(entry):
return True
if legacy_entry := async_get_legacy_entry(hass):
# If the legacy entry still exists, harvest the entities
# that are moving to this config entry.
await async_migrate_entities_devices(hass, legacy_entry.entry_id, entry)
assert entry.unique_id is not None
domain_data = hass.data[DOMAIN]
if DATA_LIFX_MANAGER not in domain_data:
manager = LIFXManager(hass)
domain_data[DATA_LIFX_MANAGER] = manager
manager.async_setup()
host = entry.data[CONF_HOST]
connection = LIFXConnection(host, TARGET_ANY)
try:
await connection.async_setup()
except socket.gaierror as ex:
raise ConfigEntryNotReady(f"Could not resolve {host}: {ex}") from ex
coordinator = LIFXUpdateCoordinator(hass, connection, entry.title)
coordinator.async_setup()
await coordinator.async_config_entry_first_refresh()
domain_data[entry.entry_id] = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
hass.data.pop(DATA_LIFX_MANAGER).cleanup()
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if async_entry_is_legacy(entry):
return True
domain_data = hass.data[DOMAIN]
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
coordinator: LIFXUpdateCoordinator = domain_data.pop(entry.entry_id)
coordinator.connection.async_stop()
# Only the DATA_LIFX_MANAGER left, remove it.
if len(domain_data) == 1:
manager: LIFXManager = domain_data.pop(DATA_LIFX_MANAGER)
manager.async_unload()
return unload_ok

View File

@ -1,16 +1,240 @@
"""Config flow flow LIFX."""
import aiolifx
from __future__ import annotations
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_entry_flow
import asyncio
import socket
from typing import Any
from .const import DOMAIN
from aiolifx.aiolifx import Light
from aiolifx_connection import LIFXConnection
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components import zeroconf
from homeassistant.components.dhcp import DhcpServiceInfo
from homeassistant.const import CONF_DEVICE, CONF_HOST
from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.typing import DiscoveryInfoType
from .const import _LOGGER, CONF_SERIAL, DOMAIN, TARGET_ANY
from .discovery import async_discover_devices
from .util import (
async_entry_is_legacy,
async_execute_lifx,
async_get_legacy_entry,
formatted_serial,
lifx_features,
mac_matches_serial_number,
)
async def _async_has_devices(hass: HomeAssistant) -> bool:
"""Return if there are devices that can be discovered."""
lifx_ip_addresses = await aiolifx.LifxScan(hass.loop).scan()
return len(lifx_ip_addresses) > 0
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for tplink."""
VERSION = 1
config_entry_flow.register_discovery_flow(DOMAIN, "LIFX", _async_has_devices)
def __init__(self) -> None:
"""Initialize the config flow."""
self._discovered_devices: dict[str, Light] = {}
self._discovered_device: Light | None = None
async def async_step_dhcp(self, discovery_info: DhcpServiceInfo) -> FlowResult:
"""Handle discovery via dhcp."""
mac = discovery_info.macaddress
host = discovery_info.ip
hass = self.hass
for entry in self._async_current_entries():
if (
entry.unique_id
and not async_entry_is_legacy(entry)
and mac_matches_serial_number(mac, entry.unique_id)
):
if entry.data[CONF_HOST] != host:
hass.config_entries.async_update_entry(
entry, data={**entry.data, CONF_HOST: host}
)
hass.async_create_task(
hass.config_entries.async_reload(entry.entry_id)
)
return self.async_abort(reason="already_configured")
return await self._async_handle_discovery(host)
async def async_step_homekit(
self, discovery_info: zeroconf.ZeroconfServiceInfo
) -> FlowResult:
"""Handle HomeKit discovery."""
return await self._async_handle_discovery(host=discovery_info.host)
async def async_step_integration_discovery(
self, discovery_info: DiscoveryInfoType
) -> FlowResult:
"""Handle discovery."""
_LOGGER.debug("async_step_integration_discovery %s", discovery_info)
serial = discovery_info[CONF_SERIAL]
host = discovery_info[CONF_HOST]
await self.async_set_unique_id(formatted_serial(serial))
self._abort_if_unique_id_configured(updates={CONF_HOST: host})
return await self._async_handle_discovery(host, serial)
async def _async_handle_discovery(
self, host: str, serial: str | None = None
) -> FlowResult:
"""Handle any discovery."""
_LOGGER.debug("Discovery %s %s", host, serial)
self._async_abort_entries_match({CONF_HOST: host})
self.context[CONF_HOST] = host
if any(
progress.get("context", {}).get(CONF_HOST) == host
for progress in self._async_in_progress()
):
return self.async_abort(reason="already_in_progress")
if not (
device := await self._async_try_connect(
host, serial=serial, raise_on_progress=True
)
):
return self.async_abort(reason="cannot_connect")
self._discovered_device = device
return await self.async_step_discovery_confirm()
@callback
def _async_discovered_pending_migration(self) -> bool:
"""Check if a discovered device is pending migration."""
assert self.unique_id is not None
if not (legacy_entry := async_get_legacy_entry(self.hass)):
return False
device_registry = dr.async_get(self.hass)
existing_device = device_registry.async_get_device(
identifiers={(DOMAIN, self.unique_id)}
)
return bool(
existing_device is not None
and legacy_entry.entry_id in existing_device.config_entries
)
async def async_step_discovery_confirm(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Confirm discovery."""
assert self._discovered_device is not None
_LOGGER.debug(
"Confirming discovery: %s with serial %s",
self._discovered_device.label,
self.unique_id,
)
if user_input is not None or self._async_discovered_pending_migration():
return self._async_create_entry_from_device(self._discovered_device)
self._set_confirm_only()
placeholders = {
"label": self._discovered_device.label,
"host": self._discovered_device.ip_addr,
"serial": self.unique_id,
}
self.context["title_placeholders"] = placeholders
return self.async_show_form(
step_id="discovery_confirm", description_placeholders=placeholders
)
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle the initial step."""
errors = {}
if user_input is not None:
host = user_input[CONF_HOST]
if not host:
return await self.async_step_pick_device()
if (
device := await self._async_try_connect(host, raise_on_progress=False)
) is None:
errors["base"] = "cannot_connect"
else:
return self._async_create_entry_from_device(device)
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({vol.Optional(CONF_HOST, default=""): str}),
errors=errors,
)
async def async_step_pick_device(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle the step to pick discovered device."""
if user_input is not None:
serial = user_input[CONF_DEVICE]
await self.async_set_unique_id(serial, raise_on_progress=False)
device_without_label = self._discovered_devices[serial]
device = await self._async_try_connect(
device_without_label.ip_addr, raise_on_progress=False
)
if not device:
return self.async_abort(reason="cannot_connect")
return self._async_create_entry_from_device(device)
configured_serials: set[str] = set()
configured_hosts: set[str] = set()
for entry in self._async_current_entries():
if entry.unique_id and not async_entry_is_legacy(entry):
configured_serials.add(entry.unique_id)
configured_hosts.add(entry.data[CONF_HOST])
self._discovered_devices = {
# device.mac_addr is not the mac_address, its the serial number
device.mac_addr: device
for device in await async_discover_devices(self.hass)
}
devices_name = {
serial: f"{serial} ({device.ip_addr})"
for serial, device in self._discovered_devices.items()
if serial not in configured_serials
and device.ip_addr not in configured_hosts
}
# Check if there is at least one device
if not devices_name:
return self.async_abort(reason="no_devices_found")
return self.async_show_form(
step_id="pick_device",
data_schema=vol.Schema({vol.Required(CONF_DEVICE): vol.In(devices_name)}),
)
@callback
def _async_create_entry_from_device(self, device: Light) -> FlowResult:
"""Create a config entry from a smart device."""
self._abort_if_unique_id_configured(updates={CONF_HOST: device.ip_addr})
return self.async_create_entry(
title=device.label,
data={CONF_HOST: device.ip_addr},
)
async def _async_try_connect(
self, host: str, serial: str | None = None, raise_on_progress: bool = True
) -> Light | None:
"""Try to connect."""
self._async_abort_entries_match({CONF_HOST: host})
connection = LIFXConnection(host, TARGET_ANY)
try:
await connection.async_setup()
except socket.gaierror:
return None
device: Light = connection.device
device.get_hostfirmware()
try:
message = await async_execute_lifx(device.get_color)
except asyncio.TimeoutError:
return None
finally:
connection.async_stop()
if (
lifx_features(device)["relays"] is True
or device.host_firmware_version is None
):
return None # relays not supported
# device.mac_addr is not the mac_address, its the serial number
device.mac_addr = serial or message.target_addr
await self.async_set_unique_id(
formatted_serial(device.mac_addr), raise_on_progress=raise_on_progress
)
return device

View File

@ -1,3 +1,19 @@
"""Const for LIFX."""
import logging
DOMAIN = "lifx"
TARGET_ANY = "00:00:00:00:00:00"
DISCOVERY_INTERVAL = 10
MESSAGE_TIMEOUT = 1.65
MESSAGE_RETRIES = 5
OVERALL_TIMEOUT = 9
UNAVAILABLE_GRACE = 90
CONF_SERIAL = "serial"
DATA_LIFX_MANAGER = "lifx_manager"
_LOGGER = logging.getLogger(__name__)

View File

@ -0,0 +1,158 @@
"""Coordinator for lifx."""
from __future__ import annotations
import asyncio
from datetime import timedelta
from functools import partial
from typing import cast
from aiolifx.aiolifx import Light
from aiolifx_connection import LIFXConnection
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.debounce import Debouncer
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import (
_LOGGER,
MESSAGE_RETRIES,
MESSAGE_TIMEOUT,
TARGET_ANY,
UNAVAILABLE_GRACE,
)
from .util import async_execute_lifx, get_real_mac_addr, lifx_features
REQUEST_REFRESH_DELAY = 0.35
class LIFXUpdateCoordinator(DataUpdateCoordinator):
"""DataUpdateCoordinator to gather data for a specific lifx device."""
def __init__(
self,
hass: HomeAssistant,
connection: LIFXConnection,
title: str,
) -> None:
"""Initialize DataUpdateCoordinator."""
assert connection.device is not None
self.connection = connection
self.device: Light = connection.device
self.lock = asyncio.Lock()
update_interval = timedelta(seconds=10)
super().__init__(
hass,
_LOGGER,
name=f"{title} ({self.device.ip_addr})",
update_interval=update_interval,
# We don't want an immediate refresh since the device
# takes a moment to reflect the state change
request_refresh_debouncer=Debouncer(
hass, _LOGGER, cooldown=REQUEST_REFRESH_DELAY, immediate=False
),
)
@callback
def async_setup(self) -> None:
"""Change timeouts."""
self.device.timeout = MESSAGE_TIMEOUT
self.device.retry_count = MESSAGE_RETRIES
self.device.unregister_timeout = UNAVAILABLE_GRACE
@property
def serial_number(self) -> str:
"""Return the internal mac address."""
return cast(
str, self.device.mac_addr
) # device.mac_addr is not the mac_address, its the serial number
@property
def mac_address(self) -> str:
"""Return the physical mac address."""
return get_real_mac_addr(
# device.mac_addr is not the mac_address, its the serial number
self.device.mac_addr,
self.device.host_firmware_version,
)
async def _async_update_data(self) -> None:
"""Fetch all device data from the api."""
async with self.lock:
if self.device.host_firmware_version is None:
self.device.get_hostfirmware()
if self.device.product is None:
self.device.get_version()
try:
response = await async_execute_lifx(self.device.get_color)
except asyncio.TimeoutError as ex:
raise UpdateFailed(
f"Failed to fetch state from device: {self.device.ip_addr}"
) from ex
if self.device.product is None:
raise UpdateFailed(
f"Failed to fetch get version from device: {self.device.ip_addr}"
)
# device.mac_addr is not the mac_address, its the serial number
if self.device.mac_addr == TARGET_ANY:
self.device.mac_addr = response.target_addr
if lifx_features(self.device)["multizone"]:
try:
await self.async_update_color_zones()
except asyncio.TimeoutError as ex:
raise UpdateFailed(
f"Failed to fetch zones from device: {self.device.ip_addr}"
) from ex
async def async_update_color_zones(self) -> None:
"""Get updated color information for each zone."""
zone = 0
top = 1
while zone < top:
# Each get_color_zones can update 8 zones at once
resp = await async_execute_lifx(
partial(self.device.get_color_zones, start_index=zone)
)
zone += 8
top = resp.count
# We only await multizone responses so don't ask for just one
if zone == top - 1:
zone -= 1
async def async_get_color(self) -> None:
"""Send a get color message to the device."""
await async_execute_lifx(self.device.get_color)
async def async_set_power(self, state: bool, duration: int | None) -> None:
"""Send a set power message to the device."""
await async_execute_lifx(
partial(self.device.set_power, state, duration=duration)
)
async def async_set_color(
self, hsbk: list[float | int | None], duration: int | None
) -> None:
"""Send a set color message to the device."""
await async_execute_lifx(
partial(self.device.set_color, hsbk, duration=duration)
)
async def async_set_color_zones(
self,
start_index: int,
end_index: int,
hsbk: list[float | int | None],
duration: int | None,
apply: int,
) -> None:
"""Send a set color zones message to the device."""
await async_execute_lifx(
partial(
self.device.set_color_zones,
start_index=start_index,
end_index=end_index,
color=hsbk,
duration=duration,
apply=apply,
)
)

View File

@ -0,0 +1,58 @@
"""The lifx integration discovery."""
from __future__ import annotations
import asyncio
from collections.abc import Iterable
from aiolifx.aiolifx import LifxDiscovery, Light, ScanManager
from homeassistant import config_entries
from homeassistant.components import network
from homeassistant.const import CONF_HOST
from homeassistant.core import HomeAssistant, callback
from .const import CONF_SERIAL, DOMAIN
DEFAULT_TIMEOUT = 8.5
async def async_discover_devices(hass: HomeAssistant) -> Iterable[Light]:
"""Discover lifx devices."""
all_lights: dict[str, Light] = {}
broadcast_addrs = await network.async_get_ipv4_broadcast_addresses(hass)
discoveries = []
for address in broadcast_addrs:
manager = ScanManager(str(address))
lifx_discovery = LifxDiscovery(hass.loop, manager, broadcast_ip=str(address))
discoveries.append(lifx_discovery)
lifx_discovery.start()
await asyncio.sleep(DEFAULT_TIMEOUT)
for discovery in discoveries:
all_lights.update(discovery.lights)
discovery.cleanup()
return all_lights.values()
@callback
def async_init_discovery_flow(hass: HomeAssistant, host: str, serial: str) -> None:
"""Start discovery of devices."""
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_INTEGRATION_DISCOVERY},
data={CONF_HOST: host, CONF_SERIAL: serial},
)
)
@callback
def async_trigger_discovery(
hass: HomeAssistant,
discovered_devices: Iterable[Light],
) -> None:
"""Trigger config flows for discovered devices."""
for device in discovered_devices:
# device.mac_addr is not the mac_address, its the serial number
async_init_discovery_flow(hass, device.ip_addr, device.mac_addr)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,216 @@
"""Support for LIFX lights."""
from __future__ import annotations
from collections.abc import Callable
from datetime import timedelta
from typing import Any
import aiolifx_effects
import voluptuous as vol
from homeassistant.components.light import (
ATTR_BRIGHTNESS,
ATTR_BRIGHTNESS_PCT,
ATTR_COLOR_NAME,
ATTR_COLOR_TEMP,
ATTR_HS_COLOR,
ATTR_KELVIN,
ATTR_RGB_COLOR,
ATTR_TRANSITION,
ATTR_XY_COLOR,
COLOR_GROUP,
VALID_BRIGHTNESS,
VALID_BRIGHTNESS_PCT,
preprocess_turn_on_alternatives,
)
from homeassistant.const import ATTR_MODE
from homeassistant.core import HomeAssistant, ServiceCall, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.service import async_extract_referenced_entity_ids
from .const import _LOGGER, DATA_LIFX_MANAGER, DOMAIN
from .util import convert_8_to_16, find_hsbk
SCAN_INTERVAL = timedelta(seconds=10)
SERVICE_EFFECT_PULSE = "effect_pulse"
SERVICE_EFFECT_COLORLOOP = "effect_colorloop"
SERVICE_EFFECT_STOP = "effect_stop"
ATTR_POWER_ON = "power_on"
ATTR_PERIOD = "period"
ATTR_CYCLES = "cycles"
ATTR_SPREAD = "spread"
ATTR_CHANGE = "change"
PULSE_MODE_BLINK = "blink"
PULSE_MODE_BREATHE = "breathe"
PULSE_MODE_PING = "ping"
PULSE_MODE_STROBE = "strobe"
PULSE_MODE_SOLID = "solid"
PULSE_MODES = [
PULSE_MODE_BLINK,
PULSE_MODE_BREATHE,
PULSE_MODE_PING,
PULSE_MODE_STROBE,
PULSE_MODE_SOLID,
]
LIFX_EFFECT_SCHEMA = {
vol.Optional(ATTR_POWER_ON, default=True): cv.boolean,
}
LIFX_EFFECT_PULSE_SCHEMA = cv.make_entity_service_schema(
{
**LIFX_EFFECT_SCHEMA,
ATTR_BRIGHTNESS: VALID_BRIGHTNESS,
ATTR_BRIGHTNESS_PCT: VALID_BRIGHTNESS_PCT,
vol.Exclusive(ATTR_COLOR_NAME, COLOR_GROUP): cv.string,
vol.Exclusive(ATTR_RGB_COLOR, COLOR_GROUP): vol.All(
vol.Coerce(tuple), vol.ExactSequence((cv.byte, cv.byte, cv.byte))
),
vol.Exclusive(ATTR_XY_COLOR, COLOR_GROUP): vol.All(
vol.Coerce(tuple), vol.ExactSequence((cv.small_float, cv.small_float))
),
vol.Exclusive(ATTR_HS_COLOR, COLOR_GROUP): vol.All(
vol.Coerce(tuple),
vol.ExactSequence(
(
vol.All(vol.Coerce(float), vol.Range(min=0, max=360)),
vol.All(vol.Coerce(float), vol.Range(min=0, max=100)),
)
),
),
vol.Exclusive(ATTR_COLOR_TEMP, COLOR_GROUP): vol.All(
vol.Coerce(int), vol.Range(min=1)
),
vol.Exclusive(ATTR_KELVIN, COLOR_GROUP): cv.positive_int,
ATTR_PERIOD: vol.All(vol.Coerce(float), vol.Range(min=0.05)),
ATTR_CYCLES: vol.All(vol.Coerce(float), vol.Range(min=1)),
ATTR_MODE: vol.In(PULSE_MODES),
}
)
LIFX_EFFECT_COLORLOOP_SCHEMA = cv.make_entity_service_schema(
{
**LIFX_EFFECT_SCHEMA,
ATTR_BRIGHTNESS: VALID_BRIGHTNESS,
ATTR_BRIGHTNESS_PCT: VALID_BRIGHTNESS_PCT,
ATTR_PERIOD: vol.All(vol.Coerce(float), vol.Clamp(min=0.05)),
ATTR_CHANGE: vol.All(vol.Coerce(float), vol.Clamp(min=0, max=360)),
ATTR_SPREAD: vol.All(vol.Coerce(float), vol.Clamp(min=0, max=360)),
ATTR_TRANSITION: cv.positive_float,
}
)
LIFX_EFFECT_STOP_SCHEMA = cv.make_entity_service_schema({})
SERVICES = (
SERVICE_EFFECT_STOP,
SERVICE_EFFECT_PULSE,
SERVICE_EFFECT_COLORLOOP,
)
class LIFXManager:
"""Representation of all known LIFX entities."""
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize the manager."""
self.hass = hass
self.effects_conductor = aiolifx_effects.Conductor(hass.loop)
self.entry_id_to_entity_id: dict[str, str] = {}
@callback
def async_unload(self) -> None:
"""Release resources."""
for service in SERVICES:
self.hass.services.async_remove(DOMAIN, service)
@callback
def async_register_entity(
self, entity_id: str, entry_id: str
) -> Callable[[], None]:
"""Register an entity to the config entry id."""
self.entry_id_to_entity_id[entry_id] = entity_id
@callback
def unregister_entity() -> None:
"""Unregister entity when it is being destroyed."""
self.entry_id_to_entity_id.pop(entry_id)
return unregister_entity
@callback
def async_setup(self) -> None:
"""Register the LIFX effects as hass service calls."""
async def service_handler(service: ServiceCall) -> None:
"""Apply a service, i.e. start an effect."""
referenced = async_extract_referenced_entity_ids(self.hass, service)
all_referenced = referenced.referenced | referenced.indirectly_referenced
if all_referenced:
await self.start_effect(all_referenced, service.service, **service.data)
self.hass.services.async_register(
DOMAIN,
SERVICE_EFFECT_PULSE,
service_handler,
schema=LIFX_EFFECT_PULSE_SCHEMA,
)
self.hass.services.async_register(
DOMAIN,
SERVICE_EFFECT_COLORLOOP,
service_handler,
schema=LIFX_EFFECT_COLORLOOP_SCHEMA,
)
self.hass.services.async_register(
DOMAIN,
SERVICE_EFFECT_STOP,
service_handler,
schema=LIFX_EFFECT_STOP_SCHEMA,
)
async def start_effect(
self, entity_ids: set[str], service: str, **kwargs: Any
) -> None:
"""Start a light effect on entities."""
bulbs = [
coordinator.device
for entry_id, coordinator in self.hass.data[DOMAIN].items()
if entry_id != DATA_LIFX_MANAGER
and self.entry_id_to_entity_id[entry_id] in entity_ids
]
_LOGGER.debug("Starting effect %s on %s", service, bulbs)
if service == SERVICE_EFFECT_PULSE:
effect = aiolifx_effects.EffectPulse(
power_on=kwargs.get(ATTR_POWER_ON),
period=kwargs.get(ATTR_PERIOD),
cycles=kwargs.get(ATTR_CYCLES),
mode=kwargs.get(ATTR_MODE),
hsbk=find_hsbk(self.hass, **kwargs),
)
await self.effects_conductor.start(effect, bulbs)
elif service == SERVICE_EFFECT_COLORLOOP:
preprocess_turn_on_alternatives(self.hass, kwargs) # type: ignore[no-untyped-call]
brightness = None
if ATTR_BRIGHTNESS in kwargs:
brightness = convert_8_to_16(kwargs[ATTR_BRIGHTNESS])
effect = aiolifx_effects.EffectColorloop(
power_on=kwargs.get(ATTR_POWER_ON),
period=kwargs.get(ATTR_PERIOD),
change=kwargs.get(ATTR_CHANGE),
spread=kwargs.get(ATTR_SPREAD),
transition=kwargs.get(ATTR_TRANSITION),
brightness=brightness,
)
await self.effects_conductor.start(effect, bulbs)
elif service == SERVICE_EFFECT_STOP:
await self.effects_conductor.stop(bulbs)

View File

@ -3,7 +3,12 @@
"name": "LIFX",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/lifx",
"requirements": ["aiolifx==0.8.1", "aiolifx_effects==0.2.2"],
"requirements": [
"aiolifx==0.8.1",
"aiolifx_effects==0.2.2",
"aiolifx-connection==1.0.0"
],
"quality_scale": "platinum",
"dependencies": ["network"],
"homekit": {
"models": [
@ -29,7 +34,8 @@
"LIFX Z"
]
},
"codeowners": ["@Djelibeybi"],
"dhcp": [{ "macaddress": "D073D5*" }, { "registered_devices": true }],
"codeowners": ["@bdraco", "@Djelibeybi"],
"iot_class": "local_polling",
"loggers": ["aiolifx", "aiolifx_effects", "bitstring"]
}

View File

@ -0,0 +1,74 @@
"""Migrate lifx devices to their own config entry."""
from __future__ import annotations
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr, entity_registry as er
from .const import _LOGGER, DOMAIN
from .discovery import async_init_discovery_flow
async def async_migrate_legacy_entries(
hass: HomeAssistant,
discovered_hosts_by_serial: dict[str, str],
existing_serials: set[str],
legacy_entry: ConfigEntry,
) -> int:
"""Migrate the legacy config entries to have an entry per device."""
_LOGGER.debug(
"Migrating legacy entries: discovered_hosts_by_serial=%s, existing_serials=%s",
discovered_hosts_by_serial,
existing_serials,
)
device_registry = dr.async_get(hass)
for dev_entry in dr.async_entries_for_config_entry(
device_registry, legacy_entry.entry_id
):
for domain, serial in dev_entry.identifiers:
if (
domain == DOMAIN
and serial not in existing_serials
and (host := discovered_hosts_by_serial.get(serial))
):
async_init_discovery_flow(hass, host, serial)
remaining_devices = dr.async_entries_for_config_entry(
dr.async_get(hass), legacy_entry.entry_id
)
_LOGGER.debug("The following devices remain: %s", remaining_devices)
return len(remaining_devices)
async def async_migrate_entities_devices(
hass: HomeAssistant, legacy_entry_id: str, new_entry: ConfigEntry
) -> None:
"""Move entities and devices to the new config entry."""
migrated_devices = []
device_registry = dr.async_get(hass)
for dev_entry in dr.async_entries_for_config_entry(
device_registry, legacy_entry_id
):
for domain, value in dev_entry.identifiers:
if domain == DOMAIN and value == new_entry.unique_id:
_LOGGER.debug(
"Migrating device with %s to %s",
dev_entry.identifiers,
new_entry.unique_id,
)
migrated_devices.append(dev_entry.id)
device_registry.async_update_device(
dev_entry.id,
add_config_entry_id=new_entry.entry_id,
remove_config_entry_id=legacy_entry_id,
)
entity_registry = er.async_get(hass)
for reg_entity in er.async_entries_for_config_entry(
entity_registry, legacy_entry_id
):
if reg_entity.device_id in migrated_devices:
entity_registry.async_update_entity(
reg_entity.entity_id, config_entry_id=new_entry.entry_id
)

View File

@ -1,12 +1,28 @@
{
"config": {
"flow_title": "{label} ({host}) {serial}",
"step": {
"confirm": {
"description": "Do you want to set up LIFX?"
"user": {
"description": "If you leave the host empty, discovery will be used to find devices.",
"data": {
"host": "[%key:common::config_flow::data::host%]"
}
},
"pick_device": {
"data": {
"device": "Device"
}
},
"discovery_confirm": {
"description": "Do you want to setup {label} ({host}) {serial}?"
}
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]"
},
"abort": {
"single_instance_allowed": "[%key:common::config_flow::abort::single_instance_allowed%]",
"already_in_progress": "[%key:common::config_flow::abort::already_in_progress%]",
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"no_devices_found": "[%key:common::config_flow::abort::no_devices_found%]"
}
}

View File

@ -1,12 +1,28 @@
{
"config": {
"abort": {
"no_devices_found": "No devices found on the network",
"single_instance_allowed": "Already configured. Only a single configuration possible."
"already_configured": "Device is already configured",
"already_in_progress": "Configuration flow is already in progress",
"no_devices_found": "No devices found on the network"
},
"error": {
"cannot_connect": "Failed to connect"
},
"flow_title": "{label} ({host}) {serial}",
"step": {
"confirm": {
"description": "Do you want to set up LIFX?"
"discovery_confirm": {
"description": "Do you want to setup {label} ({host}) {serial}?"
},
"pick_device": {
"data": {
"device": "Device"
}
},
"user": {
"data": {
"host": "Host"
},
"description": "If you leave the host empty, discovery will be used to find devices."
}
}
}

View File

@ -0,0 +1,161 @@
"""Support for LIFX."""
from __future__ import annotations
import asyncio
from collections.abc import Callable
from typing import Any
from aiolifx import products
from aiolifx.aiolifx import Light
from aiolifx.message import Message
import async_timeout
from awesomeversion import AwesomeVersion
from homeassistant.components.light import (
ATTR_BRIGHTNESS,
ATTR_COLOR_TEMP,
ATTR_HS_COLOR,
ATTR_RGB_COLOR,
ATTR_XY_COLOR,
preprocess_turn_on_alternatives,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import device_registry as dr
import homeassistant.util.color as color_util
from .const import _LOGGER, DOMAIN, OVERALL_TIMEOUT
FIX_MAC_FW = AwesomeVersion("3.70")
@callback
def async_entry_is_legacy(entry: ConfigEntry) -> bool:
"""Check if a config entry is the legacy shared one."""
return entry.unique_id is None or entry.unique_id == DOMAIN
@callback
def async_get_legacy_entry(hass: HomeAssistant) -> ConfigEntry | None:
"""Get the legacy config entry."""
for entry in hass.config_entries.async_entries(DOMAIN):
if async_entry_is_legacy(entry):
return entry
return None
def convert_8_to_16(value: int) -> int:
"""Scale an 8 bit level into 16 bits."""
return (value << 8) | value
def convert_16_to_8(value: int) -> int:
"""Scale a 16 bit level into 8 bits."""
return value >> 8
def lifx_features(bulb: Light) -> dict[str, Any]:
"""Return a feature map for this bulb, or a default map if unknown."""
features: dict[str, Any] = (
products.features_map.get(bulb.product) or products.features_map[1]
)
return features
def find_hsbk(hass: HomeAssistant, **kwargs: Any) -> list[float | int | None] | None:
"""Find the desired color from a number of possible inputs.
Hue, Saturation, Brightness, Kelvin
"""
hue, saturation, brightness, kelvin = [None] * 4
preprocess_turn_on_alternatives(hass, kwargs) # type: ignore[no-untyped-call]
if ATTR_HS_COLOR in kwargs:
hue, saturation = kwargs[ATTR_HS_COLOR]
elif ATTR_RGB_COLOR in kwargs:
hue, saturation = color_util.color_RGB_to_hs(*kwargs[ATTR_RGB_COLOR])
elif ATTR_XY_COLOR in kwargs:
hue, saturation = color_util.color_xy_to_hs(*kwargs[ATTR_XY_COLOR])
if hue is not None:
assert saturation is not None
hue = int(hue / 360 * 65535)
saturation = int(saturation / 100 * 65535)
kelvin = 3500
if ATTR_COLOR_TEMP in kwargs:
kelvin = int(
color_util.color_temperature_mired_to_kelvin(kwargs[ATTR_COLOR_TEMP])
)
saturation = 0
if ATTR_BRIGHTNESS in kwargs:
brightness = convert_8_to_16(kwargs[ATTR_BRIGHTNESS])
hsbk = [hue, saturation, brightness, kelvin]
return None if hsbk == [None] * 4 else hsbk
def merge_hsbk(
base: list[float | int | None], change: list[float | int | None]
) -> list[float | int | None]:
"""Copy change on top of base, except when None.
Hue, Saturation, Brightness, Kelvin
"""
return [b if c is None else c for b, c in zip(base, change)]
def _get_mac_offset(mac_addr: str, offset: int) -> str:
octets = [int(octet, 16) for octet in mac_addr.split(":")]
octets[5] = (octets[5] + offset) % 256
return ":".join(f"{octet:02x}" for octet in octets)
def _off_by_one_mac(firmware: str) -> bool:
"""Check if the firmware version has the off by one mac."""
return bool(firmware and AwesomeVersion(firmware) >= FIX_MAC_FW)
def get_real_mac_addr(mac_addr: str, firmware: str) -> str:
"""Increment the last byte of the mac address by one for FW>3.70."""
return _get_mac_offset(mac_addr, 1) if _off_by_one_mac(firmware) else mac_addr
def formatted_serial(serial_number: str) -> str:
"""Format the serial number to match the HA device registry."""
return dr.format_mac(serial_number)
def mac_matches_serial_number(mac_addr: str, serial_number: str) -> bool:
"""Check if a mac address matches the serial number."""
formatted_mac = dr.format_mac(mac_addr)
return bool(
formatted_serial(serial_number) == formatted_mac
or _get_mac_offset(serial_number, 1) == formatted_mac
)
async def async_execute_lifx(method: Callable) -> Message:
"""Execute a lifx coroutine and wait for a response."""
future: asyncio.Future[Message] = asyncio.Future()
def _callback(bulb: Light, message: Message) -> None:
if not future.done():
# The future will get canceled out from under
# us by async_timeout when we hit the OVERALL_TIMEOUT
future.set_result(message)
_LOGGER.debug("Sending LIFX command: %s", method)
method(callb=_callback)
result = None
async with async_timeout.timeout(OVERALL_TIMEOUT):
result = await future
if result is None:
raise asyncio.TimeoutError("No response from LIFX bulb")
return result

View File

@ -58,6 +58,8 @@ DHCP: list[dict[str, str | bool]] = [
{'domain': 'isy994', 'registered_devices': True},
{'domain': 'isy994', 'hostname': 'isy*', 'macaddress': '0021B9*'},
{'domain': 'isy994', 'hostname': 'polisy*', 'macaddress': '000DB9*'},
{'domain': 'lifx', 'macaddress': 'D073D5*'},
{'domain': 'lifx', 'registered_devices': True},
{'domain': 'lyric', 'hostname': 'lyric-*', 'macaddress': '48A2E6*'},
{'domain': 'lyric', 'hostname': 'lyric-*', 'macaddress': 'B82CA0*'},
{'domain': 'lyric', 'hostname': 'lyric-*', 'macaddress': '00D02D*'},

View File

@ -1369,6 +1369,17 @@ no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.lifx.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.local_ip.*]
check_untyped_defs = true
disallow_incomplete_defs = true

View File

@ -186,6 +186,9 @@ aiokafka==0.7.2
# homeassistant.components.kef
aiokef==0.2.16
# homeassistant.components.lifx
aiolifx-connection==1.0.0
# homeassistant.components.lifx
aiolifx==0.8.1

View File

@ -164,6 +164,15 @@ aiohue==4.4.2
# homeassistant.components.apache_kafka
aiokafka==0.7.2
# homeassistant.components.lifx
aiolifx-connection==1.0.0
# homeassistant.components.lifx
aiolifx==0.8.1
# homeassistant.components.lifx
aiolifx_effects==0.2.2
# homeassistant.components.lookin
aiolookin==0.1.1

View File

@ -0,0 +1,217 @@
"""Tests for the lifx integration."""
from __future__ import annotations
import asyncio
from contextlib import contextmanager
from unittest.mock import AsyncMock, MagicMock, patch
from aiolifx.aiolifx import Light
from homeassistant.components.lifx import discovery
from homeassistant.components.lifx.const import TARGET_ANY
MODULE = "homeassistant.components.lifx"
MODULE_CONFIG_FLOW = "homeassistant.components.lifx.config_flow"
IP_ADDRESS = "127.0.0.1"
LABEL = "My Bulb"
SERIAL = "aa:bb:cc:dd:ee:cc"
MAC_ADDRESS = "aa:bb:cc:dd:ee:cd"
DEFAULT_ENTRY_TITLE = LABEL
class MockMessage:
"""Mock a lifx message."""
def __init__(self):
"""Init message."""
self.target_addr = SERIAL
self.count = 9
class MockFailingLifxCommand:
"""Mock a lifx command that fails."""
def __init__(self, bulb, **kwargs):
"""Init command."""
self.bulb = bulb
self.calls = []
def __call__(self, *args, **kwargs):
"""Call command."""
if callb := kwargs.get("callb"):
callb(self.bulb, None)
self.calls.append([args, kwargs])
def reset_mock(self):
"""Reset mock."""
self.calls = []
class MockLifxCommand:
"""Mock a lifx command."""
def __init__(self, bulb, **kwargs):
"""Init command."""
self.bulb = bulb
self.calls = []
def __call__(self, *args, **kwargs):
"""Call command."""
if callb := kwargs.get("callb"):
callb(self.bulb, MockMessage())
self.calls.append([args, kwargs])
def reset_mock(self):
"""Reset mock."""
self.calls = []
def _mocked_bulb() -> Light:
bulb = Light(asyncio.get_running_loop(), SERIAL, IP_ADDRESS)
bulb.host_firmware_version = "3.00"
bulb.label = LABEL
bulb.color = [1, 2, 3, 4]
bulb.power_level = 0
bulb.try_sending = AsyncMock()
bulb.set_infrared = MockLifxCommand(bulb)
bulb.get_color = MockLifxCommand(bulb)
bulb.set_power = MockLifxCommand(bulb)
bulb.set_color = MockLifxCommand(bulb)
bulb.get_hostfirmware = MockLifxCommand(bulb)
bulb.get_version = MockLifxCommand(bulb)
bulb.product = 1 # LIFX Original 1000
return bulb
def _mocked_failing_bulb() -> Light:
bulb = _mocked_bulb()
bulb.get_color = MockFailingLifxCommand(bulb)
bulb.set_power = MockFailingLifxCommand(bulb)
bulb.set_color = MockFailingLifxCommand(bulb)
bulb.get_hostfirmware = MockFailingLifxCommand(bulb)
bulb.get_version = MockFailingLifxCommand(bulb)
return bulb
def _mocked_white_bulb() -> Light:
bulb = _mocked_bulb()
bulb.product = 19 # LIFX White 900 BR30 (High Voltage)
return bulb
def _mocked_brightness_bulb() -> Light:
bulb = _mocked_bulb()
bulb.product = 51 # LIFX Mini White
return bulb
def _mocked_light_strip() -> Light:
bulb = _mocked_bulb()
bulb.product = 31 # LIFX Z
bulb.get_color_zones = MockLifxCommand(bulb)
bulb.set_color_zones = MockLifxCommand(bulb)
bulb.color_zones = [MagicMock(), MagicMock()]
return bulb
def _mocked_bulb_new_firmware() -> Light:
bulb = _mocked_bulb()
bulb.host_firmware_version = "3.90"
return bulb
def _mocked_relay() -> Light:
bulb = _mocked_bulb()
bulb.product = 70 # LIFX Switch
return bulb
def _patch_device(device: Light | None = None, no_device: bool = False):
"""Patch out discovery."""
class MockLifxConnecton:
"""Mock lifx discovery."""
def __init__(self, *args, **kwargs):
"""Init connection."""
if no_device:
self.device = _mocked_failing_bulb()
else:
self.device = device or _mocked_bulb()
self.device.mac_addr = TARGET_ANY
async def async_setup(self):
"""Mock setup."""
def async_stop(self):
"""Mock teardown."""
@contextmanager
def _patcher():
with patch("homeassistant.components.lifx.LIFXConnection", MockLifxConnecton):
yield
return _patcher()
def _patch_discovery(device: Light | None = None, no_device: bool = False):
"""Patch out discovery."""
class MockLifxDiscovery:
"""Mock lifx discovery."""
def __init__(self, *args, **kwargs):
"""Init discovery."""
if no_device:
self.lights = {}
return
discovered = device or _mocked_bulb()
self.lights = {discovered.mac_addr: discovered}
def start(self):
"""Mock start."""
def cleanup(self):
"""Mock cleanup."""
@contextmanager
def _patcher():
with patch.object(discovery, "DEFAULT_TIMEOUT", 0), patch(
"homeassistant.components.lifx.discovery.LifxDiscovery", MockLifxDiscovery
):
yield
return _patcher()
def _patch_config_flow_try_connect(
device: Light | None = None, no_device: bool = False
):
"""Patch out discovery."""
class MockLifxConnecton:
"""Mock lifx discovery."""
def __init__(self, *args, **kwargs):
"""Init connection."""
if no_device:
self.device = _mocked_failing_bulb()
else:
self.device = device or _mocked_bulb()
self.device.mac_addr = TARGET_ANY
async def async_setup(self):
"""Mock setup."""
def async_stop(self):
"""Mock teardown."""
@contextmanager
def _patcher():
with patch(
"homeassistant.components.lifx.config_flow.LIFXConnection",
MockLifxConnecton,
):
yield
return _patcher()

View File

@ -0,0 +1,57 @@
"""Tests for the lifx integration."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from tests.common import mock_device_registry, mock_registry
@pytest.fixture
def mock_effect_conductor():
"""Mock the effect conductor."""
class MockConductor:
def __init__(self, *args, **kwargs) -> None:
"""Mock the conductor."""
self.start = AsyncMock()
self.stop = AsyncMock()
def effect(self, bulb):
"""Mock effect."""
return MagicMock()
mock_conductor = MockConductor()
with patch(
"homeassistant.components.lifx.manager.aiolifx_effects.Conductor",
return_value=mock_conductor,
):
yield mock_conductor
@pytest.fixture(autouse=True)
def lifx_mock_get_source_ip(mock_get_source_ip):
"""Mock network util's async_get_source_ip."""
@pytest.fixture(autouse=True)
def lifx_mock_async_get_ipv4_broadcast_addresses():
"""Mock network util's async_get_ipv4_broadcast_addresses."""
with patch(
"homeassistant.components.network.async_get_ipv4_broadcast_addresses",
return_value=["255.255.255.255"],
):
yield
@pytest.fixture(name="device_reg")
def device_reg_fixture(hass):
"""Return an empty, loaded, registry."""
return mock_device_registry(hass)
@pytest.fixture(name="entity_reg")
def entity_reg_fixture(hass):
"""Return an empty, loaded, registry."""
return mock_registry(hass)

View File

@ -0,0 +1,508 @@
"""Tests for the lifx integration config flow."""
import socket
from unittest.mock import patch
import pytest
from homeassistant import config_entries
from homeassistant.components import dhcp, zeroconf
from homeassistant.components.lifx import DOMAIN
from homeassistant.components.lifx.const import CONF_SERIAL
from homeassistant.const import CONF_DEVICE, CONF_HOST
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import RESULT_TYPE_ABORT, RESULT_TYPE_FORM
from . import (
DEFAULT_ENTRY_TITLE,
IP_ADDRESS,
LABEL,
MAC_ADDRESS,
MODULE,
SERIAL,
_mocked_failing_bulb,
_mocked_relay,
_patch_config_flow_try_connect,
_patch_discovery,
)
from tests.common import MockConfigEntry
async def test_discovery(hass: HomeAssistant):
"""Test setting up discovery."""
with _patch_discovery(), _patch_config_flow_try_connect():
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await hass.async_block_till_done()
assert result["type"] == "form"
assert result["step_id"] == "user"
assert not result["errors"]
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], {})
await hass.async_block_till_done()
assert result2["type"] == "form"
assert result2["step_id"] == "pick_device"
assert not result2["errors"]
# test we can try again
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
assert not result["errors"]
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], {})
await hass.async_block_till_done()
assert result2["type"] == "form"
assert result2["step_id"] == "pick_device"
assert not result2["errors"]
with _patch_discovery(), _patch_config_flow_try_connect(), patch(
f"{MODULE}.async_setup", return_value=True
) as mock_setup, patch(
f"{MODULE}.async_setup_entry", return_value=True
) as mock_setup_entry:
result3 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_DEVICE: SERIAL},
)
await hass.async_block_till_done()
assert result3["type"] == "create_entry"
assert result3["title"] == DEFAULT_ENTRY_TITLE
assert result3["data"] == {CONF_HOST: IP_ADDRESS}
mock_setup.assert_called_once()
mock_setup_entry.assert_called_once()
# ignore configured devices
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
assert not result["errors"]
with _patch_discovery(), _patch_config_flow_try_connect():
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], {})
await hass.async_block_till_done()
assert result2["type"] == "abort"
assert result2["reason"] == "no_devices_found"
async def test_discovery_but_cannot_connect(hass: HomeAssistant):
"""Test we can discover the device but we cannot connect."""
with _patch_discovery(), _patch_config_flow_try_connect(no_device=True):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await hass.async_block_till_done()
assert result["type"] == "form"
assert result["step_id"] == "user"
assert not result["errors"]
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], {})
await hass.async_block_till_done()
assert result2["type"] == "form"
assert result2["step_id"] == "pick_device"
assert not result2["errors"]
result3 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_DEVICE: SERIAL},
)
await hass.async_block_till_done()
assert result3["type"] == "abort"
assert result3["reason"] == "cannot_connect"
async def test_discovery_with_existing_device_present(hass: HomeAssistant):
"""Test setting up discovery."""
config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: "127.0.0.2"}, unique_id="dd:dd:dd:dd:dd:dd"
)
config_entry.add_to_hass(hass)
with _patch_discovery(), _patch_config_flow_try_connect(no_device=True):
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
assert not result["errors"]
with _patch_discovery(), _patch_config_flow_try_connect():
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], {})
await hass.async_block_till_done()
assert result2["type"] == "form"
assert result2["step_id"] == "pick_device"
assert not result2["errors"]
# Now abort and make sure we can start over
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
assert not result["errors"]
with _patch_discovery(), _patch_config_flow_try_connect():
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], {})
await hass.async_block_till_done()
assert result2["type"] == "form"
assert result2["step_id"] == "pick_device"
assert not result2["errors"]
with _patch_discovery(), _patch_config_flow_try_connect(), patch(
f"{MODULE}.async_setup_entry", return_value=True
) as mock_setup_entry:
result3 = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_DEVICE: SERIAL}
)
assert result3["type"] == "create_entry"
assert result3["title"] == DEFAULT_ENTRY_TITLE
assert result3["data"] == {
CONF_HOST: IP_ADDRESS,
}
await hass.async_block_till_done()
mock_setup_entry.assert_called_once()
# ignore configured devices
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
assert not result["errors"]
with _patch_discovery(), _patch_config_flow_try_connect():
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], {})
await hass.async_block_till_done()
assert result2["type"] == "abort"
assert result2["reason"] == "no_devices_found"
async def test_discovery_no_device(hass: HomeAssistant):
"""Test discovery without device."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
with _patch_discovery(no_device=True), _patch_config_flow_try_connect(
no_device=True
):
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], {})
await hass.async_block_till_done()
assert result2["type"] == "abort"
assert result2["reason"] == "no_devices_found"
async def test_manual(hass: HomeAssistant):
"""Test manually setup."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
assert not result["errors"]
# Cannot connect (timeout)
with _patch_discovery(no_device=True), _patch_config_flow_try_connect(
no_device=True
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_HOST: IP_ADDRESS}
)
await hass.async_block_till_done()
assert result2["type"] == "form"
assert result2["step_id"] == "user"
assert result2["errors"] == {"base": "cannot_connect"}
# Success
with _patch_discovery(), _patch_config_flow_try_connect(), patch(
f"{MODULE}.async_setup", return_value=True
), patch(f"{MODULE}.async_setup_entry", return_value=True):
result4 = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_HOST: IP_ADDRESS}
)
await hass.async_block_till_done()
assert result4["type"] == "create_entry"
assert result4["title"] == DEFAULT_ENTRY_TITLE
assert result4["data"] == {
CONF_HOST: IP_ADDRESS,
}
# Duplicate
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
with _patch_discovery(no_device=True), _patch_config_flow_try_connect(
no_device=True
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_HOST: IP_ADDRESS}
)
await hass.async_block_till_done()
assert result2["type"] == "abort"
assert result2["reason"] == "already_configured"
async def test_manual_dns_error(hass: HomeAssistant):
"""Test manually setup with unresolving host."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
assert not result["errors"]
class MockLifxConnectonDnsError:
"""Mock lifx discovery."""
def __init__(self, *args, **kwargs):
"""Init connection."""
self.device = _mocked_failing_bulb()
async def async_setup(self):
"""Mock setup."""
raise socket.gaierror()
def async_stop(self):
"""Mock teardown."""
# Cannot connect due to dns error
with _patch_discovery(no_device=True), patch(
"homeassistant.components.lifx.config_flow.LIFXConnection",
MockLifxConnectonDnsError,
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_HOST: "does.not.resolve"}
)
await hass.async_block_till_done()
assert result2["type"] == "form"
assert result2["step_id"] == "user"
assert result2["errors"] == {"base": "cannot_connect"}
async def test_manual_no_capabilities(hass: HomeAssistant):
"""Test manually setup without successful get_capabilities."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
assert not result["errors"]
with _patch_discovery(no_device=True), _patch_config_flow_try_connect(), patch(
f"{MODULE}.async_setup", return_value=True
), patch(f"{MODULE}.async_setup_entry", return_value=True):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_HOST: IP_ADDRESS}
)
await hass.async_block_till_done()
assert result["type"] == "create_entry"
assert result["data"] == {
CONF_HOST: IP_ADDRESS,
}
async def test_discovered_by_discovery_and_dhcp(hass):
"""Test we get the form with discovery and abort for dhcp source when we get both."""
with _patch_discovery(), _patch_config_flow_try_connect():
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_INTEGRATION_DISCOVERY},
data={CONF_HOST: IP_ADDRESS, CONF_SERIAL: SERIAL},
)
await hass.async_block_till_done()
assert result["type"] == RESULT_TYPE_FORM
assert result["errors"] is None
with _patch_discovery(), _patch_config_flow_try_connect():
result2 = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_DHCP},
data=dhcp.DhcpServiceInfo(
ip=IP_ADDRESS, macaddress=MAC_ADDRESS, hostname=LABEL
),
)
await hass.async_block_till_done()
assert result2["type"] == RESULT_TYPE_ABORT
assert result2["reason"] == "already_in_progress"
with _patch_discovery(), _patch_config_flow_try_connect():
result3 = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_DHCP},
data=dhcp.DhcpServiceInfo(
ip=IP_ADDRESS, macaddress="00:00:00:00:00:00", hostname="mock_hostname"
),
)
await hass.async_block_till_done()
assert result3["type"] == RESULT_TYPE_ABORT
assert result3["reason"] == "already_in_progress"
with _patch_discovery(no_device=True), _patch_config_flow_try_connect(
no_device=True
):
result3 = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_DHCP},
data=dhcp.DhcpServiceInfo(
ip="1.2.3.5", macaddress="00:00:00:00:00:01", hostname="mock_hostname"
),
)
await hass.async_block_till_done()
assert result3["type"] == RESULT_TYPE_ABORT
assert result3["reason"] == "cannot_connect"
@pytest.mark.parametrize(
"source, data",
[
(
config_entries.SOURCE_DHCP,
dhcp.DhcpServiceInfo(ip=IP_ADDRESS, macaddress=MAC_ADDRESS, hostname=LABEL),
),
(
config_entries.SOURCE_HOMEKIT,
zeroconf.ZeroconfServiceInfo(
host=IP_ADDRESS,
addresses=[IP_ADDRESS],
hostname=LABEL,
name=LABEL,
port=None,
properties={zeroconf.ATTR_PROPERTIES_ID: "any"},
type="mock_type",
),
),
(
config_entries.SOURCE_INTEGRATION_DISCOVERY,
{CONF_HOST: IP_ADDRESS, CONF_SERIAL: SERIAL},
),
],
)
async def test_discovered_by_dhcp_or_discovery(hass, source, data):
"""Test we can setup when discovered from dhcp or discovery."""
with _patch_discovery(), _patch_config_flow_try_connect():
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": source}, data=data
)
await hass.async_block_till_done()
assert result["type"] == RESULT_TYPE_FORM
assert result["errors"] is None
with _patch_discovery(), _patch_config_flow_try_connect(), patch(
f"{MODULE}.async_setup", return_value=True
) as mock_async_setup, patch(
f"{MODULE}.async_setup_entry", return_value=True
) as mock_async_setup_entry:
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], {})
await hass.async_block_till_done()
assert result2["type"] == "create_entry"
assert result2["data"] == {
CONF_HOST: IP_ADDRESS,
}
assert mock_async_setup.called
assert mock_async_setup_entry.called
@pytest.mark.parametrize(
"source, data",
[
(
config_entries.SOURCE_DHCP,
dhcp.DhcpServiceInfo(ip=IP_ADDRESS, macaddress=MAC_ADDRESS, hostname=LABEL),
),
(
config_entries.SOURCE_HOMEKIT,
zeroconf.ZeroconfServiceInfo(
host=IP_ADDRESS,
addresses=[IP_ADDRESS],
hostname=LABEL,
name=LABEL,
port=None,
properties={zeroconf.ATTR_PROPERTIES_ID: "any"},
type="mock_type",
),
),
(
config_entries.SOURCE_INTEGRATION_DISCOVERY,
{CONF_HOST: IP_ADDRESS, CONF_SERIAL: SERIAL},
),
],
)
async def test_discovered_by_dhcp_or_discovery_failed_to_get_device(hass, source, data):
"""Test we abort if we cannot get the unique id when discovered from dhcp."""
with _patch_discovery(no_device=True), _patch_config_flow_try_connect(
no_device=True
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": source}, data=data
)
await hass.async_block_till_done()
assert result["type"] == RESULT_TYPE_ABORT
assert result["reason"] == "cannot_connect"
async def test_discovered_by_dhcp_updates_ip(hass):
"""Update host from dhcp."""
config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: "127.0.0.2"}, unique_id=SERIAL
)
config_entry.add_to_hass(hass)
with _patch_discovery(no_device=True), _patch_config_flow_try_connect(
no_device=True
):
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_DHCP},
data=dhcp.DhcpServiceInfo(
ip=IP_ADDRESS, macaddress=MAC_ADDRESS, hostname=LABEL
),
)
await hass.async_block_till_done()
assert result["type"] == RESULT_TYPE_ABORT
assert result["reason"] == "already_configured"
assert config_entry.data[CONF_HOST] == IP_ADDRESS
async def test_refuse_relays(hass: HomeAssistant):
"""Test we refuse to setup relays."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
assert not result["errors"]
with _patch_discovery(device=_mocked_relay()), _patch_config_flow_try_connect(
device=_mocked_relay()
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_HOST: IP_ADDRESS}
)
await hass.async_block_till_done()
assert result2["type"] == "form"
assert result2["errors"] == {"base": "cannot_connect"}

View File

@ -0,0 +1,150 @@
"""Tests for the lifx component."""
from __future__ import annotations
from datetime import timedelta
import socket
from unittest.mock import patch
from homeassistant.components import lifx
from homeassistant.components.lifx import DOMAIN, discovery
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import CONF_HOST, EVENT_HOMEASSISTANT_STARTED
from homeassistant.setup import async_setup_component
from homeassistant.util import dt as dt_util
from . import (
IP_ADDRESS,
SERIAL,
MockFailingLifxCommand,
_mocked_bulb,
_mocked_failing_bulb,
_patch_config_flow_try_connect,
_patch_device,
_patch_discovery,
)
from tests.common import MockConfigEntry, async_fire_time_changed
async def test_configuring_lifx_causes_discovery(hass):
"""Test that specifying empty config does discovery."""
start_calls = 0
class MockLifxDiscovery:
"""Mock lifx discovery."""
def __init__(self, *args, **kwargs):
"""Init discovery."""
discovered = _mocked_bulb()
self.lights = {discovered.mac_addr: discovered}
def start(self):
"""Mock start."""
nonlocal start_calls
start_calls += 1
def cleanup(self):
"""Mock cleanup."""
with _patch_config_flow_try_connect(), patch.object(
discovery, "DEFAULT_TIMEOUT", 0
), patch(
"homeassistant.components.lifx.discovery.LifxDiscovery", MockLifxDiscovery
):
await async_setup_component(hass, lifx.DOMAIN, {lifx.DOMAIN: {}})
await hass.async_block_till_done()
assert start_calls == 0
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
assert start_calls == 1
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(minutes=5))
await hass.async_block_till_done()
assert start_calls == 2
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(minutes=15))
await hass.async_block_till_done()
assert start_calls == 3
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(minutes=30))
await hass.async_block_till_done()
assert start_calls == 4
async def test_config_entry_reload(hass):
"""Test that a config entry can be reloaded."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: "127.0.0.1"}, unique_id=SERIAL
)
already_migrated_config_entry.add_to_hass(hass)
with _patch_discovery(), _patch_config_flow_try_connect(), _patch_device():
await async_setup_component(hass, lifx.DOMAIN, {lifx.DOMAIN: {}})
await hass.async_block_till_done()
assert already_migrated_config_entry.state == ConfigEntryState.LOADED
await hass.config_entries.async_unload(already_migrated_config_entry.entry_id)
await hass.async_block_till_done()
assert already_migrated_config_entry.state == ConfigEntryState.NOT_LOADED
async def test_config_entry_retry(hass):
"""Test that a config entry can be retried."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: IP_ADDRESS}, unique_id=SERIAL
)
already_migrated_config_entry.add_to_hass(hass)
with _patch_discovery(no_device=True), _patch_config_flow_try_connect(
no_device=True
), _patch_device(no_device=True):
await async_setup_component(hass, lifx.DOMAIN, {lifx.DOMAIN: {}})
await hass.async_block_till_done()
assert already_migrated_config_entry.state == ConfigEntryState.SETUP_RETRY
async def test_get_version_fails(hass):
"""Test we handle get version failing."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: IP_ADDRESS}, unique_id=SERIAL
)
already_migrated_config_entry.add_to_hass(hass)
bulb = _mocked_bulb()
bulb.product = None
bulb.host_firmware_version = None
bulb.get_version = MockFailingLifxCommand(bulb)
with _patch_discovery(device=bulb), _patch_device(device=bulb):
await async_setup_component(hass, lifx.DOMAIN, {lifx.DOMAIN: {}})
await hass.async_block_till_done()
assert already_migrated_config_entry.state == ConfigEntryState.SETUP_RETRY
async def test_dns_error_at_startup(hass):
"""Test we handle get version failing."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: IP_ADDRESS}, unique_id=SERIAL
)
already_migrated_config_entry.add_to_hass(hass)
bulb = _mocked_failing_bulb()
class MockLifxConnectonDnsError:
"""Mock lifx connection with a dns error."""
def __init__(self, *args, **kwargs):
"""Init connection."""
self.device = bulb
async def async_setup(self):
"""Mock setup."""
raise socket.gaierror()
def async_stop(self):
"""Mock teardown."""
# Cannot connect due to dns error
with _patch_discovery(device=bulb), patch(
"homeassistant.components.lifx.LIFXConnection",
MockLifxConnectonDnsError,
):
await async_setup_component(hass, lifx.DOMAIN, {lifx.DOMAIN: {}})
await hass.async_block_till_done()
assert already_migrated_config_entry.state == ConfigEntryState.SETUP_RETRY

View File

@ -0,0 +1,993 @@
"""Tests for the lifx integration light platform."""
from datetime import timedelta
from unittest.mock import patch
import aiolifx_effects
import pytest
from homeassistant.components import lifx
from homeassistant.components.lifx import DOMAIN
from homeassistant.components.lifx.light import ATTR_INFRARED, ATTR_ZONES
from homeassistant.components.lifx.manager import SERVICE_EFFECT_COLORLOOP
from homeassistant.components.light import (
ATTR_BRIGHTNESS,
ATTR_COLOR_MODE,
ATTR_COLOR_TEMP,
ATTR_EFFECT,
ATTR_HS_COLOR,
ATTR_RGB_COLOR,
ATTR_SUPPORTED_COLOR_MODES,
ATTR_TRANSITION,
ATTR_XY_COLOR,
DOMAIN as LIGHT_DOMAIN,
ColorMode,
)
from homeassistant.const import ATTR_ENTITY_ID, CONF_HOST, STATE_OFF, STATE_UNAVAILABLE
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.setup import async_setup_component
from homeassistant.util import dt as dt_util
from . import (
IP_ADDRESS,
MAC_ADDRESS,
SERIAL,
MockFailingLifxCommand,
MockLifxCommand,
MockMessage,
_mocked_brightness_bulb,
_mocked_bulb,
_mocked_bulb_new_firmware,
_mocked_light_strip,
_mocked_white_bulb,
_patch_config_flow_try_connect,
_patch_device,
_patch_discovery,
)
from tests.common import MockConfigEntry, async_fire_time_changed
async def test_light_unique_id(hass: HomeAssistant) -> None:
"""Test a light unique id."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: "1.2.3.4"}, unique_id=SERIAL
)
already_migrated_config_entry.add_to_hass(hass)
bulb = _mocked_bulb()
with _patch_discovery(device=bulb), _patch_config_flow_try_connect(
device=bulb
), _patch_device(device=bulb):
await async_setup_component(hass, lifx.DOMAIN, {lifx.DOMAIN: {}})
await hass.async_block_till_done()
entity_id = "light.my_bulb"
entity_registry = er.async_get(hass)
assert entity_registry.async_get(entity_id).unique_id == SERIAL
device_registry = dr.async_get(hass)
device = device_registry.async_get_device(
identifiers=set(), connections={(dr.CONNECTION_NETWORK_MAC, SERIAL)}
)
assert device.identifiers == {(DOMAIN, SERIAL)}
async def test_light_unique_id_new_firmware(hass: HomeAssistant) -> None:
"""Test a light unique id with newer firmware."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: "1.2.3.4"}, unique_id=SERIAL
)
already_migrated_config_entry.add_to_hass(hass)
bulb = _mocked_bulb_new_firmware()
with _patch_discovery(device=bulb), _patch_config_flow_try_connect(
device=bulb
), _patch_device(device=bulb):
await async_setup_component(hass, lifx.DOMAIN, {lifx.DOMAIN: {}})
await hass.async_block_till_done()
entity_id = "light.my_bulb"
entity_registry = er.async_get(hass)
assert entity_registry.async_get(entity_id).unique_id == SERIAL
device_registry = dr.async_get(hass)
device = device_registry.async_get_device(
identifiers=set(),
connections={(dr.CONNECTION_NETWORK_MAC, MAC_ADDRESS)},
)
assert device.identifiers == {(DOMAIN, SERIAL)}
@patch("homeassistant.components.lifx.light.COLOR_ZONE_POPULATE_DELAY", 0)
async def test_light_strip(hass: HomeAssistant) -> None:
"""Test a light strip."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: "127.0.0.1"}, unique_id=SERIAL
)
already_migrated_config_entry.add_to_hass(hass)
bulb = _mocked_light_strip()
bulb.power_level = 65535
bulb.color = [65535, 65535, 65535, 65535]
with _patch_discovery(device=bulb), _patch_config_flow_try_connect(
device=bulb
), _patch_device(device=bulb):
await async_setup_component(hass, lifx.DOMAIN, {lifx.DOMAIN: {}})
await hass.async_block_till_done()
entity_id = "light.my_bulb"
state = hass.states.get(entity_id)
assert state.state == "on"
attributes = state.attributes
assert attributes[ATTR_BRIGHTNESS] == 255
assert attributes[ATTR_COLOR_MODE] == ColorMode.HS
assert attributes[ATTR_SUPPORTED_COLOR_MODES] == [
ColorMode.COLOR_TEMP,
ColorMode.HS,
]
assert attributes[ATTR_HS_COLOR] == (360.0, 100.0)
assert attributes[ATTR_RGB_COLOR] == (255, 0, 0)
assert attributes[ATTR_XY_COLOR] == (0.701, 0.299)
await hass.services.async_call(
LIGHT_DOMAIN, "turn_off", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
assert bulb.set_power.calls[0][0][0] is False
bulb.set_power.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN, "turn_on", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
assert bulb.set_power.calls[0][0][0] is True
bulb.set_power.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_BRIGHTNESS: 100},
blocking=True,
)
call_dict = bulb.set_color_zones.calls[0][1]
call_dict.pop("callb")
assert call_dict == {
"apply": 0,
"color": [],
"duration": 0,
"end_index": 0,
"start_index": 0,
}
bulb.set_color_zones.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_HS_COLOR: (10, 30)},
blocking=True,
)
call_dict = bulb.set_color_zones.calls[0][1]
call_dict.pop("callb")
assert call_dict == {
"apply": 0,
"color": [],
"duration": 0,
"end_index": 0,
"start_index": 0,
}
bulb.set_color_zones.reset_mock()
bulb.color_zones = [
(0, 65535, 65535, 3500),
(54612, 65535, 65535, 3500),
(54612, 65535, 65535, 3500),
(54612, 65535, 65535, 3500),
(46420, 65535, 65535, 3500),
(46420, 65535, 65535, 3500),
(46420, 65535, 65535, 3500),
(46420, 65535, 65535, 3500),
]
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_HS_COLOR: (10, 30)},
blocking=True,
)
# Single color uses the fast path
assert bulb.set_color.calls[0][0][0] == [1820, 19660, 65535, 3500]
bulb.set_color.reset_mock()
assert len(bulb.set_color_zones.calls) == 0
bulb.color_zones = [
(0, 65535, 65535, 3500),
(54612, 65535, 65535, 3500),
(54612, 65535, 65535, 3500),
(54612, 65535, 65535, 3500),
(46420, 65535, 65535, 3500),
(46420, 65535, 65535, 3500),
(46420, 65535, 65535, 3500),
(46420, 65535, 65535, 3500),
]
await hass.services.async_call(
DOMAIN,
"set_state",
{ATTR_ENTITY_ID: entity_id, ATTR_RGB_COLOR: (255, 10, 30)},
blocking=True,
)
# Single color uses the fast path
assert bulb.set_color.calls[0][0][0] == [64643, 62964, 65535, 3500]
bulb.set_color.reset_mock()
assert len(bulb.set_color_zones.calls) == 0
bulb.color_zones = [
(0, 65535, 65535, 3500),
(54612, 65535, 65535, 3500),
(54612, 65535, 65535, 3500),
(54612, 65535, 65535, 3500),
(46420, 65535, 65535, 3500),
(46420, 65535, 65535, 3500),
(46420, 65535, 65535, 3500),
(46420, 65535, 65535, 3500),
]
await hass.services.async_call(
DOMAIN,
"set_state",
{ATTR_ENTITY_ID: entity_id, ATTR_XY_COLOR: (0.3, 0.7)},
blocking=True,
)
# Single color uses the fast path
assert bulb.set_color.calls[0][0][0] == [15848, 65535, 65535, 3500]
bulb.set_color.reset_mock()
assert len(bulb.set_color_zones.calls) == 0
bulb.color_zones = [
(0, 65535, 65535, 3500),
(54612, 65535, 65535, 3500),
(54612, 65535, 65535, 3500),
(54612, 65535, 65535, 3500),
(46420, 65535, 65535, 3500),
(46420, 65535, 65535, 3500),
(46420, 65535, 65535, 3500),
(46420, 65535, 65535, 3500),
]
await hass.services.async_call(
DOMAIN,
"set_state",
{ATTR_ENTITY_ID: entity_id, ATTR_BRIGHTNESS: 128},
blocking=True,
)
# multiple zones in effect and we are changing the brightness
# we need to do each zone individually
assert len(bulb.set_color.calls) == 0
call_dict = bulb.set_color_zones.calls[0][1]
call_dict.pop("callb")
assert call_dict == {
"apply": 0,
"color": [0, 65535, 32896, 3500],
"duration": 0,
"end_index": 0,
"start_index": 0,
}
call_dict = bulb.set_color_zones.calls[1][1]
call_dict.pop("callb")
assert call_dict == {
"apply": 0,
"color": [54612, 65535, 32896, 3500],
"duration": 0,
"end_index": 1,
"start_index": 1,
}
call_dict = bulb.set_color_zones.calls[7][1]
call_dict.pop("callb")
assert call_dict == {
"apply": 1,
"color": [46420, 65535, 32896, 3500],
"duration": 0,
"end_index": 7,
"start_index": 7,
}
bulb.set_color_zones.reset_mock()
await hass.services.async_call(
DOMAIN,
"set_state",
{
ATTR_ENTITY_ID: entity_id,
ATTR_RGB_COLOR: (255, 255, 255),
ATTR_ZONES: [0, 2],
},
blocking=True,
)
# set a two zones
assert len(bulb.set_color.calls) == 0
call_dict = bulb.set_color_zones.calls[0][1]
call_dict.pop("callb")
assert call_dict == {
"apply": 0,
"color": [0, 0, 65535, 3500],
"duration": 0,
"end_index": 0,
"start_index": 0,
}
call_dict = bulb.set_color_zones.calls[1][1]
call_dict.pop("callb")
assert call_dict == {
"apply": 1,
"color": [0, 0, 65535, 3500],
"duration": 0,
"end_index": 2,
"start_index": 2,
}
bulb.set_color_zones.reset_mock()
bulb.get_color_zones.reset_mock()
bulb.set_power.reset_mock()
bulb.power_level = 0
await hass.services.async_call(
DOMAIN,
"set_state",
{ATTR_ENTITY_ID: entity_id, ATTR_RGB_COLOR: (255, 255, 255), ATTR_ZONES: [3]},
blocking=True,
)
# set a one zone
assert len(bulb.set_power.calls) == 2
assert len(bulb.get_color_zones.calls) == 2
assert len(bulb.set_color.calls) == 0
call_dict = bulb.set_color_zones.calls[0][1]
call_dict.pop("callb")
assert call_dict == {
"apply": 1,
"color": [0, 0, 65535, 3500],
"duration": 0,
"end_index": 3,
"start_index": 3,
}
bulb.get_color_zones.reset_mock()
bulb.set_power.reset_mock()
bulb.set_color_zones.reset_mock()
bulb.set_color_zones = MockFailingLifxCommand(bulb)
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
DOMAIN,
"set_state",
{
ATTR_ENTITY_ID: entity_id,
ATTR_RGB_COLOR: (255, 255, 255),
ATTR_ZONES: [3],
},
blocking=True,
)
bulb.set_color_zones = MockLifxCommand(bulb)
bulb.get_color_zones = MockFailingLifxCommand(bulb)
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
DOMAIN,
"set_state",
{
ATTR_ENTITY_ID: entity_id,
ATTR_RGB_COLOR: (255, 255, 255),
ATTR_ZONES: [3],
},
blocking=True,
)
bulb.get_color_zones = MockLifxCommand(bulb)
bulb.get_color = MockFailingLifxCommand(bulb)
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
DOMAIN,
"set_state",
{
ATTR_ENTITY_ID: entity_id,
ATTR_RGB_COLOR: (255, 255, 255),
ATTR_ZONES: [3],
},
blocking=True,
)
async def test_color_light_with_temp(
hass: HomeAssistant, mock_effect_conductor
) -> None:
"""Test a color light with temp."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: "127.0.0.1"}, unique_id=SERIAL
)
already_migrated_config_entry.add_to_hass(hass)
bulb = _mocked_bulb()
bulb.power_level = 65535
bulb.color = [65535, 65535, 65535, 65535]
with _patch_discovery(device=bulb), _patch_config_flow_try_connect(
device=bulb
), _patch_device(device=bulb):
await async_setup_component(hass, lifx.DOMAIN, {lifx.DOMAIN: {}})
await hass.async_block_till_done()
entity_id = "light.my_bulb"
state = hass.states.get(entity_id)
assert state.state == "on"
attributes = state.attributes
assert attributes[ATTR_BRIGHTNESS] == 255
assert attributes[ATTR_COLOR_MODE] == ColorMode.HS
assert attributes[ATTR_SUPPORTED_COLOR_MODES] == [
ColorMode.COLOR_TEMP,
ColorMode.HS,
]
assert attributes[ATTR_HS_COLOR] == (360.0, 100.0)
assert attributes[ATTR_RGB_COLOR] == (255, 0, 0)
assert attributes[ATTR_XY_COLOR] == (0.701, 0.299)
bulb.color = [32000, None, 32000, 6000]
await hass.services.async_call(
LIGHT_DOMAIN, "turn_on", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
assert bulb.set_power.calls[0][0][0] is True
bulb.set_power.reset_mock()
state = hass.states.get(entity_id)
assert state.state == "on"
attributes = state.attributes
assert attributes[ATTR_BRIGHTNESS] == 125
assert attributes[ATTR_COLOR_MODE] == ColorMode.COLOR_TEMP
assert attributes[ATTR_SUPPORTED_COLOR_MODES] == [
ColorMode.COLOR_TEMP,
ColorMode.HS,
]
assert attributes[ATTR_HS_COLOR] == (31.007, 6.862)
assert attributes[ATTR_RGB_COLOR] == (255, 246, 237)
assert attributes[ATTR_XY_COLOR] == (0.339, 0.338)
bulb.color = [65535, 65535, 65535, 65535]
await hass.services.async_call(
LIGHT_DOMAIN, "turn_off", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
assert bulb.set_power.calls[0][0][0] is False
bulb.set_power.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN, "turn_on", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
assert bulb.set_power.calls[0][0][0] is True
bulb.set_power.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_BRIGHTNESS: 100},
blocking=True,
)
assert bulb.set_color.calls[0][0][0] == [65535, 65535, 25700, 65535]
bulb.set_color.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_HS_COLOR: (10, 30)},
blocking=True,
)
assert bulb.set_color.calls[0][0][0] == [1820, 19660, 65535, 3500]
bulb.set_color.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_RGB_COLOR: (255, 30, 80)},
blocking=True,
)
assert bulb.set_color.calls[0][0][0] == [63107, 57824, 65535, 3500]
bulb.set_color.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_XY_COLOR: (0.46, 0.376)},
blocking=True,
)
assert bulb.set_color.calls[0][0][0] == [4956, 30583, 65535, 3500]
bulb.set_color.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_EFFECT: "effect_colorloop"},
blocking=True,
)
start_call = mock_effect_conductor.start.mock_calls
first_call = start_call[0][1]
assert isinstance(first_call[0], aiolifx_effects.EffectColorloop)
assert first_call[1][0] == bulb
mock_effect_conductor.start.reset_mock()
mock_effect_conductor.stop.reset_mock()
await hass.services.async_call(
DOMAIN,
SERVICE_EFFECT_COLORLOOP,
{ATTR_ENTITY_ID: entity_id, ATTR_BRIGHTNESS: 128},
blocking=True,
)
start_call = mock_effect_conductor.start.mock_calls
first_call = start_call[0][1]
assert isinstance(first_call[0], aiolifx_effects.EffectColorloop)
assert first_call[1][0] == bulb
mock_effect_conductor.start.reset_mock()
mock_effect_conductor.stop.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_EFFECT: "effect_pulse"},
blocking=True,
)
assert len(mock_effect_conductor.stop.mock_calls) == 1
start_call = mock_effect_conductor.start.mock_calls
first_call = start_call[0][1]
assert isinstance(first_call[0], aiolifx_effects.EffectPulse)
assert first_call[1][0] == bulb
mock_effect_conductor.start.reset_mock()
mock_effect_conductor.stop.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_EFFECT: "effect_stop"},
blocking=True,
)
assert len(mock_effect_conductor.stop.mock_calls) == 2
async def test_white_bulb(hass: HomeAssistant) -> None:
"""Test a white bulb."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: "127.0.0.1"}, unique_id=SERIAL
)
already_migrated_config_entry.add_to_hass(hass)
bulb = _mocked_white_bulb()
bulb.power_level = 65535
bulb.color = [32000, None, 32000, 6000]
with _patch_discovery(device=bulb), _patch_config_flow_try_connect(
device=bulb
), _patch_device(device=bulb):
await async_setup_component(hass, lifx.DOMAIN, {lifx.DOMAIN: {}})
await hass.async_block_till_done()
entity_id = "light.my_bulb"
state = hass.states.get(entity_id)
assert state.state == "on"
attributes = state.attributes
assert attributes[ATTR_BRIGHTNESS] == 125
assert attributes[ATTR_COLOR_MODE] == ColorMode.COLOR_TEMP
assert attributes[ATTR_SUPPORTED_COLOR_MODES] == [
ColorMode.COLOR_TEMP,
]
assert attributes[ATTR_COLOR_TEMP] == 166
await hass.services.async_call(
LIGHT_DOMAIN, "turn_off", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
assert bulb.set_power.calls[0][0][0] is False
bulb.set_power.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN, "turn_on", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
assert bulb.set_power.calls[0][0][0] is True
bulb.set_power.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_BRIGHTNESS: 100},
blocking=True,
)
assert bulb.set_color.calls[0][0][0] == [32000, None, 25700, 6000]
bulb.set_color.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_COLOR_TEMP: 400},
blocking=True,
)
assert bulb.set_color.calls[0][0][0] == [32000, 0, 32000, 2500]
bulb.set_color.reset_mock()
async def test_config_zoned_light_strip_fails(hass):
"""Test we handle failure to update zones."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: IP_ADDRESS}, unique_id=SERIAL
)
already_migrated_config_entry.add_to_hass(hass)
light_strip = _mocked_light_strip()
entity_id = "light.my_bulb"
class MockFailingLifxCommand:
"""Mock a lifx command that fails on the 3rd try."""
def __init__(self, bulb, **kwargs):
"""Init command."""
self.bulb = bulb
self.call_count = 0
def __call__(self, callb=None, *args, **kwargs):
"""Call command."""
self.call_count += 1
response = None if self.call_count >= 3 else MockMessage()
if callb:
callb(self.bulb, response)
light_strip.get_color_zones = MockFailingLifxCommand(light_strip)
with _patch_discovery(device=light_strip), _patch_device(device=light_strip):
await async_setup_component(hass, lifx.DOMAIN, {lifx.DOMAIN: {}})
await hass.async_block_till_done()
entity_registry = er.async_get(hass)
assert entity_registry.async_get(entity_id).unique_id == SERIAL
assert hass.states.get(entity_id).state == STATE_OFF
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=30))
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
async def test_white_light_fails(hass):
"""Test we handle failure to power on off."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: IP_ADDRESS}, unique_id=SERIAL
)
already_migrated_config_entry.add_to_hass(hass)
bulb = _mocked_white_bulb()
entity_id = "light.my_bulb"
bulb.set_power = MockFailingLifxCommand(bulb)
with _patch_discovery(device=bulb), _patch_device(device=bulb):
await async_setup_component(hass, lifx.DOMAIN, {lifx.DOMAIN: {}})
await hass.async_block_till_done()
entity_registry = er.async_get(hass)
assert entity_registry.async_get(entity_id).unique_id == SERIAL
assert hass.states.get(entity_id).state == STATE_OFF
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
LIGHT_DOMAIN, "turn_on", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
assert bulb.set_power.calls[0][0][0] is True
bulb.set_power.reset_mock()
bulb.set_power = MockLifxCommand(bulb)
bulb.set_color = MockFailingLifxCommand(bulb)
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_COLOR_TEMP: 153},
blocking=True,
)
assert bulb.set_color.calls[0][0][0] == [1, 0, 3, 6535]
bulb.set_color.reset_mock()
async def test_brightness_bulb(hass: HomeAssistant) -> None:
"""Test a brightness only bulb."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: "127.0.0.1"}, unique_id=SERIAL
)
already_migrated_config_entry.add_to_hass(hass)
bulb = _mocked_brightness_bulb()
bulb.power_level = 65535
bulb.color = [32000, None, 32000, 6000]
with _patch_discovery(device=bulb), _patch_config_flow_try_connect(
device=bulb
), _patch_device(device=bulb):
await async_setup_component(hass, lifx.DOMAIN, {lifx.DOMAIN: {}})
await hass.async_block_till_done()
entity_id = "light.my_bulb"
state = hass.states.get(entity_id)
assert state.state == "on"
attributes = state.attributes
assert attributes[ATTR_BRIGHTNESS] == 125
assert attributes[ATTR_COLOR_MODE] == ColorMode.BRIGHTNESS
assert attributes[ATTR_SUPPORTED_COLOR_MODES] == [
ColorMode.BRIGHTNESS,
]
await hass.services.async_call(
LIGHT_DOMAIN, "turn_off", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
assert bulb.set_power.calls[0][0][0] is False
bulb.set_power.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN, "turn_on", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
assert bulb.set_power.calls[0][0][0] is True
bulb.set_power.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_BRIGHTNESS: 100},
blocking=True,
)
assert bulb.set_color.calls[0][0][0] == [32000, None, 25700, 6000]
bulb.set_color.reset_mock()
async def test_transitions_brightness_only(hass: HomeAssistant) -> None:
"""Test transitions with a brightness only device."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: "127.0.0.1"}, unique_id=SERIAL
)
already_migrated_config_entry.add_to_hass(hass)
bulb = _mocked_brightness_bulb()
bulb.power_level = 65535
bulb.color = [32000, None, 32000, 6000]
with _patch_discovery(device=bulb), _patch_config_flow_try_connect(
device=bulb
), _patch_device(device=bulb):
await async_setup_component(hass, lifx.DOMAIN, {lifx.DOMAIN: {}})
await hass.async_block_till_done()
entity_id = "light.my_bulb"
state = hass.states.get(entity_id)
assert state.state == "on"
attributes = state.attributes
assert attributes[ATTR_BRIGHTNESS] == 125
assert attributes[ATTR_COLOR_MODE] == ColorMode.BRIGHTNESS
assert attributes[ATTR_SUPPORTED_COLOR_MODES] == [
ColorMode.BRIGHTNESS,
]
await hass.services.async_call(
LIGHT_DOMAIN, "turn_off", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
assert bulb.set_power.calls[0][0][0] is False
bulb.set_power.reset_mock()
bulb.power_level = 0
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_TRANSITION: 5, ATTR_BRIGHTNESS: 100},
blocking=True,
)
assert bulb.set_power.calls[0][0][0] is True
call_dict = bulb.set_power.calls[0][1]
call_dict.pop("callb")
assert call_dict == {"duration": 5000}
bulb.set_power.reset_mock()
bulb.power_level = 0
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{ATTR_ENTITY_ID: entity_id, ATTR_TRANSITION: 5, ATTR_BRIGHTNESS: 200},
blocking=True,
)
assert bulb.set_power.calls[0][0][0] is True
call_dict = bulb.set_power.calls[0][1]
call_dict.pop("callb")
assert call_dict == {"duration": 5000}
bulb.set_power.reset_mock()
await hass.async_block_till_done()
bulb.get_color.reset_mock()
# Ensure we force an update after the transition
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=5))
await hass.async_block_till_done()
assert len(bulb.get_color.calls) == 2
async def test_transitions_color_bulb(hass: HomeAssistant) -> None:
"""Test transitions with a color bulb."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: "127.0.0.1"}, unique_id=SERIAL
)
already_migrated_config_entry.add_to_hass(hass)
bulb = _mocked_bulb_new_firmware()
bulb.power_level = 65535
bulb.color = [32000, None, 32000, 6000]
with _patch_discovery(device=bulb), _patch_config_flow_try_connect(
device=bulb
), _patch_device(device=bulb):
await async_setup_component(hass, lifx.DOMAIN, {lifx.DOMAIN: {}})
await hass.async_block_till_done()
entity_id = "light.my_bulb"
state = hass.states.get(entity_id)
assert state.state == "on"
attributes = state.attributes
assert attributes[ATTR_BRIGHTNESS] == 125
assert attributes[ATTR_COLOR_MODE] == ColorMode.COLOR_TEMP
await hass.services.async_call(
LIGHT_DOMAIN, "turn_off", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
assert bulb.set_power.calls[0][0][0] is False
bulb.set_power.reset_mock()
bulb.power_level = 0
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_off",
{
ATTR_ENTITY_ID: entity_id,
ATTR_TRANSITION: 5,
},
blocking=True,
)
assert bulb.set_power.calls[0][0][0] is False
call_dict = bulb.set_power.calls[0][1]
call_dict.pop("callb")
assert call_dict == {"duration": 0} # already off
bulb.set_power.reset_mock()
bulb.set_color.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{
ATTR_RGB_COLOR: (255, 5, 10),
ATTR_ENTITY_ID: entity_id,
ATTR_TRANSITION: 5,
ATTR_BRIGHTNESS: 100,
},
blocking=True,
)
assert bulb.set_color.calls[0][0][0] == [65316, 64249, 25700, 3500]
assert bulb.set_power.calls[0][0][0] is True
call_dict = bulb.set_power.calls[0][1]
call_dict.pop("callb")
assert call_dict == {"duration": 5000}
bulb.set_power.reset_mock()
bulb.set_color.reset_mock()
bulb.power_level = 12800
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{
ATTR_RGB_COLOR: (5, 5, 10),
ATTR_ENTITY_ID: entity_id,
ATTR_TRANSITION: 5,
ATTR_BRIGHTNESS: 200,
},
blocking=True,
)
assert bulb.set_color.calls[0][0][0] == [43690, 32767, 51400, 3500]
call_dict = bulb.set_color.calls[0][1]
call_dict.pop("callb")
assert call_dict == {"duration": 5000}
bulb.set_power.reset_mock()
bulb.set_color.reset_mock()
await hass.async_block_till_done()
bulb.get_color.reset_mock()
# Ensure we force an update after the transition
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=5))
await hass.async_block_till_done()
assert len(bulb.get_color.calls) == 2
bulb.set_power.reset_mock()
bulb.set_color.reset_mock()
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_off",
{
ATTR_ENTITY_ID: entity_id,
ATTR_TRANSITION: 5,
},
blocking=True,
)
assert bulb.set_power.calls[0][0][0] is False
call_dict = bulb.set_power.calls[0][1]
call_dict.pop("callb")
assert call_dict == {"duration": 5000}
bulb.set_power.reset_mock()
bulb.set_color.reset_mock()
async def test_infrared_color_bulb(hass: HomeAssistant) -> None:
"""Test setting infrared with a color bulb."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: "127.0.0.1"}, unique_id=SERIAL
)
already_migrated_config_entry.add_to_hass(hass)
bulb = _mocked_bulb_new_firmware()
bulb.power_level = 65535
bulb.color = [32000, None, 32000, 6000]
with _patch_discovery(device=bulb), _patch_config_flow_try_connect(
device=bulb
), _patch_device(device=bulb):
await async_setup_component(hass, lifx.DOMAIN, {lifx.DOMAIN: {}})
await hass.async_block_till_done()
entity_id = "light.my_bulb"
state = hass.states.get(entity_id)
assert state.state == "on"
attributes = state.attributes
assert attributes[ATTR_BRIGHTNESS] == 125
assert attributes[ATTR_COLOR_MODE] == ColorMode.COLOR_TEMP
await hass.services.async_call(
LIGHT_DOMAIN, "turn_off", {ATTR_ENTITY_ID: entity_id}, blocking=True
)
assert bulb.set_power.calls[0][0][0] is False
bulb.set_power.reset_mock()
await hass.services.async_call(
DOMAIN,
"set_state",
{
ATTR_INFRARED: 100,
ATTR_ENTITY_ID: entity_id,
ATTR_BRIGHTNESS: 100,
},
blocking=True,
)
assert bulb.set_infrared.calls[0][0][0] == 25700
async def test_color_bulb_is_actually_off(hass: HomeAssistant) -> None:
"""Test setting a color when we think a bulb is on but its actually off."""
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: "127.0.0.1"}, unique_id=SERIAL
)
already_migrated_config_entry.add_to_hass(hass)
bulb = _mocked_bulb_new_firmware()
bulb.power_level = 65535
bulb.color = [32000, None, 32000, 6000]
with _patch_discovery(device=bulb), _patch_config_flow_try_connect(
device=bulb
), _patch_device(device=bulb):
await async_setup_component(hass, lifx.DOMAIN, {lifx.DOMAIN: {}})
await hass.async_block_till_done()
entity_id = "light.my_bulb"
state = hass.states.get(entity_id)
assert state.state == "on"
class MockLifxCommandActuallyOff:
"""Mock a lifx command that will update our power level state."""
def __init__(self, bulb, **kwargs):
"""Init command."""
self.bulb = bulb
self.calls = []
def __call__(self, *args, **kwargs):
"""Call command."""
bulb.power_level = 0
if callb := kwargs.get("callb"):
callb(self.bulb, MockMessage())
self.calls.append([args, kwargs])
bulb.set_color = MockLifxCommandActuallyOff(bulb)
await hass.services.async_call(
LIGHT_DOMAIN,
"turn_on",
{
ATTR_RGB_COLOR: (100, 100, 100),
ATTR_ENTITY_ID: entity_id,
ATTR_BRIGHTNESS: 100,
},
blocking=True,
)
assert bulb.set_color.calls[0][0][0] == [0, 0, 25700, 3500]
assert len(bulb.set_power.calls) == 1

View File

@ -0,0 +1,281 @@
"""Tests the lifx migration."""
from __future__ import annotations
from datetime import timedelta
from unittest.mock import patch
from homeassistant import setup
from homeassistant.components import lifx
from homeassistant.components.lifx import DOMAIN, discovery
from homeassistant.const import CONF_HOST, EVENT_HOMEASSISTANT_STARTED
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers.device_registry import DeviceRegistry
from homeassistant.helpers.entity_registry import EntityRegistry
from homeassistant.setup import async_setup_component
from homeassistant.util import dt as dt_util
from . import (
IP_ADDRESS,
LABEL,
MAC_ADDRESS,
SERIAL,
_mocked_bulb,
_patch_config_flow_try_connect,
_patch_device,
_patch_discovery,
)
from tests.common import MockConfigEntry, async_fire_time_changed
async def test_migration_device_online_end_to_end(
hass: HomeAssistant, device_reg: DeviceRegistry, entity_reg: EntityRegistry
):
"""Test migration from single config entry."""
config_entry = MockConfigEntry(
domain=DOMAIN, title="LEGACY", data={}, unique_id=DOMAIN
)
config_entry.add_to_hass(hass)
device = device_reg.async_get_or_create(
config_entry_id=config_entry.entry_id,
identifiers={(DOMAIN, SERIAL)},
connections={(dr.CONNECTION_NETWORK_MAC, MAC_ADDRESS)},
name=LABEL,
)
light_entity_reg = entity_reg.async_get_or_create(
config_entry=config_entry,
platform=DOMAIN,
domain="light",
unique_id=dr.format_mac(SERIAL),
original_name=LABEL,
device_id=device.id,
)
with _patch_discovery(), _patch_config_flow_try_connect(), _patch_device():
await setup.async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
migrated_entry = None
for entry in hass.config_entries.async_entries(DOMAIN):
if entry.unique_id == DOMAIN:
migrated_entry = entry
break
assert migrated_entry is not None
assert device.config_entries == {migrated_entry.entry_id}
assert light_entity_reg.config_entry_id == migrated_entry.entry_id
assert er.async_entries_for_config_entry(entity_reg, config_entry) == []
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(minutes=20))
await hass.async_block_till_done()
legacy_entry = None
for entry in hass.config_entries.async_entries(DOMAIN):
if entry.unique_id == DOMAIN:
legacy_entry = entry
break
assert legacy_entry is None
async def test_discovery_is_more_frequent_during_migration(
hass: HomeAssistant, device_reg: DeviceRegistry, entity_reg: EntityRegistry
):
"""Test that discovery is more frequent during migration."""
config_entry = MockConfigEntry(
domain=DOMAIN, title="LEGACY", data={}, unique_id=DOMAIN
)
config_entry.add_to_hass(hass)
device = device_reg.async_get_or_create(
config_entry_id=config_entry.entry_id,
identifiers={(DOMAIN, SERIAL)},
connections={(dr.CONNECTION_NETWORK_MAC, MAC_ADDRESS)},
name=LABEL,
)
entity_reg.async_get_or_create(
config_entry=config_entry,
platform=DOMAIN,
domain="light",
unique_id=dr.format_mac(SERIAL),
original_name=LABEL,
device_id=device.id,
)
bulb = _mocked_bulb()
start_calls = 0
class MockLifxDiscovery:
"""Mock lifx discovery."""
def __init__(self, *args, **kwargs):
"""Init discovery."""
self.bulb = bulb
self.lights = {}
def start(self):
"""Mock start."""
nonlocal start_calls
start_calls += 1
# Discover the bulb so we can complete migration
# and verify we switch back to normal discovery
# interval
if start_calls == 4:
self.lights = {self.bulb.mac_addr: self.bulb}
def cleanup(self):
"""Mock cleanup."""
with _patch_device(device=bulb), _patch_config_flow_try_connect(
device=bulb
), patch.object(discovery, "DEFAULT_TIMEOUT", 0), patch(
"homeassistant.components.lifx.discovery.LifxDiscovery", MockLifxDiscovery
):
await async_setup_component(hass, lifx.DOMAIN, {lifx.DOMAIN: {}})
await hass.async_block_till_done()
assert start_calls == 0
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
assert start_calls == 1
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(minutes=5))
await hass.async_block_till_done()
assert start_calls == 3
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(minutes=10))
await hass.async_block_till_done()
assert start_calls == 4
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(minutes=15))
await hass.async_block_till_done()
assert start_calls == 5
async def test_migration_device_online_end_to_end_after_downgrade(
hass: HomeAssistant, device_reg: DeviceRegistry, entity_reg: EntityRegistry
):
"""Test migration from single config entry can happen again after a downgrade."""
config_entry = MockConfigEntry(
domain=DOMAIN, title="LEGACY", data={}, unique_id=DOMAIN
)
config_entry.add_to_hass(hass)
already_migrated_config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_HOST: IP_ADDRESS}, unique_id=SERIAL
)
already_migrated_config_entry.add_to_hass(hass)
device = device_reg.async_get_or_create(
config_entry_id=config_entry.entry_id,
identifiers={(DOMAIN, SERIAL)},
connections={(dr.CONNECTION_NETWORK_MAC, MAC_ADDRESS)},
name=LABEL,
)
light_entity_reg = entity_reg.async_get_or_create(
config_entry=config_entry,
platform=DOMAIN,
domain="light",
unique_id=SERIAL,
original_name=LABEL,
device_id=device.id,
)
with _patch_discovery(), _patch_config_flow_try_connect(), _patch_device():
await setup.async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(minutes=20))
await hass.async_block_till_done()
assert device.config_entries == {config_entry.entry_id}
assert light_entity_reg.config_entry_id == config_entry.entry_id
assert er.async_entries_for_config_entry(entity_reg, config_entry) == []
legacy_entry = None
for entry in hass.config_entries.async_entries(DOMAIN):
if entry.unique_id == DOMAIN:
legacy_entry = entry
break
assert legacy_entry is None
async def test_migration_device_online_end_to_end_ignores_other_devices(
hass: HomeAssistant, device_reg: DeviceRegistry, entity_reg: EntityRegistry
):
"""Test migration from single config entry."""
legacy_config_entry = MockConfigEntry(
domain=DOMAIN, title="LEGACY", data={}, unique_id=DOMAIN
)
legacy_config_entry.add_to_hass(hass)
other_domain_config_entry = MockConfigEntry(
domain="other_domain", data={}, unique_id="other_domain"
)
other_domain_config_entry.add_to_hass(hass)
device = device_reg.async_get_or_create(
config_entry_id=legacy_config_entry.entry_id,
identifiers={(DOMAIN, SERIAL)},
connections={(dr.CONNECTION_NETWORK_MAC, MAC_ADDRESS)},
name=LABEL,
)
other_device = device_reg.async_get_or_create(
config_entry_id=other_domain_config_entry.entry_id,
connections={(dr.CONNECTION_NETWORK_MAC, "556655665566")},
name=LABEL,
)
light_entity_reg = entity_reg.async_get_or_create(
config_entry=legacy_config_entry,
platform=DOMAIN,
domain="light",
unique_id=SERIAL,
original_name=LABEL,
device_id=device.id,
)
ignored_entity_reg = entity_reg.async_get_or_create(
config_entry=other_domain_config_entry,
platform=DOMAIN,
domain="sensor",
unique_id="00:00:00:00:00:00_sensor",
original_name=LABEL,
device_id=device.id,
)
garbage_entity_reg = entity_reg.async_get_or_create(
config_entry=legacy_config_entry,
platform=DOMAIN,
domain="sensor",
unique_id="garbage",
original_name=LABEL,
device_id=other_device.id,
)
with _patch_discovery(), _patch_config_flow_try_connect(), _patch_device():
await setup.async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(minutes=20))
await hass.async_block_till_done()
new_entry = None
legacy_entry = None
for entry in hass.config_entries.async_entries(DOMAIN):
if entry.unique_id == DOMAIN:
legacy_entry = entry
else:
new_entry = entry
assert new_entry is not None
assert legacy_entry is None
assert device.config_entries == {legacy_config_entry.entry_id}
assert light_entity_reg.config_entry_id == legacy_config_entry.entry_id
assert ignored_entity_reg.config_entry_id == other_domain_config_entry.entry_id
assert garbage_entity_reg.config_entry_id == legacy_config_entry.entry_id
assert er.async_entries_for_config_entry(entity_reg, legacy_config_entry) == []
assert dr.async_entries_for_config_entry(device_reg, legacy_config_entry) == []