Statistics component typing (#60997)

* Implement optional manually defined uniqueid

* Fix test case via mocked environment

* Add typing to statistics component

* Fix minor inconsistency

* Fix linter issues

* Execute hassfest

* Fix stricter mypy warnings

* Fix maxsplit warning

* Make binary value range explicit check

* Add basic typing to statistics tests

* Add empty config testcase

* Minor improvements

* Improve after comments

* Remove unnecessary test case

* Fix changed type

* Remove dict.get default
This commit is contained in:
Thomas Dietrich 2021-12-20 14:53:51 +01:00 committed by GitHub
parent 2f0b73c4ad
commit 28af0b4092
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 214 additions and 169 deletions

View File

@ -121,6 +121,7 @@ homeassistant.components.slack.*
homeassistant.components.sonos.media_player
homeassistant.components.ssdp.*
homeassistant.components.stookalert.*
homeassistant.components.statistics.*
homeassistant.components.stream.*
homeassistant.components.sun.*
homeassistant.components.surepetcare.*

View File

@ -1,11 +1,17 @@
"""Support for statistics for sensor values."""
from __future__ import annotations
from collections import deque
from collections.abc import Callable
import contextlib
from datetime import datetime, timedelta
import logging
import statistics
from typing import Any, Literal, cast
import voluptuous as vol
from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR_DOMAIN
from homeassistant.components.recorder.models import States
from homeassistant.components.recorder.util import execute, session_scope
from homeassistant.components.sensor import (
@ -21,14 +27,23 @@ from homeassistant.const import (
STATE_UNAVAILABLE,
STATE_UNKNOWN,
)
from homeassistant.core import callback
from homeassistant.core import (
CALLBACK_TYPE,
Event,
HomeAssistant,
State,
callback,
split_entity_id,
)
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.event import (
async_track_point_in_utc_time,
async_track_state_change_event,
)
from homeassistant.helpers.reload import async_setup_reload_service
from homeassistant.helpers.start import async_at_start
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType, StateType
from homeassistant.util import dt as dt_util
from . import DOMAIN, PLATFORMS
@ -100,13 +115,13 @@ DEFAULT_QUANTILE_METHOD = "exclusive"
ICON = "mdi:calculator"
def valid_binary_characteristic_configuration(config):
def valid_binary_characteristic_configuration(config: dict[str, Any]) -> dict[str, Any]:
"""Validate that the characteristic selected is valid for the source sensor type, throw if it isn't."""
if config.get(CONF_ENTITY_ID).split(".")[0] == "binary_sensor":
if split_entity_id(str(config.get(CONF_ENTITY_ID)))[0] == BINARY_SENSOR_DOMAIN:
if config.get(CONF_STATE_CHARACTERISTIC) not in STATS_BINARY_SUPPORT:
raise ValueError(
"The configured characteristic '"
+ config.get(CONF_STATE_CHARACTERISTIC)
+ str(config.get(CONF_STATE_CHARACTERISTIC))
+ "' is not supported for a binary source sensor."
)
return config
@ -162,28 +177,32 @@ PLATFORM_SCHEMA = vol.All(
)
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the Statistics sensor."""
await async_setup_reload_service(hass, DOMAIN, PLATFORMS)
async_add_entities(
[
new_entities=[
StatisticsSensor(
source_entity_id=config.get(CONF_ENTITY_ID),
name=config.get(CONF_NAME),
source_entity_id=config[CONF_ENTITY_ID],
name=config[CONF_NAME],
unique_id=config.get(CONF_UNIQUE_ID),
state_characteristic=config.get(CONF_STATE_CHARACTERISTIC),
samples_max_buffer_size=config.get(CONF_SAMPLES_MAX_BUFFER_SIZE),
state_characteristic=config[CONF_STATE_CHARACTERISTIC],
samples_max_buffer_size=config[CONF_SAMPLES_MAX_BUFFER_SIZE],
samples_max_age=config.get(CONF_MAX_AGE),
precision=config.get(CONF_PRECISION),
quantile_intervals=config.get(CONF_QUANTILE_INTERVALS),
quantile_method=config.get(CONF_QUANTILE_METHOD),
precision=config[CONF_PRECISION],
quantile_intervals=config[CONF_QUANTILE_INTERVALS],
quantile_method=config[CONF_QUANTILE_METHOD],
)
],
True,
update_before_add=True,
)
return True
class StatisticsSensor(SensorEntity):
@ -191,41 +210,46 @@ class StatisticsSensor(SensorEntity):
def __init__(
self,
source_entity_id,
name,
unique_id,
state_characteristic,
samples_max_buffer_size,
samples_max_age,
precision,
quantile_intervals,
quantile_method,
):
source_entity_id: str,
name: str,
unique_id: str | None,
state_characteristic: str,
samples_max_buffer_size: int,
samples_max_age: timedelta | None,
precision: int,
quantile_intervals: int,
quantile_method: str,
) -> None:
"""Initialize the Statistics sensor."""
self._source_entity_id = source_entity_id
self.is_binary = self._source_entity_id.split(".")[0] == "binary_sensor"
self._name = name
self._unique_id = unique_id
self._state_characteristic = state_characteristic
self._attr_icon: str = ICON
self._attr_name: str = name
self._attr_should_poll: bool = False
self._attr_unique_id: str | None = unique_id
self._source_entity_id: str = source_entity_id
self.is_binary: bool = (
split_entity_id(self._source_entity_id)[0] == BINARY_SENSOR_DOMAIN
)
self._state_characteristic: str = state_characteristic
if self._state_characteristic == STAT_DEFAULT:
self._state_characteristic = STAT_COUNT if self.is_binary else STAT_MEAN
_LOGGER.warning(DEPRECATION_WARNING, self._state_characteristic, name)
self._samples_max_buffer_size = samples_max_buffer_size
self._samples_max_age = samples_max_age
self._precision = precision
self._quantile_intervals = quantile_intervals
self._quantile_method = quantile_method
self._value = None
self._unit_of_measurement = None
self._available = False
self.states = deque(maxlen=self._samples_max_buffer_size)
self.ages = deque(maxlen=self._samples_max_buffer_size)
self.attributes = {
self._samples_max_buffer_size: int = samples_max_buffer_size
self._samples_max_age: timedelta | None = samples_max_age
self._precision: int = precision
self._quantile_intervals: int = quantile_intervals
self._quantile_method: str = quantile_method
self._value: StateType | datetime = None
self._unit_of_measurement: str | None = None
self._available: bool = False
self.states: deque[float | bool] = deque(maxlen=self._samples_max_buffer_size)
self.ages: deque[datetime] = deque(maxlen=self._samples_max_buffer_size)
self.attributes: dict[str, StateType] = {
STAT_AGE_COVERAGE_RATIO: None,
STAT_BUFFER_USAGE_RATIO: None,
STAT_SOURCE_VALUE_VALID: None,
}
self._state_characteristic_fn: Callable[[], StateType | datetime]
if self.is_binary:
self._state_characteristic_fn = getattr(
self, f"_stat_binary_{self._state_characteristic}"
@ -235,20 +259,20 @@ class StatisticsSensor(SensorEntity):
self, f"_stat_{self._state_characteristic}"
)
self._update_listener = None
self._update_listener: CALLBACK_TYPE | None = None
async def async_added_to_hass(self):
async def async_added_to_hass(self) -> None:
"""Register callbacks."""
@callback
def async_stats_sensor_state_listener(event):
def async_stats_sensor_state_listener(event: Event) -> None:
"""Handle the sensor state changes."""
if (new_state := event.data.get("new_state")) is None:
return
self._add_state_to_queue(new_state)
self.async_schedule_update_ha_state(True)
async def async_stats_sensor_startup(_):
async def async_stats_sensor_startup(_: HomeAssistant) -> None:
"""Add listener and get recorded state."""
_LOGGER.debug("Startup for %s", self.entity_id)
@ -265,7 +289,7 @@ class StatisticsSensor(SensorEntity):
async_at_start(self.hass, async_stats_sensor_startup)
def _add_state_to_queue(self, new_state):
def _add_state_to_queue(self, new_state: State) -> None:
"""Add the state to the queue."""
self._available = new_state.state != STATE_UNAVAILABLE
if new_state.state == STATE_UNAVAILABLE:
@ -277,7 +301,8 @@ class StatisticsSensor(SensorEntity):
try:
if self.is_binary:
self.states.append(new_state.state)
assert new_state.state in ("on", "off")
self.states.append(new_state.state == "on")
else:
self.states.append(float(new_state.state))
self.ages.append(new_state.last_updated)
@ -293,8 +318,9 @@ class StatisticsSensor(SensorEntity):
self._unit_of_measurement = self._derive_unit_of_measurement(new_state)
def _derive_unit_of_measurement(self, new_state):
base_unit = new_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
def _derive_unit_of_measurement(self, new_state: State) -> str | None:
base_unit: str | None = new_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
unit: str | None
if self.is_binary and self._state_characteristic in (
STAT_AVERAGE_STEP,
STAT_AVERAGE_TIMELESS,
@ -336,66 +362,46 @@ class StatisticsSensor(SensorEntity):
return unit
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def unique_id(self):
"""Return the unique id of the sensor."""
return self._unique_id
@property
def state_class(self):
def state_class(self) -> Literal[SensorStateClass.MEASUREMENT] | None:
"""Return the state class of this entity."""
if self._state_characteristic in STATS_NOT_A_NUMBER:
return None
return SensorStateClass.MEASUREMENT
@property
def native_value(self):
def native_value(self) -> StateType | datetime:
"""Return the state of the sensor."""
return self._value
@property
def native_unit_of_measurement(self):
def native_unit_of_measurement(self) -> str | None:
"""Return the unit the value is expressed in."""
return self._unit_of_measurement
@property
def available(self):
def available(self) -> bool:
"""Return the availability of the sensor linked to the source sensor."""
return self._available
@property
def should_poll(self):
"""No polling needed."""
return False
@property
def extra_state_attributes(self):
def extra_state_attributes(self) -> dict[str, StateType] | None:
"""Return the state attributes of the sensor."""
return {
key: value for key, value in self.attributes.items() if value is not None
}
@property
def icon(self):
"""Return the icon to use in the frontend, if any."""
return ICON
def _purge_old(self):
"""Remove states which are older than self._samples_max_age."""
def _purge_old_states(self, max_age: timedelta) -> None:
"""Remove states which are older than a given age."""
now = dt_util.utcnow()
_LOGGER.debug(
"%s: purging records older then %s(%s)",
self.entity_id,
dt_util.as_local(now - self._samples_max_age),
dt_util.as_local(now - max_age),
self._samples_max_age,
)
while self.ages and (now - self.ages[0]) > self._samples_max_age:
while self.ages and (now - self.ages[0]) > max_age:
_LOGGER.debug(
"%s: purging record with datetime %s(%s)",
self.entity_id,
@ -405,7 +411,7 @@ class StatisticsSensor(SensorEntity):
self.ages.popleft()
self.states.popleft()
def _next_to_purge_timestamp(self):
def _next_to_purge_timestamp(self) -> datetime | None:
"""Find the timestamp when the next purge would occur."""
if self.ages and self._samples_max_age:
# Take the oldest entry from the ages list and add the configured max_age.
@ -414,11 +420,11 @@ class StatisticsSensor(SensorEntity):
return self.ages[0] + self._samples_max_age
return None
async def async_update(self):
async def async_update(self) -> None:
"""Get the latest data and updates the states."""
_LOGGER.debug("%s: updating statistics", self.entity_id)
if self._samples_max_age is not None:
self._purge_old()
self._purge_old_states(self._samples_max_age)
self._update_attributes()
self._update_value()
@ -434,7 +440,7 @@ class StatisticsSensor(SensorEntity):
self._update_listener = None
@callback
def _scheduled_update(now):
def _scheduled_update(now: datetime) -> None:
"""Timer callback for sensor update."""
_LOGGER.debug("%s: executing scheduled update", self.entity_id)
self.async_schedule_update_ha_state(True)
@ -444,7 +450,7 @@ class StatisticsSensor(SensorEntity):
self.hass, _scheduled_update, next_to_purge_timestamp
)
async def _initialize_from_database(self):
async def _initialize_from_database(self) -> None:
"""Initialize the list of states from the database.
The query will get the list of states in DESCENDING order so that we
@ -478,14 +484,15 @@ class StatisticsSensor(SensorEntity):
)
states = execute(query, to_native=True, validate_entity_ids=False)
for state in reversed(states):
self._add_state_to_queue(state)
if states:
for state in reversed(states):
self._add_state_to_queue(state)
self.async_schedule_update_ha_state(True)
_LOGGER.debug("%s: initializing from database completed", self.entity_id)
def _update_attributes(self):
def _update_attributes(self) -> None:
"""Calculate and update the various attributes."""
self.attributes[STAT_BUFFER_USAGE_RATIO] = round(
len(self.states) / self._samples_max_buffer_size, 2
@ -500,7 +507,7 @@ class StatisticsSensor(SensorEntity):
else:
self.attributes[STAT_AGE_COVERAGE_RATIO] = None
def _update_value(self):
def _update_value(self) -> None:
"""Front to call the right statistical characteristics functions.
One of the _stat_*() functions is represented by self._state_characteristic_fn().
@ -510,16 +517,16 @@ class StatisticsSensor(SensorEntity):
if self._state_characteristic not in STATS_NOT_A_NUMBER:
with contextlib.suppress(TypeError):
value = round(value, self._precision)
value = round(cast(float, value), self._precision)
if self._precision == 0:
value = int(value)
self._value = value
# Statistics for numeric sensor
def _stat_average_linear(self):
def _stat_average_linear(self) -> StateType:
if len(self.states) >= 2:
area = 0
area: float = 0
for i in range(1, len(self.states)):
area += (
0.5
@ -530,9 +537,9 @@ class StatisticsSensor(SensorEntity):
return area / age_range_seconds
return None
def _stat_average_step(self):
def _stat_average_step(self) -> StateType:
if len(self.states) >= 2:
area = 0
area: float = 0
for i in range(1, len(self.states)):
area += (
self.states[i - 1]
@ -542,65 +549,65 @@ class StatisticsSensor(SensorEntity):
return area / age_range_seconds
return None
def _stat_average_timeless(self):
def _stat_average_timeless(self) -> StateType:
return self._stat_mean()
def _stat_change(self):
def _stat_change(self) -> StateType:
if len(self.states) > 0:
return self.states[-1] - self.states[0]
return None
def _stat_change_sample(self):
def _stat_change_sample(self) -> StateType:
if len(self.states) > 1:
return (self.states[-1] - self.states[0]) / (len(self.states) - 1)
return None
def _stat_change_second(self):
def _stat_change_second(self) -> StateType:
if len(self.states) > 1:
age_range_seconds = (self.ages[-1] - self.ages[0]).total_seconds()
if age_range_seconds > 0:
return (self.states[-1] - self.states[0]) / age_range_seconds
return None
def _stat_count(self):
def _stat_count(self) -> StateType:
return len(self.states)
def _stat_datetime_newest(self):
def _stat_datetime_newest(self) -> datetime | None:
if len(self.states) > 0:
return self.ages[-1]
return None
def _stat_datetime_oldest(self):
def _stat_datetime_oldest(self) -> datetime | None:
if len(self.states) > 0:
return self.ages[0]
return None
def _stat_distance_95_percent_of_values(self):
def _stat_distance_95_percent_of_values(self) -> StateType:
if len(self.states) >= 2:
return 2 * 1.96 * self._stat_standard_deviation()
return 2 * 1.96 * cast(float, self._stat_standard_deviation())
return None
def _stat_distance_99_percent_of_values(self):
def _stat_distance_99_percent_of_values(self) -> StateType:
if len(self.states) >= 2:
return 2 * 2.58 * self._stat_standard_deviation()
return 2 * 2.58 * cast(float, self._stat_standard_deviation())
return None
def _stat_distance_absolute(self):
def _stat_distance_absolute(self) -> StateType:
if len(self.states) > 0:
return max(self.states) - min(self.states)
return None
def _stat_mean(self):
def _stat_mean(self) -> StateType:
if len(self.states) > 0:
return statistics.mean(self.states)
return None
def _stat_median(self):
def _stat_median(self) -> StateType:
if len(self.states) > 0:
return statistics.median(self.states)
return None
def _stat_noisiness(self):
def _stat_noisiness(self) -> StateType:
if len(self.states) >= 2:
diff_sum = sum(
abs(j - i) for i, j in zip(list(self.states), list(self.states)[1:])
@ -608,62 +615,64 @@ class StatisticsSensor(SensorEntity):
return diff_sum / (len(self.states) - 1)
return None
def _stat_quantiles(self):
def _stat_quantiles(self) -> StateType:
if len(self.states) > self._quantile_intervals:
return [
round(quantile, self._precision)
for quantile in statistics.quantiles(
self.states,
n=self._quantile_intervals,
method=self._quantile_method,
)
]
return str(
[
round(quantile, self._precision)
for quantile in statistics.quantiles(
self.states,
n=self._quantile_intervals,
method=self._quantile_method,
)
]
)
return None
def _stat_standard_deviation(self):
def _stat_standard_deviation(self) -> StateType:
if len(self.states) >= 2:
return statistics.stdev(self.states)
return None
def _stat_total(self):
def _stat_total(self) -> StateType:
if len(self.states) > 0:
return sum(self.states)
return None
def _stat_value_max(self):
def _stat_value_max(self) -> StateType:
if len(self.states) > 0:
return max(self.states)
return None
def _stat_value_min(self):
def _stat_value_min(self) -> StateType:
if len(self.states) > 0:
return min(self.states)
return None
def _stat_variance(self):
def _stat_variance(self) -> StateType:
if len(self.states) >= 2:
return statistics.variance(self.states)
return None
# Statistics for binary sensor
def _stat_binary_average_step(self):
def _stat_binary_average_step(self) -> StateType:
if len(self.states) >= 2:
on_seconds = 0
on_seconds: float = 0
for i in range(1, len(self.states)):
if self.states[i - 1] == "on":
if self.states[i - 1] is True:
on_seconds += (self.ages[i] - self.ages[i - 1]).total_seconds()
age_range_seconds = (self.ages[-1] - self.ages[0]).total_seconds()
return 100 / age_range_seconds * on_seconds
return None
def _stat_binary_average_timeless(self):
def _stat_binary_average_timeless(self) -> StateType:
return self._stat_binary_mean()
def _stat_binary_count(self):
def _stat_binary_count(self) -> StateType:
return len(self.states)
def _stat_binary_mean(self):
def _stat_binary_mean(self) -> StateType:
if len(self.states) > 0:
return 100.0 / len(self.states) * self.states.count("on")
return 100.0 / len(self.states) * self.states.count(True)
return None

View File

@ -1342,6 +1342,17 @@ no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.statistics.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.stream.*]
check_untyped_defs = true
disallow_incomplete_defs = true

View File

@ -14,6 +14,7 @@ from homeassistant.const import (
STATE_UNKNOWN,
TEMP_CELSIUS,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
from homeassistant.setup import async_setup_component
from homeassistant.util import dt as dt_util
@ -29,7 +30,7 @@ VALUES_BINARY = ["on", "off", "on", "off", "on", "off", "on", "off", "on"]
VALUES_NUMERIC = [17, 20, 15.2, 5, 3.8, 9.2, 6.7, 14, 6]
async def test_unique_id(hass):
async def test_unique_id(hass: HomeAssistant):
"""Test configuration defined unique_id."""
assert await async_setup_component(
hass,
@ -54,7 +55,7 @@ async def test_unique_id(hass):
assert entity_id == "sensor.test"
async def test_sensor_defaults_numeric(hass):
async def test_sensor_defaults_numeric(hass: HomeAssistant):
"""Test the general behavior of the sensor, with numeric source sensor."""
assert await async_setup_component(
hass,
@ -74,12 +75,13 @@ async def test_sensor_defaults_numeric(hass):
for value in VALUES_NUMERIC:
hass.states.async_set(
"sensor.test_monitored",
value,
str(value),
{ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS},
)
await hass.async_block_till_done()
state = hass.states.get("sensor.test")
assert state is not None
assert state.state == str(round(sum(VALUES_NUMERIC) / len(VALUES_NUMERIC), 2))
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == TEMP_CELSIUS
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
@ -96,16 +98,18 @@ async def test_sensor_defaults_numeric(hass):
)
await hass.async_block_till_done()
new_state = hass.states.get("sensor.test")
assert new_state is not None
assert new_state.state == STATE_UNAVAILABLE
assert new_state.attributes.get("source_value_valid") is None
hass.states.async_set(
"sensor.test_monitored",
0,
"0",
{ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS},
)
await hass.async_block_till_done()
new_state = hass.states.get("sensor.test")
new_mean = round(sum(VALUES_NUMERIC) / (len(VALUES_NUMERIC) + 1), 2)
assert new_state is not None
assert new_state.state == str(new_mean)
assert new_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == TEMP_CELSIUS
assert new_state.attributes.get("buffer_usage_ratio") == round(10 / 20, 2)
@ -116,6 +120,7 @@ async def test_sensor_defaults_numeric(hass):
hass.states.async_set("sensor.test_monitored", "beer", {})
await hass.async_block_till_done()
new_state = hass.states.get("sensor.test")
assert new_state is not None
assert new_state.state == str(new_mean)
assert new_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == TEMP_CELSIUS
assert new_state.attributes.get("source_value_valid") is False
@ -125,6 +130,7 @@ async def test_sensor_defaults_numeric(hass):
hass.states.async_set("sensor.test_monitored", STATE_UNKNOWN, {})
await hass.async_block_till_done()
new_state = hass.states.get("sensor.test")
assert new_state is not None
assert new_state.state == str(new_mean)
assert new_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == TEMP_CELSIUS
assert new_state.attributes.get("source_value_valid") is False
@ -134,12 +140,13 @@ async def test_sensor_defaults_numeric(hass):
hass.states.async_remove("sensor.test_monitored")
await hass.async_block_till_done()
new_state = hass.states.get("sensor.test")
assert new_state is not None
assert new_state.state == str(new_mean)
assert new_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == TEMP_CELSIUS
assert new_state.attributes.get("source_value_valid") is False
async def test_sensor_defaults_binary(hass):
async def test_sensor_defaults_binary(hass: HomeAssistant):
"""Test the general behavior of the sensor, with binary source sensor."""
assert await async_setup_component(
hass,
@ -165,6 +172,7 @@ async def test_sensor_defaults_binary(hass):
await hass.async_block_till_done()
state = hass.states.get("sensor.test")
assert state is not None
assert state.state == str(len(VALUES_BINARY))
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) is None
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
@ -173,7 +181,7 @@ async def test_sensor_defaults_binary(hass):
assert "age_coverage_ratio" not in state.attributes
async def test_sensor_source_with_force_update(hass):
async def test_sensor_source_with_force_update(hass: HomeAssistant):
"""Test the behavior of the sensor when the source sensor force-updates with same value."""
repeating_values = [18, 0, 0, 0, 0, 0, 0, 0, 9]
assert await async_setup_component(
@ -201,12 +209,12 @@ async def test_sensor_source_with_force_update(hass):
for value in repeating_values:
hass.states.async_set(
"sensor.test_monitored_normal",
value,
str(value),
{ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS},
)
hass.states.async_set(
"sensor.test_monitored_force",
value,
str(value),
{ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS},
force_update=True,
)
@ -214,13 +222,14 @@ async def test_sensor_source_with_force_update(hass):
state_normal = hass.states.get("sensor.test_normal")
state_force = hass.states.get("sensor.test_force")
assert state_normal and state_force
assert state_normal.state == str(round(sum(repeating_values) / 3, 2))
assert state_force.state == str(round(sum(repeating_values) / 9, 2))
assert state_normal.attributes.get("buffer_usage_ratio") == round(3 / 20, 2)
assert state_force.attributes.get("buffer_usage_ratio") == round(9 / 20, 2)
async def test_sampling_size_non_default(hass):
async def test_sampling_size_non_default(hass: HomeAssistant):
"""Test rotation."""
assert await async_setup_component(
hass,
@ -242,18 +251,19 @@ async def test_sampling_size_non_default(hass):
for value in VALUES_NUMERIC:
hass.states.async_set(
"sensor.test_monitored",
value,
str(value),
{ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS},
)
await hass.async_block_till_done()
state = hass.states.get("sensor.test")
new_mean = round(sum(VALUES_NUMERIC[-5:]) / len(VALUES_NUMERIC[-5:]), 2)
assert state is not None
assert state.state == str(new_mean)
assert state.attributes.get("buffer_usage_ratio") == round(5 / 5, 2)
async def test_sampling_size_1(hass):
async def test_sampling_size_1(hass: HomeAssistant):
"""Test validity of stats requiring only one sample."""
assert await async_setup_component(
hass,
@ -275,18 +285,19 @@ async def test_sampling_size_1(hass):
for value in VALUES_NUMERIC[-3:]: # just the last 3 will do
hass.states.async_set(
"sensor.test_monitored",
value,
str(value),
{ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS},
)
await hass.async_block_till_done()
state = hass.states.get("sensor.test")
new_mean = float(VALUES_NUMERIC[-1])
assert state is not None
assert state.state == str(new_mean)
assert state.attributes.get("buffer_usage_ratio") == round(1 / 1, 2)
async def test_age_limit_expiry(hass):
async def test_age_limit_expiry(hass: HomeAssistant):
"""Test that values are removed after certain age."""
now = dt_util.utcnow()
mock_data = {
@ -321,7 +332,7 @@ async def test_age_limit_expiry(hass):
async_fire_time_changed(hass, mock_data["return_time"])
hass.states.async_set(
"sensor.test_monitored",
value,
str(value),
{ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS},
)
await hass.async_block_till_done()
@ -330,6 +341,7 @@ async def test_age_limit_expiry(hass):
state = hass.states.get("sensor.test")
new_mean = round(sum(VALUES_NUMERIC[-5:]) / len(VALUES_NUMERIC[-5:]), 2)
assert state is not None
assert state.state == str(new_mean)
assert state.attributes.get("buffer_usage_ratio") == round(5 / 20, 2)
assert state.attributes.get("age_coverage_ratio") == 1.0
@ -342,6 +354,7 @@ async def test_age_limit_expiry(hass):
state = hass.states.get("sensor.test")
new_mean = round(sum(VALUES_NUMERIC[-2:]) / len(VALUES_NUMERIC[-2:]), 2)
assert state is not None
assert state.state == str(new_mean)
assert state.attributes.get("buffer_usage_ratio") == round(2 / 20, 2)
assert state.attributes.get("age_coverage_ratio") == 1 / 4
@ -354,6 +367,7 @@ async def test_age_limit_expiry(hass):
state = hass.states.get("sensor.test")
new_mean = float(VALUES_NUMERIC[-1])
assert state is not None
assert state.state == str(new_mean)
assert state.attributes.get("buffer_usage_ratio") == round(1 / 20, 2)
assert state.attributes.get("age_coverage_ratio") == 0
@ -365,12 +379,13 @@ async def test_age_limit_expiry(hass):
await hass.async_block_till_done()
state = hass.states.get("sensor.test")
assert state is not None
assert state.state == STATE_UNKNOWN
assert state.attributes.get("buffer_usage_ratio") == round(0 / 20, 2)
assert state.attributes.get("age_coverage_ratio") is None
async def test_precision(hass):
async def test_precision(hass: HomeAssistant):
"""Test correct result with precision set."""
assert await async_setup_component(
hass,
@ -399,19 +414,21 @@ async def test_precision(hass):
for value in VALUES_NUMERIC:
hass.states.async_set(
"sensor.test_monitored",
value,
str(value),
{ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS},
)
await hass.async_block_till_done()
mean = sum(VALUES_NUMERIC) / len(VALUES_NUMERIC)
state = hass.states.get("sensor.test_precision_0")
assert state is not None
assert state.state == str(int(round(mean, 0)))
state = hass.states.get("sensor.test_precision_3")
assert state is not None
assert state.state == str(round(mean, 3))
async def test_state_class(hass):
async def test_state_class(hass: HomeAssistant):
"""Test state class, which depends on the characteristic configured."""
assert await async_setup_component(
hass,
@ -438,18 +455,20 @@ async def test_state_class(hass):
for value in VALUES_NUMERIC:
hass.states.async_set(
"sensor.test_monitored",
value,
str(value),
{ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS},
)
await hass.async_block_till_done()
state = hass.states.get("sensor.test_normal")
assert state is not None
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
state = hass.states.get("sensor.test_nan")
assert state is not None
assert state.attributes.get(ATTR_STATE_CLASS) is None
async def test_unitless_source_sensor(hass):
async def test_unitless_source_sensor(hass: HomeAssistant):
"""Statistics for a unitless source sensor should never have a unit."""
assert await async_setup_component(
hass,
@ -490,31 +509,31 @@ async def test_unitless_source_sensor(hass):
)
await hass.async_block_till_done()
for value in VALUES_NUMERIC:
for value_numeric in VALUES_NUMERIC:
hass.states.async_set(
"sensor.test_monitored_unitless",
value,
str(value_numeric),
)
for value in VALUES_BINARY:
for value_binary in VALUES_BINARY:
hass.states.async_set(
"binary_sensor.test_monitored_unitless",
value,
str(value_binary),
)
await hass.async_block_till_done()
state = hass.states.get("sensor.test_unitless_1")
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) is None
assert state and state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) is None
state = hass.states.get("sensor.test_unitless_2")
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) is None
assert state and state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) is None
state = hass.states.get("sensor.test_unitless_3")
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) is None
assert state and state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) is None
state = hass.states.get("sensor.test_unitless_4")
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) is None
assert state and state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) is None
state = hass.states.get("sensor.test_unitless_5")
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == "%"
assert state and state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == "%"
async def test_state_characteristics(hass):
async def test_state_characteristics(hass: HomeAssistant):
"""Test configured state characteristic for value and unit."""
now = dt_util.utcnow()
start_datetime = datetime(now.year + 1, 8, 2, 12, 23, 42, tzinfo=dt_util.UTC)
@ -772,12 +791,12 @@ async def test_state_characteristics(hass):
async_fire_time_changed(hass, mock_data["return_time"])
hass.states.async_set(
"sensor.test_monitored",
VALUES_NUMERIC[i],
str(VALUES_NUMERIC[i]),
{ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS},
)
hass.states.async_set(
"binary_sensor.test_monitored",
VALUES_BINARY[i],
str(VALUES_BINARY[i]),
{ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS},
)
await hass.async_block_till_done()
@ -786,6 +805,7 @@ async def test_state_characteristics(hass):
state = hass.states.get(
f"sensor.test_{characteristic['source_sensor_domain']}_{characteristic['name']}"
)
assert state is not None
assert state.state == str(characteristic["value_9"]), (
f"value mismatch for characteristic "
f"'{characteristic['source_sensor_domain']}/{characteristic['name']}' "
@ -806,6 +826,7 @@ async def test_state_characteristics(hass):
state = hass.states.get(
f"sensor.test_{characteristic['source_sensor_domain']}_{characteristic['name']}"
)
assert state is not None
assert state.state == str(characteristic["value_1"]), (
f"value mismatch for characteristic "
f"'{characteristic['source_sensor_domain']}/{characteristic['name']}' "
@ -823,6 +844,7 @@ async def test_state_characteristics(hass):
state = hass.states.get(
f"sensor.test_{characteristic['source_sensor_domain']}_{characteristic['name']}"
)
assert state is not None
assert state.state == str(characteristic["value_0"]), (
f"value mismatch for characteristic "
f"'{characteristic['source_sensor_domain']}/{characteristic['name']}' "
@ -831,7 +853,7 @@ async def test_state_characteristics(hass):
)
async def test_invalid_state_characteristic(hass):
async def test_invalid_state_characteristic(hass: HomeAssistant):
"""Test the detection of wrong state_characteristics selected."""
assert await async_setup_component(
hass,
@ -857,7 +879,7 @@ async def test_invalid_state_characteristic(hass):
hass.states.async_set(
"sensor.test_monitored",
VALUES_NUMERIC[0],
str(VALUES_NUMERIC[0]),
{ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS},
)
await hass.async_block_till_done()
@ -868,7 +890,7 @@ async def test_invalid_state_characteristic(hass):
assert state is None
async def test_initialize_from_database(hass):
async def test_initialize_from_database(hass: HomeAssistant):
"""Test initializing the statistics from the recorder database."""
# enable and pre-fill the recorder
await async_init_recorder_component(hass)
@ -878,7 +900,7 @@ async def test_initialize_from_database(hass):
for value in VALUES_NUMERIC:
hass.states.async_set(
"sensor.test_monitored",
value,
str(value),
{ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS},
)
await hass.async_block_till_done()
@ -903,11 +925,12 @@ async def test_initialize_from_database(hass):
await hass.async_block_till_done()
state = hass.states.get("sensor.test")
assert state is not None
assert state.state == str(round(sum(VALUES_NUMERIC) / len(VALUES_NUMERIC), 2))
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == TEMP_CELSIUS
async def test_initialize_from_database_with_maxage(hass):
async def test_initialize_from_database_with_maxage(hass: HomeAssistant):
"""Test initializing the statistics from the database."""
now = dt_util.utcnow()
mock_data = {
@ -919,7 +942,7 @@ async def test_initialize_from_database_with_maxage(hass):
# Testing correct retrieval from recorder, thus we do not
# want purging to occur within the class itself.
def mock_purge(self):
def mock_purge(self, *args):
return
# enable and pre-fill the recorder
@ -929,11 +952,11 @@ async def test_initialize_from_database_with_maxage(hass):
with patch(
"homeassistant.components.statistics.sensor.dt_util.utcnow", new=mock_now
), patch.object(StatisticsSensor, "_purge_old", mock_purge):
), patch.object(StatisticsSensor, "_purge_old_states", mock_purge):
for value in VALUES_NUMERIC:
hass.states.async_set(
"sensor.test_monitored",
value,
str(value),
{ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS},
)
await hass.async_block_till_done()
@ -959,6 +982,7 @@ async def test_initialize_from_database_with_maxage(hass):
await hass.async_block_till_done()
state = hass.states.get("sensor.test")
assert state is not None
assert state.attributes.get("age_coverage_ratio") == round(2 / 3, 2)
# The max_age timestamp should be 1 hour before what we have right
# now in mock_data['return_time'].
@ -967,7 +991,7 @@ async def test_initialize_from_database_with_maxage(hass):
) + timedelta(hours=1)
async def test_reload(hass):
async def test_reload(hass: HomeAssistant):
"""Verify we can reload statistics sensors."""
await async_init_recorder_component(hass)
@ -988,7 +1012,7 @@ async def test_reload(hass):
)
await hass.async_block_till_done()
hass.states.async_set("sensor.test_monitored", 12345)
hass.states.async_set("sensor.test_monitored", "0")
await hass.async_block_till_done()
assert len(hass.states.async_all()) == 2