1
mirror of https://github.com/home-assistant/core synced 2024-08-02 23:40:32 +02:00

Add tests for history component

This commit is contained in:
Paulus Schoutsen 2015-04-30 21:03:01 -07:00
parent cf5278b7e9
commit 17cc9a43bb
4 changed files with 163 additions and 4 deletions

View File

@ -910,7 +910,11 @@ class Timer(threading.Thread):
# Event might have been set while sleeping
if not self._stop_event.isSet():
self.hass.bus.fire(EVENT_TIME_CHANGED, {ATTR_NOW: now})
try:
self.hass.bus.fire(EVENT_TIME_CHANGED, {ATTR_NOW: now})
except HomeAssistantError:
# HA raises error if firing event after it has shut down
break
class Config(object):

View File

@ -23,7 +23,7 @@ def last_5_states(entity_id):
query = """
SELECT * FROM states WHERE entity_id=? AND
last_changed=last_updated
ORDER BY last_changed DESC LIMIT 0, 5
ORDER BY state_id DESC LIMIT 0, 5
"""
return recorder.query_states(query, (entity_id, ))

View File

@ -11,7 +11,8 @@ import homeassistant as ha
import homeassistant.util.dt as dt_util
from homeassistant.helpers.entity import ToggleEntity
from homeassistant.const import (
STATE_ON, STATE_OFF, DEVICE_DEFAULT_NAME, EVENT_TIME_CHANGED)
STATE_ON, STATE_OFF, DEVICE_DEFAULT_NAME, EVENT_TIME_CHANGED,
EVENT_STATE_CHANGED)
from homeassistant.components import sun
@ -20,9 +21,17 @@ def get_test_config_dir():
return os.path.join(os.path.dirname(__file__), "config")
def get_test_home_assistant():
def get_test_home_assistant(num_threads=None):
""" Returns a Home Assistant object pointing at test config dir. """
if num_threads:
orig_num_threads = ha.MIN_WORKER_THREAD
ha.MIN_WORKER_THREAD = num_threads
hass = ha.HomeAssistant()
if num_threads:
ha.MIN_WORKER_THREAD = orig_num_threads
hass.config.config_dir = get_test_config_dir()
hass.config.latitude = 32.87336
hass.config.longitude = -117.22743
@ -69,6 +78,18 @@ def ensure_sun_set(hass):
sun.next_setting_utc(hass) + timedelta(seconds=10)})
def mock_state_change_event(hass, new_state, old_state=None):
event_data = {
'entity_id': new_state.entity_id,
'new_state': new_state,
}
if old_state:
event_data['old_state'] = old_state
hass.bus.fire(EVENT_STATE_CHANGED, event_data)
class MockModule(object):
""" Provides a fake module. """

View File

@ -0,0 +1,134 @@
"""
tests.test_component_history
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Tests the history component.
"""
# pylint: disable=protected-access,too-many-public-methods
import time
import os
import unittest
import homeassistant as ha
import homeassistant.util.dt as dt_util
from homeassistant.components import history, recorder, http
from helpers import get_test_home_assistant, mock_state_change_event
class TestComponentHistory(unittest.TestCase):
""" Tests homeassistant.components.history module. """
def setUp(self): # pylint: disable=invalid-name
""" Init needed objects. """
self.hass = get_test_home_assistant(1)
self.init_rec = False
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
self.hass.stop()
if self.init_rec:
recorder._INSTANCE.block_till_done()
os.remove(self.hass.config.path(recorder.DB_FILE))
def init_recorder(self):
recorder.setup(self.hass, {})
self.hass.start()
recorder._INSTANCE.block_till_done()
self.init_rec = True
def test_setup(self):
""" Test setup method of history. """
http.setup(self.hass, {http.DOMAIN: {}})
self.assertTrue(history.setup(self.hass, {}))
def test_last_5_states(self):
""" Test retrieving the last 5 states. """
self.init_recorder()
states = []
entity_id = 'test.last_5_states'
for i in range(7):
self.hass.states.set(entity_id, "State {}".format(i))
if i > 1:
states.append(self.hass.states.get(entity_id))
self.hass.pool.block_till_done()
recorder._INSTANCE.block_till_done()
self.assertEqual(
list(reversed(states)), history.last_5_states(entity_id))
def test_get_states(self):
""" Test getting states at a specific point in time. """
self.init_recorder()
states = []
# Create 10 states for 5 different entities
# After the first 5, sleep a second and save the time
# history.get_states takes the latest states BEFORE point X
for i in range(10):
state = ha.State(
'test.point_in_time_{}'.format(i % 5),
"State {}".format(i),
{'attribute_test': i})
mock_state_change_event(self.hass, state)
self.hass.pool.block_till_done()
recorder._INSTANCE.block_till_done()
if i < 5:
states.append(state)
if i == 4:
time.sleep(1)
point = dt_util.utcnow()
self.assertEqual(
states,
sorted(
history.get_states(point), key=lambda state: state.entity_id))
# Test get_state here because we have a DB setup
self.assertEqual(
states[0], history.get_state(point, states[0].entity_id))
def test_state_changes_during_period(self):
self.init_recorder()
entity_id = 'media_player.test'
def set_state(state):
self.hass.states.set(entity_id, state)
self.hass.pool.block_till_done()
recorder._INSTANCE.block_till_done()
return self.hass.states.get(entity_id)
set_state('idle')
set_state('YouTube')
start = dt_util.utcnow()
time.sleep(1)
states = [
set_state('idle'),
set_state('Netflix'),
set_state('Plex'),
set_state('YouTube'),
]
time.sleep(1)
end = dt_util.utcnow()
set_state('Netflix')
set_state('Plex')
self.assertEqual(
{entity_id: states},
history.state_changes_during_period(start, end, entity_id))