1
mirror of https://github.com/home-assistant/core synced 2024-07-18 12:02:20 +02:00

Fix filter sensor processing states that aren't numbers (#32453)

* lint

* only_numbers flag
This commit is contained in:
Paulus Schoutsen 2020-03-04 12:47:53 -08:00 committed by GitHub
parent e9978e77bd
commit 0763dc6089
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 5 deletions

View File

@ -364,6 +364,7 @@ class Filter:
self._skip_processing = False
self._window_size = window_size
self._store_raw = False
self._only_numbers = True
@property
def window_size(self):
@ -386,7 +387,11 @@ class Filter:
def filter_state(self, new_state):
"""Implement a common interface for filters."""
filtered = self._filter_state(FilterState(new_state))
fstate = FilterState(new_state)
if self._only_numbers and not isinstance(fstate.state, Number):
raise ValueError
filtered = self._filter_state(fstate)
filtered.set_precision(self.precision)
if self._store_raw:
self.states.append(copy(FilterState(new_state)))
@ -423,6 +428,7 @@ class RangeFilter(Filter):
def _filter_state(self, new_state):
"""Implement the range filter."""
if self._upper_bound is not None and new_state.state > self._upper_bound:
self._stats_internal["erasures_up"] += 1
@ -469,6 +475,7 @@ class OutlierFilter(Filter):
def _filter_state(self, new_state):
"""Implement the outlier filter."""
median = statistics.median([s.state for s in self.states]) if self.states else 0
if (
len(self.states) == self.states.maxlen
@ -498,6 +505,7 @@ class LowPassFilter(Filter):
def _filter_state(self, new_state):
"""Implement the low pass filter."""
if not self.states:
return new_state
@ -539,6 +547,7 @@ class TimeSMAFilter(Filter):
def _filter_state(self, new_state):
"""Implement the Simple Moving Average filter."""
self._leak(new_state.timestamp)
self.queue.append(copy(new_state))
@ -565,6 +574,7 @@ class ThrottleFilter(Filter):
def __init__(self, window_size, precision, entity):
"""Initialize Filter."""
super().__init__(FILTER_NAME_THROTTLE, window_size, precision, entity)
self._only_numbers = False
def _filter_state(self, new_state):
"""Implement the throttle filter."""
@ -589,6 +599,7 @@ class TimeThrottleFilter(Filter):
super().__init__(FILTER_NAME_TIME_THROTTLE, window_size, precision, entity)
self._time_window = window_size
self._last_emitted_at = None
self._only_numbers = False
def _filter_state(self, new_state):
"""Implement the filter."""

View File

@ -104,6 +104,7 @@ class TestFilterSensor(unittest.TestCase):
t_0 = dt_util.utcnow() - timedelta(minutes=1)
t_1 = dt_util.utcnow() - timedelta(minutes=2)
t_2 = dt_util.utcnow() - timedelta(minutes=3)
t_3 = dt_util.utcnow() - timedelta(minutes=4)
if missing:
fake_states = {}
@ -111,8 +112,9 @@ class TestFilterSensor(unittest.TestCase):
fake_states = {
"sensor.test_monitored": [
ha.State("sensor.test_monitored", 18.0, last_changed=t_0),
ha.State("sensor.test_monitored", 19.0, last_changed=t_1),
ha.State("sensor.test_monitored", 18.2, last_changed=t_2),
ha.State("sensor.test_monitored", "unknown", last_changed=t_1),
ha.State("sensor.test_monitored", 19.0, last_changed=t_2),
ha.State("sensor.test_monitored", 18.2, last_changed=t_3),
]
}
@ -208,6 +210,17 @@ class TestFilterSensor(unittest.TestCase):
filtered = filt.filter_state(state)
assert 21 == filtered.state
def test_unknown_state_outlier(self):
"""Test issue #32395."""
filt = OutlierFilter(window_size=3, precision=2, entity=None, radius=4.0)
out = ha.State("sensor.test_monitored", "unknown")
for state in [out] + self.values + [out]:
try:
filtered = filt.filter_state(state)
except ValueError:
assert state.state == "unknown"
assert 21 == filtered.state
def test_precision_zero(self):
"""Test if precision of zero returns an integer."""
filt = LowPassFilter(window_size=10, precision=0, entity=None, time_constant=10)
@ -218,8 +231,12 @@ class TestFilterSensor(unittest.TestCase):
def test_lowpass(self):
"""Test if lowpass filter works."""
filt = LowPassFilter(window_size=10, precision=2, entity=None, time_constant=10)
for state in self.values:
filtered = filt.filter_state(state)
out = ha.State("sensor.test_monitored", "unknown")
for state in [out] + self.values + [out]:
try:
filtered = filt.filter_state(state)
except ValueError:
assert state.state == "unknown"
assert 18.05 == filtered.state
def test_range(self):