diff --git a/homeassistant/components/mqtt/mixins.py b/homeassistant/components/mqtt/mixins.py index 6b181f2e4b54..400413c73d70 100644 --- a/homeassistant/components/mqtt/mixins.py +++ b/homeassistant/components/mqtt/mixins.py @@ -1107,7 +1107,7 @@ class MqttEntity( payload: PublishPayloadType, qos: int = 0, retain: bool = False, - encoding: str = DEFAULT_ENCODING, + encoding: str | None = DEFAULT_ENCODING, ) -> None: """Publish message to an MQTT topic.""" log_message(self.hass, self.entity_id, topic, payload, qos, retain) diff --git a/homeassistant/components/mqtt/vacuum/__init__.py b/homeassistant/components/mqtt/vacuum/__init__.py index abab55c632c4..98183a929115 100644 --- a/homeassistant/components/mqtt/vacuum/__init__.py +++ b/homeassistant/components/mqtt/vacuum/__init__.py @@ -11,8 +11,9 @@ from homeassistant.core import HomeAssistant from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType +from ..const import CONF_SCHEMA from ..mixins import async_setup_entry_helper, async_setup_platform_helper -from .schema import CONF_SCHEMA, LEGACY, MQTT_VACUUM_SCHEMA, STATE +from .schema import LEGACY, MQTT_VACUUM_SCHEMA, STATE from .schema_legacy import ( DISCOVERY_SCHEMA_LEGACY, PLATFORM_SCHEMA_LEGACY, @@ -27,26 +28,29 @@ from .schema_state import ( ) -def validate_mqtt_vacuum_discovery(value): +def validate_mqtt_vacuum_discovery(config_value: ConfigType) -> ConfigType: """Validate MQTT vacuum schema.""" schemas = {LEGACY: DISCOVERY_SCHEMA_LEGACY, STATE: DISCOVERY_SCHEMA_STATE} - return schemas[value[CONF_SCHEMA]](value) + config: ConfigType = schemas[config_value[CONF_SCHEMA]](config_value) + return config # Configuring MQTT Vacuums under the vacuum platform key is deprecated in HA Core 2022.6 -def validate_mqtt_vacuum(value): +def validate_mqtt_vacuum(config_value: ConfigType) -> ConfigType: """Validate MQTT vacuum schema (deprecated).""" schemas = {LEGACY: PLATFORM_SCHEMA_LEGACY, STATE: PLATFORM_SCHEMA_STATE} - return schemas[value[CONF_SCHEMA]](value) + config: ConfigType = schemas[config_value[CONF_SCHEMA]](config_value) + return config -def validate_mqtt_vacuum_modern(value): +def validate_mqtt_vacuum_modern(config_value: ConfigType) -> ConfigType: """Validate MQTT vacuum modern schema.""" schemas = { LEGACY: PLATFORM_SCHEMA_LEGACY_MODERN, STATE: PLATFORM_SCHEMA_STATE_MODERN, } - return schemas[value[CONF_SCHEMA]](value) + config: ConfigType = schemas[config_value[CONF_SCHEMA]](config_value) + return config DISCOVERY_SCHEMA = vol.All( @@ -96,8 +100,8 @@ async def _async_setup_entity( hass: HomeAssistant, async_add_entities: AddEntitiesCallback, config: ConfigType, - config_entry: ConfigEntry | None = None, - discovery_data: dict | None = None, + config_entry: ConfigEntry, + discovery_data: DiscoveryInfoType | None = None, ) -> None: """Set up the MQTT vacuum.""" setup_entity = { diff --git a/homeassistant/components/mqtt/vacuum/schema.py b/homeassistant/components/mqtt/vacuum/schema.py index d2ed91fa631a..20b7bced99d4 100644 --- a/homeassistant/components/mqtt/vacuum/schema.py +++ b/homeassistant/components/mqtt/vacuum/schema.py @@ -1,6 +1,10 @@ """Shared schema code.""" +from __future__ import annotations + import voluptuous as vol +from homeassistant.components.vacuum import VacuumEntityFeature + from ..const import CONF_SCHEMA LEGACY = "legacy" @@ -15,18 +19,23 @@ MQTT_VACUUM_SCHEMA = vol.Schema( ) -def services_to_strings(services, service_to_string): +def services_to_strings( + services: VacuumEntityFeature | int, + service_to_string: dict[VacuumEntityFeature, str], +) -> list[str]: """Convert SUPPORT_* service bitmask to list of service strings.""" - strings = [] - for service in service_to_string: - if service & services: - strings.append(service_to_string[service]) - return strings + return [ + service_to_string[service] + for service in service_to_string + if service & services + ] -def strings_to_services(strings, string_to_service): +def strings_to_services( + strings: list[str], string_to_service: dict[str, VacuumEntityFeature] +) -> VacuumEntityFeature | int: """Convert service strings to SUPPORT_* service bitmask.""" - services = 0 + services: VacuumEntityFeature | int = 0 for string in strings: services |= string_to_service[string] return services diff --git a/homeassistant/components/mqtt/vacuum/schema_legacy.py b/homeassistant/components/mqtt/vacuum/schema_legacy.py index 4425a3775d91..68618aba20c6 100644 --- a/homeassistant/components/mqtt/vacuum/schema_legacy.py +++ b/homeassistant/components/mqtt/vacuum/schema_legacy.py @@ -1,4 +1,9 @@ """Support for Legacy MQTT vacuum.""" +from __future__ import annotations + +from collections.abc import Callable +from typing import Any + import voluptuous as vol from homeassistant.components.vacuum import ( @@ -8,18 +13,26 @@ from homeassistant.components.vacuum import ( VacuumEntity, VacuumEntityFeature, ) +from homeassistant.config_entries import ConfigEntry from homeassistant.const import ATTR_SUPPORTED_FEATURES, CONF_NAME -from homeassistant.core import callback +from homeassistant.core import HomeAssistant, callback import homeassistant.helpers.config_validation as cv +from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.icon import icon_for_battery_level from homeassistant.helpers.json import json_dumps +from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType from .. import subscription from ..config import MQTT_BASE_SCHEMA from ..const import CONF_COMMAND_TOPIC, CONF_ENCODING, CONF_QOS, CONF_RETAIN from ..debug_info import log_messages from ..mixins import MQTT_ENTITY_COMMON_SCHEMA, MqttEntity, warn_for_legacy_schema -from ..models import MqttValueTemplate, PayloadSentinel, ReceiveMessage +from ..models import ( + MqttValueTemplate, + PayloadSentinel, + ReceiveMessage, + ReceivePayloadType, +) from ..util import get_mqtt_data, valid_publish_topic from .const import MQTT_VACUUM_ATTRIBUTES_BLOCKED from .schema import MQTT_VACUUM_SCHEMA, services_to_strings, strings_to_services @@ -158,9 +171,45 @@ DISCOVERY_SCHEMA_LEGACY = PLATFORM_SCHEMA_LEGACY_MODERN.extend( ) +_COMMANDS = { + VacuumEntityFeature.TURN_ON: { + "payload": CONF_PAYLOAD_TURN_ON, + "status": "Cleaning", + }, + VacuumEntityFeature.TURN_OFF: { + "payload": CONF_PAYLOAD_TURN_OFF, + "status": "Turning Off", + }, + VacuumEntityFeature.STOP: { + "payload": CONF_PAYLOAD_STOP, + "status": "Stopping the current task", + }, + VacuumEntityFeature.CLEAN_SPOT: { + "payload": CONF_PAYLOAD_CLEAN_SPOT, + "status": "Cleaning spot", + }, + VacuumEntityFeature.LOCATE: { + "payload": CONF_PAYLOAD_LOCATE, + "status": "Hi, I'm over here!", + }, + VacuumEntityFeature.PAUSE: { + "payload": CONF_PAYLOAD_START_PAUSE, + "status": "Pausing/Resuming cleaning...", + }, + VacuumEntityFeature.RETURN_HOME: { + "payload": CONF_PAYLOAD_RETURN_TO_BASE, + "status": "Returning home...", + }, +} + + async def async_setup_entity_legacy( - hass, config, async_add_entities, config_entry, discovery_data -): + hass: HomeAssistant, + config: ConfigType, + async_add_entities: AddEntitiesCallback, + config_entry: ConfigEntry, + discovery_data: DiscoveryInfoType | None, +) -> None: """Set up a MQTT Vacuum Legacy.""" async_add_entities([MqttVacuum(hass, config, config_entry, discovery_data)]) @@ -171,24 +220,42 @@ class MqttVacuum(MqttEntity, VacuumEntity): _entity_id_format = ENTITY_ID_FORMAT _attributes_extra_blocked = MQTT_LEGACY_VACUUM_ATTRIBUTES_BLOCKED - def __init__(self, hass, config, config_entry, discovery_data): + _encoding: str | None + _qos: bool + _retain: bool + _payloads: dict[str, str] + _send_command_topic: str | None + _set_fan_speed_topic: str | None + _state_topics: dict[str, str | None] + _templates: dict[ + str, Callable[[ReceivePayloadType, PayloadSentinel], ReceivePayloadType] + ] + + def __init__( + self, + hass: HomeAssistant, + config: ConfigType, + config_entry: ConfigEntry, + discovery_data: DiscoveryInfoType | None, + ) -> None: """Initialize the vacuum.""" self._attr_battery_level = 0 self._attr_is_on = False self._attr_fan_speed = "unknown" self._charging = False + self._cleaning = False self._docked = False - self._error = None + self._error: str | None = None MqttEntity.__init__(self, hass, config, config_entry, discovery_data) @staticmethod - def config_schema(): + def config_schema() -> vol.Schema: """Return the config schema.""" return DISCOVERY_SCHEMA_LEGACY - def _setup_from_config(self, config): + def _setup_from_config(self, config: ConfigType) -> None: """(Re)Setup the entity.""" supported_feature_strings = config[CONF_SUPPORTED_FEATURES] self._attr_supported_features = strings_to_services( @@ -204,7 +271,7 @@ class MqttVacuum(MqttEntity, VacuumEntity): self._send_command_topic = config.get(CONF_SEND_COMMAND_TOPIC) self._payloads = { - key: config.get(key) + key: config[key] for key in ( CONF_PAYLOAD_TURN_ON, CONF_PAYLOAD_TURN_OFF, @@ -227,7 +294,9 @@ class MqttVacuum(MqttEntity, VacuumEntity): ) } self._templates = { - key: config.get(key) + key: MqttValueTemplate( + config[key], entity=self + ).async_render_with_possible_json_value for key in ( CONF_BATTERY_LEVEL_TEMPLATE, CONF_CHARGING_TEMPLATE, @@ -236,13 +305,11 @@ class MqttVacuum(MqttEntity, VacuumEntity): CONF_ERROR_TEMPLATE, CONF_FAN_SPEED_TEMPLATE, ) + if key in config } - def _prepare_subscribe_topics(self): + def _prepare_subscribe_topics(self) -> None: """(Re)Subscribe to topics.""" - for tpl in self._templates.values(): - if tpl is not None: - tpl = MqttValueTemplate(tpl, entity=self) @callback @log_messages(self.hass, self.entity_id) @@ -250,11 +317,9 @@ class MqttVacuum(MqttEntity, VacuumEntity): """Handle new MQTT message.""" if ( msg.topic == self._state_topics[CONF_BATTERY_LEVEL_TOPIC] - and self._templates[CONF_BATTERY_LEVEL_TEMPLATE] + and CONF_BATTERY_LEVEL_TEMPLATE in self._config ): - battery_level = self._templates[ - CONF_BATTERY_LEVEL_TEMPLATE - ].async_render_with_possible_json_value( + battery_level = self._templates[CONF_BATTERY_LEVEL_TEMPLATE]( msg.payload, PayloadSentinel.DEFAULT ) if battery_level and battery_level is not PayloadSentinel.DEFAULT: @@ -262,11 +327,9 @@ class MqttVacuum(MqttEntity, VacuumEntity): if ( msg.topic == self._state_topics[CONF_CHARGING_TOPIC] - and self._templates[CONF_CHARGING_TEMPLATE] + and CONF_CHARGING_TEMPLATE in self._templates ): - charging = self._templates[ - CONF_CHARGING_TEMPLATE - ].async_render_with_possible_json_value( + charging = self._templates[CONF_CHARGING_TEMPLATE]( msg.payload, PayloadSentinel.DEFAULT ) if charging and charging is not PayloadSentinel.DEFAULT: @@ -274,11 +337,9 @@ class MqttVacuum(MqttEntity, VacuumEntity): if ( msg.topic == self._state_topics[CONF_CLEANING_TOPIC] - and self._templates[CONF_CLEANING_TEMPLATE] + and CONF_CLEANING_TEMPLATE in self._config ): - cleaning = self._templates[ - CONF_CLEANING_TEMPLATE - ].async_render_with_possible_json_value( + cleaning = self._templates[CONF_CLEANING_TEMPLATE]( msg.payload, PayloadSentinel.DEFAULT ) if cleaning and cleaning is not PayloadSentinel.DEFAULT: @@ -286,11 +347,9 @@ class MqttVacuum(MqttEntity, VacuumEntity): if ( msg.topic == self._state_topics[CONF_DOCKED_TOPIC] - and self._templates[CONF_DOCKED_TEMPLATE] + and CONF_DOCKED_TEMPLATE in self._config ): - docked = self._templates[ - CONF_DOCKED_TEMPLATE - ].async_render_with_possible_json_value( + docked = self._templates[CONF_DOCKED_TEMPLATE]( msg.payload, PayloadSentinel.DEFAULT ) if docked and docked is not PayloadSentinel.DEFAULT: @@ -298,11 +357,9 @@ class MqttVacuum(MqttEntity, VacuumEntity): if ( msg.topic == self._state_topics[CONF_ERROR_TOPIC] - and self._templates[CONF_ERROR_TEMPLATE] + and CONF_ERROR_TEMPLATE in self._config ): - error = self._templates[ - CONF_ERROR_TEMPLATE - ].async_render_with_possible_json_value( + error = self._templates[CONF_ERROR_TEMPLATE]( msg.payload, PayloadSentinel.DEFAULT ) if error is not PayloadSentinel.DEFAULT: @@ -322,15 +379,13 @@ class MqttVacuum(MqttEntity, VacuumEntity): if ( msg.topic == self._state_topics[CONF_FAN_SPEED_TOPIC] - and self._templates[CONF_FAN_SPEED_TEMPLATE] + and CONF_FAN_SPEED_TEMPLATE in self._config ): - fan_speed = self._templates[ - CONF_FAN_SPEED_TEMPLATE - ].async_render_with_possible_json_value( + fan_speed = self._templates[CONF_FAN_SPEED_TEMPLATE]( msg.payload, PayloadSentinel.DEFAULT ) if fan_speed and fan_speed is not PayloadSentinel.DEFAULT: - self._attr_fan_speed = fan_speed + self._attr_fan_speed = str(fan_speed) get_mqtt_data(self.hass).state_write_requests.write_state_request(self) @@ -349,12 +404,12 @@ class MqttVacuum(MqttEntity, VacuumEntity): }, ) - async def _subscribe_topics(self): + async def _subscribe_topics(self) -> None: """(Re)Subscribe to topics.""" await subscription.async_subscribe_topics(self.hass, self._sub_state) @property - def battery_icon(self): + def battery_icon(self) -> str: """Return the battery icon for the vacuum cleaner. No need to check VacuumEntityFeature.BATTERY, this won't be called if battery_level is None. @@ -363,116 +418,57 @@ class MqttVacuum(MqttEntity, VacuumEntity): battery_level=self.battery_level, charging=self._charging ) - async def async_turn_on(self, **kwargs): - """Turn the vacuum on.""" - if self.supported_features & VacuumEntityFeature.TURN_ON == 0: + async def _async_publish_command(self, feature: VacuumEntityFeature) -> None: + """Check for a missing feature or command topic.""" + + if self._command_topic is None or self.supported_features & feature == 0: return await self.async_publish( self._command_topic, - self._payloads[CONF_PAYLOAD_TURN_ON], - self._qos, - self._retain, - self._encoding, + self._payloads[_COMMANDS[feature]["payload"]], + qos=self._qos, + retain=self._retain, + encoding=self._encoding, ) - self._attr_status = "Cleaning" + self._attr_status = _COMMANDS[feature]["status"] self.async_write_ha_state() - async def async_turn_off(self, **kwargs): + async def async_turn_on(self, **kwargs: Any) -> None: + """Turn the vacuum on.""" + await self._async_publish_command(VacuumEntityFeature.TURN_ON) + + async def async_turn_off(self, **kwargs: Any) -> None: """Turn the vacuum off.""" - if self.supported_features & VacuumEntityFeature.TURN_OFF == 0: - return None + await self._async_publish_command(VacuumEntityFeature.TURN_OFF) - await self.async_publish( - self._command_topic, - self._payloads[CONF_PAYLOAD_TURN_OFF], - self._qos, - self._retain, - self._encoding, - ) - self._attr_status = "Turning Off" - self.async_write_ha_state() - - async def async_stop(self, **kwargs): + async def async_stop(self, **kwargs: Any) -> None: """Stop the vacuum.""" - if self.supported_features & VacuumEntityFeature.STOP == 0: - return None + await self._async_publish_command(VacuumEntityFeature.STOP) - await self.async_publish( - self._command_topic, - self._payloads[CONF_PAYLOAD_STOP], - self._qos, - self._retain, - self._encoding, - ) - self._attr_status = "Stopping the current task" - self.async_write_ha_state() - - async def async_clean_spot(self, **kwargs): + async def async_clean_spot(self, **kwargs: Any) -> None: """Perform a spot clean-up.""" - if self.supported_features & VacuumEntityFeature.CLEAN_SPOT == 0: - return None + await self._async_publish_command(VacuumEntityFeature.CLEAN_SPOT) - await self.async_publish( - self._command_topic, - self._payloads[CONF_PAYLOAD_CLEAN_SPOT], - self._qos, - self._retain, - self._encoding, - ) - self._attr_status = "Cleaning spot" - self.async_write_ha_state() - - async def async_locate(self, **kwargs): + async def async_locate(self, **kwargs: Any) -> None: """Locate the vacuum (usually by playing a song).""" - if self.supported_features & VacuumEntityFeature.LOCATE == 0: - return None + await self._async_publish_command(VacuumEntityFeature.LOCATE) - await self.async_publish( - self._command_topic, - self._payloads[CONF_PAYLOAD_LOCATE], - self._qos, - self._retain, - self._encoding, - ) - self._attr_status = "Hi, I'm over here!" - self.async_write_ha_state() - - async def async_start_pause(self, **kwargs): + async def async_start_pause(self, **kwargs: Any) -> None: """Start, pause or resume the cleaning task.""" - if self.supported_features & VacuumEntityFeature.PAUSE == 0: - return None + await self._async_publish_command(VacuumEntityFeature.PAUSE) - await self.async_publish( - self._command_topic, - self._payloads[CONF_PAYLOAD_START_PAUSE], - self._qos, - self._retain, - self._encoding, - ) - self._attr_status = "Pausing/Resuming cleaning..." - self.async_write_ha_state() - - async def async_return_to_base(self, **kwargs): + async def async_return_to_base(self, **kwargs: Any) -> None: """Tell the vacuum to return to its dock.""" - if self.supported_features & VacuumEntityFeature.RETURN_HOME == 0: - return None + await self._async_publish_command(VacuumEntityFeature.RETURN_HOME) - await self.async_publish( - self._command_topic, - self._payloads[CONF_PAYLOAD_RETURN_TO_BASE], - self._qos, - self._retain, - self._encoding, - ) - self._attr_status = "Returning home..." - self.async_write_ha_state() - - async def async_set_fan_speed(self, fan_speed, **kwargs): + async def async_set_fan_speed(self, fan_speed: str, **kwargs: Any) -> None: """Set fan speed.""" if ( - self.supported_features & VacuumEntityFeature.FAN_SPEED == 0 - ) or fan_speed not in self.fan_speed_list: + self._set_fan_speed_topic is None + or (self.supported_features & VacuumEntityFeature.FAN_SPEED == 0) + or fan_speed not in self.fan_speed_list + ): return None await self.async_publish( @@ -485,22 +481,30 @@ class MqttVacuum(MqttEntity, VacuumEntity): self._attr_status = f"Setting fan to {fan_speed}..." self.async_write_ha_state() - async def async_send_command(self, command, params=None, **kwargs): + async def async_send_command( + self, + command: str, + params: dict[str, Any] | list[Any] | None = None, + **kwargs: Any, + ) -> None: """Send a command to a vacuum cleaner.""" - if self.supported_features & VacuumEntityFeature.SEND_COMMAND == 0: + if ( + self._send_command_topic is None + or self.supported_features & VacuumEntityFeature.SEND_COMMAND == 0 + ): return if params: - message = {"command": command} + message: dict[str, Any] = {"command": command} message.update(params) - message = json_dumps(message) + message_payload = json_dumps(message) else: - message = command + message_payload = command await self.async_publish( self._send_command_topic, - message, + message_payload, self._qos, self._retain, self._encoding, ) - self._attr_status = f"Sending command {message}..." + self._attr_status = f"Sending command {message_payload}..." self.async_write_ha_state() diff --git a/homeassistant/components/mqtt/vacuum/schema_state.py b/homeassistant/components/mqtt/vacuum/schema_state.py index ede258102d7f..2bacf4f36de9 100644 --- a/homeassistant/components/mqtt/vacuum/schema_state.py +++ b/homeassistant/components/mqtt/vacuum/schema_state.py @@ -1,4 +1,8 @@ """Support for a State MQTT vacuum.""" +from __future__ import annotations + +from typing import Any + import voluptuous as vol from homeassistant.components.vacuum import ( @@ -11,15 +15,18 @@ from homeassistant.components.vacuum import ( StateVacuumEntity, VacuumEntityFeature, ) +from homeassistant.config_entries import ConfigEntry from homeassistant.const import ( ATTR_SUPPORTED_FEATURES, CONF_NAME, STATE_IDLE, STATE_PAUSED, ) -from homeassistant.core import callback +from homeassistant.core import HomeAssistant, callback import homeassistant.helpers.config_validation as cv +from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.json import json_dumps, json_loads +from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType from .. import subscription from ..config import MQTT_BASE_SCHEMA @@ -32,11 +39,12 @@ from ..const import ( ) from ..debug_info import log_messages from ..mixins import MQTT_ENTITY_COMMON_SCHEMA, MqttEntity, warn_for_legacy_schema +from ..models import ReceiveMessage from ..util import get_mqtt_data, valid_publish_topic from .const import MQTT_VACUUM_ATTRIBUTES_BLOCKED from .schema import MQTT_VACUUM_SCHEMA, services_to_strings, strings_to_services -SERVICE_TO_STRING = { +SERVICE_TO_STRING: dict[VacuumEntityFeature, str] = { VacuumEntityFeature.START: "start", VacuumEntityFeature.PAUSE: "pause", VacuumEntityFeature.STOP: "stop", @@ -52,7 +60,7 @@ SERVICE_TO_STRING = { STRING_TO_SERVICE = {v: k for k, v in SERVICE_TO_STRING.items()} -DEFAULT_SERVICES = ( +DEFAULT_SERVICES: VacuumEntityFeature | int = ( VacuumEntityFeature.START | VacuumEntityFeature.STOP | VacuumEntityFeature.RETURN_HOME @@ -60,7 +68,7 @@ DEFAULT_SERVICES = ( | VacuumEntityFeature.BATTERY | VacuumEntityFeature.CLEAN_SPOT ) -ALL_SERVICES = ( +ALL_SERVICES: VacuumEntityFeature | int = ( DEFAULT_SERVICES | VacuumEntityFeature.PAUSE | VacuumEntityFeature.LOCATE @@ -72,7 +80,7 @@ BATTERY = "battery_level" FAN_SPEED = "fan_speed" STATE = "state" -POSSIBLE_STATES = { +POSSIBLE_STATES: dict[str, str] = { STATE_IDLE: STATE_IDLE, STATE_DOCKED: STATE_DOCKED, STATE_ERROR: STATE_ERROR, @@ -104,6 +112,15 @@ DEFAULT_PAYLOAD_LOCATE = "locate" DEFAULT_PAYLOAD_START = "start" DEFAULT_PAYLOAD_PAUSE = "pause" +_FEATURE_PAYLOADS = { + VacuumEntityFeature.START: CONF_PAYLOAD_START, + VacuumEntityFeature.STOP: CONF_PAYLOAD_STOP, + VacuumEntityFeature.PAUSE: CONF_PAYLOAD_PAUSE, + VacuumEntityFeature.CLEAN_SPOT: CONF_PAYLOAD_CLEAN_SPOT, + VacuumEntityFeature.LOCATE: CONF_PAYLOAD_LOCATE, + VacuumEntityFeature.RETURN_HOME: CONF_PAYLOAD_RETURN_TO_BASE, +} + PLATFORM_SCHEMA_STATE_MODERN = ( MQTT_BASE_SCHEMA.extend( { @@ -147,8 +164,12 @@ DISCOVERY_SCHEMA_STATE = PLATFORM_SCHEMA_STATE_MODERN.extend({}, extra=vol.REMOV async def async_setup_entity_state( - hass, config, async_add_entities, config_entry, discovery_data -): + hass: HomeAssistant, + config: ConfigType, + async_add_entities: AddEntitiesCallback, + config_entry: ConfigEntry, + discovery_data: DiscoveryInfoType | None, +) -> None: """Set up a State MQTT Vacuum.""" async_add_entities([MqttStateVacuum(hass, config, config_entry, discovery_data)]) @@ -159,20 +180,30 @@ class MqttStateVacuum(MqttEntity, StateVacuumEntity): _entity_id_format = ENTITY_ID_FORMAT _attributes_extra_blocked = MQTT_VACUUM_ATTRIBUTES_BLOCKED - def __init__(self, hass, config, config_entry, discovery_data): + _set_fan_speed_topic: str | None + _send_command_topic: str | None + _payloads: dict[str, str | None] + + def __init__( + self, + hass: HomeAssistant, + config: ConfigType, + config_entry: ConfigEntry, + discovery_data: DiscoveryInfoType | None, + ) -> None: """Initialize the vacuum.""" - self._state_attrs = {} + self._state_attrs: dict[str, Any] = {} MqttEntity.__init__(self, hass, config, config_entry, discovery_data) @staticmethod - def config_schema(): + def config_schema() -> vol.Schema: """Return the config schema.""" return DISCOVERY_SCHEMA_STATE - def _setup_from_config(self, config): + def _setup_from_config(self, config: ConfigType) -> None: """(Re)Setup the entity.""" - supported_feature_strings = config[CONF_SUPPORTED_FEATURES] + supported_feature_strings: list[str] = config[CONF_SUPPORTED_FEATURES] self._attr_supported_features = strings_to_services( supported_feature_strings, STRING_TO_SERVICE ) @@ -193,21 +224,21 @@ class MqttStateVacuum(MqttEntity, StateVacuumEntity): ) } - def _update_state_attributes(self, payload): + def _update_state_attributes(self, payload: dict[str, Any]) -> None: """Update the entity state attributes.""" self._state_attrs.update(payload) self._attr_fan_speed = self._state_attrs.get(FAN_SPEED, 0) self._attr_battery_level = max(0, min(100, self._state_attrs.get(BATTERY, 0))) - def _prepare_subscribe_topics(self): + def _prepare_subscribe_topics(self) -> None: """(Re)Subscribe to topics.""" - topics = {} + topics: dict[str, Any] = {} @callback @log_messages(self.hass, self.entity_id) - def state_message_received(msg): + def state_message_received(msg: ReceiveMessage) -> None: """Handle state MQTT message.""" - payload = json_loads(msg.payload) + payload: dict[str, Any] = json_loads(msg.payload) if STATE in payload and ( payload[STATE] in POSSIBLE_STATES or payload[STATE] is None ): @@ -218,9 +249,9 @@ class MqttStateVacuum(MqttEntity, StateVacuumEntity): self._update_state_attributes(payload) get_mqtt_data(self.hass).state_write_requests.write_state_request(self) - if self._config.get(CONF_STATE_TOPIC): + if state_topic := self._config.get(CONF_STATE_TOPIC): topics["state_position_topic"] = { - "topic": self._config.get(CONF_STATE_TOPIC), + "topic": state_topic, "msg_callback": state_message_received, "qos": self._config[CONF_QOS], "encoding": self._config[CONF_ENCODING] or None, @@ -229,50 +260,54 @@ class MqttStateVacuum(MqttEntity, StateVacuumEntity): self.hass, self._sub_state, topics ) - async def _subscribe_topics(self): + async def _subscribe_topics(self) -> None: """(Re)Subscribe to topics.""" await subscription.async_subscribe_topics(self.hass, self._sub_state) - async def async_start(self): + async def _async_publish_command(self, feature: VacuumEntityFeature) -> None: + """Check for a missing feature or command topic.""" + if self._command_topic is None or self.supported_features & feature == 0: + return + + await self.async_publish( + self._command_topic, + self._payloads[_FEATURE_PAYLOADS[feature]], + qos=self._config[CONF_QOS], + retain=self._config[CONF_RETAIN], + encoding=self._config[CONF_ENCODING], + ) + self.async_write_ha_state() + + async def async_start(self) -> None: """Start the vacuum.""" - if self.supported_features & VacuumEntityFeature.START == 0: - return None - await self.async_publish( - self._command_topic, - self._config[CONF_PAYLOAD_START], - self._config[CONF_QOS], - self._config[CONF_RETAIN], - self._config[CONF_ENCODING], - ) + await self._async_publish_command(VacuumEntityFeature.START) - async def async_pause(self): + async def async_pause(self) -> None: """Pause the vacuum.""" - if self.supported_features & VacuumEntityFeature.PAUSE == 0: - return - await self.async_publish( - self._command_topic, - self._config[CONF_PAYLOAD_PAUSE], - self._config[CONF_QOS], - self._config[CONF_RETAIN], - self._config[CONF_ENCODING], - ) + await self._async_publish_command(VacuumEntityFeature.PAUSE) - async def async_stop(self, **kwargs): + async def async_stop(self, **kwargs: Any) -> None: """Stop the vacuum.""" - if self.supported_features & VacuumEntityFeature.STOP == 0: - return - await self.async_publish( - self._command_topic, - self._config[CONF_PAYLOAD_STOP], - self._config[CONF_QOS], - self._config[CONF_RETAIN], - self._config[CONF_ENCODING], - ) + await self._async_publish_command(VacuumEntityFeature.STOP) - async def async_set_fan_speed(self, fan_speed, **kwargs): + async def async_return_to_base(self, **kwargs: Any) -> None: + """Tell the vacuum to return to its dock.""" + await self._async_publish_command(VacuumEntityFeature.RETURN_HOME) + + async def async_clean_spot(self, **kwargs: Any) -> None: + """Perform a spot clean-up.""" + await self._async_publish_command(VacuumEntityFeature.CLEAN_SPOT) + + async def async_locate(self, **kwargs: Any) -> None: + """Locate the vacuum (usually by playing a song).""" + await self._async_publish_command(VacuumEntityFeature.LOCATE) + + async def async_set_fan_speed(self, fan_speed: str, **kwargs: Any) -> None: """Set fan speed.""" - if (self.supported_features & VacuumEntityFeature.FAN_SPEED == 0) or ( - fan_speed not in self.fan_speed_list + if ( + self._set_fan_speed_topic is None + or (self.supported_features & VacuumEntityFeature.FAN_SPEED == 0) + or (fan_speed not in self.fan_speed_list) ): return await self.async_publish( @@ -283,55 +318,27 @@ class MqttStateVacuum(MqttEntity, StateVacuumEntity): self._config[CONF_ENCODING], ) - async def async_return_to_base(self, **kwargs): - """Tell the vacuum to return to its dock.""" - if self.supported_features & VacuumEntityFeature.RETURN_HOME == 0: - return - await self.async_publish( - self._command_topic, - self._config[CONF_PAYLOAD_RETURN_TO_BASE], - self._config[CONF_QOS], - self._config[CONF_RETAIN], - self._config[CONF_ENCODING], - ) - - async def async_clean_spot(self, **kwargs): - """Perform a spot clean-up.""" - if self.supported_features & VacuumEntityFeature.CLEAN_SPOT == 0: - return - await self.async_publish( - self._command_topic, - self._config[CONF_PAYLOAD_CLEAN_SPOT], - self._config[CONF_QOS], - self._config[CONF_RETAIN], - self._config[CONF_ENCODING], - ) - - async def async_locate(self, **kwargs): - """Locate the vacuum (usually by playing a song).""" - if self.supported_features & VacuumEntityFeature.LOCATE == 0: - return - await self.async_publish( - self._command_topic, - self._config[CONF_PAYLOAD_LOCATE], - self._config[CONF_QOS], - self._config[CONF_RETAIN], - self._config[CONF_ENCODING], - ) - - async def async_send_command(self, command, params=None, **kwargs): + async def async_send_command( + self, + command: str, + params: dict[str, Any] | list[Any] | None = None, + **kwargs: Any, + ) -> None: """Send a command to a vacuum cleaner.""" - if self.supported_features & VacuumEntityFeature.SEND_COMMAND == 0: + if ( + self._send_command_topic is None + or self.supported_features & VacuumEntityFeature.SEND_COMMAND == 0 + ): return - if params: - message = {"command": command} + if isinstance(params, dict): + message: dict[str, Any] = {"command": command} message.update(params) - message = json_dumps(message) + payload = json_dumps(message) else: - message = command + payload = command await self.async_publish( self._send_command_topic, - message, + payload, self._config[CONF_QOS], self._config[CONF_RETAIN], self._config[CONF_ENCODING],