1
mirror of https://github.com/home-assistant/core synced 2024-08-02 23:40:32 +02:00

Even more test coverage

This commit is contained in:
Paulus Schoutsen 2014-11-22 21:40:01 -08:00
parent 1069d79298
commit 5943f757a0
3 changed files with 134 additions and 35 deletions

View File

@ -107,7 +107,11 @@ class HomeAssistant(object):
def track_state_change(self, entity_ids, action,
from_state=None, to_state=None):
""" Track specific state changes. """
"""
Track specific state changes.
entity_ids, from_state and to_state can be string or list.
Use list to match multiple.
"""
from_state = _process_match_param(from_state)
to_state = _process_match_param(to_state)
@ -130,14 +134,16 @@ class HomeAssistant(object):
self.bus.listen(EVENT_STATE_CHANGED, state_listener)
def track_point_in_time(self, action, point_in_time):
""" Adds a listener that fires once after a spefic point in time. """
"""
Adds a listener that fires once at or after a spefic point in time.
"""
@ft.wraps(action)
def point_in_time_listener(event):
""" Listens for matching time_changed events. """
now = event.data[ATTR_NOW]
if now > point_in_time and \
if now >= point_in_time and \
not hasattr(point_in_time_listener, 'run'):
# Set variable so that we will never run twice.
@ -221,7 +227,7 @@ class HomeAssistant(object):
def _process_match_param(parameter):
""" Wraps parameter in a list if it is not one and returns it. """
if not parameter:
if not parameter or parameter == MATCH_ALL:
return MATCH_ALL
elif isinstance(parameter, list):
return parameter

View File

@ -5,10 +5,12 @@ homeassistant.test
Provides tests to verify that Home Assistant modules do what they should do.
"""
# pylint: disable=protected-access
import os
import unittest
import time
import json
from datetime import datetime
import requests
@ -48,9 +50,6 @@ def ensure_homeassistant_started():
hass.start()
# Give objects time to startup
time.sleep(1)
HAHelper.hass = hass
return HAHelper.hass
@ -72,9 +71,6 @@ def ensure_slave_started():
slave.start()
# Give objects time to startup
time.sleep(1)
HAHelper.slave = slave
return HAHelper.slave
@ -118,18 +114,114 @@ class TestHomeAssistant(unittest.TestCase):
def test_track_state_change(self):
""" Test track_state_change. """
# with with from_state, to_state and without
pass
# 2 lists to track how often our callbacks got called
specific_runs = []
wildcard_runs = []
self.hass.track_state_change(
'light.Bowl', lambda a, b, c: specific_runs.append(1), 'on', 'off')
self.hass.track_state_change(
'light.Bowl', lambda a, b, c: wildcard_runs.append(1),
ha.MATCH_ALL, ha.MATCH_ALL)
# Set same state should not trigger a state change/listener
self.hass.states.set('light.Bowl', 'on')
self.hass._pool.block_till_done()
self.assertEqual(0, len(specific_runs))
self.assertEqual(0, len(wildcard_runs))
# State change off -> on
self.hass.states.set('light.Bowl', 'off')
self.hass._pool.block_till_done()
self.assertEqual(1, len(specific_runs))
self.assertEqual(1, len(wildcard_runs))
# State change off -> off
self.hass.states.set('light.Bowl', 'off', {"some_attr": 1})
self.hass._pool.block_till_done()
self.assertEqual(1, len(specific_runs))
self.assertEqual(2, len(wildcard_runs))
# State change off -> on
self.hass.states.set('light.Bowl', 'on')
self.hass._pool.block_till_done()
self.assertEqual(1, len(specific_runs))
self.assertEqual(3, len(wildcard_runs))
def test_listen_once_event(self):
""" Test listen_once_event method. """
runs = []
self.hass.listen_once_event('test_event', lambda x: runs.append(1))
self.hass.bus.fire('test_event')
self.hass._pool.block_till_done()
self.assertEqual(1, len(runs))
# Second time it should not increase runs
self.hass.bus.fire('test_event')
self.hass._pool.block_till_done()
self.assertEqual(1, len(runs))
def test_track_point_in_time(self):
""" Test track point in time. """
pass
before_birthday = datetime(1985, 7, 9, 12, 0, 0)
birthday_paulus = datetime(1986, 7, 9, 12, 0, 0)
after_birthday = datetime(1987, 7, 9, 12, 0, 0)
runs = []
self.hass.track_point_in_time(
lambda x: runs.append(1), birthday_paulus)
self._send_time_changed(before_birthday)
self.hass._pool.block_till_done()
self.assertEqual(0, len(runs))
self._send_time_changed(birthday_paulus)
self.hass._pool.block_till_done()
self.assertEqual(1, len(runs))
# A point in time tracker will only fire once, this should do nothing
self._send_time_changed(birthday_paulus)
self.hass._pool.block_till_done()
self.assertEqual(1, len(runs))
self.hass.track_point_in_time(
lambda x: runs.append(1), birthday_paulus)
self._send_time_changed(after_birthday)
self.hass._pool.block_till_done()
self.assertEqual(2, len(runs))
def test_track_time_change(self):
""" Test tracking time change. """
# with paramters
# without parameters
pass
wildcard_runs = []
specific_runs = []
self.hass.track_time_change(lambda x: wildcard_runs.append(1))
self.hass.track_time_change(
lambda x: specific_runs.append(1), second=[0, 30])
self._send_time_changed(datetime(2014, 5, 24, 12, 0, 0))
self.hass._pool.block_till_done()
self.assertEqual(1, len(specific_runs))
self.assertEqual(1, len(wildcard_runs))
self._send_time_changed(datetime(2014, 5, 24, 12, 0, 15))
self.hass._pool.block_till_done()
self.assertEqual(1, len(specific_runs))
self.assertEqual(2, len(wildcard_runs))
self._send_time_changed(datetime(2014, 5, 24, 12, 0, 30))
self.hass._pool.block_till_done()
self.assertEqual(2, len(specific_runs))
self.assertEqual(3, len(wildcard_runs))
def _send_time_changed(self, now):
""" Send a time changed event. """
self.hass.bus.fire(ha.EVENT_TIME_CHANGED, {ha.ATTR_NOW: now})
# pylint: disable=too-many-public-methods
@ -311,8 +403,7 @@ class TestHTTP(unittest.TestCase):
_url(remote.URL_API_EVENTS_EVENT.format("test.event_no_data")),
headers=HA_HEADERS)
# Allow the event to take place
time.sleep(1)
self.hass._pool.block_till_done()
self.assertEqual(1, len(test_value))
@ -334,8 +425,7 @@ class TestHTTP(unittest.TestCase):
data=json.dumps({"test": 1}),
headers=HA_HEADERS)
# Allow the event to take place
time.sleep(1)
self.hass._pool.block_till_done()
self.assertEqual(1, len(test_value))
@ -355,8 +445,7 @@ class TestHTTP(unittest.TestCase):
data=json.dumps('not an object'),
headers=HA_HEADERS)
# It shouldn't but if it fires, allow the event to take place
time.sleep(1)
self.hass._pool.block_till_done()
self.assertEqual(422, req.status_code)
self.assertEqual(0, len(test_value))
@ -401,8 +490,7 @@ class TestHTTP(unittest.TestCase):
"test_domain", "test_service")),
headers=HA_HEADERS)
# Allow the event to take place
time.sleep(1)
self.hass._pool.block_till_done()
self.assertEqual(1, len(test_value))
@ -424,8 +512,7 @@ class TestHTTP(unittest.TestCase):
data=json.dumps({"test": 1}),
headers=HA_HEADERS)
# Allow the event to take place
time.sleep(1)
self.hass._pool.block_till_done()
self.assertEqual(1, len(test_value))
@ -463,8 +550,7 @@ class TestRemoteMethods(unittest.TestCase):
remote.fire_event(self.api, "test.event_no_data")
# Allow the event to take place
time.sleep(1)
self.hass._pool.block_till_done()
self.assertEqual(1, len(test_value))
@ -516,8 +602,7 @@ class TestRemoteMethods(unittest.TestCase):
remote.call_service(self.api, "test_domain", "test_service")
# Allow the event to take place
time.sleep(1)
self.hass._pool.block_till_done()
self.assertEqual(1, len(test_value))
@ -544,8 +629,10 @@ class TestRemoteClasses(unittest.TestCase):
""" Tests if setting the state on a slave is recorded. """
self.slave.states.set("remote.test", "remote.statemachine test")
# Allow interaction between 2 instances
time.sleep(1)
# Wait till slave tells master
self.slave._pool.block_till_done()
# Wait till master gives updated state
self.hass._pool.block_till_done()
self.assertEqual("remote.statemachine test",
self.slave.states.get("remote.test").state)
@ -562,7 +649,9 @@ class TestRemoteClasses(unittest.TestCase):
self.slave.bus.fire("test.event_no_data")
# Allow the event to take place
time.sleep(1)
# Wait till slave tells master
self.slave._pool.block_till_done()
# Wait till master gives updated event
self.hass._pool.block_till_done()
self.assertEqual(1, len(test_value))

View File

@ -246,6 +246,10 @@ class ThreadPool(object):
self.busy_callback(self.current_jobs, self.work_queue.qsize())
def block_till_done(self):
""" Blocks till all work is done. """
self.work_queue.join()
class PriorityQueueItem(object):
""" Holds a priority and a value. Used within PriorityQueue. """