1
mirror of https://github.com/home-assistant/core synced 2024-07-27 18:58:57 +02:00

Refactor zwave_js.cover (#93433)

* Refactor zwave_js.cover and improve test coverage

* Remove extra fixtures

* cleanup old stuff

* Get coverage to 100

* Remove redundant stuff

* Revert all changes to tests

* Update conftest.py
This commit is contained in:
Raman Gupta 2023-05-24 15:37:33 -04:00 committed by GitHub
parent 46c63dd70b
commit f0874791d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 235 additions and 131 deletions

View File

@ -1,4 +1,6 @@
"""Constants for the Z-Wave JS integration."""
from __future__ import annotations
import logging
from homeassistant.const import APPLICATION_NAME, __version__ as HA_VERSION

View File

@ -54,7 +54,7 @@ async def async_setup_entry(
elif info.platform_hint and info.platform_hint.endswith("tilt"):
entities.append(ZWaveTiltCover(config_entry, driver, info))
else:
entities.append(ZWaveCover(config_entry, driver, info))
entities.append(ZWaveMultilevelSwitchCover(config_entry, driver, info))
async_add_entities(entities)
config_entry.async_on_unload(
@ -66,44 +66,199 @@ async def async_setup_entry(
)
def percent_to_zwave_position(value: int) -> int:
"""Convert position in 0-100 scale to 0-99 scale.
class CoverPositionMixin(ZWaveBaseEntity, CoverEntity):
"""Mix-in class for cover with position support."""
`value` -- (int) Position byte value from 0-100.
"""
if value > 0:
return max(1, round((value / 100) * 99))
return 0
_current_position_value: ZwaveValue | None = None
_target_position_value: ZwaveValue | None = None
_stop_position_value: ZwaveValue | None = None
def _set_position_values(
self,
current_value: ZwaveValue,
target_value: ZwaveValue | None = None,
stop_value: ZwaveValue | None = None,
) -> None:
"""Set values for position."""
self._attr_supported_features = (
(self._attr_supported_features or 0)
| CoverEntityFeature.OPEN
| CoverEntityFeature.CLOSE
| CoverEntityFeature.SET_POSITION
)
self._current_position_value = current_value
self._target_position_value = target_value or self.get_zwave_value(
TARGET_VALUE_PROPERTY, value_property_key=current_value.property_key
)
if stop_value:
self._stop_position_value = stop_value
self._attr_supported_features |= CoverEntityFeature.STOP
def percent_to_zwave_position(self, value: int) -> int:
"""Convert position in 0-100 scale to closed_value-open_value scale."""
return (
round(max(min(1, (value / 100)), 0) * self._position_range)
+ self._fully_closed_position
)
def zwave_to_percent_position(self, value: int) -> int:
"""Convert closed_value-open_value scale to position in 0-100 scale."""
return round(
((value - self._fully_closed_position) / self._position_range) * 100
)
@property
def _fully_open_position(self) -> int:
"""Return value that represents fully opened position."""
max_ = self.info.primary_value.metadata.max
return 99 if max_ is None else max_
@property
def _fully_closed_position(self) -> int:
"""Return value that represents fully closed position."""
min_ = self.info.primary_value.metadata.min
return 0 if min_ is None else min_
@property
def _position_range(self) -> int:
"""Return range between fully opened and fully closed position."""
return self._fully_open_position - self._fully_closed_position
@property
def is_closed(self) -> bool | None:
"""Return true if cover is closed."""
if not (value := self._current_position_value) or value.value is None:
return None
return bool(value.value == 0)
@property
def current_cover_position(self) -> int | None:
"""Return the current position of cover where 0 means closed and 100 is fully open."""
if (
self._current_position_value is None
or self._current_position_value.value is None
):
# guard missing value
return None
return self.zwave_to_percent_position(self._current_position_value.value)
async def async_set_cover_position(self, **kwargs: Any) -> None:
"""Move the cover to a specific position."""
assert self._target_position_value
await self.info.node.async_set_value(
self._target_position_value,
self.percent_to_zwave_position(kwargs[ATTR_POSITION]),
)
async def async_open_cover(self, **kwargs: Any) -> None:
"""Open the cover."""
assert self._target_position_value
await self.info.node.async_set_value(
self._target_position_value, self._fully_open_position
)
async def async_close_cover(self, **kwargs: Any) -> None:
"""Close cover."""
assert self._target_position_value
await self.info.node.async_set_value(
self._target_position_value, self._fully_closed_position
)
async def async_stop_cover(self, **kwargs: Any) -> None:
"""Stop cover."""
assert self._stop_position_value
# Stop the cover, will stop regardless of the actual direction of travel.
await self.info.node.async_set_value(self._stop_position_value, False)
def percent_to_zwave_tilt(value: int) -> int:
"""Convert position in 0-100 scale to 0-99 scale.
class CoverTiltMixin(ZWaveBaseEntity, CoverEntity):
"""Mix-in class for cover with tilt support."""
`value` -- (int) Position byte value from 0-100.
"""
if value > 0:
return round((value / 100) * 99)
return 0
_current_tilt_value: ZwaveValue | None = None
_target_tilt_value: ZwaveValue | None = None
_stop_tilt_value: ZwaveValue | None = None
def _set_tilt_values(
self,
current_value: ZwaveValue,
target_value: ZwaveValue | None = None,
) -> None:
"""Set values for tilt."""
self._attr_supported_features = (
(self._attr_supported_features or 0)
| CoverEntityFeature.OPEN_TILT
| CoverEntityFeature.CLOSE_TILT
| CoverEntityFeature.SET_TILT_POSITION
)
self._current_tilt_value = current_value
self._target_tilt_value = target_value or self.get_zwave_value(
TARGET_VALUE_PROPERTY, value_property_key=current_value.property_key
)
def percent_to_zwave_tilt(self, value: int) -> int:
"""Convert position in 0-100 scale to closed_value-open_value scale."""
return (
round(max(min(1, (value / 100)), 0) * self._tilt_range)
+ self._fully_closed_tilt
)
def zwave_to_percent_tilt(self, value: int) -> int:
"""Convert closed_value-open_value scale to position in 0-100 scale."""
return round(((value - self._fully_closed_tilt) / self._tilt_range) * 100)
@property
def _fully_open_tilt(self) -> int:
"""Return value that represents fully opened tilt."""
max_ = self.info.primary_value.metadata.max
return 99 if max_ is None else max_
@property
def _fully_closed_tilt(self) -> int:
"""Return value that represents fully closed tilt."""
min_ = self.info.primary_value.metadata.min
return 0 if min_ is None else min_
@property
def _tilt_range(self) -> int:
"""Return range between fully opened and fully closed tilt."""
return self._fully_open_tilt - self._fully_closed_tilt
@property
def current_cover_tilt_position(self) -> int | None:
"""Return current position of cover tilt.
None is unknown, 0 is closed, 100 is fully open.
"""
if (value := self._current_tilt_value) is None or value.value is None:
return None
return self.zwave_to_percent_tilt(int(value.value))
async def async_set_cover_tilt_position(self, **kwargs: Any) -> None:
"""Move the cover tilt to a specific position."""
assert self._target_tilt_value
await self.info.node.async_set_value(
self._target_tilt_value,
self.percent_to_zwave_tilt(kwargs[ATTR_TILT_POSITION]),
)
async def async_open_cover_tilt(self, **kwargs: Any) -> None:
"""Open the cover tilt."""
assert self._target_tilt_value
await self.info.node.async_set_value(
self._target_tilt_value, self._fully_open_tilt
)
async def async_close_cover_tilt(self, **kwargs: Any) -> None:
"""Close the cover tilt."""
assert self._target_tilt_value
await self.info.node.async_set_value(
self._target_tilt_value, self._fully_closed_tilt
)
def zwave_tilt_to_percent(value: int) -> int:
"""Convert 0-99 scale to position in 0-100 scale.
`value` -- (int) Position byte value from 0-99.
"""
if value > 0:
return round((value / 99) * 100)
return 0
class ZWaveCover(ZWaveBaseEntity, CoverEntity):
"""Representation of a Z-Wave Cover device."""
_attr_supported_features = (
CoverEntityFeature.OPEN
| CoverEntityFeature.CLOSE
| CoverEntityFeature.SET_POSITION
)
class ZWaveMultilevelSwitchCover(CoverPositionMixin):
"""Representation of a Z-Wave Cover that uses Multilevel Switch CC for position."""
def __init__(
self,
@ -113,16 +268,15 @@ class ZWaveCover(ZWaveBaseEntity, CoverEntity):
) -> None:
"""Initialize a ZWaveCover entity."""
super().__init__(config_entry, driver, info)
self._stop_cover_value = (
self.get_zwave_value(COVER_OPEN_PROPERTY)
or self.get_zwave_value(COVER_UP_PROPERTY)
or self.get_zwave_value(COVER_ON_PROPERTY)
self._set_position_values(
self.info.primary_value,
stop_value=(
self.get_zwave_value(COVER_OPEN_PROPERTY)
or self.get_zwave_value(COVER_UP_PROPERTY)
or self.get_zwave_value(COVER_ON_PROPERTY)
),
)
if self._stop_cover_value:
self._attr_supported_features |= CoverEntityFeature.STOP
# Entity class attributes
self._attr_device_class = CoverDeviceClass.WINDOW
if self.info.platform_hint and self.info.platform_hint.startswith("shutter"):
@ -130,50 +284,8 @@ class ZWaveCover(ZWaveBaseEntity, CoverEntity):
if self.info.platform_hint and self.info.platform_hint.startswith("blind"):
self._attr_device_class = CoverDeviceClass.BLIND
@property
def is_closed(self) -> bool | None:
"""Return true if cover is closed."""
if self.info.primary_value.value is None:
# guard missing value
return None
return bool(self.info.primary_value.value == 0)
@property
def current_cover_position(self) -> int | None:
"""Return the current position of cover where 0 means closed and 100 is fully open."""
if self.info.primary_value.value is None:
# guard missing value
return None
return round((cast(int, self.info.primary_value.value) / 99) * 100)
async def async_set_cover_position(self, **kwargs: Any) -> None:
"""Move the cover to a specific position."""
target_value = self.get_zwave_value(TARGET_VALUE_PROPERTY)
assert target_value is not None
await self.info.node.async_set_value(
target_value, percent_to_zwave_position(kwargs[ATTR_POSITION])
)
async def async_open_cover(self, **kwargs: Any) -> None:
"""Open the cover."""
target_value = self.get_zwave_value(TARGET_VALUE_PROPERTY)
assert target_value is not None
await self.info.node.async_set_value(target_value, 99)
async def async_close_cover(self, **kwargs: Any) -> None:
"""Close cover."""
target_value = self.get_zwave_value(TARGET_VALUE_PROPERTY)
assert target_value is not None
await self.info.node.async_set_value(target_value, 0)
async def async_stop_cover(self, **kwargs: Any) -> None:
"""Stop cover."""
assert self._stop_cover_value
# Stop the cover, will stop regardless of the actual direction of travel.
await self.info.node.async_set_value(self._stop_cover_value, False)
class ZWaveTiltCover(ZWaveCover):
class ZWaveTiltCover(ZWaveMultilevelSwitchCover, CoverTiltMixin):
"""Representation of a Z-Wave cover device with tilt."""
def __init__(
@ -185,43 +297,12 @@ class ZWaveTiltCover(ZWaveCover):
"""Initialize a ZWaveCover entity."""
super().__init__(config_entry, driver, info)
self._current_tilt_value = cast(
CoverTiltDataTemplate, self.info.platform_data_template
).current_tilt_value(self.info.platform_data)
self._attr_supported_features |= (
CoverEntityFeature.OPEN_TILT
| CoverEntityFeature.CLOSE_TILT
| CoverEntityFeature.SET_TILT_POSITION
template = cast(CoverTiltDataTemplate, self.info.platform_data_template)
self._set_tilt_values(
template.current_tilt_value(self.info.platform_data),
template.target_tilt_value(self.info.platform_data),
)
@property
def current_cover_tilt_position(self) -> int | None:
"""Return current position of cover tilt.
None is unknown, 0 is closed, 100 is fully open.
"""
value = self._current_tilt_value
if value is None or value.value is None:
return None
return zwave_tilt_to_percent(int(value.value))
async def async_set_cover_tilt_position(self, **kwargs: Any) -> None:
"""Move the cover tilt to a specific position."""
assert self._current_tilt_value
await self.info.node.async_set_value(
self._current_tilt_value,
percent_to_zwave_tilt(kwargs[ATTR_TILT_POSITION]),
)
async def async_open_cover_tilt(self, **kwargs: Any) -> None:
"""Open the cover tilt."""
await self.async_set_cover_tilt_position(tilt_position=100)
async def async_close_cover_tilt(self, **kwargs: Any) -> None:
"""Close the cover tilt."""
await self.async_set_cover_tilt_position(tilt_position=0)
class ZwaveMotorizedBarrier(ZWaveBaseEntity, CoverEntity):
"""Representation of a Z-Wave motorized barrier device."""

View File

@ -363,12 +363,18 @@ DISCOVERY_SCHEMAS = [
product_type={0x0301, 0x0302},
primary_value=SWITCH_MULTILEVEL_CURRENT_VALUE_SCHEMA,
data_template=CoverTiltDataTemplate(
tilt_value_id=ZwaveValueID(
current_tilt_value_id=ZwaveValueID(
property_="fibaro",
command_class=CommandClass.MANUFACTURER_PROPRIETARY,
endpoint=0,
property_key="venetianBlindsTilt",
)
),
target_tilt_value_id=ZwaveValueID(
property_="fibaro",
command_class=CommandClass.MANUFACTURER_PROPRIETARY,
endpoint=0,
property_key="venetianBlindsTilt",
),
),
required_values=[
ZWaveValueDiscoverySchema(
@ -854,7 +860,7 @@ DISCOVERY_SCHEMAS = [
# window coverings
ZWaveDiscoverySchema(
platform=Platform.COVER,
hint="cover",
hint="multilevel_switch",
device_class_generic={"Multilevel Switch"},
device_class_specific={
"Motor Control Class A",

View File

@ -392,31 +392,46 @@ class NumericSensorDataTemplate(BaseDiscoverySchemaDataTemplate):
@dataclass
class TiltValueMix:
"""Mixin data class for the tilt_value."""
"""Mixin data class for the current_tilt_value and target_tilt_value."""
tilt_value_id: ZwaveValueID
current_tilt_value_id: ZwaveValueID
target_tilt_value_id: ZwaveValueID
@dataclass
class CoverTiltDataTemplate(BaseDiscoverySchemaDataTemplate, TiltValueMix):
"""Tilt data template class for Z-Wave Cover entities."""
def resolve_data(self, value: ZwaveValue) -> dict[str, ZwaveValue | None]:
def resolve_data(self, value: ZwaveValue) -> dict[str, ZwaveValue]:
"""Resolve helper class data for a discovered value."""
return {"tilt_value": self._get_value_from_id(value.node, self.tilt_value_id)}
current_tilt_value = self._get_value_from_id(
value.node, self.current_tilt_value_id
)
assert current_tilt_value
target_tilt_value = self._get_value_from_id(
value.node, self.target_tilt_value_id
)
assert target_tilt_value
return {
"current_tilt_value": current_tilt_value,
"target_tilt_value": target_tilt_value,
}
def values_to_watch(
self, resolved_data: dict[str, Any]
) -> Iterable[ZwaveValue | None]:
"""Return list of all ZwaveValues resolved by helper that should be watched."""
return [resolved_data["tilt_value"]]
return [resolved_data["current_tilt_value"], resolved_data["target_tilt_value"]]
@staticmethod
def current_tilt_value(
resolved_data: dict[str, ZwaveValue | None]
) -> ZwaveValue | None:
def current_tilt_value(resolved_data: dict[str, ZwaveValue]) -> ZwaveValue:
"""Get current tilt ZwaveValue from resolved data."""
return resolved_data["tilt_value"]
return resolved_data["current_tilt_value"]
@staticmethod
def target_tilt_value(resolved_data: dict[str, ZwaveValue]) -> ZwaveValue:
"""Get target tilt ZwaveValue from resolved data."""
return resolved_data["target_tilt_value"]
@dataclass