Add network and callback support to SSDP (#51019)

Co-authored-by: Ruslan Sayfutdinov <ruslan@sayfutdinov.com>
Co-authored-by: Paulus Schoutsen <paulus@home-assistant.io>
This commit is contained in:
J. Nick Koston 2021-05-28 21:18:59 -05:00 committed by GitHub
parent 02cbb2025e
commit fb50cf9840
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 900 additions and 314 deletions

View File

@ -61,6 +61,7 @@ homeassistant.components.scene.*
homeassistant.components.sensor.*
homeassistant.components.slack.*
homeassistant.components.sonos.media_player
homeassistant.components.ssdp.*
homeassistant.components.sun.*
homeassistant.components.switch.*
homeassistant.components.synology_dsm.*

View File

@ -4,18 +4,23 @@ from __future__ import annotations
import asyncio
from collections.abc import Mapping
from datetime import timedelta
from ipaddress import IPv4Address, IPv6Address
import logging
from typing import Any
from typing import Any, Callable
import aiohttp
from async_upnp_client.search import async_search
from defusedxml import ElementTree
from netdisco import ssdp, util
from async_upnp_client.search import SSDPListener
from async_upnp_client.utils import CaseInsensitiveDict
from homeassistant import config_entries
from homeassistant.components import network
from homeassistant.const import EVENT_HOMEASSISTANT_STARTED, EVENT_HOMEASSISTANT_STOP
from homeassistant.core import callback
from homeassistant.core import CoreState, HomeAssistant, callback as core_callback
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.loader import async_get_ssdp
from homeassistant.helpers.typing import ConfigType
from homeassistant.loader import async_get_ssdp, bind_hass
from .descriptions import DescriptionManager
from .flow import FlowDispatcher, SSDPFlow
DOMAIN = "ssdp"
SCAN_INTERVAL = timedelta(seconds=60)
@ -40,188 +45,321 @@ ATTR_UPNP_UDN = "UDN"
ATTR_UPNP_UPC = "UPC"
ATTR_UPNP_PRESENTATION_URL = "presentationURL"
DISCOVERY_MAPPING = {
"usn": ATTR_SSDP_USN,
"ext": ATTR_SSDP_EXT,
"server": ATTR_SSDP_SERVER,
"st": ATTR_SSDP_ST,
"location": ATTR_SSDP_LOCATION,
}
_LOGGER = logging.getLogger(__name__)
async def async_setup(hass, config):
@bind_hass
def async_register_callback(
hass: HomeAssistant,
callback: Callable[[dict], None],
match_dict: None | dict[str, str] = None,
) -> Callable[[], None]:
"""Register to receive a callback on ssdp broadcast.
Returns a callback that can be used to cancel the registration.
"""
scanner: Scanner = hass.data[DOMAIN]
return scanner.async_register_callback(callback, match_dict)
@bind_hass
def async_get_discovery_info_by_udn_st( # pylint: disable=invalid-name
hass: HomeAssistant, udn: str, st: str
) -> dict[str, str] | None:
"""Fetch the discovery info cache."""
scanner: Scanner = hass.data[DOMAIN]
return scanner.async_get_discovery_info_by_udn_st(udn, st)
@bind_hass
def async_get_discovery_info_by_st( # pylint: disable=invalid-name
hass: HomeAssistant, st: str
) -> list[dict[str, str]]:
"""Fetch all the entries matching the st."""
scanner: Scanner = hass.data[DOMAIN]
return scanner.async_get_discovery_info_by_st(st)
@bind_hass
def async_get_discovery_info_by_udn(
hass: HomeAssistant, udn: str
) -> list[dict[str, str]]:
"""Fetch all the entries matching the udn."""
scanner: Scanner = hass.data[DOMAIN]
return scanner.async_get_discovery_info_by_udn(udn)
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the SSDP integration."""
async def _async_initialize(_):
scanner = Scanner(hass, await async_get_ssdp(hass))
await scanner.async_scan(None)
cancel_scan = async_track_time_interval(hass, scanner.async_scan, SCAN_INTERVAL)
scanner = hass.data[DOMAIN] = Scanner(hass, await async_get_ssdp(hass))
@callback
def _async_stop_scans(event):
cancel_scan()
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_stop_scans)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, _async_initialize)
asyncio.create_task(scanner.async_start())
return True
@core_callback
def _async_use_default_interface(adapters: list[network.Adapter]) -> bool:
for adapter in adapters:
if adapter["enabled"] and not adapter["default"]:
return False
return True
@core_callback
def _async_process_callbacks(
callbacks: list[Callable[[dict], None]], discovery_info: dict[str, str]
) -> None:
for callback in callbacks:
try:
callback(discovery_info)
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Failed to callback info: %s", discovery_info)
class Scanner:
"""Class to manage SSDP scanning."""
def __init__(self, hass, integration_matchers):
def __init__(
self, hass: HomeAssistant, integration_matchers: dict[str, list[dict[str, str]]]
) -> None:
"""Initialize class."""
self.hass = hass
self.seen = set()
self._entries = []
self.seen: set[tuple[str, str]] = set()
self.cache: dict[tuple[str, str], Mapping[str, str]] = {}
self._integration_matchers = integration_matchers
self._description_cache = {}
self._cancel_scan: Callable[[], None] | None = None
self._ssdp_listeners: list[SSDPListener] = []
self._callbacks: list[tuple[Callable[[dict], None], dict[str, str]]] = []
self.flow_dispatcher: FlowDispatcher | None = None
self.description_manager: DescriptionManager | None = None
async def _on_ssdp_response(self, data: Mapping[str, Any]) -> None:
"""Process an ssdp response."""
self.async_store_entry(
ssdp.UPNPEntry({key.lower(): item for key, item in data.items()})
@core_callback
def async_register_callback(
self, callback: Callable[[dict], None], match_dict: None | dict[str, str] = None
) -> Callable[[], None]:
"""Register a callback."""
if match_dict is None:
match_dict = {}
# Make sure any entries that happened
# before the callback was registered are fired
if self.hass.state != CoreState.running:
for headers in self.cache.values():
self._async_callback_if_match(callback, headers, match_dict)
callback_entry = (callback, match_dict)
self._callbacks.append(callback_entry)
@core_callback
def _async_remove_callback() -> None:
self._callbacks.remove(callback_entry)
return _async_remove_callback
@core_callback
def _async_callback_if_match(
self,
callback: Callable[[dict], None],
headers: Mapping[str, str],
match_dict: dict[str, str],
) -> None:
"""Fire a callback if info matches the match dict."""
if not all(headers.get(k) == v for (k, v) in match_dict.items()):
return
_async_process_callbacks(
[callback], self._async_headers_to_discovery_info(headers)
)
@callback
def async_store_entry(self, entry):
"""Save an entry for later processing."""
self._entries.append(entry)
@core_callback
def async_stop(self, *_: Any) -> None:
"""Stop the scanner."""
assert self._cancel_scan is not None
self._cancel_scan()
for listener in self._ssdp_listeners:
listener.async_stop()
self._ssdp_listeners = []
async def async_scan(self, _):
async def _async_build_source_set(self) -> set[IPv4Address | IPv6Address]:
"""Build the list of ssdp sources."""
adapters = await network.async_get_adapters(self.hass)
sources: set[IPv4Address | IPv6Address] = set()
if _async_use_default_interface(adapters):
sources.add(IPv4Address("0.0.0.0"))
return sources
for adapter in adapters:
if not adapter["enabled"]:
continue
if adapter["ipv4"]:
ipv4 = adapter["ipv4"][0]
sources.add(IPv4Address(ipv4["address"]))
if adapter["ipv6"]:
ipv6 = adapter["ipv6"][0]
# With python 3.9 add scope_ids can be
# added by enumerating adapter["ipv6"]s
# IPv6Address(f"::%{ipv6['scope_id']}")
sources.add(IPv6Address(ipv6["address"]))
return sources
@core_callback
def async_scan(self, *_: Any) -> None:
"""Scan for new entries."""
for listener in self._ssdp_listeners:
listener.async_search()
await async_search(async_callback=self._on_ssdp_response)
await self._process_entries()
# We clear the cache after each run. We track discovered entries
# so will never need a description twice.
self._description_cache.clear()
self._entries.clear()
async def _process_entries(self):
"""Process SSDP entries."""
entries_to_process = []
unseen_locations = set()
for entry in self._entries:
key = (entry.st, entry.location)
if key in self.seen:
continue
self.seen.add(key)
entries_to_process.append(entry)
if (
entry.location is not None
and entry.location not in self._description_cache
):
unseen_locations.add(entry.location)
if not entries_to_process:
return
if unseen_locations:
await self._fetch_descriptions(list(unseen_locations))
tasks = []
for entry in entries_to_process:
info, domains = self._process_entry(entry)
for domain in domains:
_LOGGER.debug("Discovered %s at %s", domain, entry.location)
tasks.append(
self.hass.config_entries.flow.async_init(
domain, context={"source": DOMAIN}, data=info
)
async def async_start(self) -> None:
"""Start the scanner."""
self.description_manager = DescriptionManager(self.hass)
self.flow_dispatcher = FlowDispatcher(self.hass)
for source_ip in await self._async_build_source_set():
self._ssdp_listeners.append(
SSDPListener(
async_callback=self._async_process_entry, source_ip=source_ip
)
if tasks:
await asyncio.gather(*tasks)
async def _fetch_descriptions(self, locations):
"""Fetch descriptions from locations."""
for idx, result in enumerate(
await asyncio.gather(
*[self._fetch_description(location) for location in locations],
return_exceptions=True,
)
):
location = locations[idx]
if isinstance(result, Exception):
_LOGGER.exception(
"Failed to fetch ssdp data from: %s", location, exc_info=result
)
continue
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.async_stop)
self.hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_STARTED, self.flow_dispatcher.async_start
)
await asyncio.gather(
*[listener.async_start() for listener in self._ssdp_listeners]
)
self._cancel_scan = async_track_time_interval(
self.hass, self.async_scan, SCAN_INTERVAL
)
self._description_cache[location] = result
def _process_entry(self, entry):
"""Process a single entry."""
info = {"st": entry.st}
for key in "usn", "ext", "server":
if key in entry.values:
info[key] = entry.values[key]
if entry.location:
# Multiple entries usually share same location. Make sure
# we fetch it only once.
info_req = self._description_cache.get(entry.location)
if info_req is None:
return (None, [])
info.update(info_req)
@core_callback
def _async_get_matching_callbacks(
self, headers: Mapping[str, str]
) -> list[Callable[[dict], None]]:
"""Return a list of callbacks that match."""
return [
callback
for callback, match_dict in self._callbacks
if all(headers.get(k) == v for (k, v) in match_dict.items())
]
@core_callback
def _async_matching_domains(self, info_with_req: CaseInsensitiveDict) -> set[str]:
domains = set()
for domain, matchers in self._integration_matchers.items():
for matcher in matchers:
if all(info.get(k) == v for (k, v) in matcher.items()):
if all(info_with_req.get(k) == v for (k, v) in matcher.items()):
domains.add(domain)
return domains
if domains:
return (info_from_entry(entry, info), domains)
async def _async_process_entry(self, headers: Mapping[str, str]) -> None:
"""Process SSDP entries."""
_LOGGER.debug("_async_process_entry: %s", headers)
if "st" not in headers or "location" not in headers:
return
h_st = headers["st"]
h_location = headers["location"]
key = (h_st, h_location)
return (None, [])
if udn := _udn_from_usn(headers.get("usn")):
self.cache[(udn, h_st)] = headers
async def _fetch_description(self, xml_location):
"""Fetch an XML description."""
session = self.hass.helpers.aiohttp_client.async_get_clientsession()
try:
for _ in range(2):
resp = await session.get(xml_location, timeout=5)
xml = await resp.text(errors="replace")
# Samsung Smart TV sometimes returns an empty document the
# first time. Retry once.
if xml:
break
except (aiohttp.ClientError, asyncio.TimeoutError) as err:
_LOGGER.debug("Error fetching %s: %s", xml_location, err)
return {}
callbacks = self._async_get_matching_callbacks(headers)
if key in self.seen and not callbacks:
return
try:
tree = ElementTree.fromstring(xml)
except ElementTree.ParseError as err:
_LOGGER.debug("Error parsing %s: %s", xml_location, err)
return {}
assert self.description_manager is not None
info_req = await self.description_manager.fetch_description(h_location) or {}
info_with_req = CaseInsensitiveDict(**headers, **info_req)
discovery_info = discovery_info_from_headers_and_request(info_with_req)
return util.etree_to_dict(tree).get("root", {}).get("device", {})
_async_process_callbacks(callbacks, discovery_info)
if key in self.seen:
return
self.seen.add(key)
for domain in self._async_matching_domains(info_with_req):
_LOGGER.debug("Discovered %s at %s", domain, h_location)
flow: SSDPFlow = {
"domain": domain,
"context": {"source": config_entries.SOURCE_SSDP},
"data": discovery_info,
}
assert self.flow_dispatcher is not None
self.flow_dispatcher.create(flow)
@core_callback
def _async_headers_to_discovery_info(
self, headers: Mapping[str, str]
) -> dict[str, str]:
"""Combine the headers and description into discovery_info.
Building this is a bit expensive so we only do it on demand.
"""
assert self.description_manager is not None
location = headers["location"]
info_req = self.description_manager.async_cached_description(location) or {}
return discovery_info_from_headers_and_request(
CaseInsensitiveDict(**headers, **info_req)
)
@core_callback
def async_get_discovery_info_by_udn_st( # pylint: disable=invalid-name
self, udn: str, st: str
) -> dict[str, str] | None:
"""Return discovery_info for a udn and st."""
if headers := self.cache.get((udn, st)):
return self._async_headers_to_discovery_info(headers)
return None
@core_callback
def async_get_discovery_info_by_st( # pylint: disable=invalid-name
self, st: str
) -> list[dict[str, str]]:
"""Return matching discovery_infos for a st."""
return [
self._async_headers_to_discovery_info(headers)
for udn_st, headers in self.cache.items()
if udn_st[1] == st
]
@core_callback
def async_get_discovery_info_by_udn(self, udn: str) -> list[dict[str, str]]:
"""Return matching discovery_infos for a udn."""
return [
self._async_headers_to_discovery_info(headers)
for udn_st, headers in self.cache.items()
if udn_st[0] == udn
]
def info_from_entry(entry, device_info):
"""Get info from an entry."""
info = {
ATTR_SSDP_LOCATION: entry.location,
ATTR_SSDP_ST: entry.st,
}
if device_info:
info.update(device_info)
info.pop("st", None)
if "usn" in info:
info[ATTR_SSDP_USN] = info.pop("usn")
if "ext" in info:
info[ATTR_SSDP_EXT] = info.pop("ext")
if "server" in info:
info[ATTR_SSDP_SERVER] = info.pop("server")
def discovery_info_from_headers_and_request(
info_with_req: CaseInsensitiveDict,
) -> dict[str, str]:
"""Convert headers and description to discovery_info."""
info = {DISCOVERY_MAPPING.get(k.lower(), k): v for k, v in info_with_req.items()}
if ATTR_UPNP_UDN not in info and ATTR_SSDP_USN in info:
if udn := _udn_from_usn(info[ATTR_SSDP_USN]):
info[ATTR_UPNP_UDN] = udn
return info
def _udn_from_usn(usn: str | None) -> str | None:
"""Get the UDN from the USN."""
if usn is None:
return None
if usn.startswith("uuid:"):
return usn.split("::")[0]
return None

View File

@ -0,0 +1,70 @@
"""The SSDP integration."""
from __future__ import annotations
import asyncio
import logging
import aiohttp
from defusedxml import ElementTree
from netdisco import util
from homeassistant.core import HomeAssistant, callback
_LOGGER = logging.getLogger(__name__)
class DescriptionManager:
"""Class to cache and manage fetching descriptions."""
def __init__(self, hass: HomeAssistant) -> None:
"""Init the manager."""
self.hass = hass
self._description_cache: dict[str, None | dict[str, str]] = {}
async def fetch_description(
self, xml_location: str | None
) -> None | dict[str, str]:
"""Fetch the location or get it from the cache."""
if xml_location is None:
return None
if xml_location not in self._description_cache:
try:
self._description_cache[xml_location] = await self._fetch_description(
xml_location
)
except Exception: # pylint: disable=broad-except
# If it fails, cache the failure so we do not keep trying over and over
self._description_cache[xml_location] = None
_LOGGER.exception("Failed to fetch ssdp data from: %s", xml_location)
return self._description_cache[xml_location]
@callback
def async_cached_description(self, xml_location: str) -> None | dict[str, str]:
"""Fetch the description from the cache."""
return self._description_cache[xml_location]
async def _fetch_description(self, xml_location: str) -> None | dict[str, str]:
"""Fetch an XML description."""
session = self.hass.helpers.aiohttp_client.async_get_clientsession()
try:
for _ in range(2):
resp = await session.get(xml_location, timeout=5)
# Samsung Smart TV sometimes returns an empty document the
# first time. Retry once.
if xml := await resp.text(errors="replace"):
break
except (aiohttp.ClientError, asyncio.TimeoutError) as err:
_LOGGER.debug("Error fetching %s: %s", xml_location, err)
return None
try:
tree = ElementTree.fromstring(xml)
except ElementTree.ParseError as err:
_LOGGER.debug("Error parsing %s: %s", xml_location, err)
return None
parsed: dict[str, str] = (
util.etree_to_dict(tree).get("root", {}).get("device", {})
)
return parsed

View File

@ -0,0 +1,50 @@
"""The SSDP integration."""
from __future__ import annotations
from collections.abc import Coroutine
from typing import Any, TypedDict
from homeassistant.core import HomeAssistant, callback
from homeassistant.data_entry_flow import FlowResult
class SSDPFlow(TypedDict):
"""A queued ssdp discovery flow."""
domain: str
context: dict[str, Any]
data: dict
class FlowDispatcher:
"""Dispatch discovery flows."""
def __init__(self, hass: HomeAssistant) -> None:
"""Init the discovery dispatcher."""
self.hass = hass
self.pending_flows: list[SSDPFlow] = []
self.started = False
@callback
def async_start(self, *_: Any) -> None:
"""Start processing pending flows."""
self.started = True
self.hass.loop.call_soon(self._async_process_pending_flows)
def _async_process_pending_flows(self) -> None:
for flow in self.pending_flows:
self.hass.async_create_task(self._init_flow(flow))
self.pending_flows = []
def create(self, flow: SSDPFlow) -> None:
"""Create and add or queue a flow."""
if self.started:
self.hass.async_create_task(self._init_flow(flow))
else:
self.pending_flows.append(flow)
def _init_flow(self, flow: SSDPFlow) -> Coroutine[None, None, FlowResult]:
"""Create a flow."""
return self.hass.config_entries.flow.async_init(
flow["domain"], context=flow["context"], data=flow["data"]
)

View File

@ -4,9 +4,9 @@
"documentation": "https://www.home-assistant.io/integrations/ssdp",
"requirements": [
"defusedxml==0.7.1",
"netdisco==2.8.3",
"async-upnp-client==0.18.0"
],
"dependencies": ["network"],
"after_dependencies": ["zeroconf"],
"codeowners": [],
"quality_scale": "internal",

View File

@ -21,7 +21,6 @@ home-assistant-frontend==20210528.0
httpx==0.18.0
ifaddr==0.1.7
jinja2>=3.0.1
netdisco==2.8.3
paho-mqtt==1.5.1
pillow==8.1.2
pip>=8.0.3,<20.3

View File

@ -682,6 +682,17 @@ no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.ssdp.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.sun.*]
check_untyped_defs = true
disallow_incomplete_defs = true

View File

@ -993,7 +993,6 @@ nessclient==0.9.15
netdata==0.2.0
# homeassistant.components.discovery
# homeassistant.components.ssdp
netdisco==2.8.3
# homeassistant.components.nam

View File

@ -547,7 +547,6 @@ ndms2_client==0.1.1
nessclient==0.9.15
# homeassistant.components.discovery
# homeassistant.components.ssdp
netdisco==2.8.3
# homeassistant.components.nam

View File

@ -1,42 +1,70 @@
"""Test the SSDP integration."""
import asyncio
from datetime import timedelta
from ipaddress import IPv4Address, IPv6Address
from unittest.mock import patch
import aiohttp
from async_upnp_client.search import SSDPListener
from async_upnp_client.utils import CaseInsensitiveDict
import pytest
from homeassistant import config_entries
from homeassistant.components import ssdp
from homeassistant.const import EVENT_HOMEASSISTANT_STARTED, EVENT_HOMEASSISTANT_STOP
from homeassistant.core import CoreState, callback
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
from tests.common import async_fire_time_changed, mock_coro
async def test_scan_match_st(hass, caplog):
"""Test matching based on ST."""
scanner = ssdp.Scanner(hass, {"mock-domain": [{"st": "mock-st"}]})
def _patched_ssdp_listener(info, *args, **kwargs):
listener = SSDPListener(*args, **kwargs)
async def _mock_async_scan(*args, async_callback=None, **kwargs):
await async_callback(
{
"st": "mock-st",
"location": None,
"usn": "mock-usn",
"server": "mock-server",
"ext": "",
}
async def _async_callback(*_):
await listener.async_callback(info)
listener.async_start = _async_callback
return listener
async def _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp):
def _generate_fake_ssdp_listener(*args, **kwargs):
return _patched_ssdp_listener(
mock_ssdp_response,
*args,
**kwargs,
)
with patch(
"homeassistant.components.ssdp.async_search",
side_effect=_mock_async_scan,
"homeassistant.components.ssdp.async_get_ssdp",
return_value=mock_get_ssdp,
), patch(
"homeassistant.components.ssdp.SSDPListener",
new=_generate_fake_ssdp_listener,
), patch.object(
hass.config_entries.flow, "async_init", return_value=mock_coro()
) as mock_init:
await scanner.async_scan(None)
assert await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
await hass.async_block_till_done()
return mock_init
async def test_scan_match_st(hass, caplog):
"""Test matching based on ST."""
mock_ssdp_response = {
"st": "mock-st",
"location": None,
"usn": "mock-usn",
"server": "mock-server",
"ext": "",
}
mock_get_ssdp = {"mock-domain": [{"st": "mock-st"}]}
mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
assert len(mock_init.mock_calls) == 1
assert mock_init.mock_calls[0][1][0] == "mock-domain"
@ -53,6 +81,19 @@ async def test_scan_match_st(hass, caplog):
assert "Failed to fetch ssdp data" not in caplog.text
async def test_partial_response(hass, caplog):
"""Test location and st missing."""
mock_ssdp_response = {
"usn": "mock-usn",
"server": "mock-server",
"ext": "",
}
mock_get_ssdp = {"mock-domain": [{"st": "mock-st"}]}
mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
assert len(mock_init.mock_calls) == 0
@pytest.mark.parametrize(
"key", (ssdp.ATTR_UPNP_MANUFACTURER, ssdp.ATTR_UPNP_DEVICE_TYPE)
)
@ -68,25 +109,12 @@ async def test_scan_match_upnp_devicedesc(hass, aioclient_mock, key):
</root>
""",
)
scanner = ssdp.Scanner(hass, {"mock-domain": [{key: "Paulus"}]})
async def _mock_async_scan(*args, async_callback=None, **kwargs):
for _ in range(5):
await async_callback(
{
"st": "mock-st",
"location": "http://1.1.1.1",
}
)
with patch(
"homeassistant.components.ssdp.async_search",
side_effect=_mock_async_scan,
), patch.object(
hass.config_entries.flow, "async_init", return_value=mock_coro()
) as mock_init:
await scanner.async_scan(None)
mock_get_ssdp = {"mock-domain": [{key: "Paulus"}]}
mock_ssdp_response = {
"st": "mock-st",
"location": "http://1.1.1.1",
}
mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
# If we get duplicate respones, ensure we only look it up once
assert len(aioclient_mock.mock_calls) == 1
assert len(mock_init.mock_calls) == 1
@ -108,33 +136,19 @@ async def test_scan_not_all_present(hass, aioclient_mock):
</root>
""",
)
scanner = ssdp.Scanner(
hass,
{
"mock-domain": [
{
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_UPNP_MANUFACTURER: "Paulus",
}
]
},
)
async def _mock_async_scan(*args, async_callback=None, **kwargs):
await async_callback(
mock_ssdp_response = {
"st": "mock-st",
"location": "http://1.1.1.1",
}
mock_get_ssdp = {
"mock-domain": [
{
"st": "mock-st",
"location": "http://1.1.1.1",
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_UPNP_MANUFACTURER: "Paulus",
}
)
with patch(
"homeassistant.components.ssdp.async_search",
side_effect=_mock_async_scan,
), patch.object(
hass.config_entries.flow, "async_init", return_value=mock_coro()
) as mock_init:
await scanner.async_scan(None)
]
}
mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
assert not mock_init.mock_calls
@ -152,33 +166,19 @@ async def test_scan_not_all_match(hass, aioclient_mock):
</root>
""",
)
scanner = ssdp.Scanner(
hass,
{
"mock-domain": [
{
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_UPNP_MANUFACTURER: "Not-Paulus",
}
]
},
)
async def _mock_async_scan(*args, async_callback=None, **kwargs):
await async_callback(
mock_ssdp_response = {
"st": "mock-st",
"location": "http://1.1.1.1",
}
mock_get_ssdp = {
"mock-domain": [
{
"st": "mock-st",
"location": "http://1.1.1.1",
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_UPNP_MANUFACTURER: "Not-Paulus",
}
)
with patch(
"homeassistant.components.ssdp.async_search",
side_effect=_mock_async_scan,
), patch.object(
hass.config_entries.flow, "async_init", return_value=mock_coro()
) as mock_init:
await scanner.async_scan(None)
]
}
mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
assert not mock_init.mock_calls
@ -187,21 +187,21 @@ async def test_scan_not_all_match(hass, aioclient_mock):
async def test_scan_description_fetch_fail(hass, aioclient_mock, exc):
"""Test failing to fetch description."""
aioclient_mock.get("http://1.1.1.1", exc=exc)
scanner = ssdp.Scanner(hass, {})
async def _mock_async_scan(*args, async_callback=None, **kwargs):
await async_callback(
mock_ssdp_response = {
"st": "mock-st",
"location": "http://1.1.1.1",
}
mock_get_ssdp = {
"mock-domain": [
{
"st": "mock-st",
"location": "http://1.1.1.1",
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_UPNP_MANUFACTURER: "Paulus",
}
)
]
}
mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
with patch(
"homeassistant.components.ssdp.async_search",
side_effect=_mock_async_scan,
):
await scanner.async_scan(None)
assert not mock_init.mock_calls
async def test_scan_description_parse_fail(hass, aioclient_mock):
@ -212,21 +212,22 @@ async def test_scan_description_parse_fail(hass, aioclient_mock):
<root>INVALIDXML
""",
)
scanner = ssdp.Scanner(hass, {})
async def _mock_async_scan(*args, async_callback=None, **kwargs):
await async_callback(
mock_ssdp_response = {
"st": "mock-st",
"location": "http://1.1.1.1",
}
mock_get_ssdp = {
"mock-domain": [
{
"st": "mock-st",
"location": "http://1.1.1.1",
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_UPNP_MANUFACTURER: "Paulus",
}
)
]
}
mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
with patch(
"homeassistant.components.ssdp.async_search",
side_effect=_mock_async_scan,
):
await scanner.async_scan(None)
assert not mock_init.mock_calls
async def test_invalid_characters(hass, aioclient_mock):
@ -242,32 +243,20 @@ async def test_invalid_characters(hass, aioclient_mock):
</root>
""",
)
scanner = ssdp.Scanner(
hass,
{
"mock-domain": [
{
ssdp.ATTR_UPNP_DEVICE_TYPE: "ABC",
}
]
},
)
async def _mock_async_scan(*args, async_callback=None, **kwargs):
await async_callback(
mock_ssdp_response = {
"st": "mock-st",
"location": "http://1.1.1.1",
}
mock_get_ssdp = {
"mock-domain": [
{
"st": "mock-st",
"location": "http://1.1.1.1",
ssdp.ATTR_UPNP_DEVICE_TYPE: "ABC",
}
)
]
}
with patch(
"homeassistant.components.ssdp.async_search",
side_effect=_mock_async_scan,
), patch.object(
hass.config_entries.flow, "async_init", return_value=mock_coro()
) as mock_init:
await scanner.async_scan(None)
mock_init = await _async_run_mocked_scan(hass, mock_ssdp_response, mock_get_ssdp)
assert len(mock_init.mock_calls) == 1
assert mock_init.mock_calls[0][1][0] == "mock-domain"
@ -282,8 +271,9 @@ async def test_invalid_characters(hass, aioclient_mock):
}
@patch("homeassistant.components.ssdp.async_search")
async def test_start_stop_scanner(async_search_mock, hass):
@patch("homeassistant.components.ssdp.SSDPListener.async_start")
@patch("homeassistant.components.ssdp.SSDPListener.async_search")
async def test_start_stop_scanner(async_start_mock, async_search_mock, hass):
"""Test we start and stop the scanner."""
assert await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
@ -291,13 +281,15 @@ async def test_start_stop_scanner(async_search_mock, hass):
await hass.async_block_till_done()
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
await hass.async_block_till_done()
assert async_search_mock.call_count == 2
assert async_start_mock.call_count == 1
assert async_search_mock.call_count == 1
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
await hass.async_block_till_done()
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
await hass.async_block_till_done()
assert async_search_mock.call_count == 2
assert async_start_mock.call_count == 1
assert async_search_mock.call_count == 1
async def test_unexpected_exception_while_fetching(hass, aioclient_mock, caplog):
@ -313,34 +305,357 @@ async def test_unexpected_exception_while_fetching(hass, aioclient_mock, caplog)
</root>
""",
)
scanner = ssdp.Scanner(
hass,
{
"mock-domain": [
{
ssdp.ATTR_UPNP_DEVICE_TYPE: "ABC",
}
]
},
)
async def _mock_async_scan(*args, async_callback=None, **kwargs):
await async_callback(
mock_ssdp_response = {
"st": "mock-st",
"location": "http://1.1.1.1",
}
mock_get_ssdp = {
"mock-domain": [
{
"st": "mock-st",
"location": "http://1.1.1.1",
ssdp.ATTR_UPNP_DEVICE_TYPE: "ABC",
}
)
]
}
with patch(
"homeassistant.components.ssdp.ElementTree.fromstring", side_effect=ValueError
), patch(
"homeassistant.components.ssdp.async_search",
side_effect=_mock_async_scan,
), patch.object(
hass.config_entries.flow, "async_init", return_value=mock_coro()
) as mock_init:
await scanner.async_scan(None)
"homeassistant.components.ssdp.descriptions.ElementTree.fromstring",
side_effect=ValueError,
):
mock_init = await _async_run_mocked_scan(
hass, mock_ssdp_response, mock_get_ssdp
)
assert len(mock_init.mock_calls) == 0
assert "Failed to fetch ssdp data from: http://1.1.1.1" in caplog.text
async def test_scan_with_registered_callback(hass, aioclient_mock, caplog):
"""Test matching based on callback."""
aioclient_mock.get(
"http://1.1.1.1",
text="""
<root>
<device>
<deviceType>Paulus</deviceType>
</device>
</root>
""",
)
mock_ssdp_response = {
"st": "mock-st",
"location": "http://1.1.1.1",
"usn": "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
"server": "mock-server",
"ext": "",
}
not_matching_intergration_callbacks = []
intergration_callbacks = []
intergration_callbacks_from_cache = []
match_any_callbacks = []
@callback
def _async_exception_callbacks(info):
raise ValueError
@callback
def _async_intergration_callbacks(info):
intergration_callbacks.append(info)
@callback
def _async_intergration_callbacks_from_cache(info):
intergration_callbacks_from_cache.append(info)
@callback
def _async_not_matching_intergration_callbacks(info):
not_matching_intergration_callbacks.append(info)
@callback
def _async_match_any_callbacks(info):
match_any_callbacks.append(info)
def _generate_fake_ssdp_listener(*args, **kwargs):
listener = SSDPListener(*args, **kwargs)
async def _async_callback(*_):
await listener.async_callback(mock_ssdp_response)
@callback
def _callback(*_):
hass.async_create_task(listener.async_callback(mock_ssdp_response))
listener.async_start = _async_callback
listener.async_search = _callback
return listener
with patch(
"homeassistant.components.ssdp.SSDPListener",
new=_generate_fake_ssdp_listener,
):
hass.state = CoreState.stopped
assert await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
await hass.async_block_till_done()
ssdp.async_register_callback(hass, _async_exception_callbacks, {})
ssdp.async_register_callback(
hass,
_async_intergration_callbacks,
{"st": "mock-st"},
)
ssdp.async_register_callback(
hass,
_async_not_matching_intergration_callbacks,
{"st": "not-match-mock-st"},
)
ssdp.async_register_callback(
hass,
_async_match_any_callbacks,
)
await hass.async_block_till_done()
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
ssdp.async_register_callback(
hass,
_async_intergration_callbacks_from_cache,
{"st": "mock-st"},
)
await hass.async_block_till_done()
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
hass.state = CoreState.running
await hass.async_block_till_done()
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
await hass.async_block_till_done()
assert hass.state == CoreState.running
assert len(intergration_callbacks) == 3
assert len(intergration_callbacks_from_cache) == 3
assert len(match_any_callbacks) == 3
assert len(not_matching_intergration_callbacks) == 0
assert intergration_callbacks[0] == {
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_SSDP_EXT: "",
ssdp.ATTR_SSDP_LOCATION: "http://1.1.1.1",
ssdp.ATTR_SSDP_SERVER: "mock-server",
ssdp.ATTR_SSDP_ST: "mock-st",
ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
}
assert "Failed to callback info" in caplog.text
async def test_scan_second_hit(hass, aioclient_mock, caplog):
"""Test matching on second scan."""
aioclient_mock.get(
"http://1.1.1.1",
text="""
<root>
<device>
<deviceType>Paulus</deviceType>
</device>
</root>
""",
)
mock_ssdp_response = CaseInsensitiveDict(
**{
"ST": "mock-st",
"LOCATION": "http://1.1.1.1",
"USN": "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
"SERVER": "mock-server",
"EXT": "",
}
)
mock_get_ssdp = {"mock-domain": [{"st": "mock-st"}]}
intergration_callbacks = []
@callback
def _async_intergration_callbacks(info):
intergration_callbacks.append(info)
def _generate_fake_ssdp_listener(*args, **kwargs):
listener = SSDPListener(*args, **kwargs)
async def _async_callback(*_):
pass
@callback
def _callback(*_):
hass.async_create_task(listener.async_callback(mock_ssdp_response))
listener.async_start = _async_callback
listener.async_search = _callback
return listener
with patch(
"homeassistant.components.ssdp.async_get_ssdp",
return_value=mock_get_ssdp,
), patch(
"homeassistant.components.ssdp.SSDPListener",
new=_generate_fake_ssdp_listener,
), patch.object(
hass.config_entries.flow, "async_init", return_value=mock_coro()
) as mock_init:
assert await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
await hass.async_block_till_done()
remove = ssdp.async_register_callback(
hass,
_async_intergration_callbacks,
{"st": "mock-st"},
)
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
await hass.async_block_till_done()
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
await hass.async_block_till_done()
remove()
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=200))
await hass.async_block_till_done()
assert len(intergration_callbacks) == 2
assert intergration_callbacks[0] == {
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_SSDP_EXT: "",
ssdp.ATTR_SSDP_LOCATION: "http://1.1.1.1",
ssdp.ATTR_SSDP_SERVER: "mock-server",
ssdp.ATTR_SSDP_ST: "mock-st",
ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
}
assert len(mock_init.mock_calls) == 1
assert mock_init.mock_calls[0][1][0] == "mock-domain"
assert mock_init.mock_calls[0][2]["context"] == {
"source": config_entries.SOURCE_SSDP
}
assert mock_init.mock_calls[0][2]["data"] == {
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_SSDP_ST: "mock-st",
ssdp.ATTR_SSDP_LOCATION: "http://1.1.1.1",
ssdp.ATTR_SSDP_SERVER: "mock-server",
ssdp.ATTR_SSDP_EXT: "",
ssdp.ATTR_SSDP_USN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3",
ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
}
assert "Failed to fetch ssdp data" not in caplog.text
udn_discovery_info = ssdp.async_get_discovery_info_by_st(hass, "mock-st")
discovery_info = udn_discovery_info[0]
assert discovery_info[ssdp.ATTR_SSDP_LOCATION] == "http://1.1.1.1"
assert discovery_info[ssdp.ATTR_SSDP_ST] == "mock-st"
assert (
discovery_info[ssdp.ATTR_UPNP_UDN]
== "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL"
)
assert (
discovery_info[ssdp.ATTR_SSDP_USN]
== "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3"
)
st_discovery_info = ssdp.async_get_discovery_info_by_udn(
hass, "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL"
)
discovery_info = st_discovery_info[0]
assert discovery_info[ssdp.ATTR_SSDP_LOCATION] == "http://1.1.1.1"
assert discovery_info[ssdp.ATTR_SSDP_ST] == "mock-st"
assert (
discovery_info[ssdp.ATTR_UPNP_UDN]
== "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL"
)
assert (
discovery_info[ssdp.ATTR_SSDP_USN]
== "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3"
)
discovery_info = ssdp.async_get_discovery_info_by_udn_st(
hass, "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL", "mock-st"
)
assert discovery_info[ssdp.ATTR_SSDP_LOCATION] == "http://1.1.1.1"
assert discovery_info[ssdp.ATTR_SSDP_ST] == "mock-st"
assert (
discovery_info[ssdp.ATTR_UPNP_UDN]
== "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL"
)
assert (
discovery_info[ssdp.ATTR_SSDP_USN]
== "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL::urn:mdx-netflix-com:service:target:3"
)
assert ssdp.async_get_discovery_info_by_udn_st(hass, "wrong", "mock-st") is None
_ADAPTERS_WITH_MANUAL_CONFIG = [
{
"auto": True,
"default": False,
"enabled": True,
"ipv4": [],
"ipv6": [
{
"address": "2001:db8::",
"network_prefix": 8,
"flowinfo": 1,
"scope_id": 1,
}
],
"name": "eth0",
},
{
"auto": True,
"default": False,
"enabled": True,
"ipv4": [{"address": "192.168.1.5", "network_prefix": 23}],
"ipv6": [],
"name": "eth1",
},
{
"auto": False,
"default": False,
"enabled": False,
"ipv4": [{"address": "169.254.3.2", "network_prefix": 16}],
"ipv6": [],
"name": "vtun0",
},
]
async def test_async_detect_interfaces_setting_empty_route(hass):
"""Test without default interface config and the route returns nothing."""
mock_get_ssdp = {
"mock-domain": [
{
ssdp.ATTR_UPNP_DEVICE_TYPE: "ABC",
}
]
}
create_args = []
def _generate_fake_ssdp_listener(*args, **kwargs):
create_args.append([args, kwargs])
listener = SSDPListener(*args, **kwargs)
async def _async_callback(*_):
pass
@callback
def _callback(*_):
pass
listener.async_start = _async_callback
listener.async_search = _callback
return listener
with patch(
"homeassistant.components.ssdp.async_get_ssdp",
return_value=mock_get_ssdp,
), patch(
"homeassistant.components.ssdp.SSDPListener",
new=_generate_fake_ssdp_listener,
), patch(
"homeassistant.components.ssdp.network.async_get_adapters",
return_value=_ADAPTERS_WITH_MANUAL_CONFIG,
):
assert await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
await hass.async_block_till_done()
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
assert {create_args[0][1]["source_ip"], create_args[1][1]["source_ip"]} == {
IPv4Address("192.168.1.5"),
IPv6Address("2001:db8::"),
}

View File

@ -364,7 +364,11 @@ async def test_discovery_requirements_ssdp(hass):
assert len(mock_process.mock_calls) == 4
assert mock_process.mock_calls[0][1][2] == ssdp.requirements
# Ensure zeroconf is a dep for ssdp
assert mock_process.mock_calls[1][1][1] == "zeroconf"
assert {
mock_process.mock_calls[1][1][1],
mock_process.mock_calls[2][1][1],
mock_process.mock_calls[3][1][1],
} == {"network", "zeroconf", "http"}
@pytest.mark.parametrize(