Move tankerkoenig to new aiotankerkoenig package (#108913)

* Move tankerkoenig to new aiotankerkoenig package

* Fix config flow coverage

* Process code review suggestions

* Process code review suggestions
This commit is contained in:
Jan-Philipp Benecke 2024-01-31 14:57:08 +01:00 committed by GitHub
parent 640463c559
commit 71c2460161
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 367 additions and 320 deletions

View File

@ -1336,8 +1336,8 @@ build.json @home-assistant/supervisor
/tests/components/tailwind/ @frenck
/homeassistant/components/tami4/ @Guy293
/tests/components/tami4/ @Guy293
/homeassistant/components/tankerkoenig/ @guillempages @mib1185
/tests/components/tankerkoenig/ @guillempages @mib1185
/homeassistant/components/tankerkoenig/ @guillempages @mib1185 @jpbede
/tests/components/tankerkoenig/ @guillempages @mib1185 @jpbede
/homeassistant/components/tapsaff/ @bazwilliams
/homeassistant/components/tasmota/ @emontnemery
/tests/components/tasmota/ @emontnemery

View File

@ -1,22 +1,14 @@
"""Ask tankerkoenig.de for petrol price information."""
from __future__ import annotations
import logging
from requests.exceptions import RequestException
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers import config_validation as cv
from .const import DEFAULT_SCAN_INTERVAL, DOMAIN
from .coordinator import TankerkoenigDataUpdateCoordinator
_LOGGER = logging.getLogger(__name__)
PLATFORMS = [Platform.BINARY_SENSOR, Platform.SENSOR]
CONFIG_SCHEMA = cv.removed(DOMAIN, raise_if_present=False)
@ -25,24 +17,18 @@ CONFIG_SCHEMA = cv.removed(DOMAIN, raise_if_present=False)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set a tankerkoenig configuration entry up."""
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][entry.entry_id] = coordinator = TankerkoenigDataUpdateCoordinator(
coordinator = TankerkoenigDataUpdateCoordinator(
hass,
entry,
_LOGGER,
name=entry.unique_id or DOMAIN,
update_interval=DEFAULT_SCAN_INTERVAL,
)
try:
setup_ok = await hass.async_add_executor_job(coordinator.setup)
except RequestException as err:
raise ConfigEntryNotReady from err
if not setup_ok:
_LOGGER.error("Could not setup integration")
return False
await coordinator.async_setup()
await coordinator.async_config_entry_first_refresh()
hass.data[DOMAIN][entry.entry_id] = coordinator
entry.async_on_unload(entry.add_update_listener(_async_update_listener))
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)

View File

@ -3,6 +3,8 @@ from __future__ import annotations
import logging
from aiotankerkoenig import PriceInfo, Station, Status
from homeassistant.components.binary_sensor import (
BinarySensorDeviceClass,
BinarySensorEntity,
@ -23,21 +25,15 @@ async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""Set up the tankerkoenig binary sensors."""
coordinator: TankerkoenigDataUpdateCoordinator = hass.data[DOMAIN][entry.entry_id]
stations = coordinator.stations.values()
entities = []
for station in stations:
sensor = StationOpenBinarySensorEntity(
async_add_entities(
StationOpenBinarySensorEntity(
station,
coordinator,
coordinator.show_on_map,
)
entities.append(sensor)
_LOGGER.debug("Added sensors %s", entities)
async_add_entities(entities)
for station in coordinator.stations.values()
)
class StationOpenBinarySensorEntity(TankerkoenigCoordinatorEntity, BinarySensorEntity):
@ -48,22 +44,21 @@ class StationOpenBinarySensorEntity(TankerkoenigCoordinatorEntity, BinarySensorE
def __init__(
self,
station: dict,
station: Station,
coordinator: TankerkoenigDataUpdateCoordinator,
show_on_map: bool,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, station)
self._station_id = station["id"]
self._attr_unique_id = f"{station['id']}_status"
if show_on_map:
self._station_id = station.id
self._attr_unique_id = f"{station.id}_status"
if coordinator.show_on_map:
self._attr_extra_state_attributes = {
ATTR_LATITUDE: station["lat"],
ATTR_LONGITUDE: station["lng"],
ATTR_LATITUDE: station.lat,
ATTR_LONGITUDE: station.lng,
}
@property
def is_on(self) -> bool | None:
"""Return true if the station is open."""
data: dict = self.coordinator.data[self._station_id]
return data is not None and data.get("status") == "open"
data: PriceInfo = self.coordinator.data[self._station_id]
return data is not None and data.status == Status.OPEN

View File

@ -4,7 +4,13 @@ from __future__ import annotations
from collections.abc import Mapping
from typing import Any
from pytankerkoenig import customException, getNearbyStations
from aiotankerkoenig import (
GasType,
Sort,
Station,
Tankerkoenig,
TankerkoenigInvalidKeyError,
)
import voluptuous as vol
from homeassistant import config_entries
@ -18,8 +24,9 @@ from homeassistant.const import (
CONF_SHOW_ON_MAP,
UnitOfLength,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.selector import (
LocationSelector,
@ -31,21 +38,18 @@ from .const import CONF_FUEL_TYPES, CONF_STATIONS, DEFAULT_RADIUS, DOMAIN, FUEL_
async def async_get_nearby_stations(
hass: HomeAssistant, data: Mapping[str, Any]
) -> dict[str, Any]:
tankerkoenig: Tankerkoenig, data: Mapping[str, Any]
) -> list[Station]:
"""Fetch nearby stations."""
try:
return await hass.async_add_executor_job(
getNearbyStations,
data[CONF_API_KEY],
return await tankerkoenig.nearby_stations(
coordinates=(
data[CONF_LOCATION][CONF_LATITUDE],
data[CONF_LOCATION][CONF_LONGITUDE],
data[CONF_RADIUS],
"all",
"dist",
)
except customException as err:
return {"ok": False, "message": err, "exception": True}
),
radius=data[CONF_RADIUS],
gas_type=GasType.ALL,
sort=Sort.DISTANCE,
)
class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
@ -53,11 +57,8 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
VERSION = 1
def __init__(self) -> None:
"""Init the FlowHandler."""
super().__init__()
self._data: dict[str, Any] = {}
self._stations: dict[str, str] = {}
_data: dict[str, Any] = {}
_stations: dict[str, str] = {}
@staticmethod
@callback
@ -79,17 +80,25 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
)
self._abort_if_unique_id_configured()
data = await async_get_nearby_stations(self.hass, user_input)
if not data.get("ok"):
tankerkoenig = Tankerkoenig(
api_key=user_input[CONF_API_KEY],
session=async_get_clientsession(self.hass),
)
try:
stations = await async_get_nearby_stations(tankerkoenig, user_input)
except TankerkoenigInvalidKeyError:
return self._show_form_user(
user_input, errors={CONF_API_KEY: "invalid_auth"}
)
if len(stations := data.get("stations", [])) == 0:
# no stations found
if len(stations) == 0:
return self._show_form_user(user_input, errors={CONF_RADIUS: "no_stations"})
for station in stations:
self._stations[station["id"]] = (
f"{station['brand']} {station['street']} {station['houseNumber']} -"
f" ({station['dist']}km)"
self._stations[station.id] = (
f"{station.brand} {station.street} {station.house_number} -"
f" ({station.distance}km)"
)
self._data = user_input
@ -128,8 +137,14 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
entry = self.hass.config_entries.async_get_entry(self.context["entry_id"])
assert entry
user_input = {**entry.data, **user_input}
data = await async_get_nearby_stations(self.hass, user_input)
if not data.get("ok"):
tankerkoenig = Tankerkoenig(
api_key=user_input[CONF_API_KEY],
session=async_get_clientsession(self.hass),
)
try:
await async_get_nearby_stations(tankerkoenig, user_input)
except TankerkoenigInvalidKeyError:
return self._show_form_reauth(user_input, {CONF_API_KEY: "invalid_auth"})
self.hass.config_entries.async_update_entry(entry, data=user_input)
@ -233,14 +248,22 @@ class OptionsFlowHandler(config_entries.OptionsFlow):
)
return self.async_create_entry(title="", data=user_input)
nearby_stations = await async_get_nearby_stations(
self.hass, self.config_entry.data
tankerkoenig = Tankerkoenig(
api_key=self.config_entry.data[CONF_API_KEY],
session=async_get_clientsession(self.hass),
)
if stations := nearby_stations.get("stations"):
try:
stations = await async_get_nearby_stations(
tankerkoenig, self.config_entry.data
)
except TankerkoenigInvalidKeyError:
return self.async_show_form(step_id="init", errors={"base": "invalid_auth"})
if stations:
for station in stations:
self._stations[station["id"]] = (
f"{station['brand']} {station['street']} {station['houseNumber']} -"
f" ({station['dist']}km)"
self._stations[station.id] = (
f"{station.brand} {station.street} {station.house_number} -"
f" ({station.distance}km)"
)
# add possible extra selected stations from import

View File

@ -5,13 +5,21 @@ from datetime import timedelta
import logging
from math import ceil
import pytankerkoenig
from aiotankerkoenig import (
PriceInfo,
Station,
Tankerkoenig,
TankerkoenigConnectionError,
TankerkoenigError,
TankerkoenigInvalidKeyError,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_API_KEY, CONF_SHOW_ON_MAP
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .const import CONF_FUEL_TYPES, CONF_STATIONS
@ -25,7 +33,6 @@ class TankerkoenigDataUpdateCoordinator(DataUpdateCoordinator):
self,
hass: HomeAssistant,
entry: ConfigEntry,
logger: logging.Logger,
name: str,
update_interval: int,
) -> None:
@ -33,50 +40,41 @@ class TankerkoenigDataUpdateCoordinator(DataUpdateCoordinator):
super().__init__(
hass=hass,
logger=logger,
logger=_LOGGER,
name=name,
update_interval=timedelta(minutes=update_interval),
)
self._api_key: str = entry.data[CONF_API_KEY]
self._selected_stations: list[str] = entry.data[CONF_STATIONS]
self.stations: dict[str, dict] = {}
self.stations: dict[str, Station] = {}
self.fuel_types: list[str] = entry.data[CONF_FUEL_TYPES]
self.show_on_map: bool = entry.options[CONF_SHOW_ON_MAP]
def setup(self) -> bool:
self._tankerkoenig = Tankerkoenig(
api_key=entry.data[CONF_API_KEY], session=async_get_clientsession(hass)
)
async def async_setup(self) -> None:
"""Set up the tankerkoenig API."""
for station_id in self._selected_stations:
try:
station_data = pytankerkoenig.getStationData(self._api_key, station_id)
except pytankerkoenig.customException as err:
if any(x in str(err).lower() for x in ("api-key", "apikey")):
raise ConfigEntryAuthFailed(err) from err
station_data = {
"ok": False,
"message": err,
"exception": True,
}
station = await self._tankerkoenig.station_details(station_id)
except TankerkoenigInvalidKeyError as err:
raise ConfigEntryAuthFailed(err) from err
except (TankerkoenigError, TankerkoenigConnectionError) as err:
raise ConfigEntryNotReady(err) from err
self.stations[station_id] = station
if not station_data["ok"]:
_LOGGER.error(
"Error when adding station %s:\n %s",
station_id,
station_data["message"],
)
continue
self.add_station(station_data["station"])
if len(self.stations) > 10:
_LOGGER.warning(
"Found more than 10 stations to check. "
"This might invalidate your api-key on the long run. "
"Try using a smaller radius"
)
return True
async def _async_update_data(self) -> dict:
async def _async_update_data(self) -> dict[str, PriceInfo]:
"""Get the latest data from tankerkoenig.de."""
_LOGGER.debug("Fetching new data from tankerkoenig.de")
station_ids = list(self.stations)
prices = {}
@ -84,30 +82,9 @@ class TankerkoenigDataUpdateCoordinator(DataUpdateCoordinator):
# The API seems to only return at most 10 results, so split the list in chunks of 10
# and merge it together.
for index in range(ceil(len(station_ids) / 10)):
data = await self.hass.async_add_executor_job(
pytankerkoenig.getPriceList,
self._api_key,
station_ids[index * 10 : (index + 1) * 10],
data = await self._tankerkoenig.prices(
station_ids[index * 10 : (index + 1) * 10]
)
prices.update(data)
_LOGGER.debug("Received data: %s", data)
if not data["ok"]:
raise UpdateFailed(data["message"])
if "prices" not in data:
raise UpdateFailed(
"Did not receive price information from tankerkoenig.de"
)
prices.update(data["prices"])
return prices
def add_station(self, station: dict):
"""Add fuel station to the entity list."""
station_id = station["id"]
if station_id in self.stations:
_LOGGER.warning(
"Sensor for station with id %s was already created", station_id
)
return
self.stations[station_id] = station
_LOGGER.debug("add_station called for station: %s", station)

View File

@ -1,4 +1,6 @@
"""The tankerkoenig base entity."""
from aiotankerkoenig import Station
from homeassistant.const import ATTR_ID
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.update_coordinator import CoordinatorEntity
@ -6,20 +8,22 @@ from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .coordinator import TankerkoenigDataUpdateCoordinator
class TankerkoenigCoordinatorEntity(CoordinatorEntity):
class TankerkoenigCoordinatorEntity(
CoordinatorEntity[TankerkoenigDataUpdateCoordinator]
):
"""Tankerkoenig base entity."""
_attr_has_entity_name = True
def __init__(
self, coordinator: TankerkoenigDataUpdateCoordinator, station: dict
self, coordinator: TankerkoenigDataUpdateCoordinator, station: Station
) -> None:
"""Initialize the Tankerkoenig base entity."""
super().__init__(coordinator)
self._attr_device_info = DeviceInfo(
identifiers={(ATTR_ID, station["id"])},
name=f"{station['brand']} {station['street']} {station['houseNumber']}",
model=station["brand"],
identifiers={(ATTR_ID, station.id)},
name=f"{station.brand} {station.street} {station.house_number}",
model=station.brand,
configuration_url="https://www.tankerkoenig.de",
entry_type=DeviceEntryType.SERVICE,
)

View File

@ -1,10 +1,10 @@
{
"domain": "tankerkoenig",
"name": "Tankerkoenig",
"codeowners": ["@guillempages", "@mib1185"],
"codeowners": ["@guillempages", "@mib1185", "@jpbede"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/tankerkoenig",
"iot_class": "cloud_polling",
"loggers": ["pytankerkoenig"],
"requirements": ["pytankerkoenig==0.0.6"]
"loggers": ["aiotankerkoenig"],
"requirements": ["aiotankerkoenig==0.2.0"]
}

View File

@ -3,6 +3,8 @@ from __future__ import annotations
import logging
from aiotankerkoenig import GasType, PriceInfo, Station
from homeassistant.components.sensor import SensorEntity, SensorStateClass
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import ATTR_LATITUDE, ATTR_LONGITUDE, CURRENCY_EURO
@ -30,26 +32,28 @@ async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""Set up the tankerkoenig sensors."""
coordinator: TankerkoenigDataUpdateCoordinator = hass.data[DOMAIN][entry.entry_id]
stations = coordinator.stations.values()
entities = []
for station in stations:
for fuel in coordinator.fuel_types:
if fuel not in station:
_LOGGER.warning(
"Station %s does not offer %s fuel", station["id"], fuel
for station in coordinator.stations.values():
for fuel in (GasType.E10, GasType.E5, GasType.DIESEL):
if getattr(station, fuel) is None:
_LOGGER.debug(
"Station %s %s (%s) does not offer %s fuel, skipping",
station.brand,
station.name,
station.id,
fuel,
)
continue
sensor = FuelPriceSensor(
fuel,
station,
coordinator,
coordinator.show_on_map,
entities.append(
FuelPriceSensor(
fuel,
station,
coordinator,
)
)
entities.append(sensor)
_LOGGER.debug("Added sensors %s", entities)
async_add_entities(entities)
@ -61,31 +65,35 @@ class FuelPriceSensor(TankerkoenigCoordinatorEntity, SensorEntity):
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_native_unit_of_measurement = CURRENCY_EURO
def __init__(self, fuel_type, station, coordinator, show_on_map):
def __init__(
self,
fuel_type: GasType,
station: Station,
coordinator: TankerkoenigDataUpdateCoordinator,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, station)
self._station_id = station["id"]
self._station_id = station.id
self._fuel_type = fuel_type
self._attr_translation_key = fuel_type
self._attr_unique_id = f"{station['id']}_{fuel_type}"
self._attr_unique_id = f"{station.id}_{fuel_type}"
attrs = {
ATTR_BRAND: station["brand"],
ATTR_BRAND: station.brand,
ATTR_FUEL_TYPE: fuel_type,
ATTR_STATION_NAME: station["name"],
ATTR_STREET: station["street"],
ATTR_HOUSE_NUMBER: station["houseNumber"],
ATTR_POSTCODE: station["postCode"],
ATTR_CITY: station["place"],
ATTR_STATION_NAME: station.name,
ATTR_STREET: station.street,
ATTR_HOUSE_NUMBER: station.house_number,
ATTR_POSTCODE: station.post_code,
ATTR_CITY: station.place,
}
if show_on_map:
attrs[ATTR_LATITUDE] = station["lat"]
attrs[ATTR_LONGITUDE] = station["lng"]
if coordinator.show_on_map:
attrs[ATTR_LATITUDE] = str(station.lat)
attrs[ATTR_LONGITUDE] = str(station.lng)
self._attr_extra_state_attributes = attrs
@property
def native_value(self):
"""Return the state of the device."""
# key Fuel_type is not available when the fuel station is closed,
# use "get" instead of "[]" to avoid exceptions
return self.coordinator.data[self._station_id].get(self._fuel_type)
def native_value(self) -> float:
"""Return the current price for the fuel type."""
info: PriceInfo = self.coordinator.data[self._station_id]
return getattr(info, self._fuel_type)

View File

@ -376,6 +376,9 @@ aioswitcher==3.4.1
# homeassistant.components.syncthing
aiosyncthing==0.5.1
# homeassistant.components.tankerkoenig
aiotankerkoenig==0.2.0
# homeassistant.components.tractive
aiotractive==0.5.6
@ -2156,9 +2159,6 @@ pysuez==0.2.0
# homeassistant.components.switchbee
pyswitchbee==1.8.0
# homeassistant.components.tankerkoenig
pytankerkoenig==0.0.6
# homeassistant.components.tautulli
pytautulli==23.1.1

View File

@ -349,6 +349,9 @@ aioswitcher==3.4.1
# homeassistant.components.syncthing
aiosyncthing==0.5.1
# homeassistant.components.tankerkoenig
aiotankerkoenig==0.2.0
# homeassistant.components.tractive
aiotractive==0.5.6
@ -1665,9 +1668,6 @@ pysuez==0.2.0
# homeassistant.components.switchbee
pyswitchbee==1.8.0
# homeassistant.components.tankerkoenig
pytankerkoenig==0.0.6
# homeassistant.components.tautulli
pytautulli==23.1.1

View File

@ -0,0 +1,75 @@
"""Fixtures for Tankerkoenig integration tests."""
from collections.abc import Generator
from unittest.mock import AsyncMock, patch
import pytest
from homeassistant.components.tankerkoenig import DOMAIN
from homeassistant.components.tankerkoenig.const import CONF_FUEL_TYPES, CONF_STATIONS
from homeassistant.const import (
CONF_API_KEY,
CONF_LATITUDE,
CONF_LOCATION,
CONF_LONGITUDE,
CONF_NAME,
CONF_RADIUS,
CONF_SHOW_ON_MAP,
)
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
from .const import NEARBY_STATIONS, PRICES, STATION
from tests.common import MockConfigEntry
@pytest.fixture(name="tankerkoenig")
def mock_tankerkoenig() -> Generator[AsyncMock, None, None]:
"""Mock the aiotankerkoenig client."""
with patch(
"homeassistant.components.tankerkoenig.coordinator.Tankerkoenig",
autospec=True,
) as mock_tankerkoenig, patch(
"homeassistant.components.tankerkoenig.config_flow.Tankerkoenig",
new=mock_tankerkoenig,
):
mock = mock_tankerkoenig.return_value
mock.station_details.return_value = STATION
mock.prices.return_value = PRICES
mock.nearby_stations.return_value = NEARBY_STATIONS
yield mock
@pytest.fixture(name="config_entry")
async def mock_config_entry(hass: HomeAssistant) -> MockConfigEntry:
"""Return a MockConfigEntry for testing."""
return MockConfigEntry(
domain=DOMAIN,
title="Mock Title",
unique_id="51.0_13.0",
entry_id="8036b4412f2fae6bb9dbab7fe8e37f87",
options={
CONF_SHOW_ON_MAP: True,
},
data={
CONF_NAME: "Home",
CONF_API_KEY: "269534f6-xxxx-xxxx-xxxx-yyyyzzzzxxxx",
CONF_FUEL_TYPES: ["e5"],
CONF_LOCATION: {CONF_LATITUDE: 51.0, CONF_LONGITUDE: 13.0},
CONF_RADIUS: 2.0,
CONF_STATIONS: [
"3bcd61da-xxxx-xxxx-xxxx-19d5523a7ae8",
],
},
)
@pytest.fixture(name="setup_integration")
async def mock_setup_integration(
hass: HomeAssistant, config_entry: MockConfigEntry, tankerkoenig: AsyncMock
) -> None:
"""Fixture for setting up the component."""
config_entry.add_to_hass(hass)
assert await async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()

View File

@ -0,0 +1,59 @@
"""Constants for the Tankerkoenig tests."""
from aiotankerkoenig import PriceInfo, Station, Status
NEARBY_STATIONS = [
Station(
id="3bcd61da-xxxx-xxxx-xxxx-19d5523a7ae8",
brand="BrandA",
place="CityA",
street="Main",
house_number="1",
distance=1,
lat=51.1,
lng=13.1,
name="Station ABC",
post_code=1234,
),
Station(
id="36b4b812-xxxx-xxxx-xxxx-c51735325858",
brand="BrandB",
place="CityB",
street="School",
house_number="2",
distance=2,
lat=51.2,
lng=13.2,
name="Station DEF",
post_code=2345,
),
]
STATION = Station(
id="3bcd61da-xxxx-xxxx-xxxx-19d5523a7ae8",
name="Station ABC",
brand="Station",
street="Somewhere Street",
house_number="1",
post_code=1234,
place="Somewhere",
opening_times=[],
overrides=[],
whole_day=True,
is_open=True,
e5=1.719,
e10=1.659,
diesel=1.659,
lat=51.1,
lng=13.1,
state="xxXX",
)
PRICES = {
"3bcd61da-xxxx-xxxx-xxxx-19d5523a7ae8": PriceInfo(
status=Status.OPEN,
e5=1.719,
e10=1.659,
diesel=1.659,
),
}

View File

@ -3,10 +3,8 @@
dict({
'data': dict({
'3bcd61da-xxxx-xxxx-xxxx-19d5523a7ae8': dict({
'diesel': 1.659,
'e10': 1.659,
'e5': 1.719,
'status': 'open',
'__type': "<class 'aiotankerkoenig.models.PriceInfo'>",
'repr': "PriceInfo(status=<Status.OPEN: 'open'>, e5=1.719, e10=1.659, diesel=1.659)",
}),
}),
'entry': dict({

View File

@ -1,7 +1,7 @@
"""Tests for Tankerkoenig config flow."""
from unittest.mock import patch
from pytankerkoenig import customException
from aiotankerkoenig.exceptions import TankerkoenigInvalidKeyError
from homeassistant.components.tankerkoenig.const import (
CONF_FUEL_TYPES,
@ -21,6 +21,8 @@ from homeassistant.const import (
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from .const import NEARBY_STATIONS
from tests.common import MockConfigEntry
MOCK_USER_DATA = {
@ -47,28 +49,6 @@ MOCK_OPTIONS_DATA = {
],
}
MOCK_NEARVY_STATIONS_OK = {
"ok": True,
"stations": [
{
"id": "3bcd61da-xxxx-xxxx-xxxx-19d5523a7ae8",
"brand": "BrandA",
"place": "CityA",
"street": "Main",
"houseNumber": "1",
"dist": 1,
},
{
"id": "36b4b812-xxxx-xxxx-xxxx-c51735325858",
"brand": "BrandB",
"place": "CityB",
"street": "School",
"houseNumber": "2",
"dist": 2,
},
],
}
async def test_user(hass: HomeAssistant) -> None:
"""Test starting a flow by user."""
@ -81,8 +61,8 @@ async def test_user(hass: HomeAssistant) -> None:
with patch(
"homeassistant.components.tankerkoenig.async_setup_entry", return_value=True
) as mock_setup_entry, patch(
"homeassistant.components.tankerkoenig.config_flow.getNearbyStations",
return_value=MOCK_NEARVY_STATIONS_OK,
"homeassistant.components.tankerkoenig.config_flow.Tankerkoenig.nearby_stations",
return_value=NEARBY_STATIONS,
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=MOCK_USER_DATA
@ -143,8 +123,8 @@ async def test_exception_security(hass: HomeAssistant) -> None:
assert result["step_id"] == "user"
with patch(
"homeassistant.components.tankerkoenig.config_flow.getNearbyStations",
side_effect=customException,
"homeassistant.components.tankerkoenig.config_flow.Tankerkoenig.nearby_stations",
side_effect=TankerkoenigInvalidKeyError,
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=MOCK_USER_DATA
@ -163,8 +143,8 @@ async def test_user_no_stations(hass: HomeAssistant) -> None:
assert result["step_id"] == "user"
with patch(
"homeassistant.components.tankerkoenig.config_flow.getNearbyStations",
return_value={"ok": True, "stations": []},
"homeassistant.components.tankerkoenig.config_flow.Tankerkoenig.nearby_stations",
return_value=[],
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=MOCK_USER_DATA
@ -174,32 +154,26 @@ async def test_user_no_stations(hass: HomeAssistant) -> None:
assert result["errors"][CONF_RADIUS] == "no_stations"
async def test_reauth(hass: HomeAssistant) -> None:
async def test_reauth(hass: HomeAssistant, config_entry: MockConfigEntry) -> None:
"""Test starting a flow by user to re-auth."""
mock_config = MockConfigEntry(
domain=DOMAIN,
data={**MOCK_USER_DATA, **MOCK_STATIONS_DATA},
unique_id=f"{MOCK_USER_DATA[CONF_LOCATION][CONF_LATITUDE]}_{MOCK_USER_DATA[CONF_LOCATION][CONF_LONGITUDE]}",
)
mock_config.add_to_hass(hass)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.tankerkoenig.async_setup_entry", return_value=True
) as mock_setup_entry, patch(
"homeassistant.components.tankerkoenig.config_flow.getNearbyStations",
"homeassistant.components.tankerkoenig.config_flow.Tankerkoenig.nearby_stations",
) as mock_nearby_stations:
# re-auth initialized
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_REAUTH, "entry_id": mock_config.entry_id},
data=mock_config.data,
context={"source": SOURCE_REAUTH, "entry_id": config_entry.entry_id},
data=config_entry.data,
)
assert result["type"] == FlowResultType.FORM
assert result["step_id"] == "reauth_confirm"
# re-auth unsuccessful
mock_nearby_stations.return_value = {"ok": False}
mock_nearby_stations.side_effect = TankerkoenigInvalidKeyError("Booom!")
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={
@ -211,7 +185,7 @@ async def test_reauth(hass: HomeAssistant) -> None:
assert result["errors"] == {CONF_API_KEY: "invalid_auth"}
# re-auth successful
mock_nearby_stations.return_value = MOCK_NEARVY_STATIONS_OK
mock_nearby_stations.side_effect = None
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={
@ -223,7 +197,7 @@ async def test_reauth(hass: HomeAssistant) -> None:
mock_setup_entry.assert_called()
entry = hass.config_entries.async_get_entry(mock_config.entry_id)
entry = hass.config_entries.async_get_entry(config_entry.entry_id)
assert entry.data[CONF_API_KEY] == "269534f6-aaaa-bbbb-cccc-yyyyzzzzxxxx"
@ -239,24 +213,52 @@ async def test_options_flow(hass: HomeAssistant) -> None:
mock_config.add_to_hass(hass)
with patch(
"homeassistant.components.tankerkoenig.async_setup_entry", return_value=True
) as mock_setup_entry, patch(
"homeassistant.components.tankerkoenig.config_flow.getNearbyStations",
return_value=MOCK_NEARVY_STATIONS_OK,
"homeassistant.components.tankerkoenig.config_flow.Tankerkoenig.nearby_stations",
return_value=NEARBY_STATIONS,
):
await hass.config_entries.async_setup(mock_config.entry_id)
await hass.async_block_till_done()
assert mock_setup_entry.called
result = await hass.config_entries.options.async_init(mock_config.entry_id)
assert result["type"] == FlowResultType.FORM
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
CONF_SHOW_ON_MAP: False,
CONF_STATIONS: MOCK_OPTIONS_DATA[CONF_STATIONS],
},
assert result["type"] == FlowResultType.FORM
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
CONF_SHOW_ON_MAP: False,
CONF_STATIONS: MOCK_OPTIONS_DATA[CONF_STATIONS],
},
)
assert result["type"] == FlowResultType.CREATE_ENTRY
assert not mock_config.options[CONF_SHOW_ON_MAP]
async def test_options_flow_error(hass: HomeAssistant) -> None:
"""Test options flow."""
mock_config = MockConfigEntry(
domain=DOMAIN,
data=MOCK_OPTIONS_DATA,
options={CONF_SHOW_ON_MAP: True},
unique_id=f"{DOMAIN}_{MOCK_USER_DATA[CONF_LOCATION][CONF_LATITUDE]}_{MOCK_USER_DATA[CONF_LOCATION][CONF_LONGITUDE]}",
)
assert result["type"] == FlowResultType.CREATE_ENTRY
assert not mock_config.options[CONF_SHOW_ON_MAP]
mock_config.add_to_hass(hass)
with patch(
"homeassistant.components.tankerkoenig.config_flow.Tankerkoenig.nearby_stations",
side_effect=TankerkoenigInvalidKeyError("Booom!"),
) as mock_nearby_stations:
result = await hass.config_entries.options.async_init(mock_config.entry_id)
assert result["type"] == FlowResultType.FORM
assert result["step_id"] == "init"
assert result["errors"] == {"base": "invalid_auth"}
mock_nearby_stations.return_value = NEARBY_STATIONS
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
CONF_SHOW_ON_MAP: False,
CONF_STATIONS: MOCK_OPTIONS_DATA[CONF_STATIONS],
},
)
assert result["type"] == FlowResultType.CREATE_ENTRY
assert not mock_config.options[CONF_SHOW_ON_MAP]

View File

@ -1,103 +1,23 @@
"""Tests for the Tankerkoening integration."""
from __future__ import annotations
from unittest.mock import patch
import pytest
from syrupy import SnapshotAssertion
from homeassistant.components.tankerkoenig.const import (
CONF_FUEL_TYPES,
CONF_STATIONS,
DOMAIN,
)
from homeassistant.const import (
CONF_API_KEY,
CONF_LATITUDE,
CONF_LOCATION,
CONF_LONGITUDE,
CONF_NAME,
CONF_RADIUS,
CONF_SHOW_ON_MAP,
)
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry
from tests.components.diagnostics import get_diagnostics_for_config_entry
from tests.typing import ClientSessionGenerator
MOCK_USER_DATA = {
CONF_NAME: "Home",
CONF_API_KEY: "269534f6-xxxx-xxxx-xxxx-yyyyzzzzxxxx",
CONF_FUEL_TYPES: ["e5"],
CONF_LOCATION: {CONF_LATITUDE: 51.0, CONF_LONGITUDE: 13.0},
CONF_RADIUS: 2.0,
CONF_STATIONS: [
"3bcd61da-xxxx-xxxx-xxxx-19d5523a7ae8",
],
}
MOCK_OPTIONS = {
CONF_SHOW_ON_MAP: True,
}
MOCK_STATION_DATA = {
"ok": True,
"station": {
"id": "3bcd61da-xxxx-xxxx-xxxx-19d5523a7ae8",
"name": "Station ABC",
"brand": "Station",
"street": "Somewhere Street",
"houseNumber": "1",
"postCode": "01234",
"place": "Somewhere",
"openingTimes": [],
"overrides": [],
"wholeDay": True,
"isOpen": True,
"e5": 1.719,
"e10": 1.659,
"diesel": 1.659,
"lat": 51.1,
"lng": 13.1,
"state": "xxXX",
},
}
MOCK_STATION_PRICES = {
"ok": True,
"prices": {
"3bcd61da-xxxx-xxxx-xxxx-19d5523a7ae8": {
"status": "open",
"e5": 1.719,
"e10": 1.659,
"diesel": 1.659,
},
},
}
@pytest.mark.usefixtures("setup_integration")
async def test_entry_diagnostics(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
config_entry: MockConfigEntry,
snapshot: SnapshotAssertion,
) -> None:
"""Test config entry diagnostics."""
with patch(
"homeassistant.components.tankerkoenig.coordinator.pytankerkoenig.getStationData",
return_value=MOCK_STATION_DATA,
), patch(
"homeassistant.components.tankerkoenig.coordinator.pytankerkoenig.getPriceList",
return_value=MOCK_STATION_PRICES,
):
entry = MockConfigEntry(
domain=DOMAIN,
data=MOCK_USER_DATA,
options=MOCK_OPTIONS,
unique_id="mock.tankerkoenig",
entry_id="8036b4412f2fae6bb9dbab7fe8e37f87",
)
entry.add_to_hass(hass)
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
result = await get_diagnostics_for_config_entry(hass, hass_client, entry)
result = await get_diagnostics_for_config_entry(hass, hass_client, config_entry)
assert result == snapshot