1
mirror of https://github.com/home-assistant/core synced 2024-10-04 07:58:43 +02:00

Fix throttle to work on instance-level

This commit is contained in:
Paulus Schoutsen 2015-10-08 23:49:55 -07:00
parent 8a04e1f5f4
commit 47fc1deecb
2 changed files with 30 additions and 12 deletions

View File

@ -233,35 +233,42 @@ class Throttle(object):
self.limit_no_throttle = limit_no_throttle
def __call__(self, method):
lock = threading.Lock()
if self.limit_no_throttle is not None:
method = Throttle(self.limit_no_throttle)(method)
# We want to be able to differentiate between function and method calls
# All methods have the classname in their qualname seperated by a '.'
# Functions have a '.' in their qualname if defined inline, but will
# be prefixed by '.<locals>.' so we strip that out.
is_func = '.' not in method.__qualname__.split('.<locals>.')[-1]
@wraps(method)
def wrapper(*args, **kwargs):
"""
Wrapper that allows wrapped to be called only once per min_time.
If we cannot acquire the lock, it is running so return None.
"""
if not lock.acquire(False):
# pylint: disable=protected-access
host = wrapper if is_func else args[0]
if not hasattr(host, '_throttle_lock'):
host._throttle_lock = threading.Lock()
if not host._throttle_lock.acquire(False):
return None
last_call = getattr(host, '_throttle_last_call', None)
# Check if method is never called or no_throttle is given
force = not last_call or kwargs.pop('no_throttle', False)
try:
last_call = wrapper.last_call
# Check if method is never called or no_throttle is given
force = not last_call or kwargs.pop('no_throttle', False)
if force or utcnow() - last_call > self.min_time:
result = method(*args, **kwargs)
wrapper.last_call = utcnow()
host._throttle_last_call = utcnow()
return result
else:
return None
finally:
lock.release()
wrapper.last_call = None
host._throttle_lock.release()
return wrapper

View File

@ -218,3 +218,14 @@ class TestUtil(unittest.TestCase):
self.assertEqual(3, len(calls1))
self.assertEqual(2, len(calls2))
def test_throttle_per_instance(self):
""" Test that the throttle method is done per instance of a class. """
class Tester(object):
@util.Throttle(timedelta(seconds=1))
def hello(self):
return True
self.assertTrue(Tester().hello())
self.assertTrue(Tester().hello())