Add config flow to zamg (#66469)

This commit is contained in:
Daniel Gangl 2022-10-26 18:35:12 +02:00 committed by GitHub
parent a1c18b06fb
commit b2b3c47917
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 821 additions and 311 deletions

View File

@ -1582,6 +1582,9 @@ omit =
homeassistant/components/youless/const.py
homeassistant/components/youless/sensor.py
homeassistant/components/zabbix/*
homeassistant/components/zamg/__init__.py
homeassistant/components/zamg/const.py
homeassistant/components/zamg/coordinator.py
homeassistant/components/zamg/sensor.py
homeassistant/components/zamg/weather.py
homeassistant/components/zengge/light.py

View File

@ -1318,6 +1318,8 @@ build.json @home-assistant/supervisor
/tests/components/yolink/ @matrixd2
/homeassistant/components/youless/ @gjong
/tests/components/youless/ @gjong
/homeassistant/components/zamg/ @killer0071234
/tests/components/zamg/ @killer0071234
/homeassistant/components/zengge/ @emontnemery
/homeassistant/components/zeroconf/ @bdraco
/tests/components/zeroconf/ @bdraco

View File

@ -1 +1,33 @@
"""The zamg component."""
from __future__ import annotations
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from .const import CONF_STATION_ID, DOMAIN
from .coordinator import ZamgDataUpdateCoordinator
PLATFORMS = (Platform.WEATHER, Platform.SENSOR)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Zamg from config entry."""
coordinator = ZamgDataUpdateCoordinator(hass, entry=entry)
station_id = entry.data[CONF_STATION_ID]
coordinator.zamg.set_default_station(station_id)
await coordinator.async_config_entry_first_refresh()
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator
# Set up all platforms for this device/entry.
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload ZAMG config entry."""
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok

View File

@ -0,0 +1,136 @@
"""Config Flow for zamg the Austrian "Zentralanstalt für Meteorologie und Geodynamik" integration."""
from __future__ import annotations
from typing import Any
import voluptuous as vol
from zamg import ZamgData
from homeassistant import config_entries
from homeassistant.const import CONF_LATITUDE, CONF_LONGITUDE, CONF_NAME
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
from .const import CONF_STATION_ID, DOMAIN, LOGGER
class ZamgConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Config flow for zamg integration."""
VERSION = 1
_client: ZamgData | None = None
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle a flow initiated by the user."""
errors: dict[str, Any] = {}
if self._client is None:
self._client = ZamgData()
self._client.session = async_get_clientsession(self.hass)
if user_input is None:
closest_station_id = await self._client.closest_station(
self.hass.config.latitude,
self.hass.config.longitude,
)
LOGGER.debug("config_flow: closest station = %s", str(closest_station_id))
stations = await self._client.zamg_stations()
user_input = {}
schema = vol.Schema(
{
vol.Required(
CONF_STATION_ID, default=int(closest_station_id)
): vol.In(
{
int(station): f"{stations[station][2]} ({station})"
for station in stations
}
)
}
)
return self.async_show_form(step_id="user", data_schema=schema)
station_id = str(user_input[CONF_STATION_ID])
# Check if already configured
await self.async_set_unique_id(station_id)
self._abort_if_unique_id_configured()
try:
self._client.set_default_station(station_id)
await self._client.update()
except (ValueError, TypeError) as err:
LOGGER.error("Config_flow: Received error from ZAMG: %s", err)
errors["base"] = "cannot_connect"
return self.async_abort(
reason="cannot_connect", description_placeholders=errors
)
return self.async_create_entry(
title=user_input.get(CONF_NAME) or self._client.get_station_name,
data={CONF_STATION_ID: station_id},
)
async def async_step_import(self, config: dict[str, Any]) -> FlowResult:
"""Handle ZAMG configuration import."""
station_id = str(config.get(CONF_STATION_ID))
station_name = config.get(CONF_NAME)
# create issue every time after restart
# parameter is_persistent seems not working
async_create_issue(
self.hass,
DOMAIN,
"deprecated_yaml",
breaks_in_ha_version="2023.1.0",
is_fixable=False,
severity=IssueSeverity.WARNING,
translation_key="deprecated_yaml",
)
for entry in self.hass.config_entries.async_entries(DOMAIN):
if station_id in entry.data[CONF_STATION_ID]:
return self.async_abort(
reason="already_configured",
)
if self._client is None:
self._client = ZamgData()
self._client.session = async_get_clientsession(self.hass)
if station_id not in await self._client.zamg_stations():
LOGGER.warning(
"Configured station_id %s could not be found at zamg, adding the nearest weather station instead",
station_id,
)
latitude = config.get(CONF_LATITUDE) or self.hass.config.latitude
longitude = config.get(CONF_LONGITUDE) or self.hass.config.longitude
station_id = await self._client.closest_station(latitude, longitude)
if not station_name:
await self._client.zamg_stations()
self._client.set_default_station(station_id)
station_name = self._client.get_station_name
for entry in self.hass.config_entries.async_entries(DOMAIN):
if station_id in entry.data[CONF_STATION_ID]:
return self.async_abort(
reason="already_configured",
)
LOGGER.debug(
"importing zamg station from configuration.yaml: station_id = %s, name = %s",
station_id,
station_name,
)
return await self.async_step_user(
user_input={
CONF_STATION_ID: int(station_id),
CONF_NAME: station_name,
}
)

View File

@ -0,0 +1,26 @@
"""Constants for zamg the Austrian "Zentralanstalt für Meteorologie und Geodynamik" integration."""
from datetime import timedelta
import logging
from homeassistant.const import Platform
from homeassistant.util import dt as dt_util
DOMAIN = "zamg"
PLATFORMS = [Platform.SENSOR, Platform.WEATHER]
LOGGER = logging.getLogger(__package__)
ATTR_STATION = "station"
ATTR_UPDATED = "updated"
ATTRIBUTION = "Data provided by ZAMG"
CONF_STATION_ID = "station_id"
DEFAULT_NAME = "zamg"
MANUFACTURER_URL = "https://www.zamg.ac.at"
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=10)
VIENNA_TIME_ZONE = dt_util.get_time_zone("Europe/Vienna")

View File

@ -0,0 +1,46 @@
"""Data Update coordinator for ZAMG weather data."""
from __future__ import annotations
from zamg import ZamgData as ZamgDevice
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import CONF_STATION_ID, DOMAIN, LOGGER, MIN_TIME_BETWEEN_UPDATES
class ZamgDataUpdateCoordinator(DataUpdateCoordinator[ZamgDevice]):
"""Class to manage fetching ZAMG weather data."""
config_entry: ConfigEntry
data: dict = {}
def __init__(
self,
hass: HomeAssistant,
*,
entry: ConfigEntry,
) -> None:
"""Initialize global ZAMG data updater."""
self.zamg = ZamgDevice(session=async_get_clientsession(hass))
self.zamg.set_default_station(entry.data[CONF_STATION_ID])
super().__init__(
hass,
LOGGER,
name=DOMAIN,
update_interval=MIN_TIME_BETWEEN_UPDATES,
)
async def _async_update_data(self) -> ZamgDevice:
"""Fetch data from ZAMG api."""
try:
await self.zamg.zamg_stations()
device = await self.zamg.update()
except ValueError as error:
raise UpdateFailed(f"Invalid response from API: {error}") from error
self.data = device
self.data["last_update"] = self.zamg.last_update
self.data["Name"] = self.zamg.get_station_name
return device

View File

@ -2,6 +2,8 @@
"domain": "zamg",
"name": "Zentralanstalt f\u00fcr Meteorologie und Geodynamik (ZAMG)",
"documentation": "https://www.home-assistant.io/integrations/zamg",
"codeowners": [],
"requirements": ["zamg==0.1.1"],
"codeowners": ["@killer0071234"],
"config_flow": true,
"iot_class": "cloud_polling"
}

View File

@ -1,55 +1,51 @@
"""Sensor for the Austrian "Zentralanstalt für Meteorologie und Geodynamik"."""
"""Sensor for zamg the Austrian "Zentralanstalt für Meteorologie und Geodynamik" integration."""
from __future__ import annotations
import csv
from collections.abc import Mapping
from dataclasses import dataclass
from datetime import datetime, timedelta
import gzip
import json
import logging
import os
from typing import Union
import requests
import voluptuous as vol
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorEntityDescription,
SensorStateClass,
)
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import (
AREA_SQUARE_METERS,
ATTR_ATTRIBUTION,
CONF_LATITUDE,
CONF_LONGITUDE,
CONF_MONITORED_CONDITIONS,
CONF_NAME,
DEGREE,
LENGTH_METERS,
LENGTH_CENTIMETERS,
LENGTH_MILLIMETERS,
PERCENTAGE,
PRESSURE_HPA,
SPEED_KILOMETERS_PER_HOUR,
SPEED_METERS_PER_SECOND,
TEMP_CELSIUS,
__version__,
TIME_SECONDS,
)
from homeassistant.core import HomeAssistant
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.device_registry import DeviceEntryType
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.util import Throttle, dt as dt_util
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType, StateType
from homeassistant.helpers.update_coordinator import CoordinatorEntity
_LOGGER = logging.getLogger(__name__)
ATTR_STATION = "station"
ATTR_UPDATED = "updated"
ATTRIBUTION = "Data provided by ZAMG"
CONF_STATION_ID = "station_id"
DEFAULT_NAME = "zamg"
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=10)
VIENNA_TIME_ZONE = dt_util.get_time_zone("Europe/Vienna")
from .const import (
ATTR_STATION,
ATTR_UPDATED,
ATTRIBUTION,
CONF_STATION_ID,
DEFAULT_NAME,
DOMAIN,
MANUFACTURER_URL,
)
_DType = Union[type[int], type[float], type[str]]
@ -58,7 +54,7 @@ _DType = Union[type[int], type[float], type[str]]
class ZamgRequiredKeysMixin:
"""Mixin for required keys."""
col_heading: str
para_name: str
dtype: _DType
@ -72,56 +68,67 @@ SENSOR_TYPES: tuple[ZamgSensorEntityDescription, ...] = (
key="pressure",
name="Pressure",
native_unit_of_measurement=PRESSURE_HPA,
col_heading="LDstat hPa",
device_class=SensorDeviceClass.PRESSURE,
state_class=SensorStateClass.MEASUREMENT,
para_name="P",
dtype=float,
),
ZamgSensorEntityDescription(
key="pressure_sealevel",
name="Pressure at Sea Level",
native_unit_of_measurement=PRESSURE_HPA,
col_heading="LDred hPa",
device_class=SensorDeviceClass.PRESSURE,
state_class=SensorStateClass.MEASUREMENT,
para_name="PRED",
dtype=float,
),
ZamgSensorEntityDescription(
key="humidity",
name="Humidity",
native_unit_of_measurement=PERCENTAGE,
col_heading="RF %",
device_class=SensorDeviceClass.HUMIDITY,
state_class=SensorStateClass.MEASUREMENT,
para_name="RFAM",
dtype=int,
),
ZamgSensorEntityDescription(
key="wind_speed",
name="Wind Speed",
native_unit_of_measurement=SPEED_KILOMETERS_PER_HOUR,
col_heading=f"WG {SPEED_KILOMETERS_PER_HOUR}",
native_unit_of_measurement=SPEED_METERS_PER_SECOND,
state_class=SensorStateClass.MEASUREMENT,
para_name="FFAM",
dtype=float,
),
ZamgSensorEntityDescription(
key="wind_bearing",
name="Wind Bearing",
native_unit_of_measurement=DEGREE,
col_heading=f"WR {DEGREE}",
state_class=SensorStateClass.MEASUREMENT,
para_name="DD",
dtype=int,
),
ZamgSensorEntityDescription(
key="wind_max_speed",
name="Top Wind Speed",
native_unit_of_measurement=SPEED_KILOMETERS_PER_HOUR,
col_heading=f"WSG {SPEED_KILOMETERS_PER_HOUR}",
native_unit_of_measurement=SPEED_METERS_PER_SECOND,
state_class=SensorStateClass.MEASUREMENT,
para_name="FFX",
dtype=float,
),
ZamgSensorEntityDescription(
key="wind_max_bearing",
name="Top Wind Bearing",
native_unit_of_measurement=DEGREE,
col_heading=f"WSR {DEGREE}",
state_class=SensorStateClass.MEASUREMENT,
para_name="DDX",
dtype=int,
),
ZamgSensorEntityDescription(
key="sun_last_hour",
name="Sun Last Hour",
native_unit_of_measurement=PERCENTAGE,
col_heading=f"SO {PERCENTAGE}",
key="sun_last_10min",
name="Sun Last 10 Minutes",
native_unit_of_measurement=TIME_SECONDS,
state_class=SensorStateClass.MEASUREMENT,
para_name="SO",
dtype=int,
),
ZamgSensorEntityDescription(
@ -129,14 +136,33 @@ SENSOR_TYPES: tuple[ZamgSensorEntityDescription, ...] = (
name="Temperature",
native_unit_of_measurement=TEMP_CELSIUS,
device_class=SensorDeviceClass.TEMPERATURE,
col_heading=f"T {TEMP_CELSIUS}",
state_class=SensorStateClass.MEASUREMENT,
para_name="TL",
dtype=float,
),
ZamgSensorEntityDescription(
key="temperature_average",
name="Temperature Average",
native_unit_of_measurement=TEMP_CELSIUS,
device_class=SensorDeviceClass.TEMPERATURE,
state_class=SensorStateClass.MEASUREMENT,
para_name="TLAM",
dtype=float,
),
ZamgSensorEntityDescription(
key="precipitation",
name="Precipitation",
native_unit_of_measurement=f"l/{AREA_SQUARE_METERS}",
col_heading=f"N l/{AREA_SQUARE_METERS}",
native_unit_of_measurement=LENGTH_MILLIMETERS,
state_class=SensorStateClass.MEASUREMENT,
para_name="RR",
dtype=float,
),
ZamgSensorEntityDescription(
key="snow",
name="Snow",
native_unit_of_measurement=LENGTH_CENTIMETERS,
state_class=SensorStateClass.MEASUREMENT,
para_name="SCHNEE",
dtype=float,
),
ZamgSensorEntityDescription(
@ -144,42 +170,25 @@ SENSOR_TYPES: tuple[ZamgSensorEntityDescription, ...] = (
name="Dew Point",
native_unit_of_measurement=TEMP_CELSIUS,
device_class=SensorDeviceClass.TEMPERATURE,
col_heading=f"TP {TEMP_CELSIUS}",
state_class=SensorStateClass.MEASUREMENT,
para_name="TP",
dtype=float,
),
# The following probably not useful for general consumption,
# but we need them to fill in internal attributes
ZamgSensorEntityDescription(
key="station_name",
name="Station Name",
col_heading="Name",
dtype=str,
),
ZamgSensorEntityDescription(
key="station_elevation",
name="Station Elevation",
native_unit_of_measurement=LENGTH_METERS,
col_heading=f"Höhe {LENGTH_METERS}",
dtype=int,
),
ZamgSensorEntityDescription(
key="update_date",
name="Update Date",
col_heading="Datum",
dtype=str,
),
ZamgSensorEntityDescription(
key="update_time",
name="Update Time",
col_heading="Zeit",
dtype=str,
key="dewpoint_average",
name="Dew Point Average",
native_unit_of_measurement=TEMP_CELSIUS,
device_class=SensorDeviceClass.TEMPERATURE,
state_class=SensorStateClass.MEASUREMENT,
para_name="TPAM",
dtype=float,
),
)
SENSOR_KEYS: list[str] = [desc.key for desc in SENSOR_TYPES]
API_FIELDS: dict[str, tuple[str, _DType]] = {
desc.col_heading: (desc.key, desc.dtype) for desc in SENSOR_TYPES
desc.para_name: (desc.key, desc.dtype) for desc in SENSOR_TYPES
}
PLATFORM_SCHEMA = cv.PLATFORM_SCHEMA.extend(
@ -199,187 +208,70 @@ PLATFORM_SCHEMA = cv.PLATFORM_SCHEMA.extend(
)
def setup_platform(
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_entities: AddEntitiesCallback,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the ZAMG sensor platform."""
name = config[CONF_NAME]
latitude = config.get(CONF_LATITUDE, hass.config.latitude)
longitude = config.get(CONF_LONGITUDE, hass.config.longitude)
station_id = config.get(CONF_STATION_ID) or closest_station(
latitude, longitude, hass.config.config_dir
)
if station_id not in _get_ogd_stations():
_LOGGER.error(
"Configured ZAMG %s (%s) is not a known station",
CONF_STATION_ID,
station_id,
)
return
probe = ZamgData(station_id=station_id)
try:
probe.update()
except (ValueError, TypeError) as err:
_LOGGER.error("Received error from ZAMG: %s", err)
return
monitored_conditions = config[CONF_MONITORED_CONDITIONS]
add_entities(
[
ZamgSensor(probe, name, description)
for description in SENSOR_TYPES
if description.key in monitored_conditions
],
True,
# trigger import flow
await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data=config,
)
class ZamgSensor(SensorEntity):
async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""Set up the ZAMG sensor platform."""
coordinator = hass.data[DOMAIN][entry.entry_id]
async_add_entities(
ZamgSensor(coordinator, entry.title, entry.data[CONF_STATION_ID], description)
for description in SENSOR_TYPES
)
class ZamgSensor(CoordinatorEntity, SensorEntity):
"""Implementation of a ZAMG sensor."""
_attr_attribution = ATTRIBUTION
entity_description: ZamgSensorEntityDescription
def __init__(self, probe, name, description: ZamgSensorEntityDescription):
def __init__(
self, coordinator, name, station_id, description: ZamgSensorEntityDescription
):
"""Initialize the sensor."""
super().__init__(coordinator)
self.entity_description = description
self.probe = probe
self._attr_name = f"{name} {description.key}"
self._attr_name = f"{name} {description.name}"
self._attr_unique_id = f"{station_id}_{description.key}"
self.station_id = f"{station_id}"
self._attr_device_info = DeviceInfo(
entry_type=DeviceEntryType.SERVICE,
identifiers={(DOMAIN, station_id)},
manufacturer=ATTRIBUTION,
configuration_url=MANUFACTURER_URL,
name=coordinator.name,
)
@property
def native_value(self):
def native_value(self) -> StateType:
"""Return the state of the sensor."""
return self.probe.get_data(self.entity_description.key)
return self.coordinator.data[self.station_id].get(
self.entity_description.para_name
)["data"]
@property
def extra_state_attributes(self):
def extra_state_attributes(self) -> Mapping[str, str]:
"""Return the state attributes."""
update_time = self.coordinator.data.get("last_update", "")
return {
ATTR_STATION: self.probe.get_data("station_name"),
ATTR_UPDATED: self.probe.last_update.isoformat(),
ATTR_ATTRIBUTION: ATTRIBUTION,
ATTR_STATION: self.coordinator.data.get("Name"),
CONF_STATION_ID: self.station_id,
ATTR_UPDATED: update_time.isoformat(),
}
def update(self) -> None:
"""Delegate update to probe."""
self.probe.update()
class ZamgData:
"""The class for handling the data retrieval."""
API_URL = "http://www.zamg.ac.at/ogd/"
API_HEADERS = {"User-Agent": f"home-assistant.zamg/ {__version__}"}
def __init__(self, station_id):
"""Initialize the probe."""
self._station_id = station_id
self.data = {}
@property
def last_update(self):
"""Return the timestamp of the most recent data."""
date, time = self.data.get("update_date"), self.data.get("update_time")
if date is not None and time is not None:
return datetime.strptime(date + time, "%d-%m-%Y%H:%M").replace(
tzinfo=VIENNA_TIME_ZONE
)
@classmethod
def current_observations(cls):
"""Fetch the latest CSV data."""
try:
response = requests.get(cls.API_URL, headers=cls.API_HEADERS, timeout=15)
response.raise_for_status()
response.encoding = "UTF8"
return csv.DictReader(
response.text.splitlines(), delimiter=";", quotechar='"'
)
except requests.exceptions.HTTPError:
_LOGGER.error("While fetching data")
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
"""Get the latest data from ZAMG."""
if self.last_update and (
self.last_update + timedelta(hours=1)
> datetime.utcnow().replace(tzinfo=dt_util.UTC)
):
return # Not time to update yet; data is only hourly
for row in self.current_observations():
if row.get("Station") == self._station_id:
self.data = {
API_FIELDS[col_heading][0]: API_FIELDS[col_heading][1](
v.replace(",", ".")
)
for col_heading, v in row.items()
if col_heading in API_FIELDS and v
}
break
else:
raise ValueError(f"No weather data for station {self._station_id}")
def get_data(self, variable):
"""Get the data."""
return self.data.get(variable)
def _get_ogd_stations():
"""Return all stations in the OGD dataset."""
return {r["Station"] for r in ZamgData.current_observations()}
def _get_zamg_stations():
"""Return {CONF_STATION: (lat, lon)} for all stations, for auto-config."""
capital_stations = _get_ogd_stations()
req = requests.get(
"https://www.zamg.ac.at/cms/en/documents/climate/"
"doc_metnetwork/zamg-observation-points",
timeout=15,
)
stations = {}
for row in csv.DictReader(req.text.splitlines(), delimiter=";", quotechar='"'):
if row.get("synnr") in capital_stations:
try:
stations[row["synnr"]] = tuple(
float(row[coord].replace(",", "."))
for coord in ("breite_dezi", "länge_dezi")
)
except KeyError:
_LOGGER.error("ZAMG schema changed again, cannot autodetect station")
return stations
def zamg_stations(cache_dir):
"""Return {CONF_STATION: (lat, lon)} for all stations, for auto-config.
Results from internet requests are cached as compressed json, making
subsequent calls very much faster.
"""
cache_file = os.path.join(cache_dir, ".zamg-stations.json.gz")
if not os.path.isfile(cache_file):
stations = _get_zamg_stations()
with gzip.open(cache_file, "wt") as cache:
json.dump(stations, cache, sort_keys=True)
return stations
with gzip.open(cache_file, "rt") as cache:
return {k: tuple(v) for k, v in json.load(cache).items()}
def closest_station(lat, lon, cache_dir):
"""Return the ZONE_ID.WMO_ID of the closest station to our lat/lon."""
if lat is None or lon is None or not os.path.isdir(cache_dir):
return
stations = zamg_stations(cache_dir)
def comparable_dist(zamg_id):
"""Calculate the pseudo-distance from lat/lon."""
station_lat, station_lon = stations[zamg_id]
return (lat - station_lat) ** 2 + (lon - station_lon) ** 2
return min(stations, key=comparable_dist)

View File

@ -0,0 +1,26 @@
{
"config": {
"flow_title": "{name}",
"step": {
"user": {
"description": "Set up ZAMG to integrate with Home Assistant.",
"data": {
"station_id": "Station ID (Defaults to nearest station)"
}
}
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]"
}
},
"issues": {
"deprecated_yaml": {
"title": "The ZAMG YAML configuration is being removed",
"description": "Configuring ZAMG using YAML is being removed.\n\nYour existing YAML configuration has been imported into the UI automatically.\n\nRemove the ZAMG YAML configuration from your configuration.yaml file and restart Home Assistant to fix this issue."
}
}
}

View File

@ -0,0 +1,21 @@
{
"config": {
"abort": {
"already_configured": "Wetterstation ist bereits konfiguriert",
"cannot_connect": "Verbindung fehlgeschlagen"
},
"error": {
"unknown": "ID der Wetterstation ist unbekannt",
"cannot_connect": "Verbindung fehlgeschlagen"
},
"flow_title": "{name}",
"step": {
"user": {
"data": {
"station_id": "ID der Wetterstation (nächstgelegene Station as Defaultwert)"
},
"description": "Richte zamg f\u00fcr die Integration mit Home Assistant ein."
}
}
}
}

View File

@ -0,0 +1,26 @@
{
"config": {
"abort": {
"already_configured": "Device is already configured",
"cannot_connect": "Failed to connect"
},
"error": {
"cannot_connect": "Failed to connect"
},
"flow_title": "{name}",
"step": {
"user": {
"data": {
"station_id": "Station ID (Defaults to nearest station)"
},
"description": "Set up ZAMG to integrate with Home Assistant."
}
}
},
"issues": {
"deprecated_yaml": {
"description": "Configuring ZAMG using YAML is being removed.\n\nYour existing YAML configuration has been imported into the UI automatically.\n\nRemove the ZAMG YAML configuration from your configuration.yaml file and restart Home Assistant to fix this issue.",
"title": "The ZAMG YAML configuration is being removed"
}
}
}

View File

@ -1,42 +1,29 @@
"""Sensor for data from Austrian Zentralanstalt für Meteorologie."""
"""Sensor for zamg the Austrian "Zentralanstalt für Meteorologie und Geodynamik" integration."""
from __future__ import annotations
import logging
import voluptuous as vol
from homeassistant.components.weather import (
ATTR_WEATHER_HUMIDITY,
ATTR_WEATHER_PRESSURE,
ATTR_WEATHER_TEMPERATURE,
ATTR_WEATHER_WIND_BEARING,
ATTR_WEATHER_WIND_SPEED,
PLATFORM_SCHEMA,
WeatherEntity,
)
from homeassistant.components.weather import PLATFORM_SCHEMA, WeatherEntity
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import (
CONF_LATITUDE,
CONF_LONGITUDE,
CONF_NAME,
LENGTH_MILLIMETERS,
PRESSURE_HPA,
SPEED_KILOMETERS_PER_HOUR,
SPEED_METERS_PER_SECOND,
TEMP_CELSIUS,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.device_registry import DeviceEntryType
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.helpers.update_coordinator import CoordinatorEntity
# Reuse data and API logic from the sensor implementation
from .sensor import (
ATTRIBUTION,
CONF_STATION_ID,
ZamgData,
closest_station,
zamg_stations,
)
_LOGGER = logging.getLogger(__name__)
from .const import ATTRIBUTION, CONF_STATION_ID, DOMAIN, MANUFACTURER_URL
from .coordinator import ZamgDataUpdateCoordinator
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
@ -52,93 +39,101 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
)
def setup_platform(
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_entities: AddEntitiesCallback,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the ZAMG weather platform."""
name = config.get(CONF_NAME)
latitude = config.get(CONF_LATITUDE, hass.config.latitude)
longitude = config.get(CONF_LONGITUDE, hass.config.longitude)
station_id = config.get(CONF_STATION_ID) or closest_station(
latitude, longitude, hass.config.config_dir
# trigger import flow
await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data=config,
)
if station_id not in zamg_stations(hass.config.config_dir):
_LOGGER.error(
"Configured ZAMG %s (%s) is not a known station",
CONF_STATION_ID,
station_id,
)
return
probe = ZamgData(station_id=station_id)
try:
probe.update()
except (ValueError, TypeError) as err:
_LOGGER.error("Received error from ZAMG: %s", err)
return
add_entities([ZamgWeather(probe, name)], True)
class ZamgWeather(WeatherEntity):
async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""Set up the ZAMG weather platform."""
coordinator = hass.data[DOMAIN][entry.entry_id]
async_add_entities(
[ZamgWeather(coordinator, entry.title, entry.data[CONF_STATION_ID])]
)
class ZamgWeather(CoordinatorEntity, WeatherEntity):
"""Representation of a weather condition."""
_attr_native_pressure_unit = PRESSURE_HPA
_attr_native_temperature_unit = TEMP_CELSIUS
_attr_native_wind_speed_unit = SPEED_KILOMETERS_PER_HOUR
def __init__(self, zamg_data, stationname=None):
def __init__(
self, coordinator: ZamgDataUpdateCoordinator, name, station_id
) -> None:
"""Initialise the platform with a data instance and station name."""
self.zamg_data = zamg_data
self.stationname = stationname
@property
def name(self):
"""Return the name of the sensor."""
return (
self.stationname
or f"ZAMG {self.zamg_data.data.get('Name') or '(unknown station)'}"
super().__init__(coordinator)
self._attr_unique_id = f"{name}_{station_id}"
self._attr_name = f"ZAMG {name}"
self.station_id = f"{station_id}"
self._attr_device_info = DeviceInfo(
entry_type=DeviceEntryType.SERVICE,
identifiers={(DOMAIN, station_id)},
manufacturer=ATTRIBUTION,
configuration_url=MANUFACTURER_URL,
name=coordinator.name,
)
# set units of ZAMG API
self._attr_native_temperature_unit = TEMP_CELSIUS
self._attr_native_pressure_unit = PRESSURE_HPA
self._attr_native_wind_speed_unit = SPEED_METERS_PER_SECOND
self._attr_native_precipitation_unit = LENGTH_MILLIMETERS
@property
def condition(self):
def condition(self) -> str | None:
"""Return the current condition."""
return None
@property
def attribution(self):
def attribution(self) -> str | None:
"""Return the attribution."""
return ATTRIBUTION
@property
def native_temperature(self):
def native_temperature(self) -> float | None:
"""Return the platform temperature."""
return self.zamg_data.get_data(ATTR_WEATHER_TEMPERATURE)
try:
return float(self.coordinator.data[self.station_id].get("TL")["data"])
except (TypeError, ValueError):
return None
@property
def native_pressure(self):
def native_pressure(self) -> float | None:
"""Return the pressure."""
return self.zamg_data.get_data(ATTR_WEATHER_PRESSURE)
try:
return float(self.coordinator.data[self.station_id].get("P")["data"])
except (TypeError, ValueError):
return None
@property
def humidity(self):
def humidity(self) -> float | None:
"""Return the humidity."""
return self.zamg_data.get_data(ATTR_WEATHER_HUMIDITY)
try:
return float(self.coordinator.data[self.station_id].get("RFAM")["data"])
except (TypeError, ValueError):
return None
@property
def native_wind_speed(self):
def native_wind_speed(self) -> float | None:
"""Return the wind speed."""
return self.zamg_data.get_data(ATTR_WEATHER_WIND_SPEED)
try:
return float(self.coordinator.data[self.station_id].get("FF")["data"])
except (TypeError, ValueError):
return None
@property
def wind_bearing(self):
def wind_bearing(self) -> float | str | None:
"""Return the wind bearing."""
return self.zamg_data.get_data(ATTR_WEATHER_WIND_BEARING)
def update(self) -> None:
"""Update current conditions."""
self.zamg_data.update()
try:
return self.coordinator.data[self.station_id].get("DD")["data"]
except (TypeError, ValueError):
return None

View File

@ -461,6 +461,7 @@ FLOWS = {
"yeelight",
"yolink",
"youless",
"zamg",
"zerproc",
"zha",
"zwave_js",

View File

@ -6093,7 +6093,7 @@
"zamg": {
"name": "Zentralanstalt f\u00fcr Meteorologie und Geodynamik (ZAMG)",
"integration_type": "hub",
"config_flow": false,
"config_flow": true,
"iot_class": "cloud_polling"
},
"zengge": {

View File

@ -2597,6 +2597,9 @@ youless-api==0.16
# homeassistant.components.media_extractor
youtube_dl==2021.12.17
# homeassistant.components.zamg
zamg==0.1.1
# homeassistant.components.zengge
zengge==0.2

View File

@ -1801,6 +1801,9 @@ yolink-api==0.1.0
# homeassistant.components.youless
youless-api==0.16
# homeassistant.components.zamg
zamg==0.1.1
# homeassistant.components.zeroconf
zeroconf==0.39.2

View File

@ -0,0 +1 @@
"""Tests for the ZAMG component."""

View File

@ -0,0 +1,95 @@
"""Fixtures for Zamg integration tests."""
from collections.abc import Generator
import json
from unittest.mock import MagicMock, patch
import pytest
from zamg import ZamgData as ZamgDevice
from homeassistant.components.zamg.const import CONF_STATION_ID, DOMAIN
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry, load_fixture
TEST_STATION_ID = "11240"
TEST_STATION_NAME = "Graz/Flughafen"
@pytest.fixture
def mock_config_entry() -> MockConfigEntry:
"""Return the default mocked config entry."""
return MockConfigEntry(
domain=DOMAIN,
data={CONF_STATION_ID: TEST_STATION_ID},
unique_id=TEST_STATION_ID,
)
@pytest.fixture
def mock_setup_entry() -> Generator[None, None, None]:
"""Mock setting up a config entry."""
with patch("homeassistant.components.zamg.async_setup_entry", return_value=True):
yield
@pytest.fixture
def mock_zamg_config_flow(
request: pytest.FixtureRequest,
) -> Generator[None, MagicMock, None]:
"""Return a mocked Zamg client."""
with patch(
"homeassistant.components.zamg.sensor.ZamgData", autospec=True
) as zamg_mock:
zamg = zamg_mock.return_value
zamg.update.return_value = ZamgDevice(
json.loads(load_fixture("zamg/data.json"))
)
zamg.get_data.return_value = zamg.get_data(TEST_STATION_ID)
yield zamg
@pytest.fixture
def mock_zamg(request: pytest.FixtureRequest) -> Generator[None, MagicMock, None]:
"""Return a mocked Zamg client."""
with patch(
"homeassistant.components.zamg.config_flow.ZamgData", autospec=True
) as zamg_mock:
zamg = zamg_mock.return_value
zamg.update.return_value = {TEST_STATION_ID: {"Name": TEST_STATION_NAME}}
zamg.zamg_stations.return_value = {
TEST_STATION_ID: (46.99305556, 15.43916667, TEST_STATION_NAME),
"11244": (46.8722229, 15.90361118, "BAD GLEICHENBERG"),
}
zamg.closest_station.return_value = TEST_STATION_ID
zamg.get_data.return_value = TEST_STATION_ID
zamg.get_station_name = TEST_STATION_NAME
yield zamg
@pytest.fixture
def mock_zamg_stations(
request: pytest.FixtureRequest,
) -> Generator[None, MagicMock, None]:
"""Return a mocked Zamg client."""
with patch(
"homeassistant.components.zamg.config_flow.ZamgData.zamg_stations"
) as zamg_mock:
zamg_mock.return_value = {
"11240": (46.99305556, 15.43916667, "GRAZ-FLUGHAFEN"),
"11244": (46.87222222, 15.90361111, "BAD GLEICHENBERG"),
}
yield zamg_mock
@pytest.fixture
async def init_integration(
hass: HomeAssistant,
) -> MockConfigEntry:
"""Set up the Zamg integration for testing."""
mock_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
return mock_config_entry

View File

@ -0,0 +1,6 @@
{
"data": {
"station_id": "11240",
"station_name": "Graz/Flughafen"
}
}

View File

@ -0,0 +1,194 @@
"""Tests for the Zamg config flow."""
from unittest.mock import MagicMock
from homeassistant.components.zamg.const import CONF_STATION_ID, DOMAIN, LOGGER
from homeassistant.config_entries import SOURCE_IMPORT, SOURCE_USER
from homeassistant.const import CONF_NAME
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from .conftest import TEST_STATION_ID, TEST_STATION_NAME
async def test_full_user_flow_implementation(
hass: HomeAssistant,
mock_zamg: MagicMock,
mock_setup_entry: None,
) -> None:
"""Test the full manual user flow from start to finish."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
assert result.get("step_id") == "user"
assert result.get("type") == FlowResultType.FORM
LOGGER.debug(result)
assert result.get("data_schema") != ""
assert "flow_id" in result
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={CONF_STATION_ID: int(TEST_STATION_ID)},
)
assert result.get("type") == FlowResultType.CREATE_ENTRY
assert "data" in result
assert result["data"][CONF_STATION_ID] == TEST_STATION_ID
assert "result" in result
assert result["result"].unique_id == TEST_STATION_ID
async def test_error_update(
hass: HomeAssistant,
mock_zamg: MagicMock,
mock_setup_entry: None,
) -> None:
"""Test with error of reading from Zamg."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
assert result.get("step_id") == "user"
assert result.get("type") == FlowResultType.FORM
LOGGER.debug(result)
assert result.get("data_schema") != ""
mock_zamg.update.side_effect = ValueError
assert "flow_id" in result
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={CONF_STATION_ID: int(TEST_STATION_ID)},
)
assert result.get("type") == FlowResultType.ABORT
assert result.get("reason") == "cannot_connect"
async def test_full_import_flow_implementation(
hass: HomeAssistant,
mock_zamg: MagicMock,
mock_setup_entry: None,
) -> None:
"""Test the full import flow from start to finish."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={CONF_STATION_ID: TEST_STATION_ID, CONF_NAME: TEST_STATION_NAME},
)
assert result.get("type") == FlowResultType.CREATE_ENTRY
assert result.get("data") == {CONF_STATION_ID: TEST_STATION_ID}
async def test_user_flow_duplicate(
hass: HomeAssistant,
mock_zamg: MagicMock,
mock_setup_entry: None,
) -> None:
"""Test the full manual user flow from start to finish."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
assert result.get("step_id") == "user"
assert result.get("type") == FlowResultType.FORM
assert "flow_id" in result
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={CONF_STATION_ID: int(TEST_STATION_ID)},
)
assert result.get("type") == FlowResultType.CREATE_ENTRY
assert "data" in result
assert result["data"][CONF_STATION_ID] == TEST_STATION_ID
assert "result" in result
assert result["result"].unique_id == TEST_STATION_ID
# try to add another instance
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
assert result.get("step_id") == "user"
assert result.get("type") == FlowResultType.FORM
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={CONF_STATION_ID: int(TEST_STATION_ID)},
)
assert result.get("type") == FlowResultType.ABORT
assert result.get("reason") == "already_configured"
async def test_import_flow_duplicate(
hass: HomeAssistant,
mock_zamg: MagicMock,
mock_setup_entry: None,
) -> None:
"""Test import flow with duplicate entry."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
assert result.get("step_id") == "user"
assert result.get("type") == FlowResultType.FORM
assert "flow_id" in result
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={CONF_STATION_ID: int(TEST_STATION_ID)},
)
assert result.get("type") == FlowResultType.CREATE_ENTRY
assert "data" in result
assert result["data"][CONF_STATION_ID] == TEST_STATION_ID
assert "result" in result
assert result["result"].unique_id == TEST_STATION_ID
# try to add another instance
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={CONF_STATION_ID: TEST_STATION_ID, CONF_NAME: TEST_STATION_NAME},
)
assert result.get("type") == FlowResultType.ABORT
assert result.get("reason") == "already_configured"
async def test_import_flow_duplicate_after_position(
hass: HomeAssistant,
mock_zamg: MagicMock,
mock_setup_entry: None,
) -> None:
"""Test import flow with duplicate entry."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
assert result.get("step_id") == "user"
assert result.get("type") == FlowResultType.FORM
assert "flow_id" in result
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={CONF_STATION_ID: int(TEST_STATION_ID)},
)
assert result.get("type") == FlowResultType.CREATE_ENTRY
assert "data" in result
assert result["data"][CONF_STATION_ID] == TEST_STATION_ID
assert "result" in result
assert result["result"].unique_id == TEST_STATION_ID
# try to add another instance
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={CONF_STATION_ID: "123", CONF_NAME: TEST_STATION_NAME},
)
assert result.get("type") == FlowResultType.ABORT
assert result.get("reason") == "already_configured"
async def test_import_flow_no_name(
hass: HomeAssistant,
mock_zamg: MagicMock,
mock_setup_entry: None,
) -> None:
"""Test the full import flow from start to finish."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={CONF_STATION_ID: TEST_STATION_ID},
)
assert result.get("type") == FlowResultType.CREATE_ENTRY
assert result.get("data") == {CONF_STATION_ID: TEST_STATION_ID}