Migrate IQVIA to DataUpdateCoordinator (#41970)

* Migrate IQVIA to DataUpdateCoordinator

* Linting

* Code review

* Better re-raise
This commit is contained in:
Aaron Bach 2020-10-17 10:16:41 -06:00 committed by GitHub
parent c0845a3650
commit 1b94ef69d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 116 additions and 234 deletions

View File

@ -1,238 +1,125 @@
"""Support for IQVIA."""
import asyncio
from datetime import timedelta
import logging
from pyiqvia import Client
from pyiqvia.errors import InvalidZipError, IQVIAError
from pyiqvia.errors import IQVIAError
from homeassistant.const import ATTR_ATTRIBUTION
from homeassistant.core import callback
from homeassistant.helpers import aiohttp_client
from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
UpdateFailed,
)
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.event import async_track_time_interval
from .const import (
CONF_ZIP_CODE,
DATA_CLIENT,
DATA_LISTENER,
DATA_COORDINATOR,
DOMAIN,
TOPIC_DATA_UPDATE,
LOGGER,
TYPE_ALLERGY_FORECAST,
TYPE_ALLERGY_INDEX,
TYPE_ALLERGY_OUTLOOK,
TYPE_ALLERGY_TODAY,
TYPE_ALLERGY_TOMORROW,
TYPE_ASTHMA_FORECAST,
TYPE_ASTHMA_INDEX,
TYPE_ASTHMA_TODAY,
TYPE_ASTHMA_TOMORROW,
TYPE_DISEASE_FORECAST,
TYPE_DISEASE_INDEX,
TYPE_DISEASE_TODAY,
)
_LOGGER = logging.getLogger(__name__)
API_CATEGORY_MAPPING = {
TYPE_ALLERGY_TODAY: TYPE_ALLERGY_INDEX,
TYPE_ALLERGY_TOMORROW: TYPE_ALLERGY_INDEX,
TYPE_ALLERGY_TOMORROW: TYPE_ALLERGY_INDEX,
TYPE_ASTHMA_TODAY: TYPE_ASTHMA_INDEX,
TYPE_ASTHMA_TOMORROW: TYPE_ALLERGY_INDEX,
TYPE_DISEASE_TODAY: TYPE_DISEASE_INDEX,
}
DATA_CONFIG = "config"
DEFAULT_ATTRIBUTION = "Data provided by IQVIA™"
DEFAULT_SCAN_INTERVAL = timedelta(minutes=30)
@callback
def async_get_api_category(sensor_type):
"""Return the API category that a particular sensor type should use."""
return API_CATEGORY_MAPPING.get(sensor_type, sensor_type)
PLATFORMS = ["sensor"]
async def async_setup(hass, config):
"""Set up the IQVIA component."""
hass.data[DOMAIN] = {}
hass.data[DOMAIN][DATA_CLIENT] = {}
hass.data[DOMAIN][DATA_LISTENER] = {}
hass.data[DOMAIN] = {DATA_COORDINATOR: {}}
return True
async def async_setup_entry(hass, config_entry):
async def async_setup_entry(hass, entry):
"""Set up IQVIA as config entry."""
websession = aiohttp_client.async_get_clientsession(hass)
hass.data[DOMAIN][DATA_COORDINATOR][entry.entry_id] = {}
if not config_entry.unique_id:
if not entry.unique_id:
# If the config entry doesn't already have a unique ID, set one:
hass.config_entries.async_update_entry(
config_entry, **{"unique_id": config_entry.data[CONF_ZIP_CODE]}
entry, **{"unique_id": entry.data[CONF_ZIP_CODE]}
)
iqvia = IQVIAData(hass, Client(config_entry.data[CONF_ZIP_CODE], websession))
websession = aiohttp_client.async_get_clientsession(hass)
client = Client(entry.data[CONF_ZIP_CODE], websession)
try:
await iqvia.async_update()
except InvalidZipError:
_LOGGER.error("Invalid ZIP code provided: %s", config_entry.data[CONF_ZIP_CODE])
return False
async def async_get_data_from_api(api_coro):
"""Get data from a particular API coroutine."""
try:
return await api_coro()
except IQVIAError as err:
raise UpdateFailed from err
hass.data[DOMAIN][DATA_CLIENT][config_entry.entry_id] = iqvia
init_data_update_tasks = []
for sensor_type, api_coro in [
(TYPE_ALLERGY_FORECAST, client.allergens.extended),
(TYPE_ALLERGY_INDEX, client.allergens.current),
(TYPE_ALLERGY_OUTLOOK, client.allergens.outlook),
(TYPE_ASTHMA_FORECAST, client.asthma.extended),
(TYPE_ASTHMA_INDEX, client.asthma.current),
(TYPE_DISEASE_FORECAST, client.disease.extended),
(TYPE_DISEASE_INDEX, client.disease.current),
]:
coordinator = hass.data[DOMAIN][DATA_COORDINATOR][entry.entry_id][
sensor_type
] = DataUpdateCoordinator(
hass,
LOGGER,
name=f"{entry.data[CONF_ZIP_CODE]} {sensor_type}",
update_interval=DEFAULT_SCAN_INTERVAL,
update_method=lambda coro=api_coro: async_get_data_from_api(coro),
)
init_data_update_tasks.append(coordinator.async_refresh())
hass.async_create_task(
hass.config_entries.async_forward_entry_setup(config_entry, "sensor")
await asyncio.gather(*init_data_update_tasks)
for component in PLATFORMS:
hass.async_create_task(
hass.config_entries.async_forward_entry_setup(entry, component)
)
return True
async def async_unload_entry(hass, entry):
"""Unload an OpenUV config entry."""
unload_ok = all(
await asyncio.gather(
*[
hass.config_entries.async_forward_entry_unload(entry, component)
for component in PLATFORMS
]
)
)
return True
if unload_ok:
hass.data[DOMAIN][DATA_COORDINATOR].pop(entry.entry_id)
return unload_ok
async def async_unload_entry(hass, config_entry):
"""Unload an OpenUV config entry."""
hass.data[DOMAIN][DATA_CLIENT].pop(config_entry.entry_id)
remove_listener = hass.data[DOMAIN][DATA_LISTENER].pop(config_entry.entry_id)
remove_listener()
await hass.config_entries.async_forward_entry_unload(config_entry, "sensor")
return True
class IQVIAData:
"""Define a data object to retrieve info from IQVIA."""
def __init__(self, hass, client):
"""Initialize."""
self._async_cancel_time_interval_listener = None
self._client = client
self._hass = hass
self.data = {}
self.zip_code = client.zip_code
self._api_coros = {
TYPE_ALLERGY_FORECAST: client.allergens.extended,
TYPE_ALLERGY_INDEX: client.allergens.current,
TYPE_ALLERGY_OUTLOOK: client.allergens.outlook,
TYPE_ASTHMA_FORECAST: client.asthma.extended,
TYPE_ASTHMA_INDEX: client.asthma.current,
TYPE_DISEASE_FORECAST: client.disease.extended,
TYPE_DISEASE_INDEX: client.disease.current,
}
self._api_category_count = {
TYPE_ALLERGY_FORECAST: 0,
TYPE_ALLERGY_INDEX: 0,
TYPE_ALLERGY_OUTLOOK: 0,
TYPE_ASTHMA_FORECAST: 0,
TYPE_ASTHMA_INDEX: 0,
TYPE_DISEASE_FORECAST: 0,
TYPE_DISEASE_INDEX: 0,
}
self._api_category_locks = {
TYPE_ALLERGY_FORECAST: asyncio.Lock(),
TYPE_ALLERGY_INDEX: asyncio.Lock(),
TYPE_ALLERGY_OUTLOOK: asyncio.Lock(),
TYPE_ASTHMA_FORECAST: asyncio.Lock(),
TYPE_ASTHMA_INDEX: asyncio.Lock(),
TYPE_DISEASE_FORECAST: asyncio.Lock(),
TYPE_DISEASE_INDEX: asyncio.Lock(),
}
async def _async_get_data_from_api(self, api_category):
"""Update and save data for a particular API category."""
if self._api_category_count[api_category] == 0:
return
try:
self.data[api_category] = await self._api_coros[api_category]()
except IQVIAError as err:
_LOGGER.error("Unable to get %s data: %s", api_category, err)
self.data[api_category] = None
async def _async_update_listener_action(self, now):
"""Define an async_track_time_interval action to update data."""
await self.async_update()
@callback
def async_deregister_api_interest(self, sensor_type):
"""Decrement the number of entities with data needs from an API category."""
# If this deregistration should leave us with no registration at all, remove the
# time interval:
if sum(self._api_category_count.values()) == 0:
if self._async_cancel_time_interval_listener:
self._async_cancel_time_interval_listener()
self._async_cancel_time_interval_listener = None
return
api_category = async_get_api_category(sensor_type)
self._api_category_count[api_category] -= 1
async def async_register_api_interest(self, sensor_type):
"""Increment the number of entities with data needs from an API category."""
# If this is the first registration we have, start a time interval:
if not self._async_cancel_time_interval_listener:
self._async_cancel_time_interval_listener = async_track_time_interval(
self._hass,
self._async_update_listener_action,
DEFAULT_SCAN_INTERVAL,
)
api_category = async_get_api_category(sensor_type)
self._api_category_count[api_category] += 1
# If a sensor registers interest in a particular API call and the data doesn't
# exist for it yet, make the API call and grab the data:
async with self._api_category_locks[api_category]:
if api_category not in self.data:
await self._async_get_data_from_api(api_category)
async def async_update(self):
"""Update IQVIA data."""
tasks = [
self._async_get_data_from_api(api_category)
for api_category in self._api_coros
]
await asyncio.gather(*tasks)
_LOGGER.debug("Received new data")
async_dispatcher_send(self._hass, TOPIC_DATA_UPDATE)
class IQVIAEntity(Entity):
class IQVIAEntity(CoordinatorEntity):
"""Define a base IQVIA entity."""
def __init__(self, iqvia, sensor_type, name, icon, zip_code):
"""Initialize the sensor."""
def __init__(self, coordinator, entry, sensor_type, name, icon):
"""Initialize."""
super().__init__(coordinator)
self._attrs = {ATTR_ATTRIBUTION: DEFAULT_ATTRIBUTION}
self._entry = entry
self._icon = icon
self._iqvia = iqvia
self._name = name
self._state = None
self._type = sensor_type
self._zip_code = zip_code
@property
def available(self):
"""Return True if entity is available."""
if self._type in (TYPE_ALLERGY_TODAY, TYPE_ALLERGY_TOMORROW):
return self._iqvia.data.get(TYPE_ALLERGY_INDEX) is not None
if self._type in (TYPE_ASTHMA_TODAY, TYPE_ASTHMA_TOMORROW):
return self._iqvia.data.get(TYPE_ASTHMA_INDEX) is not None
if self._type == TYPE_DISEASE_TODAY:
return self._iqvia.data.get(TYPE_DISEASE_INDEX) is not None
return self._iqvia.data.get(self._type) is not None
@property
def device_state_attributes(self):
@ -257,7 +144,7 @@ class IQVIAEntity(Entity):
@property
def unique_id(self):
"""Return a unique, Home Assistant friendly identifier for this entity."""
return f"{self._zip_code}_{self._type}"
return f"{self._entry.data[CONF_ZIP_CODE]}_{self._type}"
@property
def unit_of_measurement(self):
@ -273,27 +160,17 @@ class IQVIAEntity(Entity):
self.update_from_latest_data()
self.async_write_ha_state()
self.async_on_remove(
async_dispatcher_connect(self.hass, TOPIC_DATA_UPDATE, update)
)
self.async_on_remove(self.coordinator.async_add_listener(update))
await self._iqvia.async_register_api_interest(self._type)
if self._type == TYPE_ALLERGY_FORECAST:
# Entities that express interest in allergy forecast data should also
# express interest in allergy outlook data:
await self._iqvia.async_register_api_interest(TYPE_ALLERGY_OUTLOOK)
outlook_coordinator = self.hass.data[DOMAIN][DATA_COORDINATOR][
self._entry.entry_id
][TYPE_ALLERGY_OUTLOOK]
self.async_on_remove(outlook_coordinator.async_add_listener(update))
self.update_from_latest_data()
async def async_will_remove_from_hass(self):
"""Disconnect dispatcher listener when removed."""
self._iqvia.async_deregister_api_interest(self._type)
if self._type == TYPE_ALLERGY_FORECAST:
# Entities that lose interest in allergy forecast data should also lose
# interest in allergy outlook data:
self._iqvia.async_deregister_api_interest(TYPE_ALLERGY_OUTLOOK)
@callback
def update_from_latest_data(self):
"""Update the entity's state."""
raise NotImplementedError()
"""Update the entity from the latest data."""
raise NotImplementedError

View File

@ -1,12 +1,13 @@
"""Define IQVIA constants."""
import logging
LOGGER = logging.getLogger(__package__)
DOMAIN = "iqvia"
CONF_ZIP_CODE = "zip_code"
DATA_CLIENT = "client"
DATA_LISTENER = "listener"
TOPIC_DATA_UPDATE = f"{DOMAIN}_data_update"
DATA_COORDINATOR = "coordinator"
TYPE_ALLERGY_FORECAST = "allergy_average_forecasted"
TYPE_ALLERGY_INDEX = "allergy_index"

View File

@ -3,9 +3,14 @@ from statistics import mean
import numpy as np
from homeassistant.components.iqvia import (
DATA_CLIENT,
from homeassistant.const import ATTR_STATE
from homeassistant.core import callback
from . import IQVIAEntity
from .const import (
DATA_COORDINATOR,
DOMAIN,
SENSORS,
TYPE_ALLERGY_FORECAST,
TYPE_ALLERGY_INDEX,
TYPE_ALLERGY_OUTLOOK,
@ -18,12 +23,7 @@ from homeassistant.components.iqvia import (
TYPE_DISEASE_FORECAST,
TYPE_DISEASE_INDEX,
TYPE_DISEASE_TODAY,
IQVIAEntity,
)
from homeassistant.const import ATTR_STATE
from homeassistant.core import callback
from .const import SENSORS
ATTR_ALLERGEN_AMOUNT = "allergen_amount"
ATTR_ALLERGEN_GENUS = "allergen_genus"
@ -36,6 +36,15 @@ ATTR_SEASON = "season"
ATTR_TREND = "trend"
ATTR_ZIP_CODE = "zip_code"
API_CATEGORY_MAPPING = {
TYPE_ALLERGY_TODAY: TYPE_ALLERGY_INDEX,
TYPE_ALLERGY_TOMORROW: TYPE_ALLERGY_INDEX,
TYPE_ALLERGY_TOMORROW: TYPE_ALLERGY_INDEX,
TYPE_ASTHMA_TODAY: TYPE_ASTHMA_INDEX,
TYPE_ASTHMA_TOMORROW: TYPE_ASTHMA_INDEX,
TYPE_DISEASE_TODAY: TYPE_DISEASE_INDEX,
}
RATING_MAPPING = [
{"label": "Low", "minimum": 0.0, "maximum": 2.4},
{"label": "Low/Medium", "minimum": 2.5, "maximum": 4.8},
@ -51,8 +60,6 @@ TREND_SUBSIDING = "Subsiding"
async def async_setup_entry(hass, entry, async_add_entities):
"""Set up IQVIA sensors based on a config entry."""
iqvia = hass.data[DOMAIN][DATA_CLIENT][entry.entry_id]
sensor_class_mapping = {
TYPE_ALLERGY_FORECAST: ForecastSensor,
TYPE_ALLERGY_TODAY: IndexSensor,
@ -64,14 +71,15 @@ async def async_setup_entry(hass, entry, async_add_entities):
TYPE_DISEASE_TODAY: IndexSensor,
}
async_add_entities(
[
sensor_class_mapping[sensor_type](
iqvia, sensor_type, name, icon, iqvia.zip_code
)
for sensor_type, (name, icon) in SENSORS.items()
]
)
sensors = []
for sensor_type, (name, icon) in SENSORS.items():
api_category = API_CATEGORY_MAPPING.get(sensor_type, sensor_type)
coordinator = hass.data[DOMAIN][DATA_COORDINATOR][entry.entry_id][api_category]
sensor_class = sensor_class_mapping[sensor_type]
sensors.append(sensor_class(coordinator, entry, sensor_type, name, icon))
async_add_entities(sensors)
def calculate_trend(indices):
@ -96,10 +104,7 @@ class ForecastSensor(IQVIAEntity):
@callback
def update_from_latest_data(self):
"""Update the sensor."""
if not self._iqvia.data.get(self._type):
return
data = self._iqvia.data[self._type].get("Location")
data = self.coordinator.data.get("Location")
if not data or not data.get("periods"):
return
@ -122,9 +127,11 @@ class ForecastSensor(IQVIAEntity):
)
if self._type == TYPE_ALLERGY_FORECAST:
outlook = self._iqvia.data[TYPE_ALLERGY_OUTLOOK]
self._attrs[ATTR_OUTLOOK] = outlook.get("Outlook")
self._attrs[ATTR_SEASON] = outlook.get("Season")
outlook_coordinator = self.hass.data[DOMAIN][DATA_COORDINATOR][
self._entry.entry_id
][TYPE_ALLERGY_OUTLOOK]
self._attrs[ATTR_OUTLOOK] = outlook_coordinator.data.get("Outlook")
self._attrs[ATTR_SEASON] = outlook_coordinator.data.get("Season")
self._state = average
@ -135,16 +142,13 @@ class IndexSensor(IQVIAEntity):
@callback
def update_from_latest_data(self):
"""Update the sensor."""
if not self._iqvia.data:
return
try:
if self._type in (TYPE_ALLERGY_TODAY, TYPE_ALLERGY_TOMORROW):
data = self._iqvia.data[TYPE_ALLERGY_INDEX].get("Location")
data = self.coordinator.data.get("Location")
elif self._type in (TYPE_ASTHMA_TODAY, TYPE_ASTHMA_TOMORROW):
data = self._iqvia.data[TYPE_ASTHMA_INDEX].get("Location")
data = self.coordinator.data.get("Location")
elif self._type == TYPE_DISEASE_TODAY:
data = self._iqvia.data[TYPE_DISEASE_INDEX].get("Location")
data = self.coordinator.data.get("Location")
except KeyError:
return