1
mirror of https://github.com/home-assistant/supervisor synced 2024-07-11 06:27:55 +02:00
ha-supervisor/tests/test_bus.py
Pascal Vizeli caacb421c1
Support dynamic device access cgroup (#3421)
* Support dynamic device access cgroup

* Clean listener better

* Update supervisor/docker/addon.py

Co-authored-by: Stefan Agner <stefan@agner.ch>

* Update addon.py

* Fix black

Co-authored-by: Stefan Agner <stefan@agner.ch>
2022-01-26 16:48:23 +01:00

71 lines
1.8 KiB
Python

"""Test bus backend."""
import asyncio
import pytest
from supervisor.const import BusEvent
from supervisor.coresys import CoreSys
@pytest.mark.asyncio
async def test_bus_event(coresys: CoreSys) -> None:
"""Test bus events over the backend."""
results = []
async def callback(data) -> None:
"""Test callback."""
results.append(data)
coresys.bus.register_event(BusEvent.HARDWARE_NEW_DEVICE, callback)
coresys.bus.fire_event(BusEvent.HARDWARE_NEW_DEVICE, None)
await asyncio.sleep(0)
assert results[-1] is None
coresys.bus.fire_event(BusEvent.HARDWARE_NEW_DEVICE, "test")
await asyncio.sleep(0)
assert results[-1] == "test"
@pytest.mark.asyncio
async def test_bus_event_not_called(coresys: CoreSys) -> None:
"""Test bus events over the backend."""
results = []
async def callback(data) -> None:
"""Test callback."""
results.append(data)
coresys.bus.register_event(BusEvent.HARDWARE_NEW_DEVICE, callback)
coresys.bus.fire_event(BusEvent.HARDWARE_REMOVE_DEVICE, None)
await asyncio.sleep(0)
assert len(results) == 0
@pytest.mark.asyncio
async def test_bus_event_removed(coresys: CoreSys) -> None:
"""Test bus events over the backend and remove."""
results = []
async def callback(data) -> None:
"""Test callback."""
results.append(data)
listener = coresys.bus.register_event(BusEvent.HARDWARE_NEW_DEVICE, callback)
coresys.bus.fire_event(BusEvent.HARDWARE_NEW_DEVICE, None)
await asyncio.sleep(0)
assert results[-1] is None
coresys.bus.fire_event(BusEvent.HARDWARE_NEW_DEVICE, "test")
await asyncio.sleep(0)
assert results[-1] == "test"
coresys.bus.remove_listener(listener)
coresys.bus.fire_event(BusEvent.HARDWARE_NEW_DEVICE, None)
await asyncio.sleep(0)
assert results[-1] == "test"