Add new Rabbit Air integration (#66130)

* Add new Rabbit Air integration

* Remove py.typed file

It is not needed and was just accidentally added to the commit.

* Enable strict type checking for rabbitair component

Keeping the code fully type hinted is a good idea.

* Add missing type annotations

* Remove translation file

* Prevent data to be added to hass.data if refresh fails

* Reload the config entry when the options change

* Add missing type parameters for generics

* Avoid using assert in production code

* Move zeroconf to optional dependencies

* Remove unnecessary logging

Co-authored-by: Franck Nijhof <frenck@frenck.nl>

* Remove unused keys from the manifest

Co-authored-by: Franck Nijhof <frenck@frenck.nl>

* Replace property with attr

Co-authored-by: Franck Nijhof <frenck@frenck.nl>

* Allow to return None for power

The type of the is_on property now allows this.

Co-authored-by: Franck Nijhof <frenck@frenck.nl>

* Remove unnecessary method call

Co-authored-by: Franck Nijhof <frenck@frenck.nl>

* Update the python library

The new version properly re-exports names from the package root.

* Remove options flow

Scan interval should not be part of integration configuration. This was
the only option, so the options flow can be fully removed.

* Replace properties with attrs

* Remove multiline ternary operator

* Use NamedTuple for hass.data

* Remove unused logger variable

* Move async_setup_entry up in the file

* Adjust debouncer settings to use request_refresh

* Prevent status updates during the cooldown period

* Move device polling code to the update coordinator

* Fix the problem with the switch jumping back and forth

The UI seems to have a timeout of 2 seconds somewhere, which is just a
little bit less than what we normally need to get an updated state. So
the power switch would jump to its previous state and then immediately
return to the new state.

* Update the python library

The new version fixes errors when multiple requests are executed
simultaneously.

* Fix incorrect check for pending call in debouncer

This caused the polling to stop.

* Fix tests

* Update .coveragerc to exclude new file.
* Remove test for Options Flow.

* Update the existing entry when device access details change

* Add Zeroconf discovery step

* Fix tests

The ZeroconfServiceInfo constructor now requires one more argument.

* Fix typing for CoordinatorEntity

* Fix signature of async_turn_on

* Fix depreciation warnings

* Fix manifest formatting

* Fix warning about debouncer typing

relates to 5ae5ae5392

* Wait for config entry platform forwards

* Apply some of the suggested changes

* Do not put the MAC address in the title. Use a fixed title instead.
* Do not format the MAC to use as a unique ID.
* Do not catch exceptions in _async_update_data().
* Remove unused _entry field in the base entity class.
* Use the standard attribute self._attr_is_on to keep the power state.

* Store the MAC in the config entry data

* Change the order of except clauses

OSError is an ancestor class of TimeoutError, so TimeoutError should be
handled first

* Fix depreciation warnings

* Fix tests

The ZeroconfServiceInfo constructor arguments have changed.

* Fix DeviceInfo import

* Rename the method to make it clearer what it does

* Apply suggestions from code review

* Fix tests

* Change speed/mode logic to use is_on from the base class

* A zero value is more appropriate than None

since None means "unknown", but we actually know that the speed is zero
when the power is off.

---------

Co-authored-by: Franck Nijhof <frenck@frenck.nl>
Co-authored-by: Erik Montnemery <erik@montnemery.com>
This commit is contained in:
Alexander Somov 2024-01-05 18:34:28 +03:00 committed by GitHub
parent 8bbfee7801
commit d754ea7e22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 743 additions and 0 deletions

View File

@ -1002,6 +1002,11 @@ omit =
homeassistant/components/qrcode/image_processing.py
homeassistant/components/quantum_gateway/device_tracker.py
homeassistant/components/qvr_pro/*
homeassistant/components/rabbitair/__init__.py
homeassistant/components/rabbitair/const.py
homeassistant/components/rabbitair/coordinator.py
homeassistant/components/rabbitair/entity.py
homeassistant/components/rabbitair/fan.py
homeassistant/components/rachio/__init__.py
homeassistant/components/rachio/binary_sensor.py
homeassistant/components/rachio/device.py

View File

@ -307,6 +307,7 @@ homeassistant.components.purpleair.*
homeassistant.components.pushbullet.*
homeassistant.components.pvoutput.*
homeassistant.components.qnap_qsw.*
homeassistant.components.rabbitair.*
homeassistant.components.radarr.*
homeassistant.components.rainforest_raven.*
homeassistant.components.rainmachine.*

View File

@ -1036,6 +1036,8 @@ build.json @home-assistant/supervisor
/homeassistant/components/qvr_pro/ @oblogic7
/homeassistant/components/qwikswitch/ @kellerza
/tests/components/qwikswitch/ @kellerza
/homeassistant/components/rabbitair/ @rabbit-air
/tests/components/rabbitair/ @rabbit-air
/homeassistant/components/rachio/ @bdraco @rfverbruggen
/tests/components/rachio/ @bdraco @rfverbruggen
/homeassistant/components/radarr/ @tkdrob

View File

@ -0,0 +1,51 @@
"""The Rabbit Air integration."""
from __future__ import annotations
from rabbitair import Client, UdpClient
from homeassistant.components import zeroconf
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_HOST, Platform
from homeassistant.core import HomeAssistant
from .const import DOMAIN
from .coordinator import RabbitAirDataUpdateCoordinator
PLATFORMS: list[Platform] = [Platform.FAN]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Rabbit Air from a config entry."""
hass.data.setdefault(DOMAIN, {})
host: str = entry.data[CONF_HOST]
token: str = entry.data[CONF_ACCESS_TOKEN]
zeroconf_instance = await zeroconf.async_get_async_instance(hass)
device: Client = UdpClient(host, token, zeroconf=zeroconf_instance)
coordinator = RabbitAirDataUpdateCoordinator(hass, device)
await coordinator.async_config_entry_first_refresh()
hass.data[DOMAIN][entry.entry_id] = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(entry.add_update_listener(update_listener))
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok
async def update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Handle options update."""
await hass.config_entries.async_reload(entry.entry_id)

View File

@ -0,0 +1,126 @@
"""Config flow for Rabbit Air integration."""
from __future__ import annotations
import asyncio
import logging
from typing import Any
from rabbitair import UdpClient
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components import zeroconf
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_HOST, CONF_MAC
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResult
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import device_registry as dr
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
async def validate_input(hass: HomeAssistant, data: dict[str, Any]) -> dict[str, Any]:
"""Validate the user input allows us to connect."""
try:
try:
zeroconf_instance = await zeroconf.async_get_async_instance(hass)
with UdpClient(
data[CONF_HOST], data[CONF_ACCESS_TOKEN], zeroconf=zeroconf_instance
) as client:
info = await client.get_info()
except Exception as err:
_LOGGER.debug("Connection attempt failed: %s", err)
raise
except ValueError as err:
# Most likely caused by the invalid access token.
raise InvalidAccessToken from err
except asyncio.TimeoutError as err:
# Either the host doesn't respond or the auth failed.
raise TimeoutConnect from err
except OSError as err:
# Most likely caused by the invalid host.
raise InvalidHost from err
except Exception as err:
# Other possible errors.
raise CannotConnect from err
# Return info to store in the config entry.
return {"mac": info.mac}
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Rabbit Air."""
VERSION = 1
_discovered_host: str | None = None
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle the initial step."""
errors = {}
if user_input is not None:
try:
info = await validate_input(self.hass, user_input)
except CannotConnect:
errors["base"] = "cannot_connect"
except InvalidAccessToken:
errors["base"] = "invalid_access_token"
except InvalidHost:
errors["base"] = "invalid_host"
except TimeoutConnect:
errors["base"] = "timeout_connect"
except Exception as err: # pylint: disable=broad-except
_LOGGER.debug("Unexpected exception: %s", err)
errors["base"] = "unknown"
else:
user_input[CONF_MAC] = info["mac"]
await self.async_set_unique_id(dr.format_mac(info["mac"]))
self._abort_if_unique_id_configured(updates=user_input)
return self.async_create_entry(title="Rabbit Air", data=user_input)
user_input = user_input or {}
host = user_input.get(CONF_HOST, self._discovered_host)
token = user_input.get(CONF_ACCESS_TOKEN)
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{
vol.Required(CONF_HOST, default=host): str,
vol.Required(CONF_ACCESS_TOKEN, default=token): vol.All(
str, vol.Length(min=32, max=32)
),
}
),
errors=errors,
)
async def async_step_zeroconf(
self, discovery_info: zeroconf.ZeroconfServiceInfo
) -> FlowResult:
"""Handle zeroconf discovery."""
mac = dr.format_mac(discovery_info.properties["id"])
await self.async_set_unique_id(mac)
self._abort_if_unique_id_configured()
self._discovered_host = discovery_info.hostname.rstrip(".")
return await self.async_step_user()
class CannotConnect(HomeAssistantError):
"""Error to indicate we cannot connect."""
class InvalidAccessToken(HomeAssistantError):
"""Error to indicate the access token is not valid."""
class InvalidHost(HomeAssistantError):
"""Error to indicate the host is not valid."""
class TimeoutConnect(HomeAssistantError):
"""Error to indicate the connection attempt is timed out."""

View File

@ -0,0 +1,3 @@
"""Constants for the Rabbit Air integration."""
DOMAIN = "rabbitair"

View File

@ -0,0 +1,74 @@
"""Rabbit Air Update Coordinator."""
from collections.abc import Coroutine
from datetime import timedelta
import logging
from typing import Any, cast
from rabbitair import Client, State
from homeassistant.core import HomeAssistant
from homeassistant.helpers.debounce import Debouncer
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
_LOGGER = logging.getLogger(__name__)
class RabbitAirDebouncer(Debouncer[Coroutine[Any, Any, None]]):
"""Class to rate limit calls to a specific command."""
def __init__(
self,
hass: HomeAssistant,
) -> None:
"""Initialize debounce."""
# We don't want an immediate refresh since the device needs some time
# to apply the changes and reflect the updated state. Two seconds
# should be sufficient, since the internal cycle of the device runs at
# one-second intervals.
super().__init__(hass, _LOGGER, cooldown=2.0, immediate=False)
async def async_call(self) -> None:
"""Call the function."""
# Restart the timer.
self.async_cancel()
await super().async_call()
def has_pending_call(self) -> bool:
"""Indicate that the debouncer has a call waiting for cooldown."""
return self._execute_at_end_of_timer
class RabbitAirDataUpdateCoordinator(DataUpdateCoordinator[State]):
"""Class to manage fetching data from single endpoint."""
def __init__(self, hass: HomeAssistant, device: Client) -> None:
"""Initialize global data updater."""
self.device = device
super().__init__(
hass,
_LOGGER,
name="rabbitair",
update_interval=timedelta(seconds=10),
request_refresh_debouncer=RabbitAirDebouncer(hass),
)
async def _async_update_data(self) -> State:
return await self.device.get_state()
async def _async_refresh(
self,
log_failures: bool = True,
raise_on_auth_failed: bool = False,
scheduled: bool = False,
raise_on_entry_error: bool = False,
) -> None:
"""Refresh data."""
# Skip a scheduled refresh if there is a pending requested refresh.
debouncer = cast(RabbitAirDebouncer, self._debounced_refresh)
if scheduled and debouncer.has_pending_call():
return
await super()._async_refresh(
log_failures, raise_on_auth_failed, scheduled, raise_on_entry_error
)

View File

@ -0,0 +1,62 @@
"""A base class for Rabbit Air entities."""
from __future__ import annotations
import logging
from typing import Any
from rabbitair import Model
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_MAC
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN
from .coordinator import RabbitAirDataUpdateCoordinator
_LOGGER = logging.getLogger(__name__)
MODELS = {
Model.A3: "A3",
Model.BioGS: "BioGS 2.0",
Model.MinusA2: "MinusA2",
None: None,
}
class RabbitAirBaseEntity(CoordinatorEntity[RabbitAirDataUpdateCoordinator]):
"""Base class for Rabbit Air entity."""
def __init__(
self,
coordinator: RabbitAirDataUpdateCoordinator,
entry: ConfigEntry,
) -> None:
"""Initialize the entity."""
super().__init__(coordinator)
self._attr_name = entry.title
self._attr_unique_id = entry.unique_id
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, entry.data[CONF_MAC])},
manufacturer="Rabbit Air",
model=MODELS.get(coordinator.data.model),
name=entry.title,
sw_version=coordinator.data.wifi_firmware,
hw_version=coordinator.data.main_firmware,
)
def _is_model(self, model: Model | list[Model]) -> bool:
"""Check the model of the device."""
if isinstance(model, list):
return self.coordinator.data.model in model
return self.coordinator.data.model is model
async def _set_state(self, **kwargs: Any) -> None:
"""Change the state of the device."""
_LOGGER.debug("Set state %s", kwargs)
await self.coordinator.device.set_state(**kwargs)
# Force polling of the device, because changing one parameter often
# causes other parameters to change as well. By getting updated status
# we provide a better user experience, especially if the default
# polling interval is set too long.
await self.coordinator.async_request_refresh()

View File

@ -0,0 +1,147 @@
"""Support for Rabbit Air fan entity."""
from __future__ import annotations
from typing import Any
from rabbitair import Mode, Model, Speed
from homeassistant.components.fan import FanEntity, FanEntityFeature
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.util.percentage import (
ordered_list_item_to_percentage,
percentage_to_ordered_list_item,
)
from .const import DOMAIN
from .coordinator import RabbitAirDataUpdateCoordinator
from .entity import RabbitAirBaseEntity
SPEED_LIST = [
Speed.Silent,
Speed.Low,
Speed.Medium,
Speed.High,
Speed.Turbo,
]
PRESET_MODE_AUTO = "Auto"
PRESET_MODE_MANUAL = "Manual"
PRESET_MODE_POLLEN = "Pollen"
PRESET_MODES = {
PRESET_MODE_AUTO: Mode.Auto,
PRESET_MODE_MANUAL: Mode.Manual,
PRESET_MODE_POLLEN: Mode.Pollen,
}
async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""Set up a config entry."""
coordinator: RabbitAirDataUpdateCoordinator = hass.data[DOMAIN][entry.entry_id]
async_add_entities([RabbitAirFanEntity(coordinator, entry)])
class RabbitAirFanEntity(RabbitAirBaseEntity, FanEntity):
"""Fan control functions of the Rabbit Air air purifier."""
_attr_supported_features = FanEntityFeature.PRESET_MODE | FanEntityFeature.SET_SPEED
def __init__(
self,
coordinator: RabbitAirDataUpdateCoordinator,
entry: ConfigEntry,
) -> None:
"""Initialize the entity."""
super().__init__(coordinator, entry)
if self._is_model(Model.MinusA2):
self._attr_preset_modes = list(PRESET_MODES)
elif self._is_model(Model.A3):
# A3 does not support Pollen mode
self._attr_preset_modes = [
k for k in PRESET_MODES if k != PRESET_MODE_POLLEN
]
self._attr_speed_count = len(SPEED_LIST)
self._get_state_from_coordinator_data()
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
self._get_state_from_coordinator_data()
super()._handle_coordinator_update()
def _get_state_from_coordinator_data(self) -> None:
"""Populate the entity fields with values from the coordinator data."""
data = self.coordinator.data
# Speed as a percentage
if not data.power:
self._attr_percentage = 0
elif data.speed is None:
self._attr_percentage = None
elif data.speed is Speed.SuperSilent:
self._attr_percentage = 1
else:
self._attr_percentage = ordered_list_item_to_percentage(
SPEED_LIST, data.speed
)
# Preset mode
if not data.power or data.mode is None:
self._attr_preset_mode = None
else:
# Get key by value in dictionary
self._attr_preset_mode = next(
k for k, v in PRESET_MODES.items() if v == data.mode
)
async def async_set_preset_mode(self, preset_mode: str) -> None:
"""Set new preset mode."""
await self._set_state(power=True, mode=PRESET_MODES[preset_mode])
self._attr_preset_mode = preset_mode
self.async_write_ha_state()
async def async_set_percentage(self, percentage: int) -> None:
"""Set the speed of the fan, as a percentage."""
if percentage > 0:
value = percentage_to_ordered_list_item(SPEED_LIST, percentage)
await self._set_state(power=True, speed=value)
self._attr_percentage = percentage
else:
await self._set_state(power=False)
self._attr_percentage = 0
self._attr_preset_mode = None
self.async_write_ha_state()
async def async_turn_on(
self,
percentage: int | None = None,
preset_mode: str | None = None,
**kwargs: Any,
) -> None:
"""Turn on the fan."""
mode_value: Mode | None = None
if preset_mode is not None:
mode_value = PRESET_MODES[preset_mode]
speed_value: Speed | None = None
if percentage is not None:
speed_value = percentage_to_ordered_list_item(SPEED_LIST, percentage)
await self._set_state(power=True, mode=mode_value, speed=speed_value)
if percentage is not None:
self._attr_percentage = percentage
if preset_mode is not None:
self._attr_preset_mode = preset_mode
self.async_write_ha_state()
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the fan off."""
await self._set_state(power=False)
self._attr_percentage = 0
self._attr_preset_mode = None
self.async_write_ha_state()

View File

@ -0,0 +1,11 @@
{
"domain": "rabbitair",
"name": "Rabbit Air",
"after_dependencies": ["zeroconf"],
"codeowners": ["@rabbit-air"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/rabbitair",
"iot_class": "local_polling",
"requirements": ["python-rabbitair==0.0.8"],
"zeroconf": ["_rabbitair._udp.local."]
}

View File

@ -0,0 +1,22 @@
{
"config": {
"step": {
"user": {
"data": {
"host": "[%key:common::config_flow::data::host%]",
"access_token": "[%key:common::config_flow::data::access_token%]"
}
}
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_access_token": "[%key:common::config_flow::error::invalid_access_token%]",
"invalid_host": "[%key:common::config_flow::error::invalid_host%]",
"timeout_connect": "[%key:common::config_flow::error::timeout_connect%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]"
}
}
}

View File

@ -394,6 +394,7 @@ FLOWS = {
"qingping",
"qnap",
"qnap_qsw",
"rabbitair",
"rachio",
"radarr",
"radio_browser",

View File

@ -4664,6 +4664,12 @@
"config_flow": false,
"iot_class": "local_push"
},
"rabbitair": {
"name": "Rabbit Air",
"integration_type": "hub",
"config_flow": true,
"iot_class": "local_polling"
},
"rachio": {
"name": "Rachio",
"integration_type": "hub",

View File

@ -621,6 +621,11 @@ ZEROCONF = {
"name": "brother*",
},
],
"_rabbitair._udp.local.": [
{
"domain": "rabbitair",
},
],
"_raop._tcp.local.": [
{
"domain": "apple_tv",

View File

@ -2831,6 +2831,16 @@ disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.rabbitair.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.radarr.*]
check_untyped_defs = true
disallow_incomplete_defs = true

View File

@ -2237,6 +2237,9 @@ python-picnic-api==1.1.0
# homeassistant.components.qbittorrent
python-qbittorrent==0.4.3
# homeassistant.components.rabbitair
python-rabbitair==0.0.8
# homeassistant.components.ripple
python-ripple-api==0.0.3

View File

@ -1695,6 +1695,9 @@ python-picnic-api==1.1.0
# homeassistant.components.qbittorrent
python-qbittorrent==0.4.3
# homeassistant.components.rabbitair
python-rabbitair==0.0.8
# homeassistant.components.roborock
python-roborock==0.38.0

View File

@ -0,0 +1 @@
"""Tests for the RabbitAir integration."""

View File

@ -0,0 +1,210 @@
"""Test the RabbitAir config flow."""
from __future__ import annotations
import asyncio
from collections.abc import Generator
from ipaddress import ip_address
from unittest.mock import Mock, patch
import pytest
from rabbitair import Mode, Model, Speed
from homeassistant import config_entries
from homeassistant.components import zeroconf
from homeassistant.components.rabbitair.const import DOMAIN
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_HOST, CONF_MAC
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from homeassistant.helpers.device_registry import format_mac
TEST_HOST = "1.1.1.1"
TEST_NAME = "abcdef1234_123456789012345678"
TEST_TOKEN = "0123456789abcdef0123456789abcdef"
TEST_MAC = "01:23:45:67:89:AB"
TEST_FIRMWARE = "2.3.17"
TEST_HARDWARE = "1.0.0.4"
TEST_UNIQUE_ID = format_mac(TEST_MAC)
TEST_TITLE = "Rabbit Air"
ZEROCONF_DATA = zeroconf.ZeroconfServiceInfo(
ip_address=ip_address(TEST_HOST),
ip_addresses=[ip_address(TEST_HOST)],
port=9009,
hostname=f"{TEST_NAME}.local.",
type="_rabbitair._udp.local.",
name=f"{TEST_NAME}._rabbitair._udp.local.",
properties={"id": TEST_MAC.replace(":", "")},
)
@pytest.fixture(autouse=True)
def use_mocked_zeroconf(mock_async_zeroconf):
"""Mock zeroconf in all tests."""
@pytest.fixture
def rabbitair_connect() -> Generator[None, None, None]:
"""Mock connection."""
with patch("rabbitair.UdpClient.get_info", return_value=get_mock_info()), patch(
"rabbitair.UdpClient.get_state", return_value=get_mock_state()
):
yield
def get_mock_info(mac: str = TEST_MAC) -> Mock:
"""Return a mock device info instance."""
mock_info = Mock()
mock_info.mac = mac
return mock_info
def get_mock_state(
model: Model | None = Model.A3,
main_firmware: str | None = TEST_HARDWARE,
power: bool | None = True,
mode: Mode | None = Mode.Auto,
speed: Speed | None = Speed.Low,
wifi_firmware: str | None = TEST_FIRMWARE,
) -> Mock:
"""Return a mock device state instance."""
mock_state = Mock()
mock_state.model = model
mock_state.main_firmware = main_firmware
mock_state.power = power
mock_state.mode = mode
mock_state.speed = speed
mock_state.wifi_firmware = wifi_firmware
return mock_state
@pytest.mark.usefixtures("rabbitair_connect")
async def test_form(hass: HomeAssistant) -> None:
"""Test we get the form."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == FlowResultType.FORM
assert not result["errors"]
with patch(
"homeassistant.components.rabbitair.async_setup_entry",
return_value=True,
) as mock_setup_entry:
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_HOST: TEST_HOST,
CONF_ACCESS_TOKEN: TEST_TOKEN,
},
)
await hass.async_block_till_done()
assert result2["type"] == FlowResultType.CREATE_ENTRY
assert result2["title"] == TEST_TITLE
assert result2["data"] == {
CONF_HOST: TEST_HOST,
CONF_ACCESS_TOKEN: TEST_TOKEN,
CONF_MAC: TEST_MAC,
}
assert result2["result"].unique_id == TEST_UNIQUE_ID
assert len(mock_setup_entry.mock_calls) == 1
@pytest.mark.parametrize(
("error_type", "base_value"),
[
(ValueError, "invalid_access_token"),
(OSError, "invalid_host"),
(asyncio.TimeoutError, "timeout_connect"),
(Exception, "cannot_connect"),
],
)
async def test_form_cannot_connect(
hass: HomeAssistant, error_type: type[Exception], base_value: str
) -> None:
"""Test we handle cannot connect error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == FlowResultType.FORM
assert not result["errors"]
with patch(
"rabbitair.UdpClient.get_info",
side_effect=error_type,
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_HOST: TEST_HOST,
CONF_ACCESS_TOKEN: TEST_TOKEN,
},
)
assert result2["type"] == FlowResultType.FORM
assert result2["errors"] == {"base": base_value}
async def test_form_unknown_error(hass: HomeAssistant) -> None:
"""Test we handle unknown error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == FlowResultType.FORM
assert not result["errors"]
with patch(
"homeassistant.components.rabbitair.config_flow.validate_input",
side_effect=Exception,
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_HOST: TEST_HOST,
CONF_ACCESS_TOKEN: TEST_TOKEN,
},
)
assert result2["type"] == FlowResultType.FORM
assert result2["errors"] == {"base": "unknown"}
@pytest.mark.usefixtures("rabbitair_connect")
async def test_zeroconf_discovery(hass: HomeAssistant) -> None:
"""Test zeroconf discovery setup flow."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_ZEROCONF}, data=ZEROCONF_DATA
)
assert result["type"] == FlowResultType.FORM
assert not result["errors"]
with patch(
"homeassistant.components.rabbitair.async_setup_entry",
return_value=True,
) as mock_setup_entry:
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_HOST: TEST_NAME + ".local",
CONF_ACCESS_TOKEN: TEST_TOKEN,
},
)
await hass.async_block_till_done()
assert result2["type"] == FlowResultType.CREATE_ENTRY
assert result2["title"] == TEST_TITLE
assert result2["data"] == {
CONF_HOST: TEST_NAME + ".local",
CONF_ACCESS_TOKEN: TEST_TOKEN,
CONF_MAC: TEST_MAC,
}
assert result2["result"].unique_id == TEST_UNIQUE_ID
assert len(mock_setup_entry.mock_calls) == 1
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_ZEROCONF}, data=ZEROCONF_DATA
)
assert result["type"] == FlowResultType.ABORT
assert result["reason"] == "already_configured"