1
mirror of https://github.com/home-assistant/core synced 2024-08-02 23:40:32 +02:00

Convert CO2Signal to data update coordinator and add fossil fuel percentage (#53370)

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
Co-authored-by: Daniel Hjelseth Høyer <mail@dahoiv.net>
This commit is contained in:
Paulus Schoutsen 2021-07-23 14:35:11 -07:00 committed by GitHub
parent d0bef97453
commit c49decb7f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 227 additions and 137 deletions

View File

@ -1,16 +1,53 @@
"""The CO2 Signal integration."""
from __future__ import annotations
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from datetime import timedelta
import logging
from typing import TypedDict, cast
from .const import DOMAIN # noqa: F401
import CO2Signal
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_API_KEY, CONF_LATITUDE, CONF_LONGITUDE
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed, HomeAssistantError
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import CONF_COUNTRY_CODE, DOMAIN
from .util import get_extra_name
PLATFORMS = ["sensor"]
_LOGGER = logging.getLogger(__name__)
class CO2SignalData(TypedDict):
"""Data field."""
carbonIntensity: float
fossilFuelPercentage: float
class CO2SignalUnit(TypedDict):
"""Unit field."""
carbonIntensity: str
class CO2SignalResponse(TypedDict):
"""API response."""
status: str
countryCode: str
data: CO2SignalData
units: CO2SignalUnit
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up CO2 Signal from a config entry."""
coordinator = CO2SignalCoordinator(hass, entry)
await coordinator.async_config_entry_first_refresh()
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
return True
@ -18,3 +55,95 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
class CO2SignalCoordinator(DataUpdateCoordinator[CO2SignalResponse]):
"""Data update coordinator."""
def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Initialize the coordinator."""
super().__init__(
hass, _LOGGER, name=DOMAIN, update_interval=timedelta(minutes=15)
)
self._entry = entry
@property
def entry_id(self) -> str:
"""Return entry ID."""
return self._entry.entry_id
def get_extra_name(self) -> str | None:
"""Return the extra name describing the location if not home."""
return get_extra_name(self._entry.data)
async def _async_update_data(self) -> CO2SignalResponse:
"""Fetch the latest data from the source."""
try:
data = await self.hass.async_add_executor_job(
get_data, self.hass, self._entry.data
)
except InvalidAuth as err:
raise ConfigEntryAuthFailed from err
except CO2Error as err:
raise UpdateFailed(str(err)) from err
return data
class CO2Error(HomeAssistantError):
"""Base error."""
class InvalidAuth(CO2Error):
"""Raised when invalid authentication credentials are provided."""
class APIRatelimitExceeded(CO2Error):
"""Raised when the API rate limit is exceeded."""
class UnknownError(CO2Error):
"""Raised when an unknown error occurs."""
def get_data(hass: HomeAssistant, config: dict) -> CO2SignalResponse:
"""Get data from the API."""
if CONF_COUNTRY_CODE in config:
latitude = None
longitude = None
else:
latitude = config.get(CONF_LATITUDE, hass.config.latitude)
longitude = config.get(CONF_LONGITUDE, hass.config.longitude)
try:
data = CO2Signal.get_latest(
config[CONF_API_KEY],
config.get(CONF_COUNTRY_CODE),
latitude,
longitude,
wait=False,
)
except ValueError as err:
err_str = str(err)
if "Invalid authentication credentials" in err_str:
raise InvalidAuth from err
if "API rate limit exceeded." in err_str:
raise APIRatelimitExceeded from err
_LOGGER.exception("Unexpected exception")
raise UnknownError from err
except Exception as err: # pylint: disable=broad-except
_LOGGER.exception("Unexpected exception")
raise UnknownError from err
else:
if "error" in data:
raise UnknownError(data["error"])
if data.get("status") != "ok":
_LOGGER.exception("Unexpected response: %s", data)
raise UnknownError
return cast(CO2SignalResponse, data)

View File

@ -4,15 +4,14 @@ from __future__ import annotations
import logging
from typing import Any
import CO2Signal
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import CONF_API_KEY, CONF_LATITUDE, CONF_LONGITUDE, CONF_TOKEN
from homeassistant.data_entry_flow import FlowResult
from homeassistant.exceptions import HomeAssistantError
import homeassistant.helpers.config_validation as cv
from . import APIRatelimitExceeded, CO2Error, InvalidAuth, UnknownError, get_data
from .const import CONF_COUNTRY_CODE, DOMAIN
from .util import get_extra_name
@ -34,62 +33,6 @@ def _get_entry_type(config: dict) -> str:
return TYPE_USE_HOME
def _validate_info(hass, config: dict) -> dict:
"""Validate the passed in info."""
if CONF_COUNTRY_CODE in config:
latitude = None
longitude = None
else:
latitude = config.get(CONF_LATITUDE, hass.config.latitude)
longitude = config.get(CONF_LONGITUDE, hass.config.longitude)
try:
data = CO2Signal.get_latest(
config[CONF_API_KEY],
config.get(CONF_COUNTRY_CODE),
latitude,
longitude,
wait=False,
)
except ValueError as err:
err_str = str(err)
if "Invalid authentication credentials" in err_str:
raise InvalidAuth from err
if "API rate limit exceeded." in err_str:
raise APIRatelimitExceeded from err
_LOGGER.exception("Unexpected exception")
raise UnknownError from err
except Exception as err: # pylint: disable=broad-except
_LOGGER.exception("Unexpected exception")
raise UnknownError from err
else:
if data.get("status") != "ok":
_LOGGER.exception("Unexpected response: %s", data)
raise UnknownError
return data
class CO2Error(HomeAssistantError):
"""Base error."""
class InvalidAuth(CO2Error):
"""Raised when invalid authentication credentials are provided."""
class APIRatelimitExceeded(CO2Error):
"""Raised when the API rate limit is exceeded."""
class UnknownError(CO2Error):
"""Raised when an unknown error occurs."""
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Co2signal."""
@ -136,12 +79,12 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
return self.async_abort(reason="already_configured")
try:
await self.hass.async_add_executor_job(_validate_info, self.hass, data)
await self.hass.async_add_executor_job(get_data, self.hass, data)
except CO2Error:
return self.async_abort(reason="unknown")
return self.async_create_entry(
title=get_extra_name(self.hass, data) or "CO2 Signal", data=data
title=get_extra_name(data) or "CO2 Signal", data=data
)
async def async_step_user(
@ -227,7 +170,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
errors: dict[str, str] = {}
try:
await self.hass.async_add_executor_job(_validate_info, self.hass, data)
await self.hass.async_add_executor_job(get_data, self.hass, data)
except InvalidAuth:
errors["base"] = "invalid_auth"
except APIRatelimitExceeded:
@ -236,7 +179,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
errors["base"] = "unknown"
else:
return self.async_create_entry(
title=get_extra_name(self.hass, data) or "CO2 Signal",
title=get_extra_name(data) or "CO2 Signal",
data=data,
)

View File

@ -1,32 +1,36 @@
"""Support for the CO2signal platform."""
from datetime import timedelta
import logging
from __future__ import annotations
from dataclasses import dataclass
from datetime import timedelta
from typing import cast
import CO2Signal
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components.sensor import PLATFORM_SCHEMA, SensorEntity
from homeassistant.components.sensor import (
PLATFORM_SCHEMA,
STATE_CLASS_MEASUREMENT,
SensorEntity,
)
from homeassistant.const import (
ATTR_ATTRIBUTION,
ATTR_IDENTIFIERS,
ATTR_MANUFACTURER,
ATTR_NAME,
CONF_API_KEY,
CONF_LATITUDE,
CONF_LONGITUDE,
CONF_TOKEN,
ENERGY_KILO_WATT_HOUR,
PERCENTAGE,
)
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers import config_validation as cv, update_coordinator
from homeassistant.helpers.typing import StateType
from . import CO2SignalCoordinator, CO2SignalResponse
from .const import ATTRIBUTION, CONF_COUNTRY_CODE, DOMAIN, MSG_LOCATION
from .util import get_extra_name
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(minutes=3)
CO2_INTENSITY_UNIT = f"CO2eq/{ENERGY_KILO_WATT_HOUR}"
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_TOKEN): cv.string,
@ -37,6 +41,32 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
)
@dataclass
class CO2SensorEntityDescription:
"""Provide a description of a CO2 sensor."""
key: str
name: str
unit_of_measurement: str | None = None
# For backwards compat, allow description to override unique ID key to use
unique_id: str | None = None
SENSORS = (
CO2SensorEntityDescription(
key="carbonIntensity",
name="CO2 intensity",
unique_id="co2intensity",
# No unit, it's extracted from response.
),
CO2SensorEntityDescription(
key="fossilFuelPercentage",
name="Grid fossil fuel percentage",
unit_of_measurement=PERCENTAGE,
),
)
async def async_setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the CO2signal sensor."""
await hass.config_entries.flow.async_init(
@ -48,59 +78,47 @@ async def async_setup_platform(hass, config, add_entities, discovery_info=None):
async def async_setup_entry(hass, entry, async_add_entities):
"""Set up the CO2signal sensor."""
name = "CO2 intensity"
if extra_name := get_extra_name(hass, entry.data):
name += f" - {extra_name}"
async_add_entities(
[
CO2Sensor(
name,
entry.data,
entry_id=entry.entry_id,
)
],
True,
)
coordinator: CO2SignalCoordinator = hass.data[DOMAIN][entry.entry_id]
async_add_entities(CO2Sensor(coordinator, description) for description in SENSORS)
class CO2Sensor(SensorEntity):
class CO2Sensor(update_coordinator.CoordinatorEntity[CO2SignalResponse], SensorEntity):
"""Implementation of the CO2Signal sensor."""
_attr_state_class = STATE_CLASS_MEASUREMENT
_attr_icon = "mdi:molecule-co2"
_attr_unit_of_measurement = CO2_INTENSITY_UNIT
def __init__(self, name, config, entry_id):
def __init__(
self, coordinator: CO2SignalCoordinator, description: CO2SensorEntityDescription
) -> None:
"""Initialize the sensor."""
self._config = config
super().__init__(coordinator)
self._description = description
name = description.name
if extra_name := coordinator.get_extra_name():
name = f"{extra_name} - {name}"
self._attr_name = name
self._attr_extra_state_attributes = {ATTR_ATTRIBUTION: ATTRIBUTION}
self._attr_device_info = {
ATTR_IDENTIFIERS: {(DOMAIN, entry_id)},
ATTR_IDENTIFIERS: {(DOMAIN, coordinator.entry_id)},
ATTR_NAME: "CO2 signal",
ATTR_MANUFACTURER: "Tmrow.com",
"entry_type": "service",
}
self._attr_unique_id = f"{entry_id}_co2intensity"
def update(self):
"""Get the latest data and updates the states."""
_LOGGER.debug("Update data for %s", self.name)
if CONF_COUNTRY_CODE in self._config:
kwargs = {"country_code": self._config[CONF_COUNTRY_CODE]}
elif CONF_LATITUDE in self._config:
kwargs = {
"latitude": self._config[CONF_LATITUDE],
"longitude": self._config[CONF_LONGITUDE],
}
else:
kwargs = {
"latitude": self.hass.config.latitude,
"longitude": self.hass.config.longitude,
}
self._attr_state = round(
CO2Signal.get_latest_carbon_intensity(self._config[CONF_API_KEY], **kwargs),
2,
self._attr_unique_id = (
f"{coordinator.entry_id}_{description.unique_id or description.key}"
)
@property
def state(self) -> StateType:
"""Return sensor state."""
return round(self.coordinator.data["data"][self._description.key], 2) # type: ignore[misc]
@property
def unit_of_measurement(self) -> str | None:
"""Return the unit of measurement."""
if self._description.unit_of_measurement:
return self._description.unit_of_measurement
return cast(str, self.coordinator.data["units"].get(self._description.key))

View File

@ -1,13 +1,14 @@
"""Utils for CO2 signal."""
from __future__ import annotations
from collections.abc import Mapping
from homeassistant.const import CONF_LATITUDE, CONF_LONGITUDE
from homeassistant.core import HomeAssistant
from .const import CONF_COUNTRY_CODE
def get_extra_name(hass: HomeAssistant, config: dict) -> str | None:
def get_extra_name(config: Mapping) -> str | None:
"""Return the extra name describing the location if not home."""
if CONF_COUNTRY_CODE in config:
return config[CONF_COUNTRY_CODE]

View File

@ -23,10 +23,7 @@ async def test_form_home(hass: HomeAssistant) -> None:
assert result["type"] == RESULT_TYPE_FORM
assert result["errors"] is None
with patch(
"homeassistant.components.co2signal.config_flow.CO2Signal.get_latest",
return_value=VALID_PAYLOAD,
), patch(
with patch("CO2Signal.get_latest", return_value=VALID_PAYLOAD,), patch(
"homeassistant.components.co2signal.async_setup_entry",
return_value=True,
) as mock_setup_entry:
@ -65,10 +62,7 @@ async def test_form_coordinates(hass: HomeAssistant) -> None:
)
assert result2["type"] == RESULT_TYPE_FORM
with patch(
"homeassistant.components.co2signal.config_flow.CO2Signal.get_latest",
return_value=VALID_PAYLOAD,
), patch(
with patch("CO2Signal.get_latest", return_value=VALID_PAYLOAD,), patch(
"homeassistant.components.co2signal.async_setup_entry",
return_value=True,
) as mock_setup_entry:
@ -109,10 +103,7 @@ async def test_form_country(hass: HomeAssistant) -> None:
)
assert result2["type"] == RESULT_TYPE_FORM
with patch(
"homeassistant.components.co2signal.config_flow.CO2Signal.get_latest",
return_value=VALID_PAYLOAD,
), patch(
with patch("CO2Signal.get_latest", return_value=VALID_PAYLOAD,), patch(
"homeassistant.components.co2signal.async_setup_entry",
return_value=True,
) as mock_setup_entry:
@ -148,7 +139,7 @@ async def test_form_error_handling(hass: HomeAssistant, err_str, err_code) -> No
)
with patch(
"homeassistant.components.co2signal.config_flow.CO2Signal.get_latest",
"CO2Signal.get_latest",
side_effect=ValueError(err_str),
):
result2 = await hass.config_entries.flow.async_configure(
@ -170,7 +161,7 @@ async def test_form_error_unexpected_error(hass: HomeAssistant) -> None:
)
with patch(
"homeassistant.components.co2signal.config_flow.CO2Signal.get_latest",
"CO2Signal.get_latest",
side_effect=Exception("Boom"),
):
result2 = await hass.config_entries.flow.async_configure(
@ -192,7 +183,7 @@ async def test_form_error_unexpected_data(hass: HomeAssistant) -> None:
)
with patch(
"homeassistant.components.co2signal.config_flow.CO2Signal.get_latest",
"CO2Signal.get_latest",
return_value={"status": "error"},
):
result2 = await hass.config_entries.flow.async_configure(
@ -212,7 +203,7 @@ async def test_import(hass: HomeAssistant) -> None:
await setup.async_setup_component(hass, "persistent_notification", {})
with patch(
"homeassistant.components.co2signal.config_flow.CO2Signal.get_latest",
"CO2Signal.get_latest",
return_value=VALID_PAYLOAD,
):
assert await async_setup_component(
@ -221,10 +212,18 @@ async def test_import(hass: HomeAssistant) -> None:
await hass.async_block_till_done()
assert len(hass.config_entries.async_entries("co2signal")) == 1
state = hass.states.get("sensor.co2_intensity")
assert state is not None
assert state.state == "45.99"
assert state.name == "CO2 intensity"
assert state.attributes["unit_of_measurement"] == "gCO2eq/kWh"
state = hass.states.get("sensor.grid_fossil_fuel_percentage")
assert state is not None
assert state.state == "5.46"
assert state.name == "Grid fossil fuel percentage"
assert state.attributes["unit_of_measurement"] == "%"
async def test_import_abort_existing_home(hass: HomeAssistant) -> None:
@ -233,7 +232,7 @@ async def test_import_abort_existing_home(hass: HomeAssistant) -> None:
MockConfigEntry(domain="co2signal", data={"api_key": "abcd"}).add_to_hass(hass)
with patch(
"homeassistant.components.co2signal.config_flow.CO2Signal.get_latest",
"CO2Signal.get_latest",
return_value=VALID_PAYLOAD,
):
assert await async_setup_component(
@ -252,7 +251,7 @@ async def test_import_abort_existing_country(hass: HomeAssistant) -> None:
).add_to_hass(hass)
with patch(
"homeassistant.components.co2signal.config_flow.CO2Signal.get_latest",
"CO2Signal.get_latest",
return_value=VALID_PAYLOAD,
):
assert await async_setup_component(
@ -279,7 +278,7 @@ async def test_import_abort_existing_coordinates(hass: HomeAssistant) -> None:
).add_to_hass(hass)
with patch(
"homeassistant.components.co2signal.config_flow.CO2Signal.get_latest",
"CO2Signal.get_latest",
return_value=VALID_PAYLOAD,
):
assert await async_setup_component(