GeoNet NZ Quakes Sensor (#26078)

* working version of status sensor

* changed unit of measurement

* align naming with feed source

* simplified sensor name

* fix potential issue during initialisation

* fixed tests

* changed icon to constant

* added tests for new sensor

* split tests for geolocation vs sensor

* fixed lint

* fixed pylint

* fixed test

* removed config entry id from attributes

* moved entity manager to component

* fix issue with multiple config entries overriding each other's data

* creating async tasks instead of awaiting each unloading

* moved manager to component

* correctly triggering update only when this component is loaded

* fixed tests after major code refactorings

* fixed pylint

* moved actual creation of new events to geolocation platform

* changed all timestamps to utc

* changed the way platforms are setup and manager is updated

* simplify assert statement

* changed the way waiting for unloading platforms
This commit is contained in:
Malte Franken 2019-09-04 01:16:13 +10:00 committed by Martin Hjelmare
parent fa79ef1220
commit 13bb2ea35a
7 changed files with 474 additions and 169 deletions

View File

@ -1,27 +1,47 @@
"""The GeoNet NZ Quakes integration.""" """The GeoNet NZ Quakes integration."""
import voluptuous as vol import asyncio
import logging
from datetime import timedelta
import voluptuous as vol
from aio_geojson_geonetnz_quakes import GeonetnzQuakesFeedManager
from homeassistant.core import callback
from homeassistant.util.unit_system import METRIC_SYSTEM
from homeassistant.config_entries import SOURCE_IMPORT from homeassistant.config_entries import SOURCE_IMPORT
from homeassistant.const import ( from homeassistant.const import (
CONF_LATITUDE, CONF_LATITUDE,
CONF_LONGITUDE, CONF_LONGITUDE,
CONF_RADIUS, CONF_RADIUS,
CONF_SCAN_INTERVAL, CONF_SCAN_INTERVAL,
CONF_UNIT_SYSTEM_IMPERIAL,
CONF_UNIT_SYSTEM,
LENGTH_MILES,
) )
from homeassistant.helpers import config_validation as cv from homeassistant.helpers import config_validation as cv, aiohttp_client
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.event import async_track_time_interval
from .config_flow import configured_instances from .config_flow import configured_instances
from .const import ( from .const import (
PLATFORMS,
CONF_MINIMUM_MAGNITUDE, CONF_MINIMUM_MAGNITUDE,
CONF_MMI, CONF_MMI,
DEFAULT_FILTER_TIME_INTERVAL,
DEFAULT_MINIMUM_MAGNITUDE, DEFAULT_MINIMUM_MAGNITUDE,
DEFAULT_MMI, DEFAULT_MMI,
DEFAULT_RADIUS, DEFAULT_RADIUS,
DEFAULT_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL,
DOMAIN, DOMAIN,
FEED, FEED,
SIGNAL_DELETE_ENTITY,
SIGNAL_NEW_GEOLOCATION,
SIGNAL_STATUS,
SIGNAL_UPDATE_ENTITY,
) )
_LOGGER = logging.getLogger(__name__)
CONFIG_SCHEMA = vol.Schema( CONFIG_SCHEMA = vol.Schema(
{ {
DOMAIN: vol.Schema( DOMAIN: vol.Schema(
@ -81,13 +101,20 @@ async def async_setup(hass, config):
async def async_setup_entry(hass, config_entry): async def async_setup_entry(hass, config_entry):
"""Set up the GeoNet NZ Quakes component as config entry.""" """Set up the GeoNet NZ Quakes component as config entry."""
hass.data[DOMAIN] = {} if DOMAIN not in hass.data:
hass.data[DOMAIN][FEED] = {} hass.data[DOMAIN] = {}
if FEED not in hass.data[DOMAIN]:
hass.async_create_task( hass.data[DOMAIN][FEED] = {}
hass.config_entries.async_forward_entry_setup(config_entry, "geo_location")
)
radius = config_entry.data[CONF_RADIUS]
unit_system = config_entry.data[CONF_UNIT_SYSTEM]
if unit_system == CONF_UNIT_SYSTEM_IMPERIAL:
radius = METRIC_SYSTEM.length(radius, LENGTH_MILES)
# Create feed entity manager for all platforms.
manager = GeonetnzQuakesFeedEntityManager(hass, config_entry, radius, unit_system)
hass.data[DOMAIN][FEED][config_entry.entry_id] = manager
_LOGGER.debug("Feed entity manager added for %s", config_entry.entry_id)
await manager.async_init()
return True return True
@ -95,7 +122,114 @@ async def async_unload_entry(hass, config_entry):
"""Unload an GeoNet NZ Quakes component config entry.""" """Unload an GeoNet NZ Quakes component config entry."""
manager = hass.data[DOMAIN][FEED].pop(config_entry.entry_id) manager = hass.data[DOMAIN][FEED].pop(config_entry.entry_id)
await manager.async_stop() await manager.async_stop()
await asyncio.wait(
await hass.config_entries.async_forward_entry_unload(config_entry, "geo_location") [
hass.config_entries.async_forward_entry_unload(config_entry, domain)
for domain in PLATFORMS
]
)
return True return True
class GeonetnzQuakesFeedEntityManager:
"""Feed Entity Manager for GeoNet NZ Quakes feed."""
def __init__(self, hass, config_entry, radius_in_km, unit_system):
"""Initialize the Feed Entity Manager."""
self._hass = hass
self._config_entry = config_entry
coordinates = (
config_entry.data[CONF_LATITUDE],
config_entry.data[CONF_LONGITUDE],
)
websession = aiohttp_client.async_get_clientsession(hass)
self._feed_manager = GeonetnzQuakesFeedManager(
websession,
self._generate_entity,
self._update_entity,
self._remove_entity,
coordinates,
mmi=config_entry.data[CONF_MMI],
filter_radius=radius_in_km,
filter_minimum_magnitude=config_entry.data[CONF_MINIMUM_MAGNITUDE],
filter_time=DEFAULT_FILTER_TIME_INTERVAL,
status_callback=self._status_update,
)
self._config_entry_id = config_entry.entry_id
self._scan_interval = timedelta(seconds=config_entry.data[CONF_SCAN_INTERVAL])
self._unit_system = unit_system
self._track_time_remove_callback = None
self._status_info = None
self.listeners = []
async def async_init(self):
"""Schedule initial and regular updates based on configured time interval."""
for domain in PLATFORMS:
self._hass.async_create_task(
self._hass.config_entries.async_forward_entry_setup(
self._config_entry, domain
)
)
async def update(event_time):
"""Update."""
await self.async_update()
# Trigger updates at regular intervals.
self._track_time_remove_callback = async_track_time_interval(
self._hass, update, self._scan_interval
)
_LOGGER.debug("Feed entity manager initialized")
async def async_update(self):
"""Refresh data."""
await self._feed_manager.update()
_LOGGER.debug("Feed entity manager updated")
async def async_stop(self):
"""Stop this feed entity manager from refreshing."""
for unsub_dispatcher in self.listeners:
unsub_dispatcher()
self.listeners = []
if self._track_time_remove_callback:
self._track_time_remove_callback()
_LOGGER.debug("Feed entity manager stopped")
@callback
def async_event_new_entity(self):
"""Return manager specific event to signal new entity."""
return SIGNAL_NEW_GEOLOCATION.format(self._config_entry_id)
def get_entry(self, external_id):
"""Get feed entry by external id."""
return self._feed_manager.feed_entries.get(external_id)
def status_info(self):
"""Return latest status update info received."""
return self._status_info
async def _generate_entity(self, external_id):
"""Generate new entity."""
async_dispatcher_send(
self._hass,
self.async_event_new_entity(),
self,
external_id,
self._unit_system,
)
async def _update_entity(self, external_id):
"""Update entity."""
async_dispatcher_send(self._hass, SIGNAL_UPDATE_ENTITY.format(external_id))
async def _remove_entity(self, external_id):
"""Remove entity."""
async_dispatcher_send(self._hass, SIGNAL_DELETE_ENTITY.format(external_id))
async def _status_update(self, status_info):
"""Propagate status update."""
_LOGGER.debug("Status update received: %s", status_info)
self._status_info = status_info
async_dispatcher_send(self._hass, SIGNAL_STATUS.format(self._config_entry_id))

View File

@ -3,12 +3,21 @@ from datetime import timedelta
DOMAIN = "geonetnz_quakes" DOMAIN = "geonetnz_quakes"
PLATFORMS = ("sensor", "geo_location")
CONF_MINIMUM_MAGNITUDE = "minimum_magnitude" CONF_MINIMUM_MAGNITUDE = "minimum_magnitude"
CONF_MMI = "mmi" CONF_MMI = "mmi"
FEED = "feed" FEED = "feed"
DEFAULT_FILTER_TIME_INTERVAL = timedelta(days=7)
DEFAULT_MINIMUM_MAGNITUDE = 0.0 DEFAULT_MINIMUM_MAGNITUDE = 0.0
DEFAULT_MMI = 3 DEFAULT_MMI = 3
DEFAULT_RADIUS = 50.0 DEFAULT_RADIUS = 50.0
DEFAULT_SCAN_INTERVAL = timedelta(minutes=5) DEFAULT_SCAN_INTERVAL = timedelta(minutes=5)
SIGNAL_DELETE_ENTITY = "geonetnz_quakes_delete_{}"
SIGNAL_UPDATE_ENTITY = "geonetnz_quakes_update_{}"
SIGNAL_STATUS = "geonetnz_quakes_status_{}"
SIGNAL_NEW_GEOLOCATION = "geonetnz_quakes_new_geolocation_{}"

View File

@ -1,33 +1,20 @@
"""Geolocation support for GeoNet NZ Quakes Feeds.""" """Geolocation support for GeoNet NZ Quakes Feeds."""
from datetime import timedelta
import logging import logging
from typing import Optional from typing import Optional
from aio_geojson_geonetnz_quakes import GeonetnzQuakesFeedManager
from homeassistant.components.geo_location import GeolocationEvent from homeassistant.components.geo_location import GeolocationEvent
from homeassistant.const import ( from homeassistant.const import (
ATTR_ATTRIBUTION, ATTR_ATTRIBUTION,
CONF_LATITUDE,
CONF_LONGITUDE,
CONF_RADIUS,
CONF_SCAN_INTERVAL,
CONF_UNIT_SYSTEM,
CONF_UNIT_SYSTEM_IMPERIAL, CONF_UNIT_SYSTEM_IMPERIAL,
LENGTH_KILOMETERS, LENGTH_KILOMETERS,
LENGTH_MILES, LENGTH_MILES,
ATTR_TIME, ATTR_TIME,
) )
from homeassistant.core import callback from homeassistant.core import callback
from homeassistant.helpers import aiohttp_client from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.dispatcher import ( from homeassistant.util.unit_system import IMPERIAL_SYSTEM
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.util.unit_system import IMPERIAL_SYSTEM, METRIC_SYSTEM
from .const import CONF_MINIMUM_MAGNITUDE, CONF_MMI, DOMAIN, FEED from .const import DOMAIN, FEED, SIGNAL_DELETE_ENTITY, SIGNAL_UPDATE_ENTITY
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -39,111 +26,27 @@ ATTR_MMI = "mmi"
ATTR_PUBLICATION_DATE = "publication_date" ATTR_PUBLICATION_DATE = "publication_date"
ATTR_QUALITY = "quality" ATTR_QUALITY = "quality"
DEFAULT_FILTER_TIME_INTERVAL = timedelta(days=7)
SIGNAL_DELETE_ENTITY = "geonetnz_quakes_delete_{}"
SIGNAL_UPDATE_ENTITY = "geonetnz_quakes_update_{}"
SOURCE = "geonetnz_quakes" SOURCE = "geonetnz_quakes"
async def async_setup_entry(hass, entry, async_add_entities): async def async_setup_entry(hass, entry, async_add_entities):
"""Set up the GeoNet NZ Quakes Feed platform.""" """Set up the GeoNet NZ Quakes Feed platform."""
radius = entry.data[CONF_RADIUS] manager = hass.data[DOMAIN][FEED][entry.entry_id]
unit_system = entry.data[CONF_UNIT_SYSTEM]
if unit_system == CONF_UNIT_SYSTEM_IMPERIAL: @callback
radius = METRIC_SYSTEM.length(radius, LENGTH_MILES) def async_add_geolocation(feed_manager, external_id, unit_system):
manager = GeonetnzQuakesFeedEntityManager( """Add gelocation entity from feed."""
hass, new_entity = GeonetnzQuakesEvent(feed_manager, external_id, unit_system)
async_add_entities, _LOGGER.debug("Adding geolocation %s", new_entity)
entry.data[CONF_SCAN_INTERVAL], async_add_entities([new_entity], True)
entry.data[CONF_LATITUDE],
entry.data[CONF_LONGITUDE], manager.listeners.append(
entry.data[CONF_MMI], async_dispatcher_connect(
radius, hass, manager.async_event_new_entity(), async_add_geolocation
unit_system, )
entry.data[CONF_MINIMUM_MAGNITUDE],
) )
hass.data[DOMAIN][FEED][entry.entry_id] = manager hass.async_create_task(manager.async_update())
await manager.async_init() _LOGGER.debug("Geolocation setup done")
class GeonetnzQuakesFeedEntityManager:
"""Feed Entity Manager for GeoNet NZ Quakes feed."""
def __init__(
self,
hass,
async_add_entities,
scan_interval,
latitude,
longitude,
mmi,
radius_in_km,
unit_system,
minimum_magnitude,
):
"""Initialize the Feed Entity Manager."""
self._hass = hass
coordinates = (latitude, longitude)
websession = aiohttp_client.async_get_clientsession(hass)
self._feed_manager = GeonetnzQuakesFeedManager(
websession,
self._generate_entity,
self._update_entity,
self._remove_entity,
coordinates,
mmi=mmi,
filter_radius=radius_in_km,
filter_minimum_magnitude=minimum_magnitude,
filter_time=DEFAULT_FILTER_TIME_INTERVAL,
)
self._async_add_entities = async_add_entities
self._scan_interval = timedelta(seconds=scan_interval)
self._unit_system = unit_system
self._track_time_remove_callback = None
async def async_init(self):
"""Schedule regular updates based on configured time interval."""
async def update(event_time):
"""Update."""
await self.async_update()
await self.async_update()
self._track_time_remove_callback = async_track_time_interval(
self._hass, update, self._scan_interval
)
_LOGGER.debug("Feed entity manager initialized")
async def async_update(self):
"""Refresh data."""
await self._feed_manager.update()
_LOGGER.debug("Feed entity manager updated")
async def async_stop(self):
"""Stop this feed entity manager from refreshing."""
if self._track_time_remove_callback:
self._track_time_remove_callback()
_LOGGER.debug("Feed entity manager stopped")
def get_entry(self, external_id):
"""Get feed entry by external id."""
return self._feed_manager.feed_entries.get(external_id)
async def _generate_entity(self, external_id):
"""Generate new entity."""
new_entity = GeonetnzQuakesEvent(self, external_id, self._unit_system)
# Add new entities to HA.
self._async_add_entities([new_entity], True)
async def _update_entity(self, external_id):
"""Update entity."""
async_dispatcher_send(self._hass, SIGNAL_UPDATE_ENTITY.format(external_id))
async def _remove_entity(self, external_id):
"""Remove entity."""
async_dispatcher_send(self._hass, SIGNAL_DELETE_ENTITY.format(external_id))
class GeonetnzQuakesEvent(GeolocationEvent): class GeonetnzQuakesEvent(GeolocationEvent):

View File

@ -0,0 +1,139 @@
"""Feed Entity Manager Sensor support for GeoNet NZ Quakes Feeds."""
import logging
from typing import Optional
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity import Entity
from homeassistant.util import dt
from .const import DOMAIN, FEED, SIGNAL_STATUS
_LOGGER = logging.getLogger(__name__)
ATTR_STATUS = "status"
ATTR_LAST_UPDATE = "last_update"
ATTR_LAST_UPDATE_SUCCESSFUL = "last_update_successful"
ATTR_LAST_TIMESTAMP = "last_timestamp"
ATTR_CREATED = "created"
ATTR_UPDATED = "updated"
ATTR_REMOVED = "removed"
DEFAULT_ICON = "mdi:pulse"
DEFAULT_UNIT_OF_MEASUREMENT = "quakes"
async def async_setup_entry(hass, entry, async_add_entities):
"""Set up the GeoNet NZ Quakes Feed platform."""
manager = hass.data[DOMAIN][FEED][entry.entry_id]
sensor = GeonetnzQuakesSensor(entry.entry_id, entry.title, manager)
async_add_entities([sensor])
_LOGGER.debug("Sensor setup done")
class GeonetnzQuakesSensor(Entity):
"""This is a status sensor for the GeoNet NZ Quakes integration."""
def __init__(self, config_entry_id, config_title, manager):
"""Initialize entity."""
self._config_entry_id = config_entry_id
self._config_title = config_title
self._manager = manager
self._status = None
self._last_update = None
self._last_update_successful = None
self._last_timestamp = None
self._total = None
self._created = None
self._updated = None
self._removed = None
self._remove_signal_status = None
async def async_added_to_hass(self):
"""Call when entity is added to hass."""
self._remove_signal_status = async_dispatcher_connect(
self.hass,
SIGNAL_STATUS.format(self._config_entry_id),
self._update_status_callback,
)
_LOGGER.debug("Waiting for updates %s", self._config_entry_id)
# First update is manual because of how the feed entity manager is updated.
await self.async_update()
async def async_will_remove_from_hass(self) -> None:
"""Call when entity will be removed from hass."""
if self._remove_signal_status:
self._remove_signal_status()
@callback
def _update_status_callback(self):
"""Call status update method."""
_LOGGER.debug("Received status update for %s", self._config_entry_id)
self.async_schedule_update_ha_state(True)
@property
def should_poll(self):
"""No polling needed for GeoNet NZ Quakes status sensor."""
return False
async def async_update(self):
"""Update this entity from the data held in the feed manager."""
_LOGGER.debug("Updating %s", self._config_entry_id)
if self._manager:
status_info = self._manager.status_info()
if status_info:
self._update_from_status_info(status_info)
def _update_from_status_info(self, status_info):
"""Update the internal state from the provided information."""
self._status = status_info.status
self._last_update = (
dt.as_utc(status_info.last_update) if status_info.last_update else None
)
self._last_update_successful = (
dt.as_utc(status_info.last_update_successful)
if status_info.last_update_successful
else None
)
self._last_timestamp = status_info.last_timestamp
self._total = status_info.total
self._created = status_info.created
self._updated = status_info.updated
self._removed = status_info.removed
@property
def state(self):
"""Return the state of the sensor."""
return self._total
@property
def name(self) -> Optional[str]:
"""Return the name of the entity."""
return f"GeoNet NZ Quakes ({self._config_title})"
@property
def icon(self):
"""Return the icon to use in the frontend, if any."""
return DEFAULT_ICON
@property
def unit_of_measurement(self):
"""Return the unit of measurement."""
return DEFAULT_UNIT_OF_MEASUREMENT
@property
def device_state_attributes(self):
"""Return the device state attributes."""
attributes = {}
for key, value in (
(ATTR_STATUS, self._status),
(ATTR_LAST_UPDATE, self._last_update),
(ATTR_LAST_UPDATE_SUCCESSFUL, self._last_update_successful),
(ATTR_LAST_TIMESTAMP, self._last_timestamp),
(ATTR_CREATED, self._created),
(ATTR_UPDATED, self._updated),
(ATTR_REMOVED, self._removed),
):
if value or isinstance(value, bool):
attributes[key] = value
return attributes

View File

@ -1 +1,31 @@
"""Tests for the geonetnz_quakes component.""" """Tests for the geonetnz_quakes component."""
from unittest.mock import MagicMock
def _generate_mock_feed_entry(
external_id,
title,
distance_to_home,
coordinates,
attribution=None,
depth=None,
magnitude=None,
mmi=None,
locality=None,
quality=None,
time=None,
):
"""Construct a mock feed entry for testing purposes."""
feed_entry = MagicMock()
feed_entry.external_id = external_id
feed_entry.title = title
feed_entry.distance_to_home = distance_to_home
feed_entry.coordinates = coordinates
feed_entry.attribution = attribution
feed_entry.depth = depth
feed_entry.magnitude = magnitude
feed_entry.mmi = mmi
feed_entry.locality = locality
feed_entry.quality = quality
feed_entry.time = time
return feed_entry

View File

@ -1,6 +1,5 @@
"""The tests for the GeoNet NZ Quakes Feed integration.""" """The tests for the GeoNet NZ Quakes Feed integration."""
import datetime import datetime
from unittest.mock import MagicMock
from asynctest import patch, CoroutineMock from asynctest import patch, CoroutineMock
@ -30,39 +29,11 @@ from homeassistant.setup import async_setup_component
from homeassistant.util.unit_system import IMPERIAL_SYSTEM from homeassistant.util.unit_system import IMPERIAL_SYSTEM
from tests.common import async_fire_time_changed from tests.common import async_fire_time_changed
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from tests.components.geonetnz_quakes import _generate_mock_feed_entry
CONFIG = {geonetnz_quakes.DOMAIN: {CONF_RADIUS: 200}} CONFIG = {geonetnz_quakes.DOMAIN: {CONF_RADIUS: 200}}
def _generate_mock_feed_entry(
external_id,
title,
distance_to_home,
coordinates,
attribution=None,
depth=None,
magnitude=None,
mmi=None,
locality=None,
quality=None,
time=None,
):
"""Construct a mock feed entry for testing purposes."""
feed_entry = MagicMock()
feed_entry.external_id = external_id
feed_entry.title = title
feed_entry.distance_to_home = distance_to_home
feed_entry.coordinates = coordinates
feed_entry.attribution = attribution
feed_entry.depth = depth
feed_entry.magnitude = magnitude
feed_entry.mmi = mmi
feed_entry.locality = locality
feed_entry.quality = quality
feed_entry.time = time
return feed_entry
async def test_setup(hass): async def test_setup(hass):
"""Test the general setup of the integration.""" """Test the general setup of the integration."""
# Set up some mock feed entries for this test. # Set up some mock feed entries for this test.
@ -94,13 +65,13 @@ async def test_setup(hass):
) as mock_feed_update: ) as mock_feed_update:
mock_feed_update.return_value = "OK", [mock_entry_1, mock_entry_2, mock_entry_3] mock_feed_update.return_value = "OK", [mock_entry_1, mock_entry_2, mock_entry_3]
assert await async_setup_component(hass, geonetnz_quakes.DOMAIN, CONFIG) assert await async_setup_component(hass, geonetnz_quakes.DOMAIN, CONFIG)
# Artificially trigger update. # Artificially trigger update and collect events.
hass.bus.async_fire(EVENT_HOMEASSISTANT_START) hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
# Collect events.
await hass.async_block_till_done() await hass.async_block_till_done()
all_states = hass.states.async_all() all_states = hass.states.async_all()
assert len(all_states) == 3 # 3 geolocation and 1 sensor entities
assert len(all_states) == 4
state = hass.states.get("geo_location.title_1") state = hass.states.get("geo_location.title_1")
assert state is not None assert state is not None
@ -155,14 +126,13 @@ async def test_setup(hass):
} }
assert float(state.state) == 25.5 assert float(state.state) == 25.5
# Simulate an update - one existing, one new entry, # Simulate an update - two existing, one new entry, one outdated entry
# one outdated entry
mock_feed_update.return_value = "OK", [mock_entry_1, mock_entry_4, mock_entry_3] mock_feed_update.return_value = "OK", [mock_entry_1, mock_entry_4, mock_entry_3]
async_fire_time_changed(hass, utcnow + DEFAULT_SCAN_INTERVAL) async_fire_time_changed(hass, utcnow + DEFAULT_SCAN_INTERVAL)
await hass.async_block_till_done() await hass.async_block_till_done()
all_states = hass.states.async_all() all_states = hass.states.async_all()
assert len(all_states) == 3 assert len(all_states) == 4
# Simulate an update - empty data, but successful update, # Simulate an update - empty data, but successful update,
# so no changes to entities. # so no changes to entities.
@ -171,7 +141,7 @@ async def test_setup(hass):
await hass.async_block_till_done() await hass.async_block_till_done()
all_states = hass.states.async_all() all_states = hass.states.async_all()
assert len(all_states) == 3 assert len(all_states) == 4
# Simulate an update - empty data, removes all entities # Simulate an update - empty data, removes all entities
mock_feed_update.return_value = "ERROR", None mock_feed_update.return_value = "ERROR", None
@ -179,7 +149,7 @@ async def test_setup(hass):
await hass.async_block_till_done() await hass.async_block_till_done()
all_states = hass.states.async_all() all_states = hass.states.async_all()
assert len(all_states) == 0 assert len(all_states) == 1
async def test_setup_imperial(hass): async def test_setup_imperial(hass):
@ -193,17 +163,22 @@ async def test_setup_imperial(hass):
with patch("homeassistant.util.dt.utcnow", return_value=utcnow), patch( with patch("homeassistant.util.dt.utcnow", return_value=utcnow), patch(
"aio_geojson_client.feed.GeoJsonFeed.update", new_callable=CoroutineMock "aio_geojson_client.feed.GeoJsonFeed.update", new_callable=CoroutineMock
) as mock_feed_update, patch( ) as mock_feed_update, patch(
"aio_geojson_client.feed.GeoJsonFeed.__init__", new_callable=CoroutineMock "aio_geojson_client.feed.GeoJsonFeed.__init__",
) as mock_feed_init: new_callable=CoroutineMock,
create=True,
) as mock_feed_init, patch(
"aio_geojson_client.feed.GeoJsonFeed.last_timestamp",
new_callable=CoroutineMock,
create=True,
):
mock_feed_update.return_value = "OK", [mock_entry_1] mock_feed_update.return_value = "OK", [mock_entry_1]
assert await async_setup_component(hass, geonetnz_quakes.DOMAIN, CONFIG) assert await async_setup_component(hass, geonetnz_quakes.DOMAIN, CONFIG)
# Artificially trigger update. # Artificially trigger update and collect events.
hass.bus.async_fire(EVENT_HOMEASSISTANT_START) hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
# Collect events.
await hass.async_block_till_done() await hass.async_block_till_done()
all_states = hass.states.async_all() all_states = hass.states.async_all()
assert len(all_states) == 1 assert len(all_states) == 2
# Test conversion of 200 miles to kilometers. # Test conversion of 200 miles to kilometers.
assert mock_feed_init.call_args[1].get("filter_radius") == 321.8688 assert mock_feed_init.call_args[1].get("filter_radius") == 321.8688

View File

@ -0,0 +1,115 @@
"""The tests for the GeoNet NZ Quakes Feed integration."""
import datetime
from asynctest import patch, CoroutineMock
from homeassistant.components import geonetnz_quakes
from homeassistant.components.geonetnz_quakes import DEFAULT_SCAN_INTERVAL
from homeassistant.components.geonetnz_quakes.sensor import (
ATTR_STATUS,
ATTR_LAST_UPDATE,
ATTR_CREATED,
ATTR_UPDATED,
ATTR_REMOVED,
ATTR_LAST_UPDATE_SUCCESSFUL,
)
from homeassistant.const import (
EVENT_HOMEASSISTANT_START,
CONF_RADIUS,
ATTR_UNIT_OF_MEASUREMENT,
ATTR_ICON,
)
from homeassistant.setup import async_setup_component
from tests.common import async_fire_time_changed
import homeassistant.util.dt as dt_util
from tests.components.geonetnz_quakes import _generate_mock_feed_entry
CONFIG = {geonetnz_quakes.DOMAIN: {CONF_RADIUS: 200}}
async def test_setup(hass):
"""Test the general setup of the integration."""
# Set up some mock feed entries for this test.
mock_entry_1 = _generate_mock_feed_entry(
"1234",
"Title 1",
15.5,
(38.0, -3.0),
locality="Locality 1",
attribution="Attribution 1",
time=datetime.datetime(2018, 9, 22, 8, 0, tzinfo=datetime.timezone.utc),
magnitude=5.7,
mmi=5,
depth=10.5,
quality="best",
)
mock_entry_2 = _generate_mock_feed_entry(
"2345", "Title 2", 20.5, (38.1, -3.1), magnitude=4.6
)
mock_entry_3 = _generate_mock_feed_entry(
"3456", "Title 3", 25.5, (38.2, -3.2), locality="Locality 3"
)
mock_entry_4 = _generate_mock_feed_entry("4567", "Title 4", 12.5, (38.3, -3.3))
# Patching 'utcnow' to gain more control over the timed update.
utcnow = dt_util.utcnow()
with patch("homeassistant.util.dt.utcnow", return_value=utcnow), patch(
"aio_geojson_client.feed.GeoJsonFeed.update", new_callable=CoroutineMock
) as mock_feed_update:
mock_feed_update.return_value = "OK", [mock_entry_1, mock_entry_2, mock_entry_3]
assert await async_setup_component(hass, geonetnz_quakes.DOMAIN, CONFIG)
# Artificially trigger update and collect events.
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
await hass.async_block_till_done()
all_states = hass.states.async_all()
# 3 geolocation and 1 sensor entities
assert len(all_states) == 4
state = hass.states.get("sensor.geonet_nz_quakes_32_87336_117_22743")
assert state is not None
assert int(state.state) == 3
assert state.name == "GeoNet NZ Quakes (32.87336, -117.22743)"
attributes = state.attributes
assert attributes[ATTR_STATUS] == "OK"
assert attributes[ATTR_CREATED] == 3
assert attributes[ATTR_LAST_UPDATE].tzinfo == dt_util.UTC
assert attributes[ATTR_LAST_UPDATE_SUCCESSFUL].tzinfo == dt_util.UTC
assert attributes[ATTR_LAST_UPDATE] == attributes[ATTR_LAST_UPDATE_SUCCESSFUL]
assert attributes[ATTR_UNIT_OF_MEASUREMENT] == "quakes"
assert attributes[ATTR_ICON] == "mdi:pulse"
# Simulate an update - two existing, one new entry, one outdated entry
mock_feed_update.return_value = "OK", [mock_entry_1, mock_entry_4, mock_entry_3]
async_fire_time_changed(hass, utcnow + DEFAULT_SCAN_INTERVAL)
await hass.async_block_till_done()
all_states = hass.states.async_all()
assert len(all_states) == 4
state = hass.states.get("sensor.geonet_nz_quakes_32_87336_117_22743")
attributes = state.attributes
assert attributes[ATTR_CREATED] == 1
assert attributes[ATTR_UPDATED] == 2
assert attributes[ATTR_REMOVED] == 1
# Simulate an update - empty data, but successful update,
# so no changes to entities.
mock_feed_update.return_value = "OK_NO_DATA", None
async_fire_time_changed(hass, utcnow + 2 * DEFAULT_SCAN_INTERVAL)
await hass.async_block_till_done()
all_states = hass.states.async_all()
assert len(all_states) == 4
# Simulate an update - empty data, removes all entities
mock_feed_update.return_value = "ERROR", None
async_fire_time_changed(hass, utcnow + 3 * DEFAULT_SCAN_INTERVAL)
await hass.async_block_till_done()
all_states = hass.states.async_all()
assert len(all_states) == 1
state = hass.states.get("sensor.geonet_nz_quakes_32_87336_117_22743")
attributes = state.attributes
assert attributes[ATTR_REMOVED] == 3