1
mirror of https://github.com/home-assistant/core synced 2024-09-25 00:41:32 +02:00

Add event sensors for risco (#39594)

* Add Risco event sensors

* Fix lint
This commit is contained in:
On Freund 2020-09-04 22:11:07 +03:00 committed by GitHub
parent 9b23d7c2fd
commit ad6e8b2d62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 404 additions and 84 deletions

View File

@ -15,13 +15,15 @@ from homeassistant.const import (
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.storage import Store
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DATA_COORDINATOR, DEFAULT_SCAN_INTERVAL, DOMAIN
from .const import DATA_COORDINATOR, DEFAULT_SCAN_INTERVAL, DOMAIN, EVENTS_COORDINATOR
PLATFORMS = ["alarm_control_panel", "binary_sensor"]
PLATFORMS = ["alarm_control_panel", "binary_sensor", "sensor"]
UNDO_UPDATE_LISTENER = "undo_update_listener"
LAST_EVENT_STORAGE_VERSION = 1
LAST_EVENT_TIMESTAMP_KEY = "last_event_timestamp"
_LOGGER = logging.getLogger(__name__)
@ -46,12 +48,16 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
scan_interval = entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
coordinator = RiscoDataUpdateCoordinator(hass, risco, scan_interval)
await coordinator.async_refresh()
events_coordinator = RiscoEventsDataUpdateCoordinator(
hass, risco, entry.entry_id, 60
)
undo_listener = entry.add_update_listener(_update_listener)
hass.data[DOMAIN][entry.entry_id] = {
DATA_COORDINATOR: coordinator,
UNDO_UPDATE_LISTENER: undo_listener,
EVENTS_COORDINATOR: events_coordinator,
}
for component in PLATFORMS:
@ -105,3 +111,37 @@ class RiscoDataUpdateCoordinator(DataUpdateCoordinator):
return await self.risco.get_state()
except (CannotConnectError, UnauthorizedError, OperationError) as error:
raise UpdateFailed(error) from error
class RiscoEventsDataUpdateCoordinator(DataUpdateCoordinator):
"""Class to manage fetching risco data."""
def __init__(self, hass, risco, eid, scan_interval):
"""Initialize global risco data updater."""
self.risco = risco
self._store = Store(
hass, LAST_EVENT_STORAGE_VERSION, f"risco_{eid}_last_event_timestamp"
)
interval = timedelta(seconds=scan_interval)
super().__init__(
hass,
_LOGGER,
name=f"{DOMAIN}_events",
update_interval=interval,
)
async def _async_update_data(self):
"""Fetch data from risco."""
last_store = await self._store.async_load() or {}
last_timestamp = last_store.get(
LAST_EVENT_TIMESTAMP_KEY, "2020-01-01T00:00:00Z"
)
try:
events = await self.risco.get_events(last_timestamp, 10)
except (CannotConnectError, UnauthorizedError, OperationError) as error:
raise UpdateFailed(error) from error
if len(events) > 0:
await self._store.async_save({LAST_EVENT_TIMESTAMP_KEY: events[0].time})
return events

View File

@ -8,7 +8,10 @@ from homeassistant.const import (
DOMAIN = "risco"
RISCO_EVENT = "risco_event"
DATA_COORDINATOR = "risco"
EVENTS_COORDINATOR = "risco_events"
DEFAULT_SCAN_INTERVAL = 30

View File

@ -4,7 +4,7 @@
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/risco",
"requirements": [
"pyrisco==0.2.4"
"pyrisco==0.3.0"
],
"codeowners": [
"@OnFreund"

View File

@ -0,0 +1,96 @@
"""Sensor for Risco Events."""
from homeassistant.const import DEVICE_CLASS_TIMESTAMP
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN, EVENTS_COORDINATOR
CATEGORIES = {
2: "Alarm",
4: "Status",
7: "Trouble",
}
EVENT_ATTRIBUTES = [
"category_id",
"category_name",
"type_id",
"type_name",
"name",
"text",
"partition_id",
"zone_id",
"user_id",
"group",
"priority",
"raw",
]
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up sensors for device."""
coordinator = hass.data[DOMAIN][config_entry.entry_id][EVENTS_COORDINATOR]
sensors = [
RiscoSensor(coordinator, id, [], name) for id, name in CATEGORIES.items()
]
sensors.append(RiscoSensor(coordinator, None, CATEGORIES.keys(), "Other"))
async_add_entities(sensors)
class RiscoSensor(CoordinatorEntity):
"""Sensor for Risco events."""
def __init__(self, coordinator, category_id, excludes, name) -> None:
"""Initialize sensor."""
super().__init__(coordinator)
self._event = None
self._category_id = category_id
self._excludes = excludes
self._name = name
@property
def name(self):
"""Return the name of the sensor."""
return f"Risco {self.coordinator.risco.site_name} {self._name} Events"
@property
def unique_id(self):
"""Return a unique id for this sensor."""
return f"events_{self._name}_{self.coordinator.risco.site_uuid}"
async def async_added_to_hass(self):
"""When entity is added to hass."""
self.async_on_remove(
self.coordinator.async_add_listener(self._refresh_from_coordinator)
)
await self.coordinator.async_request_refresh()
def _refresh_from_coordinator(self):
events = self.coordinator.data
for event in reversed(events):
if event.category_id in self._excludes:
continue
if self._category_id is not None and event.category_id != self._category_id:
continue
self._event = event
self.async_write_ha_state()
@property
def state(self):
"""Value of sensor."""
if self._event is None:
return None
return self._event.time
@property
def device_state_attributes(self):
"""State attributes."""
if self._event is None:
return None
return {atr: getattr(self._event, atr, None) for atr in EVENT_ATTRIBUTES}
@property
def device_class(self):
"""Device class of sensor."""
return DEVICE_CLASS_TIMESTAMP

View File

@ -1591,7 +1591,7 @@ pyrecswitch==1.0.2
pyrepetier==3.0.5
# homeassistant.components.risco
pyrisco==0.2.4
pyrisco==0.3.0
# homeassistant.components.sabnzbd
pysabnzbd==1.1.0

View File

@ -766,7 +766,7 @@ pyps4-2ndscreen==1.1.1
pyqwikswitch==0.93
# homeassistant.components.risco
pyrisco==0.2.4
pyrisco==0.3.0
# homeassistant.components.acer_projector
# homeassistant.components.zha

View File

@ -11,9 +11,6 @@ from homeassistant.components.alarm_control_panel.const import (
from homeassistant.components.risco import CannotConnectError, UnauthorizedError
from homeassistant.components.risco.const import DOMAIN
from homeassistant.const import (
CONF_PASSWORD,
CONF_PIN,
CONF_USERNAME,
SERVICE_ALARM_ARM_AWAY,
SERVICE_ALARM_ARM_CUSTOM_BYPASS,
SERVICE_ALARM_ARM_HOME,
@ -30,16 +27,11 @@ from homeassistant.const import (
)
from homeassistant.helpers.entity_component import async_update_entity
from .util import TEST_CONFIG, TEST_SITE_UUID, setup_risco
from tests.async_mock import MagicMock, PropertyMock, patch
from tests.common import MockConfigEntry
TEST_CONFIG = {
CONF_USERNAME: "test-username",
CONF_PASSWORD: "test-password",
CONF_PIN: "1234",
}
TEST_SITE_UUID = "test-site-uuid"
TEST_SITE_NAME = "test-site-name"
FIRST_ENTITY_ID = "alarm_control_panel.risco_test_site_name_partition_0"
SECOND_ENTITY_ID = "alarm_control_panel.risco_test_site_name_partition_1"
@ -110,28 +102,6 @@ def two_part_alarm():
yield alarm_mock
async def _setup_risco(hass, options={}):
config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG, options=options)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.risco.RiscoAPI.login",
return_value=True,
), patch(
"homeassistant.components.risco.RiscoAPI.site_uuid",
new_callable=PropertyMock(return_value=TEST_SITE_UUID),
), patch(
"homeassistant.components.risco.RiscoAPI.site_name",
new_callable=PropertyMock(return_value=TEST_SITE_NAME),
), patch(
"homeassistant.components.risco.RiscoAPI.close"
):
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
return config_entry
async def test_cannot_connect(hass):
"""Test connection error."""
@ -171,7 +141,7 @@ async def test_setup(hass, two_part_alarm):
assert not registry.async_is_registered(FIRST_ENTITY_ID)
assert not registry.async_is_registered(SECOND_ENTITY_ID)
await _setup_risco(hass)
await setup_risco(hass)
assert registry.async_is_registered(FIRST_ENTITY_ID)
assert registry.async_is_registered(SECOND_ENTITY_ID)
@ -196,7 +166,7 @@ async def _check_state(hass, alarm, property, state, entity_id, partition_id):
async def test_states(hass, two_part_alarm):
"""Test the various alarm states."""
await _setup_risco(hass, CUSTOM_MAPPING_OPTIONS)
await setup_risco(hass, CUSTOM_MAPPING_OPTIONS)
assert hass.states.get(FIRST_ENTITY_ID).state == STATE_UNKNOWN
for partition_id, entity_id in {0: FIRST_ENTITY_ID, 1: SECOND_ENTITY_ID}.items():
@ -278,7 +248,7 @@ async def _call_alarm_service(hass, service, entity_id, **kwargs):
async def test_sets_custom_mapping(hass, two_part_alarm):
"""Test settings the various modes when mapping some states."""
await _setup_risco(hass, CUSTOM_MAPPING_OPTIONS)
await setup_risco(hass, CUSTOM_MAPPING_OPTIONS)
registry = await hass.helpers.entity_registry.async_get_registry()
entity = registry.async_get(FIRST_ENTITY_ID)
@ -304,7 +274,7 @@ async def test_sets_custom_mapping(hass, two_part_alarm):
async def test_sets_full_custom_mapping(hass, two_part_alarm):
"""Test settings the various modes when mapping all states."""
await _setup_risco(hass, FULL_CUSTOM_MAPPING)
await setup_risco(hass, FULL_CUSTOM_MAPPING)
registry = await hass.helpers.entity_registry.async_get_registry()
entity = registry.async_get(FIRST_ENTITY_ID)
@ -338,7 +308,7 @@ async def test_sets_full_custom_mapping(hass, two_part_alarm):
async def test_sets_with_correct_code(hass, two_part_alarm):
"""Test settings the various modes when code is required."""
await _setup_risco(hass, {**CUSTOM_MAPPING_OPTIONS, **CODES_REQUIRED_OPTIONS})
await setup_risco(hass, {**CUSTOM_MAPPING_OPTIONS, **CODES_REQUIRED_OPTIONS})
code = {"code": 1234}
await _test_service_call(
@ -380,7 +350,7 @@ async def test_sets_with_correct_code(hass, two_part_alarm):
async def test_sets_with_incorrect_code(hass, two_part_alarm):
"""Test settings the various modes when code is required and incorrect."""
await _setup_risco(hass, {**CUSTOM_MAPPING_OPTIONS, **CODES_REQUIRED_OPTIONS})
await setup_risco(hass, {**CUSTOM_MAPPING_OPTIONS, **CODES_REQUIRED_OPTIONS})
code = {"code": 4321}
await _test_no_service_call(

View File

@ -3,25 +3,14 @@ import pytest
from homeassistant.components.risco import CannotConnectError, UnauthorizedError
from homeassistant.components.risco.const import DOMAIN
from homeassistant.const import (
CONF_PASSWORD,
CONF_PIN,
CONF_USERNAME,
STATE_OFF,
STATE_ON,
)
from homeassistant.const import STATE_OFF, STATE_ON
from homeassistant.helpers.entity_component import async_update_entity
from .util import TEST_CONFIG, TEST_SITE_UUID, setup_risco
from tests.async_mock import MagicMock, PropertyMock, patch
from tests.common import MockConfigEntry
TEST_CONFIG = {
CONF_USERNAME: "test-username",
CONF_PASSWORD: "test-password",
CONF_PIN: "1234",
}
TEST_SITE_UUID = "test-site-uuid"
TEST_SITE_NAME = "test-site-name"
FIRST_ENTITY_ID = "binary_sensor.zone_0"
SECOND_ENTITY_ID = "binary_sensor.zone_1"
@ -57,28 +46,6 @@ def two_zone_alarm():
yield alarm_mock
async def _setup_risco(hass):
config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.risco.RiscoAPI.login",
return_value=True,
), patch(
"homeassistant.components.risco.RiscoAPI.site_uuid",
new_callable=PropertyMock(return_value=TEST_SITE_UUID),
), patch(
"homeassistant.components.risco.RiscoAPI.site_name",
new_callable=PropertyMock(return_value=TEST_SITE_NAME),
), patch(
"homeassistant.components.risco.RiscoAPI.close"
):
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
return config_entry
async def test_cannot_connect(hass):
"""Test connection error."""
@ -118,7 +85,7 @@ async def test_setup(hass, two_zone_alarm):
assert not registry.async_is_registered(FIRST_ENTITY_ID)
assert not registry.async_is_registered(SECOND_ENTITY_ID)
await _setup_risco(hass)
await setup_risco(hass)
assert registry.async_is_registered(FIRST_ENTITY_ID)
assert registry.async_is_registered(SECOND_ENTITY_ID)
@ -153,7 +120,7 @@ async def _check_state(hass, alarm, triggered, bypassed, entity_id, zone_id):
async def test_states(hass, two_zone_alarm):
"""Test the various alarm states."""
await _setup_risco(hass)
await setup_risco(hass)
await _check_state(hass, two_zone_alarm, True, True, FIRST_ENTITY_ID, 0)
await _check_state(hass, two_zone_alarm, True, False, FIRST_ENTITY_ID, 0)
@ -167,7 +134,7 @@ async def test_states(hass, two_zone_alarm):
async def test_bypass(hass, two_zone_alarm):
"""Test bypassing a zone."""
await _setup_risco(hass)
await setup_risco(hass)
with patch("homeassistant.components.risco.RiscoAPI.bypass_zone") as mock:
data = {"entity_id": FIRST_ENTITY_ID}
@ -180,7 +147,7 @@ async def test_bypass(hass, two_zone_alarm):
async def test_unbypass(hass, two_zone_alarm):
"""Test unbypassing a zone."""
await _setup_risco(hass)
await setup_risco(hass)
with patch("homeassistant.components.risco.RiscoAPI.bypass_zone") as mock:
data = {"entity_id": FIRST_ENTITY_ID}

View File

@ -0,0 +1,207 @@
"""Tests for the Risco event sensors."""
import pytest
from homeassistant.components.risco import (
LAST_EVENT_TIMESTAMP_KEY,
CannotConnectError,
UnauthorizedError,
)
from homeassistant.components.risco.const import DOMAIN, EVENTS_COORDINATOR
from .util import TEST_CONFIG, setup_risco
from tests.async_mock import MagicMock, patch
from tests.common import MockConfigEntry
ENTITY_IDS = {
"Alarm": "sensor.risco_test_site_name_alarm_events",
"Status": "sensor.risco_test_site_name_status_events",
"Trouble": "sensor.risco_test_site_name_trouble_events",
"Other": "sensor.risco_test_site_name_other_events",
}
TEST_EVENTS = [
MagicMock(
time="2020-09-02T10:00:00Z",
category_id=4,
category_name="System Status",
type_id=16,
type_name="disarmed",
name="'user' disarmed 'partition'",
text="",
partition_id=0,
zone_id=None,
user_id=3,
group=None,
priority=2,
raw={},
),
MagicMock(
time="2020-09-02T09:00:00Z",
category_id=7,
category_name="Troubles",
type_id=36,
type_name="service needed",
name="Device Fault",
text="Service is needed.",
partition_id=None,
zone_id=None,
user_id=None,
group=None,
priority=1,
raw={},
),
MagicMock(
time="2020-09-02T08:00:00Z",
category_id=2,
category_name="Alarms",
type_id=3,
type_name="triggered",
name="Alarm is on",
text="Yes it is.",
partition_id=0,
zone_id=12,
user_id=None,
group=None,
priority=0,
raw={},
),
MagicMock(
time="2020-09-02T07:00:00Z",
category_id=4,
category_name="System Status",
type_id=119,
type_name="group arm",
name="You armed a group",
text="",
partition_id=0,
zone_id=None,
user_id=1,
group="C",
priority=2,
raw={},
),
MagicMock(
time="2020-09-02T06:00:00Z",
category_id=8,
category_name="Made up",
type_id=200,
type_name="also made up",
name="really made up",
text="",
partition_id=2,
zone_id=None,
user_id=1,
group=None,
priority=2,
raw={},
),
]
CATEGORIES_TO_EVENTS = {
"Alarm": 2,
"Status": 0,
"Trouble": 1,
"Other": 4,
}
@pytest.fixture
def emptry_alarm():
"""Fixture to mock an empty alarm."""
with patch(
"homeassistant.components.risco.RiscoAPI.get_state",
return_value=MagicMock(paritions={}, zones={}),
):
yield
async def test_cannot_connect(hass):
"""Test connection error."""
with patch(
"homeassistant.components.risco.RiscoAPI.login",
side_effect=CannotConnectError,
):
config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG)
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
registry = await hass.helpers.entity_registry.async_get_registry()
for id in ENTITY_IDS.values():
assert not registry.async_is_registered(id)
async def test_unauthorized(hass):
"""Test unauthorized error."""
with patch(
"homeassistant.components.risco.RiscoAPI.login",
side_effect=UnauthorizedError,
):
config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG)
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
registry = await hass.helpers.entity_registry.async_get_registry()
for id in ENTITY_IDS.values():
assert not registry.async_is_registered(id)
def _check_state(hass, category, entity_id):
event = TEST_EVENTS[CATEGORIES_TO_EVENTS[category]]
assert hass.states.get(entity_id).state == event.time
assert hass.states.get(entity_id).attributes["category_id"] == event.category_id
assert hass.states.get(entity_id).attributes["category_name"] == event.category_name
assert hass.states.get(entity_id).attributes["type_id"] == event.type_id
assert hass.states.get(entity_id).attributes["type_name"] == event.type_name
assert hass.states.get(entity_id).attributes["name"] == event.name
assert hass.states.get(entity_id).attributes["text"] == event.text
assert hass.states.get(entity_id).attributes["partition_id"] == event.partition_id
assert hass.states.get(entity_id).attributes["zone_id"] == event.zone_id
assert hass.states.get(entity_id).attributes["user_id"] == event.user_id
assert hass.states.get(entity_id).attributes["group"] == event.group
assert hass.states.get(entity_id).attributes["priority"] == event.priority
assert hass.states.get(entity_id).attributes["raw"] == event.raw
async def test_setup(hass, emptry_alarm):
"""Test entity setup."""
registry = await hass.helpers.entity_registry.async_get_registry()
for id in ENTITY_IDS.values():
assert not registry.async_is_registered(id)
with patch(
"homeassistant.components.risco.RiscoAPI.get_events",
return_value=TEST_EVENTS,
), patch(
"homeassistant.components.risco.Store.async_save",
) as save_mock:
entry = await setup_risco(hass)
await hass.async_block_till_done()
save_mock.assert_awaited_once_with(
{LAST_EVENT_TIMESTAMP_KEY: TEST_EVENTS[0].time}
)
for id in ENTITY_IDS.values():
assert registry.async_is_registered(id)
for category, entity_id in ENTITY_IDS.items():
_check_state(hass, category, entity_id)
coordinator = hass.data[DOMAIN][entry.entry_id][EVENTS_COORDINATOR]
with patch(
"homeassistant.components.risco.RiscoAPI.get_events", return_value=[]
) as events_mock, patch(
"homeassistant.components.risco.Store.async_load",
return_value={LAST_EVENT_TIMESTAMP_KEY: TEST_EVENTS[0].time},
):
await coordinator.async_refresh()
await hass.async_block_till_done()
events_mock.assert_awaited_once_with(TEST_EVENTS[0].time, 10)
for category, entity_id in ENTITY_IDS.items():
_check_state(hass, category, entity_id)

View File

@ -0,0 +1,37 @@
"""Utilities for Risco tests."""
from homeassistant.components.risco.const import DOMAIN
from homeassistant.const import CONF_PASSWORD, CONF_PIN, CONF_USERNAME
from tests.async_mock import PropertyMock, patch
from tests.common import MockConfigEntry
TEST_CONFIG = {
CONF_USERNAME: "test-username",
CONF_PASSWORD: "test-password",
CONF_PIN: "1234",
}
TEST_SITE_UUID = "test-site-uuid"
TEST_SITE_NAME = "test-site-name"
async def setup_risco(hass, options={}):
"""Set up a Risco integration for testing."""
config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG, options=options)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.risco.RiscoAPI.login",
return_value=True,
), patch(
"homeassistant.components.risco.RiscoAPI.site_uuid",
new_callable=PropertyMock(return_value=TEST_SITE_UUID),
), patch(
"homeassistant.components.risco.RiscoAPI.site_name",
new_callable=PropertyMock(return_value=TEST_SITE_NAME),
), patch(
"homeassistant.components.risco.RiscoAPI.close"
):
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
return config_entry