1
mirror of https://github.com/home-assistant/core synced 2024-07-15 09:42:11 +02:00

Add support for bluetooth local name matchers shorter than 3 chars (#107411)

This commit is contained in:
J. Nick Koston 2024-01-07 18:25:56 -10:00 committed by GitHub
parent 69307374f4
commit efffbc08aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 67 additions and 34 deletions

View File

@ -237,10 +237,12 @@ class BluetoothMatcherIndexBase(Generic[_T]):
def match(self, service_info: BluetoothServiceInfoBleak) -> list[_T]:
"""Check for a match."""
matches = []
if service_info.name and len(service_info.name) >= LOCAL_NAME_MIN_MATCH_LENGTH:
for matcher in self.local_name.get(
service_info.name[:LOCAL_NAME_MIN_MATCH_LENGTH], []
):
if (name := service_info.name) and (
local_name_matchers := self.local_name.get(
name[:LOCAL_NAME_MIN_MATCH_LENGTH]
)
):
for matcher in local_name_matchers:
if ble_device_matches(matcher, service_info):
matches.append(matcher)
@ -351,11 +353,6 @@ def _local_name_to_index_key(local_name: str) -> str:
if they try to setup a matcher that will is overly broad
as would match too many devices and cause a performance hit.
"""
if len(local_name) < LOCAL_NAME_MIN_MATCH_LENGTH:
raise ValueError(
"Local name matchers must be at least "
f"{LOCAL_NAME_MIN_MATCH_LENGTH} characters long ({local_name})"
)
match_part = local_name[:LOCAL_NAME_MIN_MATCH_LENGTH]
if "*" in match_part or "[" in match_part:
raise ValueError(
@ -377,35 +374,29 @@ def ble_device_matches(
if matcher.get(CONNECTABLE, True) and not service_info.connectable:
return False
advertisement_data = service_info.advertisement
if (
service_uuid := matcher.get(SERVICE_UUID)
) and service_uuid not in advertisement_data.service_uuids:
) and service_uuid not in service_info.service_uuids:
return False
if (
service_data_uuid := matcher.get(SERVICE_DATA_UUID)
) and service_data_uuid not in advertisement_data.service_data:
) and service_data_uuid not in service_info.service_data:
return False
if manfacturer_id := matcher.get(MANUFACTURER_ID):
if manfacturer_id not in advertisement_data.manufacturer_data:
if manufacturer_id := matcher.get(MANUFACTURER_ID):
if manufacturer_id not in service_info.manufacturer_data:
return False
if manufacturer_data_start := matcher.get(MANUFACTURER_DATA_START):
manufacturer_data_start_bytes = bytearray(manufacturer_data_start)
if not any(
manufacturer_data.startswith(manufacturer_data_start_bytes)
for manufacturer_data in advertisement_data.manufacturer_data.values()
if not service_info.manufacturer_data[manufacturer_id].startswith(
bytes(manufacturer_data_start)
):
return False
if (local_name := matcher.get(LOCAL_NAME)) and (
(device_name := advertisement_data.local_name or service_info.device.name)
is None
or not _memorized_fnmatch(
device_name,
local_name,
)
if (local_name := matcher.get(LOCAL_NAME)) and not _memorized_fnmatch(
service_info.name,
local_name,
):
return False

View File

@ -2,7 +2,7 @@
import asyncio
from datetime import timedelta
import time
from unittest.mock import ANY, MagicMock, Mock, patch
from unittest.mock import ANY, AsyncMock, MagicMock, Mock, patch
from bleak import BleakError
from bleak.backends.scanner import AdvertisementData, BLEDevice
@ -376,6 +376,56 @@ async def test_discovery_match_by_service_uuid(
assert mock_config_flow.mock_calls[0][1][0] == "switchbot"
@patch.object(
bluetooth,
"async_get_bluetooth",
return_value=[
{
"domain": "sensorpush",
"local_name": "s",
"service_uuid": "ef090000-11d6-42ba-93b8-9dd7ec090aa9",
}
],
)
async def test_discovery_match_by_service_uuid_and_short_local_name(
mock_async_get_bluetooth: AsyncMock,
hass: HomeAssistant,
mock_bleak_scanner_start: MagicMock,
mock_bluetooth_adapters: None,
) -> None:
"""Test bluetooth discovery match by service_uuid and short local name."""
entry = MockConfigEntry(domain="bluetooth", unique_id="00:00:00:00:00:01")
entry.add_to_hass(hass)
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
with patch.object(hass.config_entries.flow, "async_init") as mock_config_flow:
await async_setup_with_default_adapter(hass)
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
assert len(mock_bleak_scanner_start.mock_calls) == 1
wrong_device = generate_ble_device("44:44:33:11:23:45", "wrong_name")
wrong_adv = generate_advertisement_data(local_name="s", service_uuids=[])
inject_advertisement(hass, wrong_device, wrong_adv)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 0
ht1_device = generate_ble_device("44:44:33:11:23:45", "s")
ht1_adv = generate_advertisement_data(
local_name="s", service_uuids=["ef090000-11d6-42ba-93b8-9dd7ec090aa9"]
)
inject_advertisement(hass, ht1_device, ht1_adv)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 1
assert mock_config_flow.mock_calls[0][1][0] == "sensorpush"
def _domains_from_mock_config_flow(mock_config_flow: Mock) -> list[str]:
"""Get all the domains that were passed to async_init except bluetooth."""
return [call[1][0] for call in mock_config_flow.mock_calls if call[1][0] != DOMAIN]
@ -2016,14 +2066,6 @@ async def test_register_callback_by_local_name_overly_broad(
):
await async_setup_with_default_adapter(hass)
with pytest.raises(ValueError):
bluetooth.async_register_callback(
hass,
_fake_subscriber,
{LOCAL_NAME: "a"},
BluetoothScanningMode.ACTIVE,
)
with pytest.raises(ValueError):
bluetooth.async_register_callback(
hass,