Config-flow for DLNA-DMR integration (#55267)

* Modernize dlna_dmr component: configflow, test, types

* Support config-flow with ssdp discovery
* Add unit tests
* Enforce strict typing
* Gracefully handle network devices (dis)appearing

* Fix Aiohttp mock response headers type to match actual response class

* Fixes from code review

* Fixes from code review

* Import device config in flow if unavailable at hass start

* Support SSDP advertisements

* Ignore bad BOOTID, fix ssdp:byebye handling

* Only listen for events on interface connected to device

* Release all listeners when entities are removed

* Warn about deprecated dlna_dmr configuration

* Use sublogger for dlna_dmr.config_flow for easier filtering

* Tests for dlna_dmr.data module

* Rewrite DMR tests for HA style

* Fix DMR strings: "Digital Media *Renderer*"

* Update DMR entity state and device info when changed

* Replace deprecated async_upnp_client State with TransportState

* supported_features are dynamic, based on current device state

* Cleanup fully when subscription fails

* Log warnings when device connection fails unexpectedly

* Set PARALLEL_UPDATES to unlimited

* Fix spelling

* Fixes from code review

* Simplify has & can checks to just can, which includes has

* Treat transitioning state as playing (not idle) to reduce UI jerking

* Test if device is usable

* Handle ssdp:update messages properly

* Fix _remove_ssdp_callbacks being shared by all DlnaDmrEntity instances

* Fix tests for transitioning state

* Mock DmrDevice.is_profile_device (added to support embedded devices)

* Use ST & NT SSDP headers to find DMR devices, not deviceType

The deviceType is extracted from the device's description XML, and will not
be what we want when dealing with embedded devices.

* Use UDN from SSDP headers, not device description, as unique_id

The SSDP headers have the UDN of the embedded device that we're interested
in, whereas the device description (`ATTR_UPNP_UDN`) field will always be
for the root device.

* Fix DMR string English localization

* Test config flow with UDN from SSDP headers

* Bump async-upnp-client==0.22.1, fix flake8 error

* fix test for remapping

* DMR HA Device connections based on root and embedded UDN

* DmrDevice's UpnpDevice is now named profile_device

* Use device type from SSDP headers, not device description

* Mark dlna_dmr constants as Final

* Use embedded device UDN and type for unique ID when connected via URL

* More informative connection error messages

* Also match SSDP messages on NT headers

The NT header is to ssdp:alive messages what ST is to M-SEARCH responses.

* Bump async-upnp-client==0.22.2

* fix merge

* Bump async-upnp-client==0.22.3

Co-authored-by: Steven Looman <steven.looman@gmail.com>
Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Michael Chisholm 2021-09-28 06:47:01 +10:00 committed by GitHub
parent b15f11f46a
commit a28fd7d61b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 3443 additions and 257 deletions

View File

@ -212,7 +212,6 @@ omit =
homeassistant/components/dlib_face_detect/image_processing.py
homeassistant/components/dlib_face_identify/image_processing.py
homeassistant/components/dlink/switch.py
homeassistant/components/dlna_dmr/media_player.py
homeassistant/components/dnsip/sensor.py
homeassistant/components/dominos/*
homeassistant/components/doods/*

View File

@ -31,6 +31,7 @@ homeassistant.components.crownstone.*
homeassistant.components.device_automation.*
homeassistant.components.device_tracker.*
homeassistant.components.devolo_home_control.*
homeassistant.components.dlna_dmr.*
homeassistant.components.dnsip.*
homeassistant.components.dsmr.*
homeassistant.components.dunehd.*

View File

@ -122,6 +122,7 @@ homeassistant/components/dhcp/* @bdraco
homeassistant/components/dht/* @thegardenmonkey
homeassistant/components/digital_ocean/* @fabaff
homeassistant/components/discogs/* @thibmaek
homeassistant/components/dlna_dmr/* @StevenLooman @chishm
homeassistant/components/doorbird/* @oblogic7 @bdraco
homeassistant/components/dsmr/* @Robbie1221 @frenck
homeassistant/components/dsmr_reader/* @depl0y

View File

@ -1,4 +1,6 @@
"""Starts a service to scan in intervals for new devices."""
from __future__ import annotations
from datetime import timedelta
import json
import logging
@ -56,7 +58,7 @@ SERVICE_HANDLERS = {
"lg_smart_device": ("media_player", "lg_soundbar"),
}
OPTIONAL_SERVICE_HANDLERS = {SERVICE_DLNA_DMR: ("media_player", "dlna_dmr")}
OPTIONAL_SERVICE_HANDLERS: dict[str, tuple[str, str | None]] = {}
MIGRATED_SERVICE_HANDLERS = [
SERVICE_APPLE_TV,
@ -64,6 +66,7 @@ MIGRATED_SERVICE_HANDLERS = [
"deconz",
SERVICE_DAIKIN,
"denonavr",
SERVICE_DLNA_DMR,
"esphome",
"google_cast",
SERVICE_HASS_IOS_APP,

View File

@ -1 +1,56 @@
"""The dlna_dmr component."""
from __future__ import annotations
from homeassistant import config_entries
from homeassistant.components.media_player.const import DOMAIN as MEDIA_PLAYER_DOMAIN
from homeassistant.const import CONF_PLATFORM, CONF_URL
from homeassistant.core import HomeAssistant
from homeassistant.helpers.typing import ConfigType
from .const import DOMAIN, LOGGER
PLATFORMS = [MEDIA_PLAYER_DOMAIN]
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up DLNA component."""
if MEDIA_PLAYER_DOMAIN not in config:
return True
for entry_config in config[MEDIA_PLAYER_DOMAIN]:
if entry_config.get(CONF_PLATFORM) != DOMAIN:
continue
LOGGER.warning(
"Configuring dlna_dmr via yaml is deprecated; the configuration for"
" %s has been migrated to a config entry and can be safely removed",
entry_config.get(CONF_URL),
)
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data=entry_config,
)
)
return True
async def async_setup_entry(
hass: HomeAssistant, entry: config_entries.ConfigEntry
) -> bool:
"""Set up a DLNA DMR device from a config entry."""
LOGGER.debug("Setting up config entry: %s", entry.unique_id)
# Forward setup to the appropriate platform
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
return True
async def async_unload_entry(
hass: HomeAssistant, config_entry: config_entries.ConfigEntry
) -> bool:
"""Unload a config entry."""
# Forward to the same platform as async_setup_entry did
return await hass.config_entries.async_unload_platforms(config_entry, PLATFORMS)

View File

@ -0,0 +1,340 @@
"""Config flow for DLNA DMR."""
from __future__ import annotations
from collections.abc import Callable
import logging
from pprint import pformat
from typing import Any, Mapping, Optional
from urllib.parse import urlparse
from async_upnp_client.client import UpnpError
from async_upnp_client.profiles.dlna import DmrDevice
from async_upnp_client.profiles.profile import find_device_of_type
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components import ssdp
from homeassistant.const import CONF_DEVICE_ID, CONF_NAME, CONF_TYPE, CONF_URL
from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowResult
from homeassistant.exceptions import IntegrationError
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.typing import DiscoveryInfoType
from .const import (
CONF_CALLBACK_URL_OVERRIDE,
CONF_LISTEN_PORT,
CONF_POLL_AVAILABILITY,
DOMAIN,
)
from .data import get_domain_data
LOGGER = logging.getLogger(__name__)
FlowInput = Optional[Mapping[str, Any]]
class ConnectError(IntegrationError):
"""Error occurred when trying to connect to a device."""
class DlnaDmrFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a DLNA DMR config flow.
The Unique Device Name (UDN) of the DMR device is used as the unique_id for
config entries and for entities. This UDN may differ from the root UDN if
the DMR is an embedded device.
"""
VERSION = 1
def __init__(self) -> None:
"""Initialize flow."""
self._discoveries: list[Mapping[str, str]] = []
@staticmethod
@callback
def async_get_options_flow(
config_entry: config_entries.ConfigEntry,
) -> config_entries.OptionsFlow:
"""Define the config flow to handle options."""
return DlnaDmrOptionsFlowHandler(config_entry)
async def async_step_user(self, user_input: FlowInput = None) -> FlowResult:
"""Handle a flow initialized by the user: manual URL entry.
Discovered devices will already be displayed, no need to prompt user
with them here.
"""
LOGGER.debug("async_step_user: user_input: %s", user_input)
errors = {}
if user_input is not None:
try:
discovery = await self._async_connect(user_input[CONF_URL])
except ConnectError as err:
errors["base"] = err.args[0]
else:
# If unmigrated config was imported earlier then use it
import_data = get_domain_data(self.hass).unmigrated_config.get(
user_input[CONF_URL]
)
if import_data is not None:
return await self.async_step_import(import_data)
# Device setup manually, assume we don't get SSDP broadcast notifications
options = {CONF_POLL_AVAILABILITY: True}
return await self._async_create_entry_from_discovery(discovery, options)
data_schema = vol.Schema({CONF_URL: str})
return self.async_show_form(
step_id="user", data_schema=data_schema, errors=errors
)
async def async_step_import(self, import_data: FlowInput = None) -> FlowResult:
"""Import a new DLNA DMR device from a config entry.
This flow is triggered by `async_setup`. If no device has been
configured before, find any matching device and create a config_entry
for it. Otherwise, do nothing.
"""
LOGGER.debug("async_step_import: import_data: %s", import_data)
if not import_data or CONF_URL not in import_data:
LOGGER.debug("Entry not imported: incomplete_config")
return self.async_abort(reason="incomplete_config")
self._async_abort_entries_match({CONF_URL: import_data[CONF_URL]})
location = import_data[CONF_URL]
self._discoveries = await self._async_get_discoveries()
poll_availability = True
# Find the device in the list of unconfigured devices
for discovery in self._discoveries:
if discovery[ssdp.ATTR_SSDP_LOCATION] == location:
# Device found via SSDP, it shouldn't need polling
poll_availability = False
LOGGER.debug(
"Entry %s found via SSDP, with UDN %s",
import_data[CONF_URL],
discovery[ssdp.ATTR_SSDP_UDN],
)
break
else:
# Not in discoveries. Try connecting directly.
try:
discovery = await self._async_connect(location)
except ConnectError as err:
LOGGER.debug(
"Entry %s not imported: %s", import_data[CONF_URL], err.args[0]
)
# Store the config to apply if the device is added later
get_domain_data(self.hass).unmigrated_config[location] = import_data
return self.async_abort(reason=err.args[0])
# Set options from the import_data, except listen_ip which is no longer used
options = {
CONF_LISTEN_PORT: import_data.get(CONF_LISTEN_PORT),
CONF_CALLBACK_URL_OVERRIDE: import_data.get(CONF_CALLBACK_URL_OVERRIDE),
CONF_POLL_AVAILABILITY: poll_availability,
}
# Override device name if it's set in the YAML
if CONF_NAME in import_data:
discovery = dict(discovery)
discovery[ssdp.ATTR_UPNP_FRIENDLY_NAME] = import_data[CONF_NAME]
LOGGER.debug("Entry %s ready for import", import_data[CONF_URL])
return await self._async_create_entry_from_discovery(discovery, options)
async def async_step_ssdp(self, discovery_info: DiscoveryInfoType) -> FlowResult:
"""Handle a flow initialized by SSDP discovery."""
LOGGER.debug("async_step_ssdp: discovery_info %s", pformat(discovery_info))
self._discoveries = [discovery_info]
udn = discovery_info[ssdp.ATTR_SSDP_UDN]
location = discovery_info[ssdp.ATTR_SSDP_LOCATION]
# Abort if already configured, but update the last-known location
await self.async_set_unique_id(udn)
self._abort_if_unique_id_configured(
updates={CONF_URL: location}, reload_on_update=False
)
# If the device needs migration because it wasn't turned on when HA
# started, silently migrate it now.
import_data = get_domain_data(self.hass).unmigrated_config.get(location)
if import_data is not None:
return await self.async_step_import(import_data)
parsed_url = urlparse(location)
name = discovery_info.get(ssdp.ATTR_UPNP_FRIENDLY_NAME) or parsed_url.hostname
self.context["title_placeholders"] = {"name": name}
return await self.async_step_confirm()
async def async_step_confirm(self, user_input: FlowInput = None) -> FlowResult:
"""Allow the user to confirm adding the device.
Also check that the device is still available, otherwise when it is
added to HA it won't report the correct DeviceInfo.
"""
LOGGER.debug("async_step_confirm: %s", user_input)
errors = {}
if user_input is not None:
discovery = self._discoveries[0]
try:
await self._async_connect(discovery[ssdp.ATTR_SSDP_LOCATION])
except ConnectError as err:
errors["base"] = err.args[0]
else:
return await self._async_create_entry_from_discovery(discovery)
self._set_confirm_only()
return self.async_show_form(step_id="confirm", errors=errors)
async def _async_create_entry_from_discovery(
self,
discovery: Mapping[str, Any],
options: Mapping[str, Any] | None = None,
) -> FlowResult:
"""Create an entry from discovery."""
LOGGER.debug("_async_create_entry_from_discovery: discovery: %s", discovery)
location = discovery[ssdp.ATTR_SSDP_LOCATION]
udn = discovery[ssdp.ATTR_SSDP_UDN]
# Abort if already configured, but update the last-known location
await self.async_set_unique_id(udn)
self._abort_if_unique_id_configured(updates={CONF_URL: location})
parsed_url = urlparse(location)
title = discovery.get(ssdp.ATTR_UPNP_FRIENDLY_NAME) or parsed_url.hostname
data = {
CONF_URL: discovery[ssdp.ATTR_SSDP_LOCATION],
CONF_DEVICE_ID: discovery[ssdp.ATTR_SSDP_UDN],
CONF_TYPE: discovery.get(ssdp.ATTR_SSDP_NT) or discovery[ssdp.ATTR_SSDP_ST],
}
return self.async_create_entry(title=title, data=data, options=options)
async def _async_get_discoveries(self) -> list[Mapping[str, str]]:
"""Get list of unconfigured DLNA devices discovered by SSDP."""
LOGGER.debug("_get_discoveries")
# Get all compatible devices from ssdp's cache
discoveries: list[Mapping[str, str]] = []
for udn_st in DmrDevice.DEVICE_TYPES:
st_discoveries = await ssdp.async_get_discovery_info_by_st(
self.hass, udn_st
)
discoveries.extend(st_discoveries)
# Filter out devices already configured
current_unique_ids = {
entry.unique_id for entry in self._async_current_entries()
}
discoveries = [
disc
for disc in discoveries
if disc[ssdp.ATTR_SSDP_UDN] not in current_unique_ids
]
return discoveries
async def _async_connect(self, location: str) -> dict[str, str]:
"""Connect to a device to confirm it works and get discovery information.
Raises ConnectError if something goes wrong.
"""
LOGGER.debug("_async_connect(location=%s)", location)
domain_data = get_domain_data(self.hass)
try:
device = await domain_data.upnp_factory.async_create_device(location)
except UpnpError as err:
raise ConnectError("could_not_connect") from err
try:
device = find_device_of_type(device, DmrDevice.DEVICE_TYPES)
except UpnpError as err:
raise ConnectError("not_dmr") from err
discovery = {
ssdp.ATTR_SSDP_LOCATION: location,
ssdp.ATTR_SSDP_UDN: device.udn,
ssdp.ATTR_SSDP_ST: device.device_type,
ssdp.ATTR_UPNP_FRIENDLY_NAME: device.name,
}
return discovery
class DlnaDmrOptionsFlowHandler(config_entries.OptionsFlow):
"""Handle a DLNA DMR options flow.
Configures the single instance and updates the existing config entry.
"""
def __init__(self, config_entry: config_entries.ConfigEntry) -> None:
"""Initialize."""
self.config_entry = config_entry
async def async_step_init(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Manage the options."""
errors: dict[str, str] = {}
# Don't modify existing (read-only) options -- copy and update instead
options = dict(self.config_entry.options)
if user_input is not None:
LOGGER.debug("user_input: %s", user_input)
listen_port = user_input.get(CONF_LISTEN_PORT) or None
callback_url_override = user_input.get(CONF_CALLBACK_URL_OVERRIDE) or None
try:
# Cannot use cv.url validation in the schema itself so apply
# extra validation here
if callback_url_override:
cv.url(callback_url_override)
except vol.Invalid:
errors["base"] = "invalid_url"
options[CONF_LISTEN_PORT] = listen_port
options[CONF_CALLBACK_URL_OVERRIDE] = callback_url_override
options[CONF_POLL_AVAILABILITY] = user_input[CONF_POLL_AVAILABILITY]
# Save if there's no errors, else fall through and show the form again
if not errors:
return self.async_create_entry(title="", data=options)
fields = {}
def _add_with_suggestion(key: str, validator: Callable) -> None:
"""Add a field to with a suggested, not default, value."""
suggested_value = options.get(key)
if suggested_value is None:
fields[vol.Optional(key)] = validator
else:
fields[
vol.Optional(key, description={"suggested_value": suggested_value})
] = validator
# listen_port can be blank or 0 for "bind any free port"
_add_with_suggestion(CONF_LISTEN_PORT, cv.port)
_add_with_suggestion(CONF_CALLBACK_URL_OVERRIDE, str)
fields[
vol.Required(
CONF_POLL_AVAILABILITY,
default=options.get(CONF_POLL_AVAILABILITY, False),
)
] = bool
return self.async_show_form(
step_id="init",
data_schema=vol.Schema(fields),
errors=errors,
)

View File

@ -0,0 +1,16 @@
"""Constants for the DLNA DMR component."""
import logging
from typing import Final
LOGGER = logging.getLogger(__package__)
DOMAIN: Final = "dlna_dmr"
CONF_LISTEN_PORT: Final = "listen_port"
CONF_CALLBACK_URL_OVERRIDE: Final = "callback_url_override"
CONF_POLL_AVAILABILITY: Final = "poll_availability"
DEFAULT_NAME: Final = "DLNA Digital Media Renderer"
CONNECT_TIMEOUT: Final = 10

View File

@ -0,0 +1,126 @@
"""Data used by this integration."""
from __future__ import annotations
import asyncio
from collections import defaultdict
from collections.abc import Mapping
from typing import Any, NamedTuple, cast
from async_upnp_client import UpnpEventHandler, UpnpFactory, UpnpRequester
from async_upnp_client.aiohttp import AiohttpNotifyServer, AiohttpSessionRequester
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant
from homeassistant.helpers import aiohttp_client
from .const import DOMAIN, LOGGER
class EventListenAddr(NamedTuple):
"""Unique identifier for an event listener."""
host: str | None # Specific local IP(v6) address for listening on
port: int # Listening port, 0 means use an ephemeral port
callback_url: str | None
class DlnaDmrData:
"""Storage class for domain global data."""
lock: asyncio.Lock
requester: UpnpRequester
upnp_factory: UpnpFactory
event_notifiers: dict[EventListenAddr, AiohttpNotifyServer]
event_notifier_refs: defaultdict[EventListenAddr, int]
stop_listener_remove: CALLBACK_TYPE | None = None
unmigrated_config: dict[str, Mapping[str, Any]]
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize global data."""
self.lock = asyncio.Lock()
session = aiohttp_client.async_get_clientsession(hass, verify_ssl=False)
self.requester = AiohttpSessionRequester(session, with_sleep=False)
self.upnp_factory = UpnpFactory(self.requester, non_strict=True)
self.event_notifiers = {}
self.event_notifier_refs = defaultdict(int)
self.unmigrated_config = {}
async def async_cleanup_event_notifiers(self, event: Event) -> None:
"""Clean up resources when Home Assistant is stopped."""
del event # unused
LOGGER.debug("Cleaning resources in DlnaDmrData")
async with self.lock:
tasks = (server.stop_server() for server in self.event_notifiers.values())
asyncio.gather(*tasks)
self.event_notifiers = {}
self.event_notifier_refs = defaultdict(int)
async def async_get_event_notifier(
self, listen_addr: EventListenAddr, hass: HomeAssistant
) -> UpnpEventHandler:
"""Return existing event notifier for the listen_addr, or create one.
Only one event notify server is kept for each listen_addr. Must call
async_release_event_notifier when done to cleanup resources.
"""
LOGGER.debug("Getting event handler for %s", listen_addr)
async with self.lock:
# Stop all servers when HA shuts down, to release resources on devices
if not self.stop_listener_remove:
self.stop_listener_remove = hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_STOP, self.async_cleanup_event_notifiers
)
# Always increment the reference counter, for existing or new event handlers
self.event_notifier_refs[listen_addr] += 1
# Return an existing event handler if we can
if listen_addr in self.event_notifiers:
return self.event_notifiers[listen_addr].event_handler
# Start event handler
server = AiohttpNotifyServer(
requester=self.requester,
listen_port=listen_addr.port,
listen_host=listen_addr.host,
callback_url=listen_addr.callback_url,
loop=hass.loop,
)
await server.start_server()
LOGGER.debug("Started event handler at %s", server.callback_url)
self.event_notifiers[listen_addr] = server
return server.event_handler
async def async_release_event_notifier(self, listen_addr: EventListenAddr) -> None:
"""Indicate that the event notifier for listen_addr is not used anymore.
This is called once by each caller of async_get_event_notifier, and will
stop the listening server when all users are done.
"""
async with self.lock:
assert self.event_notifier_refs[listen_addr] > 0
self.event_notifier_refs[listen_addr] -= 1
# Shutdown the server when it has no more users
if self.event_notifier_refs[listen_addr] == 0:
server = self.event_notifiers.pop(listen_addr)
await server.stop_server()
# Remove the cleanup listener when there's nothing left to cleanup
if not self.event_notifiers:
assert self.stop_listener_remove is not None
self.stop_listener_remove()
self.stop_listener_remove = None
def get_domain_data(hass: HomeAssistant) -> DlnaDmrData:
"""Obtain this integration's domain data, creating it if needed."""
if DOMAIN in hass.data:
return cast(DlnaDmrData, hass.data[DOMAIN])
data = DlnaDmrData(hass)
hass.data[DOMAIN] = data
return data

View File

@ -1,9 +1,30 @@
{
"domain": "dlna_dmr",
"name": "DLNA Digital Media Renderer",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/dlna_dmr",
"requirements": ["async-upnp-client==0.22.1"],
"dependencies": ["network"],
"codeowners": [],
"requirements": ["async-upnp-client==0.22.3"],
"dependencies": ["network", "ssdp"],
"ssdp": [
{
"st": "urn:schemas-upnp-org:device:MediaRenderer:1"
},
{
"st": "urn:schemas-upnp-org:device:MediaRenderer:2"
},
{
"st": "urn:schemas-upnp-org:device:MediaRenderer:3"
},
{
"nt": "urn:schemas-upnp-org:device:MediaRenderer:1"
},
{
"nt": "urn:schemas-upnp-org:device:MediaRenderer:2"
},
{
"nt": "urn:schemas-upnp-org:device:MediaRenderer:3"
}
],
"codeowners": ["@StevenLooman", "@chishm"],
"iot_class": "local_push"
}

View File

@ -2,16 +2,19 @@
from __future__ import annotations
import asyncio
from datetime import timedelta
from collections.abc import Mapping, Sequence
from datetime import datetime, timedelta
import functools
import logging
from typing import Any, Callable, TypeVar, cast
import aiohttp
from async_upnp_client import UpnpFactory
from async_upnp_client.aiohttp import AiohttpNotifyServer, AiohttpSessionRequester
from async_upnp_client.profiles.dlna import DeviceState, DmrDevice
from async_upnp_client import UpnpError, UpnpService, UpnpStateVariable
from async_upnp_client.const import NotificationSubType
from async_upnp_client.profiles.dlna import DmrDevice, TransportState
from async_upnp_client.utils import async_get_local_ip
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components import ssdp
from homeassistant.components.media_player import PLATFORM_SCHEMA, MediaPlayerEntity
from homeassistant.components.media_player.const import (
SUPPORT_NEXT_TRACK,
@ -24,298 +27,499 @@ from homeassistant.components.media_player.const import (
SUPPORT_VOLUME_MUTE,
SUPPORT_VOLUME_SET,
)
from homeassistant.components.network import async_get_source_ip
from homeassistant.components.network.const import PUBLIC_TARGET_IP
from homeassistant.const import (
CONF_DEVICE_ID,
CONF_NAME,
CONF_TYPE,
CONF_URL,
EVENT_HOMEASSISTANT_STOP,
STATE_IDLE,
STATE_OFF,
STATE_ON,
STATE_PAUSED,
STATE_PLAYING,
STATE_UNKNOWN,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import PlatformNotReady
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers import device_registry, entity_registry
import homeassistant.helpers.config_validation as cv
import homeassistant.util.dt as dt_util
from homeassistant.helpers.entity_platform import AddEntitiesCallback
_LOGGER = logging.getLogger(__name__)
from .const import (
CONF_CALLBACK_URL_OVERRIDE,
CONF_LISTEN_PORT,
CONF_POLL_AVAILABILITY,
DEFAULT_NAME,
DOMAIN,
LOGGER as _LOGGER,
)
from .data import EventListenAddr, get_domain_data
DLNA_DMR_DATA = "dlna_dmr"
DEFAULT_NAME = "DLNA Digital Media Renderer"
DEFAULT_LISTEN_PORT = 8301
PARALLEL_UPDATES = 0
# Configuration via YAML is deprecated in favour of config flow
CONF_LISTEN_IP = "listen_ip"
CONF_LISTEN_PORT = "listen_port"
CONF_CALLBACK_URL_OVERRIDE = "callback_url_override"
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_URL): cv.string,
vol.Optional(CONF_LISTEN_IP): cv.string,
vol.Optional(CONF_LISTEN_PORT, default=DEFAULT_LISTEN_PORT): cv.port,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_CALLBACK_URL_OVERRIDE): cv.url,
}
PLATFORM_SCHEMA = vol.All(
cv.deprecated(CONF_URL),
cv.deprecated(CONF_LISTEN_IP),
cv.deprecated(CONF_LISTEN_PORT),
cv.deprecated(CONF_NAME),
cv.deprecated(CONF_CALLBACK_URL_OVERRIDE),
PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_URL): cv.string,
vol.Optional(CONF_LISTEN_IP): cv.string,
vol.Optional(CONF_LISTEN_PORT): cv.port,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_CALLBACK_URL_OVERRIDE): cv.url,
}
),
)
def catch_request_errors():
"""Catch asyncio.TimeoutError, aiohttp.ClientError errors."""
def call_wrapper(func):
"""Call wrapper for decorator."""
@functools.wraps(func)
async def wrapper(self, *args, **kwargs):
"""Catch asyncio.TimeoutError, aiohttp.ClientError errors."""
try:
return await func(self, *args, **kwargs)
except (asyncio.TimeoutError, aiohttp.ClientError):
_LOGGER.error("Error during call %s", func.__name__)
return wrapper
return call_wrapper
Func = TypeVar("Func", bound=Callable[..., Any])
async def async_start_event_handler(
def catch_request_errors(func: Func) -> Func:
"""Catch UpnpError errors."""
@functools.wraps(func)
async def wrapper(self: "DlnaDmrEntity", *args: Any, **kwargs: Any) -> Any:
"""Catch UpnpError errors and check availability before and after request."""
if not self.available:
_LOGGER.warning(
"Device disappeared when trying to call service %s", func.__name__
)
return
try:
return await func(self, *args, **kwargs)
except UpnpError as err:
self.check_available = True
_LOGGER.error("Error during call %s: %r", func.__name__, err)
return cast(Func, wrapper)
async def async_setup_entry(
hass: HomeAssistant,
server_host: str,
server_port: int,
requester,
callback_url_override: str | None = None,
):
"""Register notify view."""
hass_data = hass.data[DLNA_DMR_DATA]
if "event_handler" in hass_data:
return hass_data["event_handler"]
entry: config_entries.ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the DlnaDmrEntity from a config entry."""
del hass # Unused
_LOGGER.debug("media_player.async_setup_entry %s (%s)", entry.entry_id, entry.title)
# start event handler
server = AiohttpNotifyServer(
requester,
listen_port=server_port,
listen_host=server_host,
callback_url=callback_url_override,
# Create our own device-wrapping entity
entity = DlnaDmrEntity(
udn=entry.data[CONF_DEVICE_ID],
device_type=entry.data[CONF_TYPE],
name=entry.title,
event_port=entry.options.get(CONF_LISTEN_PORT) or 0,
event_callback_url=entry.options.get(CONF_CALLBACK_URL_OVERRIDE),
poll_availability=entry.options.get(CONF_POLL_AVAILABILITY, False),
location=entry.data[CONF_URL],
)
await server.start_server()
_LOGGER.info("UPNP/DLNA event handler listening, url: %s", server.callback_url)
hass_data["notify_server"] = server
hass_data["event_handler"] = server.event_handler
# register for graceful shutdown
async def async_stop_server(event):
"""Stop server."""
_LOGGER.debug("Stopping UPNP/DLNA event handler")
await server.stop_server()
entry.async_on_unload(
entry.add_update_listener(entity.async_config_update_listener)
)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, async_stop_server)
return hass_data["event_handler"]
async_add_entities([entity])
async def async_setup_platform(
hass: HomeAssistant, config, async_add_entities, discovery_info=None
):
"""Set up DLNA DMR platform."""
if config.get(CONF_URL) is not None:
url = config[CONF_URL]
name = config.get(CONF_NAME)
elif discovery_info is not None:
url = discovery_info["ssdp_description"]
name = discovery_info.get("name")
class DlnaDmrEntity(MediaPlayerEntity):
"""Representation of a DLNA DMR device as a HA entity."""
if DLNA_DMR_DATA not in hass.data:
hass.data[DLNA_DMR_DATA] = {}
udn: str
device_type: str
if "lock" not in hass.data[DLNA_DMR_DATA]:
hass.data[DLNA_DMR_DATA]["lock"] = asyncio.Lock()
_event_addr: EventListenAddr
poll_availability: bool
# Last known URL for the device, used when adding this entity to hass to try
# to connect before SSDP has rediscovered it, or when SSDP discovery fails.
location: str
# build upnp/aiohttp requester
session = async_get_clientsession(hass)
requester = AiohttpSessionRequester(session, True)
_device_lock: asyncio.Lock # Held when connecting or disconnecting the device
_device: DmrDevice | None = None
_remove_ssdp_callbacks: list[Callable]
check_available: bool = False
# ensure event handler has been started
async with hass.data[DLNA_DMR_DATA]["lock"]:
server_host = config.get(CONF_LISTEN_IP)
if server_host is None:
server_host = await async_get_source_ip(hass, PUBLIC_TARGET_IP)
server_port = config.get(CONF_LISTEN_PORT, DEFAULT_LISTEN_PORT)
callback_url_override = config.get(CONF_CALLBACK_URL_OVERRIDE)
event_handler = await async_start_event_handler(
hass, server_host, server_port, requester, callback_url_override
# Track BOOTID in SSDP advertisements for device changes
_bootid: int | None = None
# DMR devices need polling for track position information. async_update will
# determine whether further device polling is required.
_attr_should_poll = True
def __init__(
self,
udn: str,
device_type: str,
name: str,
event_port: int,
event_callback_url: str | None,
poll_availability: bool,
location: str,
) -> None:
"""Initialize DLNA DMR entity."""
self.udn = udn
self.device_type = device_type
self._attr_name = name
self._event_addr = EventListenAddr(None, event_port, event_callback_url)
self.poll_availability = poll_availability
self.location = location
self._device_lock = asyncio.Lock()
self._remove_ssdp_callbacks = []
async def async_added_to_hass(self) -> None:
"""Handle addition."""
# Try to connect to the last known location, but don't worry if not available
if not self._device:
try:
await self._device_connect(self.location)
except UpnpError as err:
_LOGGER.debug("Couldn't connect immediately: %r", err)
# Get SSDP notifications for only this device
self._remove_ssdp_callbacks.append(
await ssdp.async_register_callback(
self.hass, self.async_ssdp_callback, {"USN": self.usn}
)
)
# create upnp device
factory = UpnpFactory(requester, non_strict=True)
try:
upnp_device = await factory.async_create_device(url)
except (asyncio.TimeoutError, aiohttp.ClientError) as err:
raise PlatformNotReady() from err
# async_upnp_client.SsdpListener only reports byebye once for each *UDN*
# (device name) which often is not the USN (service within the device)
# that we're interested in. So also listen for byebye advertisements for
# the UDN, which is reported in the _udn field of the combined_headers.
self._remove_ssdp_callbacks.append(
await ssdp.async_register_callback(
self.hass,
self.async_ssdp_callback,
{"_udn": self.udn, "NTS": NotificationSubType.SSDP_BYEBYE},
)
)
# wrap with DmrDevice
dlna_device = DmrDevice(upnp_device, event_handler)
async def async_will_remove_from_hass(self) -> None:
"""Handle removal."""
for callback in self._remove_ssdp_callbacks:
callback()
self._remove_ssdp_callbacks.clear()
await self._device_disconnect()
# create our own device
device = DlnaDmrDevice(dlna_device, name)
_LOGGER.debug("Adding device: %s", device)
async_add_entities([device], True)
class DlnaDmrDevice(MediaPlayerEntity):
"""Representation of a DLNA DMR device."""
def __init__(self, dmr_device, name=None):
"""Initialize DLNA DMR device."""
self._device = dmr_device
self._name = name
self._available = False
self._subscription_renew_time = None
async def async_added_to_hass(self):
"""Handle addition."""
self._device.on_event = self._on_event
# Register unsubscribe on stop
bus = self.hass.bus
bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self._async_on_hass_stop)
@property
def available(self):
"""Device is available."""
return self._available
async def _async_on_hass_stop(self, event):
"""Event handler on Home Assistant stop."""
async with self.hass.data[DLNA_DMR_DATA]["lock"]:
await self._device.async_unsubscribe_services()
async def async_update(self):
"""Retrieve the latest data."""
was_available = self._available
async def async_ssdp_callback(
self, info: Mapping[str, Any], change: ssdp.SsdpChange
) -> None:
"""Handle notification from SSDP of device state change."""
_LOGGER.debug(
"SSDP %s notification of device %s at %s",
change,
info[ssdp.ATTR_SSDP_USN],
info.get(ssdp.ATTR_SSDP_LOCATION),
)
try:
await self._device.async_update()
self._available = True
except (asyncio.TimeoutError, aiohttp.ClientError):
self._available = False
_LOGGER.debug("Device unavailable")
bootid_str = info[ssdp.ATTR_SSDP_BOOTID]
bootid: int | None = int(bootid_str, 10)
except (KeyError, ValueError):
bootid = None
if change == ssdp.SsdpChange.UPDATE:
# This is an announcement that bootid is about to change
if self._bootid is not None and self._bootid == bootid:
# Store the new value (because our old value matches) so that we
# can ignore subsequent ssdp:alive messages
try:
next_bootid_str = info[ssdp.ATTR_SSDP_NEXTBOOTID]
self._bootid = int(next_bootid_str, 10)
except (KeyError, ValueError):
pass
# Nothing left to do until ssdp:alive comes through
return
# do we need to (re-)subscribe?
now = dt_util.utcnow()
should_renew = (
self._subscription_renew_time and now >= self._subscription_renew_time
)
if should_renew or not was_available and self._available:
try:
timeout = await self._device.async_subscribe_services()
self._subscription_renew_time = dt_util.utcnow() + timeout / 2
except (asyncio.TimeoutError, aiohttp.ClientError):
self._available = False
_LOGGER.debug("Could not (re)subscribe")
if self._bootid is not None and self._bootid != bootid and self._device:
# Device has rebooted, drop existing connection and maybe reconnect
await self._device_disconnect()
self._bootid = bootid
def _on_event(self, service, state_variables):
if change == ssdp.SsdpChange.BYEBYE and self._device:
# Device is going away, disconnect
await self._device_disconnect()
if change == ssdp.SsdpChange.ALIVE and not self._device:
location = info[ssdp.ATTR_SSDP_LOCATION]
try:
await self._device_connect(location)
except UpnpError as err:
_LOGGER.warning(
"Failed connecting to recently alive device at %s: %r",
location,
err,
)
# Device could have been de/re-connected, state probably changed
self.schedule_update_ha_state()
async def async_config_update_listener(
self, hass: HomeAssistant, entry: config_entries.ConfigEntry
) -> None:
"""Handle options update by modifying self in-place."""
del hass # Unused
_LOGGER.debug(
"Updating: %s with data=%s and options=%s",
self.name,
entry.data,
entry.options,
)
self.location = entry.data[CONF_URL]
self.poll_availability = entry.options.get(CONF_POLL_AVAILABILITY, False)
new_port = entry.options.get(CONF_LISTEN_PORT) or 0
new_callback_url = entry.options.get(CONF_CALLBACK_URL_OVERRIDE)
if (
new_port == self._event_addr.port
and new_callback_url == self._event_addr.callback_url
):
return
# Changes to eventing requires a device reconnect for it to update correctly
await self._device_disconnect()
# Update _event_addr after disconnecting, to stop the right event listener
self._event_addr = self._event_addr._replace(
port=new_port, callback_url=new_callback_url
)
try:
await self._device_connect(self.location)
except UpnpError as err:
_LOGGER.warning("Couldn't (re)connect after config change: %r", err)
# Device was de/re-connected, state might have changed
self.schedule_update_ha_state()
async def _device_connect(self, location: str) -> None:
"""Connect to the device now that it's available."""
_LOGGER.debug("Connecting to device at %s", location)
async with self._device_lock:
if self._device:
_LOGGER.debug("Trying to connect when device already connected")
return
domain_data = get_domain_data(self.hass)
# Connect to the base UPNP device
upnp_device = await domain_data.upnp_factory.async_create_device(location)
# Create/get event handler that is reachable by the device, using
# the connection's local IP to listen only on the relevant interface
_, event_ip = await async_get_local_ip(location, self.hass.loop)
self._event_addr = self._event_addr._replace(host=event_ip)
event_handler = await domain_data.async_get_event_notifier(
self._event_addr, self.hass
)
# Create profile wrapper
self._device = DmrDevice(upnp_device, event_handler)
self.location = location
# Subscribe to event notifications
try:
self._device.on_event = self._on_event
await self._device.async_subscribe_services(auto_resubscribe=True)
except UpnpError as err:
# Don't leave the device half-constructed
self._device.on_event = None
self._device = None
await domain_data.async_release_event_notifier(self._event_addr)
_LOGGER.debug("Error while subscribing during device connect: %r", err)
raise
if (
not self.registry_entry
or not self.registry_entry.config_entry_id
or self.registry_entry.device_id
):
return
# Create linked HA DeviceEntry now the information is known.
dev_reg = device_registry.async_get(self.hass)
device_entry = dev_reg.async_get_or_create(
config_entry_id=self.registry_entry.config_entry_id,
# Connections are based on the root device's UDN, and the DMR
# embedded device's UDN. They may be the same, if the DMR is the
# root device.
connections={
(
device_registry.CONNECTION_UPNP,
self._device.profile_device.root_device.udn,
),
(device_registry.CONNECTION_UPNP, self._device.udn),
},
identifiers={(DOMAIN, self.unique_id)},
default_manufacturer=self._device.manufacturer,
default_model=self._device.model_name,
default_name=self._device.name,
)
# Update entity registry to link to the device
ent_reg = entity_registry.async_get(self.hass)
ent_reg.async_get_or_create(
self.registry_entry.domain,
self.registry_entry.platform,
self.unique_id,
device_id=device_entry.id,
)
async def _device_disconnect(self) -> None:
"""Destroy connections to the device now that it's not available.
Also call when removing this entity from hass to clean up connections.
"""
async with self._device_lock:
if not self._device:
_LOGGER.debug("Disconnecting from device that's not connected")
return
_LOGGER.debug("Disconnecting from %s", self._device.name)
self._device.on_event = None
old_device = self._device
self._device = None
await old_device.async_unsubscribe_services()
domain_data = get_domain_data(self.hass)
await domain_data.async_release_event_notifier(self._event_addr)
@property
def available(self) -> bool:
"""Device is available when we have a connection to it."""
return self._device is not None and self._device.profile_device.available
async def async_update(self) -> None:
"""Retrieve the latest data."""
if not self._device:
if not self.poll_availability:
return
try:
await self._device_connect(self.location)
except UpnpError:
return
assert self._device is not None
try:
do_ping = self.poll_availability or self.check_available
await self._device.async_update(do_ping=do_ping)
except UpnpError:
_LOGGER.debug("Device unavailable")
await self._device_disconnect()
return
finally:
self.check_available = False
def _on_event(
self, service: UpnpService, state_variables: Sequence[UpnpStateVariable]
) -> None:
"""State variable(s) changed, let home-assistant know."""
del service # Unused
if not state_variables:
# Indicates a failure to resubscribe, check if device is still available
self.check_available = True
self.schedule_update_ha_state()
@property
def supported_features(self):
"""Flag media player features that are supported."""
def supported_features(self) -> int:
"""Flag media player features that are supported at this moment.
Supported features may change as the device enters different states.
"""
if not self._device:
return 0
supported_features = 0
if self._device.has_volume_level:
supported_features |= SUPPORT_VOLUME_SET
if self._device.has_volume_mute:
supported_features |= SUPPORT_VOLUME_MUTE
if self._device.has_play:
if self._device.can_play:
supported_features |= SUPPORT_PLAY
if self._device.has_pause:
if self._device.can_pause:
supported_features |= SUPPORT_PAUSE
if self._device.has_stop:
if self._device.can_stop:
supported_features |= SUPPORT_STOP
if self._device.has_previous:
if self._device.can_previous:
supported_features |= SUPPORT_PREVIOUS_TRACK
if self._device.has_next:
if self._device.can_next:
supported_features |= SUPPORT_NEXT_TRACK
if self._device.has_play_media:
supported_features |= SUPPORT_PLAY_MEDIA
if self._device.has_seek_rel_time:
if self._device.can_seek_rel_time:
supported_features |= SUPPORT_SEEK
return supported_features
@property
def volume_level(self):
def volume_level(self) -> float | None:
"""Volume level of the media player (0..1)."""
if self._device.has_volume_level:
return self._device.volume_level
return 0
if not self._device or not self._device.has_volume_level:
return None
return self._device.volume_level
@catch_request_errors()
async def async_set_volume_level(self, volume):
@catch_request_errors
async def async_set_volume_level(self, volume: float) -> None:
"""Set volume level, range 0..1."""
assert self._device is not None
await self._device.async_set_volume_level(volume)
@property
def is_volume_muted(self):
def is_volume_muted(self) -> bool | None:
"""Boolean if volume is currently muted."""
if not self._device:
return None
return self._device.is_volume_muted
@catch_request_errors()
async def async_mute_volume(self, mute):
@catch_request_errors
async def async_mute_volume(self, mute: bool) -> None:
"""Mute the volume."""
assert self._device is not None
desired_mute = bool(mute)
await self._device.async_mute_volume(desired_mute)
@catch_request_errors()
async def async_media_pause(self):
@catch_request_errors
async def async_media_pause(self) -> None:
"""Send pause command."""
if not self._device.can_pause:
_LOGGER.debug("Cannot do Pause")
return
assert self._device is not None
await self._device.async_pause()
@catch_request_errors()
async def async_media_play(self):
@catch_request_errors
async def async_media_play(self) -> None:
"""Send play command."""
if not self._device.can_play:
_LOGGER.debug("Cannot do Play")
return
assert self._device is not None
await self._device.async_play()
@catch_request_errors()
async def async_media_stop(self):
@catch_request_errors
async def async_media_stop(self) -> None:
"""Send stop command."""
if not self._device.can_stop:
_LOGGER.debug("Cannot do Stop")
return
assert self._device is not None
await self._device.async_stop()
@catch_request_errors()
async def async_media_seek(self, position):
@catch_request_errors
async def async_media_seek(self, position: int | float) -> None:
"""Send seek command."""
if not self._device.can_seek_rel_time:
_LOGGER.debug("Cannot do Seek/rel_time")
return
assert self._device is not None
time = timedelta(seconds=position)
await self._device.async_seek_rel_time(time)
@catch_request_errors()
async def async_play_media(self, media_type, media_id, **kwargs):
@catch_request_errors
async def async_play_media(
self, media_type: str, media_id: str, **kwargs: Any
) -> None:
"""Play a piece of media."""
_LOGGER.debug("Playing media: %s, %s, %s", media_type, media_id, kwargs)
title = "Home Assistant"
assert self._device is not None
# Stop current playing media
if self._device.can_stop:
await self.async_media_stop()
@ -325,81 +529,90 @@ class DlnaDmrDevice(MediaPlayerEntity):
await self._device.async_wait_for_can_play()
# If already playing, no need to call Play
if self._device.state == DeviceState.PLAYING:
if self._device.transport_state == TransportState.PLAYING:
return
# Play it
await self.async_media_play()
@catch_request_errors()
async def async_media_previous_track(self):
@catch_request_errors
async def async_media_previous_track(self) -> None:
"""Send previous track command."""
if not self._device.can_previous:
_LOGGER.debug("Cannot do Previous")
return
assert self._device is not None
await self._device.async_previous()
@catch_request_errors()
async def async_media_next_track(self):
@catch_request_errors
async def async_media_next_track(self) -> None:
"""Send next track command."""
if not self._device.can_next:
_LOGGER.debug("Cannot do Next")
return
assert self._device is not None
await self._device.async_next()
@property
def media_title(self):
def media_title(self) -> str | None:
"""Title of current playing media."""
if not self._device:
return None
return self._device.media_title
@property
def media_image_url(self):
def media_image_url(self) -> str | None:
"""Image url of current playing media."""
if not self._device:
return None
return self._device.media_image_url
@property
def state(self):
def state(self) -> str:
"""State of the player."""
if not self._available:
if not self._device or not self.available:
return STATE_OFF
if self._device.state is None:
if self._device.transport_state is None:
return STATE_ON
if self._device.state == DeviceState.PLAYING:
if self._device.transport_state in (
TransportState.PLAYING,
TransportState.TRANSITIONING,
):
return STATE_PLAYING
if self._device.state == DeviceState.PAUSED:
if self._device.transport_state in (
TransportState.PAUSED_PLAYBACK,
TransportState.PAUSED_RECORDING,
):
return STATE_PAUSED
if self._device.transport_state == TransportState.VENDOR_DEFINED:
return STATE_UNKNOWN
return STATE_IDLE
@property
def media_duration(self):
def media_duration(self) -> int | None:
"""Duration of current playing media in seconds."""
if not self._device:
return None
return self._device.media_duration
@property
def media_position(self):
def media_position(self) -> int | None:
"""Position of current playing media in seconds."""
if not self._device:
return None
return self._device.media_position
@property
def media_position_updated_at(self):
def media_position_updated_at(self) -> datetime | None:
"""When was the position of the current playing media valid.
Returns value from homeassistant.util.dt.utcnow().
"""
if not self._device:
return None
return self._device.media_position_updated_at
@property
def name(self) -> str:
"""Return the name of the device."""
if self._name:
return self._name
return self._device.name
def unique_id(self) -> str:
"""Report the UDN (Unique Device Name) as this entity's unique ID."""
return self.udn
@property
def unique_id(self) -> str:
"""Return an unique ID."""
return self._device.udn
def usn(self) -> str:
"""Get the USN based on the UDN (Unique Device Name) and device type."""
return f"{self.udn}::{self.device_type}"

View File

@ -0,0 +1,44 @@
{
"config": {
"flow_title": "{name}",
"step": {
"user": {
"title": "DLNA Digital Media Renderer",
"description": "URL to a device description XML file",
"data": {
"url": "[%key:common::config_flow::data::url%]"
}
},
"confirm": {
"description": "[%key:common::config_flow::description::confirm_setup%]"
}
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"could_not_connect": "Failed to connect to DLNA device",
"discovery_error": "Failed to discover a matching DLNA device",
"incomplete_config": "Configuration is missing a required variable",
"non_unique_id": "Multiple devices found with the same unique ID",
"not_dmr": "Device is not a Digital Media Renderer"
},
"error": {
"could_not_connect": "Failed to connect to DLNA device",
"not_dmr": "Device is not a Digital Media Renderer"
}
},
"options": {
"step": {
"init": {
"title": "DLNA Digital Media Renderer configuration",
"data": {
"listen_port": "Event listener port (random if not set)",
"callback_url_override": "Event listener callback URL",
"poll_availability": "Poll for device availability"
}
}
},
"error": {
"invalid_url": "Invalid URL"
}
}
}

View File

@ -0,0 +1,44 @@
{
"config": {
"abort": {
"already_configured": "Device is already configured",
"could_not_connect": "Failed to connect to DLNA device",
"discovery_error": "Failed to discover a matching DLNA device",
"incomplete_config": "Configuration is missing a required variable",
"non_unique_id": "Multiple devices found with the same unique ID",
"not_dmr": "Device is not a Digital Media Renderer"
},
"error": {
"could_not_connect": "Failed to connect to DLNA device",
"not_dmr": "Device is not a Digital Media Renderer"
},
"flow_title": "{name}",
"step": {
"confirm": {
"description": "Do you want to start set up?"
},
"user": {
"data": {
"url": "URL"
},
"description": "URL to a device description XML file",
"title": "DLNA Digital Media Renderer"
}
}
},
"options": {
"error": {
"invalid_url": "Invalid URL"
},
"step": {
"init": {
"data": {
"callback_url_override": "Event listener callback URL",
"listen_port": "Event listener port (random if not set)",
"poll_availability": "Poll for device availability"
},
"title": "DLNA Digital Media Renderer configuration"
}
}
}
}

View File

@ -39,9 +39,13 @@ IPV4_BROADCAST = IPv4Address("255.255.255.255")
# Attributes for accessing info from SSDP response
ATTR_SSDP_LOCATION = "ssdp_location"
ATTR_SSDP_ST = "ssdp_st"
ATTR_SSDP_NT = "ssdp_nt"
ATTR_SSDP_UDN = "ssdp_udn"
ATTR_SSDP_USN = "ssdp_usn"
ATTR_SSDP_EXT = "ssdp_ext"
ATTR_SSDP_SERVER = "ssdp_server"
ATTR_SSDP_BOOTID = "BOOTID.UPNP.ORG"
ATTR_SSDP_NEXTBOOTID = "NEXTBOOTID.UPNP.ORG"
# Attributes for accessing info from retrieved UPnP device description
ATTR_UPNP_DEVICE_TYPE = "deviceType"
ATTR_UPNP_FRIENDLY_NAME = "friendlyName"
@ -56,7 +60,7 @@ ATTR_UPNP_UDN = "UDN"
ATTR_UPNP_UPC = "UPC"
ATTR_UPNP_PRESENTATION_URL = "presentationURL"
PRIMARY_MATCH_KEYS = [ATTR_UPNP_MANUFACTURER, "st", ATTR_UPNP_DEVICE_TYPE]
PRIMARY_MATCH_KEYS = [ATTR_UPNP_MANUFACTURER, "st", ATTR_UPNP_DEVICE_TYPE, "nt"]
DISCOVERY_MAPPING = {
"usn": ATTR_SSDP_USN,
@ -64,6 +68,8 @@ DISCOVERY_MAPPING = {
"server": ATTR_SSDP_SERVER,
"st": ATTR_SSDP_ST,
"location": ATTR_SSDP_LOCATION,
"_udn": ATTR_SSDP_UDN,
"nt": ATTR_SSDP_NT,
}
SsdpChange = Enum("SsdpChange", "ALIVE BYEBYE UPDATE")

View File

@ -2,7 +2,7 @@
"domain": "ssdp",
"name": "Simple Service Discovery Protocol (SSDP)",
"documentation": "https://www.home-assistant.io/integrations/ssdp",
"requirements": ["async-upnp-client==0.22.1"],
"requirements": ["async-upnp-client==0.22.3"],
"dependencies": ["network"],
"after_dependencies": ["zeroconf"],
"codeowners": [],

View File

@ -3,7 +3,7 @@
"name": "UPnP/IGD",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/upnp",
"requirements": ["async-upnp-client==0.22.1"],
"requirements": ["async-upnp-client==0.22.3"],
"dependencies": ["network", "ssdp"],
"codeowners": ["@StevenLooman","@ehendrix23"],
"ssdp": [

View File

@ -2,7 +2,7 @@
"domain": "yeelight",
"name": "Yeelight",
"documentation": "https://www.home-assistant.io/integrations/yeelight",
"requirements": ["yeelight==0.7.5", "async-upnp-client==0.22.1"],
"requirements": ["yeelight==0.7.5", "async-upnp-client==0.22.3"],
"codeowners": ["@rytilahti", "@zewelor", "@shenxn", "@starkillerOG"],
"config_flow": true,
"dependencies": ["network"],

View File

@ -61,6 +61,7 @@ FLOWS = [
"dexcom",
"dialogflow",
"directv",
"dlna_dmr",
"doorbird",
"dsmr",
"dunehd",

View File

@ -83,6 +83,26 @@ SSDP = {
"manufacturer": "DIRECTV"
}
],
"dlna_dmr": [
{
"st": "urn:schemas-upnp-org:device:MediaRenderer:1"
},
{
"st": "urn:schemas-upnp-org:device:MediaRenderer:2"
},
{
"st": "urn:schemas-upnp-org:device:MediaRenderer:3"
},
{
"nt": "urn:schemas-upnp-org:device:MediaRenderer:1"
},
{
"nt": "urn:schemas-upnp-org:device:MediaRenderer:2"
},
{
"nt": "urn:schemas-upnp-org:device:MediaRenderer:3"
}
],
"fritz": [
{
"st": "urn:schemas-upnp-org:device:fritzbox:1"

View File

@ -4,7 +4,7 @@ aiodiscover==1.4.2
aiohttp==3.7.4.post0
aiohttp_cors==0.7.0
astral==2.2
async-upnp-client==0.22.1
async-upnp-client==0.22.3
async_timeout==3.0.1
attrs==21.2.0
awesomeversion==21.8.1

View File

@ -352,6 +352,17 @@ no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.dlna_dmr.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.dnsip.*]
check_untyped_defs = true
disallow_incomplete_defs = true

View File

@ -327,7 +327,7 @@ asterisk_mbox==0.5.0
# homeassistant.components.ssdp
# homeassistant.components.upnp
# homeassistant.components.yeelight
async-upnp-client==0.22.1
async-upnp-client==0.22.3
# homeassistant.components.supla
asyncpysupla==0.0.5

View File

@ -221,7 +221,7 @@ arcam-fmj==0.7.0
# homeassistant.components.ssdp
# homeassistant.components.upnp
# homeassistant.components.yeelight
async-upnp-client==0.22.1
async-upnp-client==0.22.3
# homeassistant.components.aurora
auroranoaa==0.0.2

View File

@ -0,0 +1 @@
"""Tests for the DLNA component."""

View File

@ -0,0 +1,141 @@
"""Fixtures for DLNA tests."""
from __future__ import annotations
from collections.abc import Iterable
from socket import AddressFamily # pylint: disable=no-name-in-module
from unittest.mock import Mock, create_autospec, patch, seal
from async_upnp_client import UpnpDevice, UpnpFactory
import pytest
from homeassistant.components.dlna_dmr.const import DOMAIN as DLNA_DOMAIN
from homeassistant.components.dlna_dmr.data import DlnaDmrData
from homeassistant.const import CONF_DEVICE_ID, CONF_TYPE, CONF_URL
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry
MOCK_DEVICE_BASE_URL = "http://192.88.99.4"
MOCK_DEVICE_LOCATION = MOCK_DEVICE_BASE_URL + "/dmr_description.xml"
MOCK_DEVICE_NAME = "Test Renderer Device"
MOCK_DEVICE_TYPE = "urn:schemas-upnp-org:device:MediaRenderer:1"
MOCK_DEVICE_UDN = "uuid:7cc6da13-7f5d-4ace-9729-58b275c52f1e"
MOCK_DEVICE_USN = f"{MOCK_DEVICE_UDN}::{MOCK_DEVICE_TYPE}"
LOCAL_IP = "192.88.99.1"
EVENT_CALLBACK_URL = "http://192.88.99.1/notify"
NEW_DEVICE_LOCATION = "http://192.88.99.7" + "/dmr_description.xml"
@pytest.fixture
def domain_data_mock(hass: HomeAssistant) -> Iterable[Mock]:
"""Mock the global data used by this component.
This includes network clients and library object factories. Mocking it
prevents network use.
"""
domain_data = create_autospec(DlnaDmrData, instance=True)
domain_data.upnp_factory = create_autospec(
UpnpFactory, spec_set=True, instance=True
)
upnp_device = create_autospec(UpnpDevice, instance=True)
upnp_device.name = MOCK_DEVICE_NAME
upnp_device.udn = MOCK_DEVICE_UDN
upnp_device.device_url = MOCK_DEVICE_LOCATION
upnp_device.device_type = "urn:schemas-upnp-org:device:MediaRenderer:1"
upnp_device.available = True
upnp_device.parent_device = None
upnp_device.root_device = upnp_device
upnp_device.all_devices = [upnp_device]
seal(upnp_device)
domain_data.upnp_factory.async_create_device.return_value = upnp_device
domain_data.unmigrated_config = {}
with patch.dict(hass.data, {DLNA_DOMAIN: domain_data}):
yield domain_data
# Make sure the event notifiers are released
assert (
domain_data.async_get_event_notifier.await_count
== domain_data.async_release_event_notifier.await_count
)
@pytest.fixture
def config_entry_mock() -> Iterable[MockConfigEntry]:
"""Mock a config entry for this platform."""
mock_entry = MockConfigEntry(
unique_id=MOCK_DEVICE_UDN,
domain=DLNA_DOMAIN,
data={
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_DEVICE_ID: MOCK_DEVICE_UDN,
CONF_TYPE: MOCK_DEVICE_TYPE,
},
title=MOCK_DEVICE_NAME,
options={},
)
yield mock_entry
@pytest.fixture
def dmr_device_mock(domain_data_mock: Mock) -> Iterable[Mock]:
"""Mock the async_upnp_client DMR device, initially connected."""
with patch(
"homeassistant.components.dlna_dmr.media_player.DmrDevice", autospec=True
) as constructor:
device = constructor.return_value
device.on_event = None
device.profile_device = (
domain_data_mock.upnp_factory.async_create_device.return_value
)
device.media_image_url = "http://192.88.99.20:8200/AlbumArt/2624-17620.jpg"
device.udn = "device_udn"
device.manufacturer = "device_manufacturer"
device.model_name = "device_model_name"
device.name = "device_name"
yield device
# Make sure the device is disconnected
assert (
device.async_subscribe_services.await_count
== device.async_unsubscribe_services.await_count
)
assert device.on_event is None
@pytest.fixture(name="skip_notifications", autouse=True)
def skip_notifications_fixture() -> Iterable[None]:
"""Skip notification calls."""
with patch("homeassistant.components.persistent_notification.async_create"), patch(
"homeassistant.components.persistent_notification.async_dismiss"
):
yield
@pytest.fixture(autouse=True)
def ssdp_scanner_mock() -> Iterable[Mock]:
"""Mock the SSDP module."""
with patch("homeassistant.components.ssdp.Scanner", autospec=True) as mock_scanner:
reg_callback = mock_scanner.return_value.async_register_callback
reg_callback.return_value = Mock(return_value=None)
yield mock_scanner.return_value
assert (
reg_callback.call_count == reg_callback.return_value.call_count
), "Not all callbacks unregistered"
@pytest.fixture(autouse=True)
def async_get_local_ip_mock() -> Iterable[Mock]:
"""Mock the async_get_local_ip utility function to prevent network access."""
with patch(
"homeassistant.components.dlna_dmr.media_player.async_get_local_ip",
autospec=True,
) as func:
func.return_value = AddressFamily.AF_INET, LOCAL_IP
yield func

View File

@ -0,0 +1,624 @@
"""Test the DLNA config flow."""
from __future__ import annotations
from unittest.mock import Mock
from async_upnp_client import UpnpDevice, UpnpError
import pytest
from homeassistant import config_entries, data_entry_flow
from homeassistant.components import ssdp
from homeassistant.components.dlna_dmr.const import (
CONF_CALLBACK_URL_OVERRIDE,
CONF_LISTEN_PORT,
CONF_POLL_AVAILABILITY,
DOMAIN as DLNA_DOMAIN,
)
from homeassistant.const import (
CONF_DEVICE_ID,
CONF_NAME,
CONF_PLATFORM,
CONF_TYPE,
CONF_URL,
)
from homeassistant.core import HomeAssistant
from .conftest import (
MOCK_DEVICE_LOCATION,
MOCK_DEVICE_NAME,
MOCK_DEVICE_TYPE,
MOCK_DEVICE_UDN,
NEW_DEVICE_LOCATION,
)
from tests.common import MockConfigEntry
# Auto-use the domain_data_mock and dmr_device_mock fixtures for every test in this module
pytestmark = [
pytest.mark.usefixtures("domain_data_mock"),
pytest.mark.usefixtures("dmr_device_mock"),
]
WRONG_DEVICE_TYPE = "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
IMPORTED_DEVICE_NAME = "Imported DMR device"
MOCK_CONFIG_IMPORT_DATA = {
CONF_PLATFORM: DLNA_DOMAIN,
CONF_URL: MOCK_DEVICE_LOCATION,
}
MOCK_ROOT_DEVICE_UDN = "ROOT_DEVICE"
MOCK_ROOT_DEVICE_TYPE = "ROOT_DEVICE_TYPE"
MOCK_DISCOVERY = {
ssdp.ATTR_SSDP_LOCATION: MOCK_DEVICE_LOCATION,
ssdp.ATTR_SSDP_UDN: MOCK_DEVICE_UDN,
ssdp.ATTR_SSDP_ST: MOCK_DEVICE_TYPE,
ssdp.ATTR_UPNP_UDN: MOCK_ROOT_DEVICE_UDN,
ssdp.ATTR_UPNP_DEVICE_TYPE: MOCK_ROOT_DEVICE_TYPE,
ssdp.ATTR_UPNP_FRIENDLY_NAME: MOCK_DEVICE_NAME,
}
async def test_user_flow(hass: HomeAssistant) -> None:
"""Test user-init'd config flow with user entering a valid URL."""
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {}
assert result["step_id"] == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={CONF_URL: MOCK_DEVICE_LOCATION}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == MOCK_DEVICE_NAME
assert result["data"] == {
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_DEVICE_ID: MOCK_DEVICE_UDN,
CONF_TYPE: MOCK_DEVICE_TYPE,
}
assert result["options"] == {CONF_POLL_AVAILABILITY: True}
# Wait for platform to be fully setup
await hass.async_block_till_done()
# Remove the device to clean up all resources, completing its life cycle
entry_id = result["result"].entry_id
assert await hass.config_entries.async_remove(entry_id) == {
"require_restart": False
}
async def test_user_flow_uncontactable(
hass: HomeAssistant, domain_data_mock: Mock
) -> None:
"""Test user-init'd config flow with user entering an uncontactable URL."""
# Device is not contactable
domain_data_mock.upnp_factory.async_create_device.side_effect = UpnpError
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {}
assert result["step_id"] == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={CONF_URL: MOCK_DEVICE_LOCATION}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {"base": "could_not_connect"}
assert result["step_id"] == "user"
async def test_user_flow_embedded_st(
hass: HomeAssistant, domain_data_mock: Mock
) -> None:
"""Test user-init'd flow for device with an embedded DMR."""
# Device is the wrong type
upnp_device = domain_data_mock.upnp_factory.async_create_device.return_value
upnp_device.udn = MOCK_ROOT_DEVICE_UDN
upnp_device.device_type = MOCK_ROOT_DEVICE_TYPE
upnp_device.name = "ROOT_DEVICE_NAME"
embedded_device = Mock(spec=UpnpDevice)
embedded_device.udn = MOCK_DEVICE_UDN
embedded_device.device_type = MOCK_DEVICE_TYPE
embedded_device.name = MOCK_DEVICE_NAME
upnp_device.all_devices.append(embedded_device)
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {}
assert result["step_id"] == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={CONF_URL: MOCK_DEVICE_LOCATION}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == MOCK_DEVICE_NAME
assert result["data"] == {
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_DEVICE_ID: MOCK_DEVICE_UDN,
CONF_TYPE: MOCK_DEVICE_TYPE,
}
assert result["options"] == {CONF_POLL_AVAILABILITY: True}
# Wait for platform to be fully setup
await hass.async_block_till_done()
# Remove the device to clean up all resources, completing its life cycle
entry_id = result["result"].entry_id
assert await hass.config_entries.async_remove(entry_id) == {
"require_restart": False
}
async def test_user_flow_wrong_st(hass: HomeAssistant, domain_data_mock: Mock) -> None:
"""Test user-init'd config flow with user entering a URL for the wrong device."""
# Device has a sub device of the right type
upnp_device = domain_data_mock.upnp_factory.async_create_device.return_value
upnp_device.device_type = WRONG_DEVICE_TYPE
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {}
assert result["step_id"] == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={CONF_URL: MOCK_DEVICE_LOCATION}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {"base": "not_dmr"}
assert result["step_id"] == "user"
async def test_import_flow_invalid(hass: HomeAssistant, domain_data_mock: Mock) -> None:
"""Test import flow of invalid YAML config."""
# Missing CONF_URL
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={CONF_PLATFORM: DLNA_DOMAIN},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "incomplete_config"
# Device is not contactable
domain_data_mock.upnp_factory.async_create_device.side_effect = UpnpError
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={CONF_PLATFORM: DLNA_DOMAIN, CONF_URL: MOCK_DEVICE_LOCATION},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "could_not_connect"
# Device is the wrong type
domain_data_mock.upnp_factory.async_create_device.side_effect = None
upnp_device = domain_data_mock.upnp_factory.async_create_device.return_value
upnp_device.device_type = WRONG_DEVICE_TYPE
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={CONF_PLATFORM: DLNA_DOMAIN, CONF_URL: MOCK_DEVICE_LOCATION},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "not_dmr"
async def test_import_flow_ssdp_discovered(
hass: HomeAssistant, ssdp_scanner_mock: Mock
) -> None:
"""Test import of YAML config with a device also found via SSDP."""
ssdp_scanner_mock.async_get_discovery_info_by_st.side_effect = [
[MOCK_DISCOVERY],
[],
[],
]
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data=MOCK_CONFIG_IMPORT_DATA,
)
await hass.async_block_till_done()
assert ssdp_scanner_mock.async_get_discovery_info_by_st.call_count >= 1
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == MOCK_DEVICE_NAME
assert result["data"] == {
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_DEVICE_ID: MOCK_DEVICE_UDN,
CONF_TYPE: MOCK_DEVICE_TYPE,
}
assert result["options"] == {
CONF_LISTEN_PORT: None,
CONF_CALLBACK_URL_OVERRIDE: None,
CONF_POLL_AVAILABILITY: False,
}
entry_id = result["result"].entry_id
# The config entry should not be duplicated when dlna_dmr is restarted
ssdp_scanner_mock.async_get_discovery_info_by_st.side_effect = [
[MOCK_DISCOVERY],
[],
[],
]
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data=MOCK_CONFIG_IMPORT_DATA,
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_configured"
# Wait for platform to be fully setup
await hass.async_block_till_done()
# Remove the device to clean up all resources, completing its life cycle
assert await hass.config_entries.async_remove(entry_id) == {
"require_restart": False
}
async def test_import_flow_direct_connect(
hass: HomeAssistant, ssdp_scanner_mock: Mock
) -> None:
"""Test import of YAML config with a device *not found* via SSDP."""
ssdp_scanner_mock.async_get_discovery_info_by_st.return_value = []
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data=MOCK_CONFIG_IMPORT_DATA,
)
await hass.async_block_till_done()
assert ssdp_scanner_mock.async_get_discovery_info_by_st.call_count >= 1
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == MOCK_DEVICE_NAME
assert result["data"] == {
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_DEVICE_ID: MOCK_DEVICE_UDN,
CONF_TYPE: MOCK_DEVICE_TYPE,
}
assert result["options"] == {
CONF_LISTEN_PORT: None,
CONF_CALLBACK_URL_OVERRIDE: None,
CONF_POLL_AVAILABILITY: True,
}
entry_id = result["result"].entry_id
# The config entry should not be duplicated when dlna_dmr is restarted
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data=MOCK_CONFIG_IMPORT_DATA,
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_configured"
# Remove the device to clean up all resources, completing its life cycle
assert await hass.config_entries.async_remove(entry_id) == {
"require_restart": False
}
async def test_import_flow_options(
hass: HomeAssistant, ssdp_scanner_mock: Mock
) -> None:
"""Test import of YAML config with options set."""
ssdp_scanner_mock.async_get_discovery_info_by_st.return_value = []
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={
CONF_PLATFORM: DLNA_DOMAIN,
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_NAME: IMPORTED_DEVICE_NAME,
CONF_LISTEN_PORT: 2222,
CONF_CALLBACK_URL_OVERRIDE: "http://override/callback",
},
)
await hass.async_block_till_done()
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == IMPORTED_DEVICE_NAME
assert result["data"] == {
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_DEVICE_ID: MOCK_DEVICE_UDN,
CONF_TYPE: MOCK_DEVICE_TYPE,
}
assert result["options"] == {
CONF_LISTEN_PORT: 2222,
CONF_CALLBACK_URL_OVERRIDE: "http://override/callback",
CONF_POLL_AVAILABILITY: True,
}
# Wait for platform to be fully setup
await hass.async_block_till_done()
# Remove the device to clean up all resources, completing its life cycle
entry_id = result["result"].entry_id
assert await hass.config_entries.async_remove(entry_id) == {
"require_restart": False
}
async def test_import_flow_deferred_ssdp(
hass: HomeAssistant, domain_data_mock: Mock, ssdp_scanner_mock: Mock
) -> None:
"""Test YAML import of unavailable device later found via SSDP."""
# Attempted import at hass start fails because device is unavailable
ssdp_scanner_mock.async_get_discovery_info_by_st.side_effect = [
[],
[],
[],
]
domain_data_mock.upnp_factory.async_create_device.side_effect = UpnpError
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={
CONF_PLATFORM: DLNA_DOMAIN,
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_NAME: IMPORTED_DEVICE_NAME,
CONF_LISTEN_PORT: 2222,
CONF_CALLBACK_URL_OVERRIDE: "http://override/callback",
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "could_not_connect"
# Device becomes available then discovered via SSDP, import now occurs automatically
ssdp_scanner_mock.async_get_discovery_info_by_st.side_effect = [
[MOCK_DISCOVERY],
[],
[],
]
domain_data_mock.upnp_factory.async_create_device.side_effect = None
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data=MOCK_DISCOVERY,
)
await hass.async_block_till_done()
assert hass.config_entries.flow.async_progress(include_uninitialized=True) == []
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == IMPORTED_DEVICE_NAME
assert result["data"] == {
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_DEVICE_ID: MOCK_DEVICE_UDN,
CONF_TYPE: MOCK_DEVICE_TYPE,
}
assert result["options"] == {
CONF_LISTEN_PORT: 2222,
CONF_CALLBACK_URL_OVERRIDE: "http://override/callback",
CONF_POLL_AVAILABILITY: False,
}
# Remove the device to clean up all resources, completing its life cycle
entry_id = result["result"].entry_id
assert await hass.config_entries.async_remove(entry_id) == {
"require_restart": False
}
async def test_import_flow_deferred_user(
hass: HomeAssistant, domain_data_mock: Mock, ssdp_scanner_mock: Mock
) -> None:
"""Test YAML import of unavailable device later added by user."""
# Attempted import at hass start fails because device is unavailable
ssdp_scanner_mock.async_get_discovery_info_by_st.return_value = []
domain_data_mock.upnp_factory.async_create_device.side_effect = UpnpError
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={
CONF_PLATFORM: DLNA_DOMAIN,
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_NAME: IMPORTED_DEVICE_NAME,
CONF_LISTEN_PORT: 2222,
CONF_CALLBACK_URL_OVERRIDE: "http://override/callback",
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "could_not_connect"
# Device becomes available then added by user, use all imported settings
domain_data_mock.upnp_factory.async_create_device.side_effect = None
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {}
assert result["step_id"] == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={CONF_URL: MOCK_DEVICE_LOCATION}
)
await hass.async_block_till_done()
assert hass.config_entries.flow.async_progress(include_uninitialized=True) == []
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == IMPORTED_DEVICE_NAME
assert result["data"] == {
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_DEVICE_ID: MOCK_DEVICE_UDN,
CONF_TYPE: MOCK_DEVICE_TYPE,
}
assert result["options"] == {
CONF_LISTEN_PORT: 2222,
CONF_CALLBACK_URL_OVERRIDE: "http://override/callback",
CONF_POLL_AVAILABILITY: True,
}
# Remove the device to clean up all resources, completing its life cycle
entry_id = result["result"].entry_id
assert await hass.config_entries.async_remove(entry_id) == {
"require_restart": False
}
async def test_ssdp_flow_success(hass: HomeAssistant) -> None:
"""Test that SSDP discovery with an available device works."""
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data=MOCK_DISCOVERY,
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {}
assert result["step_id"] == "confirm"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={}
)
await hass.async_block_till_done()
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == MOCK_DEVICE_NAME
assert result["data"] == {
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_DEVICE_ID: MOCK_DEVICE_UDN,
CONF_TYPE: MOCK_DEVICE_TYPE,
}
assert result["options"] == {}
# Remove the device to clean up all resources, completing its life cycle
entry_id = result["result"].entry_id
assert await hass.config_entries.async_remove(entry_id) == {
"require_restart": False
}
async def test_ssdp_flow_unavailable(
hass: HomeAssistant, domain_data_mock: Mock
) -> None:
"""Test that SSDP discovery with an unavailable device gives an error message.
This may occur if the device is turned on, discovered, then turned off
before the user attempts to add it.
"""
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data=MOCK_DISCOVERY,
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {}
assert result["step_id"] == "confirm"
domain_data_mock.upnp_factory.async_create_device.side_effect = UpnpError
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {"base": "could_not_connect"}
assert result["step_id"] == "confirm"
async def test_ssdp_flow_existing(
hass: HomeAssistant, config_entry_mock: MockConfigEntry
) -> None:
"""Test that SSDP discovery of existing config entry updates the URL."""
config_entry_mock.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data={
ssdp.ATTR_SSDP_LOCATION: NEW_DEVICE_LOCATION,
ssdp.ATTR_SSDP_UDN: MOCK_DEVICE_UDN,
ssdp.ATTR_UPNP_UDN: MOCK_ROOT_DEVICE_UDN,
ssdp.ATTR_UPNP_DEVICE_TYPE: MOCK_DEVICE_TYPE,
ssdp.ATTR_UPNP_FRIENDLY_NAME: MOCK_DEVICE_NAME,
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_configured"
assert config_entry_mock.data[CONF_URL] == NEW_DEVICE_LOCATION
async def test_ssdp_flow_upnp_udn(
hass: HomeAssistant, config_entry_mock: MockConfigEntry
) -> None:
"""Test that SSDP discovery ignores the root device's UDN."""
config_entry_mock.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data={
ssdp.ATTR_SSDP_LOCATION: NEW_DEVICE_LOCATION,
ssdp.ATTR_SSDP_UDN: MOCK_DEVICE_UDN,
ssdp.ATTR_SSDP_ST: MOCK_DEVICE_TYPE,
ssdp.ATTR_UPNP_UDN: "DIFFERENT_ROOT_DEVICE",
ssdp.ATTR_UPNP_DEVICE_TYPE: "DIFFERENT_ROOT_DEVICE_TYPE",
ssdp.ATTR_UPNP_FRIENDLY_NAME: MOCK_DEVICE_NAME,
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_configured"
assert config_entry_mock.data[CONF_URL] == NEW_DEVICE_LOCATION
async def test_options_flow(
hass: HomeAssistant, config_entry_mock: MockConfigEntry
) -> None:
"""Test config flow options."""
config_entry_mock.add_to_hass(hass)
result = await hass.config_entries.options.async_init(config_entry_mock.entry_id)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "init"
assert result["errors"] == {}
# Invalid URL for callback (can't be validated automatically by voluptuous)
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
CONF_CALLBACK_URL_OVERRIDE: "Bad url",
CONF_POLL_AVAILABILITY: False,
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "init"
assert result["errors"] == {"base": "invalid_url"}
# Good data for all fields
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
CONF_LISTEN_PORT: 2222,
CONF_CALLBACK_URL_OVERRIDE: "http://override/callback",
CONF_POLL_AVAILABILITY: True,
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["data"] == {
CONF_LISTEN_PORT: 2222,
CONF_CALLBACK_URL_OVERRIDE: "http://override/callback",
CONF_POLL_AVAILABILITY: True,
}

View File

@ -0,0 +1,121 @@
"""Tests for the DLNA DMR data module."""
from __future__ import annotations
from collections.abc import Iterable
from unittest.mock import ANY, Mock, patch
from async_upnp_client import UpnpEventHandler
from async_upnp_client.aiohttp import AiohttpNotifyServer
import pytest
from homeassistant.components.dlna_dmr.const import DOMAIN
from homeassistant.components.dlna_dmr.data import EventListenAddr, get_domain_data
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.core import Event, HomeAssistant
@pytest.fixture
def aiohttp_notify_servers_mock() -> Iterable[Mock]:
"""Construct mock AiohttpNotifyServer on demand, eliminating network use.
This fixture provides a list of the constructed servers.
"""
with patch(
"homeassistant.components.dlna_dmr.data.AiohttpNotifyServer"
) as mock_constructor:
servers = []
def make_server(*_args, **_kwargs):
server = Mock(spec=AiohttpNotifyServer)
servers.append(server)
server.event_handler = Mock(spec=UpnpEventHandler)
return server
mock_constructor.side_effect = make_server
yield mock_constructor
# Every server must be stopped if it was started
for server in servers:
assert server.start_server.call_count == server.stop_server.call_count
async def test_get_domain_data(hass: HomeAssistant) -> None:
"""Test the get_domain_data function returns the same data every time."""
assert DOMAIN not in hass.data
domain_data = get_domain_data(hass)
assert domain_data is not None
assert get_domain_data(hass) is domain_data
async def test_event_notifier(
hass: HomeAssistant, aiohttp_notify_servers_mock: Mock
) -> None:
"""Test getting and releasing event notifiers."""
domain_data = get_domain_data(hass)
listen_addr = EventListenAddr(None, 0, None)
event_notifier = await domain_data.async_get_event_notifier(listen_addr, hass)
assert event_notifier is not None
# Check that the parameters were passed through to the AiohttpNotifyServer
aiohttp_notify_servers_mock.assert_called_with(
requester=ANY, listen_port=0, listen_host=None, callback_url=None, loop=ANY
)
# Same address should give same notifier
listen_addr_2 = EventListenAddr(None, 0, None)
event_notifier_2 = await domain_data.async_get_event_notifier(listen_addr_2, hass)
assert event_notifier_2 is event_notifier
# Different address should give different notifier
listen_addr_3 = EventListenAddr(
"192.88.99.4", 9999, "http://192.88.99.4:9999/notify"
)
event_notifier_3 = await domain_data.async_get_event_notifier(listen_addr_3, hass)
assert event_notifier_3 is not None
assert event_notifier_3 is not event_notifier
# Check that the parameters were passed through to the AiohttpNotifyServer
aiohttp_notify_servers_mock.assert_called_with(
requester=ANY,
listen_port=9999,
listen_host="192.88.99.4",
callback_url="http://192.88.99.4:9999/notify",
loop=ANY,
)
# There should be 2 notifiers total, one with 2 references, and a stop callback
assert set(domain_data.event_notifiers.keys()) == {listen_addr, listen_addr_3}
assert domain_data.event_notifier_refs == {listen_addr: 2, listen_addr_3: 1}
assert domain_data.stop_listener_remove is not None
# Releasing notifiers should delete them when they have not more references
await domain_data.async_release_event_notifier(listen_addr)
assert set(domain_data.event_notifiers.keys()) == {listen_addr, listen_addr_3}
assert domain_data.event_notifier_refs == {listen_addr: 1, listen_addr_3: 1}
assert domain_data.stop_listener_remove is not None
await domain_data.async_release_event_notifier(listen_addr)
assert set(domain_data.event_notifiers.keys()) == {listen_addr_3}
assert domain_data.event_notifier_refs == {listen_addr: 0, listen_addr_3: 1}
assert domain_data.stop_listener_remove is not None
await domain_data.async_release_event_notifier(listen_addr_3)
assert set(domain_data.event_notifiers.keys()) == set()
assert domain_data.event_notifier_refs == {listen_addr: 0, listen_addr_3: 0}
assert domain_data.stop_listener_remove is None
async def test_cleanup_event_notifiers(hass: HomeAssistant) -> None:
"""Test cleanup function clears all event notifiers."""
domain_data = get_domain_data(hass)
await domain_data.async_get_event_notifier(EventListenAddr(None, 0, None), hass)
await domain_data.async_get_event_notifier(
EventListenAddr(None, 0, "different"), hass
)
await domain_data.async_cleanup_event_notifiers(Event(EVENT_HOMEASSISTANT_STOP))
assert not domain_data.event_notifiers
assert not domain_data.event_notifier_refs

View File

@ -0,0 +1,59 @@
"""Tests for the DLNA DMR __init__ module."""
from unittest.mock import Mock
from async_upnp_client import UpnpError
from homeassistant.components.dlna_dmr.const import (
CONF_LISTEN_PORT,
DOMAIN as DLNA_DOMAIN,
)
from homeassistant.components.media_player.const import DOMAIN as MEDIA_PLAYER_DOMAIN
from homeassistant.const import CONF_NAME, CONF_PLATFORM, CONF_URL
from homeassistant.core import HomeAssistant
from homeassistant.helpers.typing import ConfigType
from homeassistant.setup import async_setup_component
from .conftest import MOCK_DEVICE_LOCATION
async def test_import_flow_started(hass: HomeAssistant, domain_data_mock: Mock) -> None:
"""Test import flow of YAML config is started if there's config data."""
mock_config: ConfigType = {
MEDIA_PLAYER_DOMAIN: [
{
CONF_PLATFORM: DLNA_DOMAIN,
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_LISTEN_PORT: 1234,
},
{
CONF_PLATFORM: "other_domain",
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_NAME: "another device",
},
]
}
# Device is not available yet
domain_data_mock.upnp_factory.async_create_device.side_effect = UpnpError
# Run the setup
await async_setup_component(hass, DLNA_DOMAIN, mock_config)
await hass.async_block_till_done()
# Check config_flow has completed
assert hass.config_entries.flow.async_progress(include_uninitialized=True) == []
# Check device contact attempt was made
domain_data_mock.upnp_factory.async_create_device.assert_awaited_once_with(
MOCK_DEVICE_LOCATION
)
# Check the device is added to the unmigrated configs
assert domain_data_mock.unmigrated_config == {
MOCK_DEVICE_LOCATION: {
CONF_PLATFORM: DLNA_DOMAIN,
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_LISTEN_PORT: 1234,
}
}

File diff suppressed because it is too large Load Diff

View File

@ -69,7 +69,7 @@ async def test_ssdp_flow_dispatched_on_st(mock_get_ssdp, hass, caplog, mock_flow
ssdp.ATTR_SSDP_SERVER: "mock-server",
ssdp.ATTR_SSDP_EXT: "",
ssdp.ATTR_UPNP_UDN: "uuid:mock-udn",
"_udn": ANY,
ssdp.ATTR_SSDP_UDN: ANY,
"_timestamp": ANY,
}
assert "Failed to fetch ssdp data" not in caplog.text
@ -411,7 +411,7 @@ async def test_scan_with_registered_callback(
ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::mock-st",
ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
"x-rincon-bootseq": "55",
"_udn": ANY,
ssdp.ATTR_SSDP_UDN: ANY,
"_timestamp": ANY,
},
ssdp.SsdpChange.ALIVE,
@ -465,7 +465,7 @@ async def test_getting_existing_headers(mock_get_ssdp, hass, aioclient_mock):
ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
"_udn": ANY,
ssdp.ATTR_SSDP_UDN: ANY,
"_timestamp": ANY,
}
]
@ -482,7 +482,7 @@ async def test_getting_existing_headers(mock_get_ssdp, hass, aioclient_mock):
ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
"_udn": ANY,
ssdp.ATTR_SSDP_UDN: ANY,
"_timestamp": ANY,
}
]
@ -498,7 +498,7 @@ async def test_getting_existing_headers(mock_get_ssdp, hass, aioclient_mock):
ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
"_udn": ANY,
ssdp.ATTR_SSDP_UDN: ANY,
"_timestamp": ANY,
}

View File

@ -9,6 +9,7 @@ from urllib.parse import parse_qs
from aiohttp import ClientSession
from aiohttp.client_exceptions import ClientError, ClientResponseError
from aiohttp.streams import StreamReader
from multidict import CIMultiDict
from yarl import URL
from homeassistant.const import EVENT_HOMEASSISTANT_CLOSE
@ -179,7 +180,7 @@ class AiohttpClientMockResponse:
self.response = response
self.exc = exc
self.side_effect = side_effect
self._headers = headers or {}
self._headers = CIMultiDict(headers or {})
self._cookies = {}
if cookies: