ha-core/homeassistant/components/zwave_js/migrate.py

241 lines
8.0 KiB
Python

"""Functions used to migrate unique IDs for Z-Wave JS entities."""
from __future__ import annotations
from dataclasses import dataclass
import logging
from zwave_js_server.model.driver import Driver
from zwave_js_server.model.value import Value as ZwaveValue
from homeassistant.const import STATE_UNAVAILABLE
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.device_registry import DeviceEntry
from homeassistant.helpers.entity_registry import (
EntityRegistry,
RegistryEntry,
async_entries_for_device,
)
from .const import DOMAIN
from .discovery import ZwaveDiscoveryInfo
from .helpers import get_unique_id
_LOGGER = logging.getLogger(__name__)
@dataclass
class ValueID:
"""Class to represent a Value ID."""
command_class: str
endpoint: str
property_: str
property_key: str | None = None
@staticmethod
def from_unique_id(unique_id: str) -> ValueID:
"""Get a ValueID from a unique ID.
This also works for Notification CC Binary Sensors which have their
own unique ID format.
"""
return ValueID.from_string_id(unique_id.split(".")[1])
@staticmethod
def from_string_id(value_id_str: str) -> ValueID:
"""Get a ValueID from a string representation of the value ID."""
parts = value_id_str.split("-")
property_key = parts[4] if len(parts) > 4 else None
return ValueID(parts[1], parts[2], parts[3], property_key=property_key)
def is_same_value_different_endpoints(self, other: ValueID) -> bool:
"""Return whether two value IDs are the same excluding endpoint."""
return (
self.command_class == other.command_class
and self.property_ == other.property_
and self.property_key == other.property_key
and self.endpoint != other.endpoint
)
@callback
def async_migrate_old_entity(
hass: HomeAssistant,
ent_reg: EntityRegistry,
registered_unique_ids: set[str],
platform: str,
device: DeviceEntry,
unique_id: str,
) -> None:
"""Migrate existing entity if current one can't be found and an old one exists."""
# If we can find an existing entity with this unique ID, there's nothing to migrate
if ent_reg.async_get_entity_id(platform, DOMAIN, unique_id):
return
value_id = ValueID.from_unique_id(unique_id)
# Look for existing entities in the registry that could be the same value but on
# a different endpoint
existing_entity_entries: list[RegistryEntry] = []
for entry in async_entries_for_device(ent_reg, device.id):
# If entity is not in the domain for this discovery info or entity has already
# been processed, skip it
if entry.domain != platform or entry.unique_id in registered_unique_ids:
continue
try:
old_ent_value_id = ValueID.from_unique_id(entry.unique_id)
# Skip non value ID based unique ID's (e.g. node status sensor)
except IndexError:
continue
if value_id.is_same_value_different_endpoints(old_ent_value_id):
existing_entity_entries.append(entry)
# We can return early if we get more than one result
if len(existing_entity_entries) > 1:
return
# If we couldn't find any results, return early
if not existing_entity_entries:
return
entry = existing_entity_entries[0]
state = hass.states.get(entry.entity_id)
if not state or state.state == STATE_UNAVAILABLE:
async_migrate_unique_id(ent_reg, platform, entry.unique_id, unique_id)
@callback
def async_migrate_unique_id(
ent_reg: EntityRegistry, platform: str, old_unique_id: str, new_unique_id: str
) -> None:
"""Check if entity with old unique ID exists, and if so migrate it to new ID."""
if entity_id := ent_reg.async_get_entity_id(platform, DOMAIN, old_unique_id):
_LOGGER.debug(
"Migrating entity %s from old unique ID '%s' to new unique ID '%s'",
entity_id,
old_unique_id,
new_unique_id,
)
try:
ent_reg.async_update_entity(entity_id, new_unique_id=new_unique_id)
except ValueError:
_LOGGER.debug(
(
"Entity %s can't be migrated because the unique ID is taken; "
"Cleaning it up since it is likely no longer valid"
),
entity_id,
)
ent_reg.async_remove(entity_id)
@callback
def async_migrate_discovered_value(
hass: HomeAssistant,
ent_reg: EntityRegistry,
registered_unique_ids: set[str],
device: DeviceEntry,
driver: Driver,
disc_info: ZwaveDiscoveryInfo,
) -> None:
"""Migrate unique ID for entity/entities tied to discovered value."""
new_unique_id = get_unique_id(driver, disc_info.primary_value.value_id)
# On reinterviews, there is no point in going through this logic again for already
# discovered values
if new_unique_id in registered_unique_ids:
return
# Migration logic was added in 2021.3 to handle a breaking change to the value_id
# format. Some time in the future, the logic to migrate unique IDs can be removed.
# 2021.2.*, 2021.3.0b0, and 2021.3.0 formats
old_unique_ids = [
get_unique_id(driver, value_id)
for value_id in get_old_value_ids(disc_info.primary_value)
]
if (
disc_info.platform == "binary_sensor"
and disc_info.platform_hint == "notification"
):
for state_key in disc_info.primary_value.metadata.states:
# ignore idle key (0)
if state_key == "0":
continue
new_bin_sensor_unique_id = f"{new_unique_id}.{state_key}"
# On reinterviews, there is no point in going through this logic again
# for already discovered values
if new_bin_sensor_unique_id in registered_unique_ids:
continue
# Unique ID migration
for old_unique_id in old_unique_ids:
async_migrate_unique_id(
ent_reg,
disc_info.platform,
f"{old_unique_id}.{state_key}",
new_bin_sensor_unique_id,
)
# Migrate entities in case upstream changes cause endpoint change
async_migrate_old_entity(
hass,
ent_reg,
registered_unique_ids,
disc_info.platform,
device,
new_bin_sensor_unique_id,
)
registered_unique_ids.add(new_bin_sensor_unique_id)
# Once we've iterated through all state keys, we are done
return
# Unique ID migration
for old_unique_id in old_unique_ids:
async_migrate_unique_id(
ent_reg, disc_info.platform, old_unique_id, new_unique_id
)
# Migrate entities in case upstream changes cause endpoint change
async_migrate_old_entity(
hass, ent_reg, registered_unique_ids, disc_info.platform, device, new_unique_id
)
registered_unique_ids.add(new_unique_id)
@callback
def get_old_value_ids(value: ZwaveValue) -> list[str]:
"""Get old value IDs so we can migrate entity unique ID."""
value_ids = []
# Pre 2021.3.0 value ID
command_class = value.command_class
endpoint = value.endpoint or "00"
property_ = value.property_
property_key_name = value.property_key_name or "00"
value_ids.append(
f"{value.node.node_id}.{value.node.node_id}-{command_class}-{endpoint}-"
f"{property_}-{property_key_name}"
)
endpoint = "00" if value.endpoint is None else value.endpoint
property_key = "00" if value.property_key is None else value.property_key
property_key_name = value.property_key_name or "00"
value_id = (
f"{value.node.node_id}-{command_class}-{endpoint}-"
f"{property_}-{property_key}-{property_key_name}"
)
# 2021.3.0b0 and 2021.3.0 value IDs
value_ids.extend([f"{value.node.node_id}.{value_id}", value_id])
return value_ids