mirror of
https://github.com/home-assistant/core
synced 2024-07-12 07:21:24 +02:00
Further Python 3 migration
This commit is contained in:
parent
7e06d535ab
commit
ef6d862671
@ -33,10 +33,6 @@ assert 60 % TIMER_INTERVAL == 0, "60 % TIMER_INTERVAL should be 0!"
|
||||
|
||||
BUS_NUM_THREAD = 4
|
||||
BUS_REPORT_BUSY_TIMEOUT = dt.timedelta(minutes=1)
|
||||
PRIO_SERVICE_DEFAULT = 1
|
||||
PRIO_EVENT_STATE = 2
|
||||
PRIO_EVENT_TIME = 3
|
||||
PRIO_EVENT_DEFAULT = 4
|
||||
|
||||
|
||||
def start_home_assistant(bus):
|
||||
@ -160,6 +156,32 @@ def track_time_change(bus, action,
|
||||
bus.listen_event(EVENT_TIME_CHANGED, time_listener)
|
||||
|
||||
|
||||
def listen_once_event(bus, event_type, listener):
|
||||
""" Listen once for event of a specific type.
|
||||
|
||||
To listen to all events specify the constant ``MATCH_ALL``
|
||||
as event_type.
|
||||
|
||||
Note: at the moment it is impossible to remove a one time listener.
|
||||
"""
|
||||
@ft.wraps(listener)
|
||||
def onetime_listener(event):
|
||||
""" Removes listener from eventbus and then fires listener. """
|
||||
if not hasattr(onetime_listener, 'run'):
|
||||
# Set variable so that we will never run twice.
|
||||
# Because the event bus might have to wait till a thread comes
|
||||
# available to execute this listener it might occur that the
|
||||
# listener gets lined up twice to be executed.
|
||||
# This will make sure the second time it does nothing.
|
||||
onetime_listener.run = True
|
||||
|
||||
bus.remove_event_listener(event_type, onetime_listener)
|
||||
|
||||
listener(event)
|
||||
|
||||
bus.listen_event(event_type, onetime_listener)
|
||||
|
||||
|
||||
def create_bus_job_handler(logger):
|
||||
""" Creates a job handler that logs errors to supplied `logger`. """
|
||||
|
||||
@ -176,6 +198,26 @@ def create_bus_job_handler(logger):
|
||||
return job_handler
|
||||
|
||||
|
||||
class BusPriority(util.OrderedEnum):
|
||||
""" Provides priorities for bus events. """
|
||||
# pylint: disable=no-init
|
||||
|
||||
SERVICE_DEFAULT = 1
|
||||
EVENT_STATE = 2
|
||||
EVENT_TIME = 3
|
||||
EVENT_DEFAULT = 4
|
||||
|
||||
@staticmethod
|
||||
def from_event_type(event_type):
|
||||
""" Returns a priority based on event type. """
|
||||
if event_type == EVENT_TIME_CHANGED:
|
||||
return BusPriority.EVENT_TIME
|
||||
elif event_type == EVENT_STATE_CHANGED:
|
||||
return BusPriority.EVENT_STATE
|
||||
else:
|
||||
return BusPriority.EVENT_DEFAULT
|
||||
|
||||
|
||||
# pylint: disable=too-few-public-methods
|
||||
class ServiceCall(object):
|
||||
""" Represents a call to a service. """
|
||||
@ -249,10 +291,7 @@ class Bus(object):
|
||||
|
||||
def has_service(self, domain, service):
|
||||
""" Returns True if specified service exists. """
|
||||
try:
|
||||
return service in self._services[domain]
|
||||
except KeyError: # if key 'domain' does not exist
|
||||
return False
|
||||
return service in self._services.get(domain, [])
|
||||
|
||||
def call_service(self, domain, service, service_data=None):
|
||||
""" Calls a service. """
|
||||
@ -260,7 +299,7 @@ class Bus(object):
|
||||
|
||||
with self.service_lock:
|
||||
try:
|
||||
self.pool.add_job(PRIO_SERVICE_DEFAULT,
|
||||
self.pool.add_job(BusPriority.SERVICE_DEFAULT,
|
||||
(self._services[domain][service],
|
||||
service_call))
|
||||
|
||||
@ -273,10 +312,9 @@ class Bus(object):
|
||||
def register_service(self, domain, service, service_func):
|
||||
""" Register a service. """
|
||||
with self.service_lock:
|
||||
try:
|
||||
if domain in self._services:
|
||||
self._services[domain][service] = service_func
|
||||
|
||||
except KeyError: # Domain does not exist yet in self._services
|
||||
else:
|
||||
self._services[domain] = {service: service_func}
|
||||
|
||||
def fire_event(self, event_type, event_data=None):
|
||||
@ -295,15 +333,9 @@ class Bus(object):
|
||||
if not listeners:
|
||||
return
|
||||
|
||||
if event_type == EVENT_TIME_CHANGED:
|
||||
prio = PRIO_EVENT_TIME
|
||||
elif event_type == EVENT_STATE_CHANGED:
|
||||
prio = PRIO_EVENT_STATE
|
||||
else:
|
||||
prio = PRIO_EVENT_DEFAULT
|
||||
|
||||
for func in listeners:
|
||||
self.pool.add_job(prio, (func, event))
|
||||
self.pool.add_job(BusPriority.from_event_type(event_type),
|
||||
(func, event))
|
||||
|
||||
self._check_busy()
|
||||
|
||||
@ -314,37 +346,11 @@ class Bus(object):
|
||||
as event_type.
|
||||
"""
|
||||
with self.event_lock:
|
||||
try:
|
||||
if event_type in self._event_listeners:
|
||||
self._event_listeners[event_type].append(listener)
|
||||
|
||||
except KeyError: # event_type did not exist
|
||||
else:
|
||||
self._event_listeners[event_type] = [listener]
|
||||
|
||||
def listen_once_event(self, event_type, listener):
|
||||
""" Listen once for event of a specific type.
|
||||
|
||||
To listen to all events specify the constant ``MATCH_ALL``
|
||||
as event_type.
|
||||
|
||||
Note: at the moment it is impossible to remove a one time listener.
|
||||
"""
|
||||
@ft.wraps(listener)
|
||||
def onetime_listener(event):
|
||||
""" Removes listener from eventbus and then fires listener. """
|
||||
if not hasattr(onetime_listener, 'run'):
|
||||
# Set variable so that we will never run twice.
|
||||
# Because the event bus might have to wait till a thread comes
|
||||
# available to execute this listener it might occur that the
|
||||
# listener gets lined up twice to be executed.
|
||||
# This will make sure the second time it does nothing.
|
||||
onetime_listener.run = True
|
||||
|
||||
self.remove_event_listener(event_type, onetime_listener)
|
||||
|
||||
listener(event)
|
||||
|
||||
self.listen_event(event_type, onetime_listener)
|
||||
|
||||
def remove_event_listener(self, event_type, listener):
|
||||
""" Removes a listener of a specific event_type. """
|
||||
with self.event_lock:
|
||||
@ -416,24 +422,27 @@ class State(object):
|
||||
'attributes': self.attributes,
|
||||
'last_changed': util.datetime_to_str(self.last_changed)}
|
||||
|
||||
@staticmethod
|
||||
def from_dict(json_dict):
|
||||
def __eq__(self, other):
|
||||
return (self.__class__ == other.__class__ and
|
||||
self.state == other.state and
|
||||
self.attributes == other.attributes)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, json_dict):
|
||||
""" Static method to create a state from a dict.
|
||||
Ensures: state == State.from_json_dict(state.to_json_dict()) """
|
||||
|
||||
try:
|
||||
last_changed = json_dict.get('last_changed')
|
||||
|
||||
if last_changed:
|
||||
last_changed = util.str_to_datetime(last_changed)
|
||||
|
||||
return State(json_dict['entity_id'],
|
||||
json_dict['state'],
|
||||
json_dict.get('attributes'),
|
||||
last_changed)
|
||||
except KeyError: # if key 'entity_id' or 'state' did not exist
|
||||
if 'entity_id' not in json_dict and 'state' not in json_dict:
|
||||
return None
|
||||
|
||||
last_changed = json_dict.get('last_changed')
|
||||
|
||||
if last_changed:
|
||||
last_changed = util.str_to_datetime(last_changed)
|
||||
|
||||
return cls(json_dict['entity_id'], json_dict['state'],
|
||||
json_dict.get('attributes'), last_changed)
|
||||
|
||||
def __repr__(self):
|
||||
if self.attributes:
|
||||
return "<state {}:{} @ {}>".format(
|
||||
@ -454,23 +463,27 @@ class StateMachine(object):
|
||||
|
||||
@property
|
||||
def entity_ids(self):
|
||||
""" List of entitie ids that are being tracked. """
|
||||
with self.lock:
|
||||
return list(self.states.keys())
|
||||
""" List of entity ids that are being tracked. """
|
||||
return self.states.keys()
|
||||
|
||||
def get_state(self, entity_id):
|
||||
""" Returns the state of the specified entity. """
|
||||
state = self.states.get(entity_id)
|
||||
|
||||
# Make a copy so people won't mutate the state
|
||||
return state.copy() if state else None
|
||||
|
||||
def is_state(self, entity_id, state):
|
||||
""" Returns True if entity exists and is specified state. """
|
||||
return (entity_id in self.states and
|
||||
self.states[entity_id].state == state)
|
||||
|
||||
def remove_entity(self, entity_id):
|
||||
""" Removes a entity from the state machine.
|
||||
|
||||
Returns boolean to indicate if a entity was removed. """
|
||||
with self.lock:
|
||||
try:
|
||||
del self.states[entity_id]
|
||||
|
||||
return True
|
||||
|
||||
except KeyError:
|
||||
# if entity does not exist
|
||||
return False
|
||||
return self.states.pop(entity_id, None) is not None
|
||||
|
||||
def set_state(self, entity_id, new_state, attributes=None):
|
||||
""" Set the state of an entity, add entity if it does not exist.
|
||||
@ -480,16 +493,9 @@ class StateMachine(object):
|
||||
attributes = attributes or {}
|
||||
|
||||
with self.lock:
|
||||
# Change state and fire listeners
|
||||
try:
|
||||
if entity_id in self.states:
|
||||
old_state = self.states[entity_id]
|
||||
|
||||
except KeyError:
|
||||
# If state did not exist yet
|
||||
self.states[entity_id] = State(entity_id, new_state,
|
||||
attributes)
|
||||
|
||||
else:
|
||||
if old_state.state != new_state or \
|
||||
old_state.attributes != attributes:
|
||||
|
||||
@ -501,24 +507,10 @@ class StateMachine(object):
|
||||
'old_state': old_state,
|
||||
'new_state': state})
|
||||
|
||||
def get_state(self, entity_id):
|
||||
""" Returns the state of the specified entity. """
|
||||
with self.lock:
|
||||
try:
|
||||
# Make a copy so people won't mutate the state
|
||||
return self.states[entity_id].copy()
|
||||
|
||||
except KeyError:
|
||||
# If entity does not exist
|
||||
return None
|
||||
|
||||
def is_state(self, entity_id, state):
|
||||
""" Returns True if entity exists and is specified state. """
|
||||
try:
|
||||
return self.states.get(entity_id).state == state
|
||||
except AttributeError:
|
||||
# states.get returned None
|
||||
return False
|
||||
else:
|
||||
# If state did not exist yet
|
||||
self.states[entity_id] = State(entity_id, new_state,
|
||||
attributes)
|
||||
|
||||
|
||||
class Timer(threading.Thread):
|
||||
@ -530,8 +522,8 @@ class Timer(threading.Thread):
|
||||
self.daemon = True
|
||||
self.bus = bus
|
||||
|
||||
bus.listen_once_event(EVENT_HOMEASSISTANT_START,
|
||||
lambda event: self.start())
|
||||
listen_once_event(bus, EVENT_HOMEASSISTANT_START,
|
||||
lambda event: self.start())
|
||||
|
||||
def run(self):
|
||||
""" Start the timer. """
|
||||
|
@ -33,7 +33,7 @@ def from_config_file(config_path):
|
||||
statusses = []
|
||||
|
||||
# Read config
|
||||
config = configparser.SafeConfigParser()
|
||||
config = configparser.ConfigParser()
|
||||
config.read(config_path)
|
||||
|
||||
# Init core
|
||||
|
@ -21,11 +21,12 @@ LIGHT_PROFILE = 'relax'
|
||||
|
||||
|
||||
# pylint: disable=too-many-branches
|
||||
def setup(bus, statemachine,
|
||||
light_group=light.GROUP_NAME_ALL_LIGHTS,
|
||||
light_profile=LIGHT_PROFILE):
|
||||
def setup(bus, statemachine, light_group=None, light_profile=None):
|
||||
""" Triggers to turn lights on or off based on device precense. """
|
||||
|
||||
light_group = light_group or light.GROUP_NAME_ALL_LIGHTS
|
||||
light_profile = light_profile or LIGHT_PROFILE
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
device_entity_ids = util.filter_entity_ids(statemachine.entity_ids,
|
||||
|
@ -39,13 +39,10 @@ def is_on(statemachine, entity_id):
|
||||
if state:
|
||||
group_type = _get_group_type(state.state)
|
||||
|
||||
if group_type:
|
||||
# We found group_type, compare to ON-state
|
||||
return state.state == _GROUP_TYPES[group_type][0]
|
||||
else:
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
# If we found a group_type, compare to ON-state
|
||||
return group_type and state.state == _GROUP_TYPES[group_type][0]
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def expand_entity_ids(statemachine, entity_ids):
|
||||
|
@ -113,12 +113,10 @@ class HTTPInterface(threading.Thread):
|
||||
|
||||
self.daemon = True
|
||||
|
||||
if not server_port:
|
||||
server_port = SERVER_PORT
|
||||
server_port = server_port or SERVER_PORT
|
||||
|
||||
# If no server host is given, accept all incoming requests
|
||||
if not server_host:
|
||||
server_host = '0.0.0.0'
|
||||
server_host = server_host or '0.0.0.0'
|
||||
|
||||
self.server = HTTPServer((server_host, server_port), RequestHandler)
|
||||
|
||||
@ -128,8 +126,8 @@ class HTTPInterface(threading.Thread):
|
||||
self.server.statemachine = statemachine
|
||||
self.server.api_password = api_password
|
||||
|
||||
bus.listen_once_event(ha.EVENT_HOMEASSISTANT_START,
|
||||
lambda event: self.start())
|
||||
ha.listen_once_event(bus, ha.EVENT_HOMEASSISTANT_START,
|
||||
lambda event: self.start())
|
||||
|
||||
def run(self):
|
||||
""" Start the HTTP interface. """
|
||||
@ -609,7 +607,8 @@ class RequestHandler(BaseHTTPRequestHandler):
|
||||
# pylint: disable=unused-argument
|
||||
def _handle_get_api_states(self, path_match, data):
|
||||
""" Returns the entitie ids which state are being tracked. """
|
||||
self._write_json({'entity_ids': self.server.statemachine.entity_ids})
|
||||
self._write_json(
|
||||
{'entity_ids': list(self.server.statemachine.entity_ids)})
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def _handle_get_api_states_entity(self, path_match, data):
|
||||
|
@ -265,11 +265,9 @@ def setup(bus, statemachine, light_control):
|
||||
profile = profiles.get(dat.get(ATTR_PROFILE))
|
||||
|
||||
if profile:
|
||||
color = profile[0:2]
|
||||
bright = profile[2]
|
||||
*color, bright = profile
|
||||
else:
|
||||
color = None
|
||||
bright = None
|
||||
color, bright = None, None
|
||||
|
||||
if ATTR_BRIGHTNESS in dat:
|
||||
bright = util.convert(dat.get(ATTR_BRIGHTNESS), int)
|
||||
@ -277,28 +275,28 @@ def setup(bus, statemachine, light_control):
|
||||
if ATTR_XY_COLOR in dat:
|
||||
try:
|
||||
# xy_color should be a list containing 2 floats
|
||||
xy_color = [float(val) for val in dat.get(ATTR_XY_COLOR)]
|
||||
xy_color = dat.get(ATTR_XY_COLOR)
|
||||
|
||||
if len(xy_color) == 2:
|
||||
color = xy_color
|
||||
color = [float(val) for val in xy_color]
|
||||
|
||||
except (TypeError, ValueError):
|
||||
# TypeError if dat[ATTR_XY_COLOR] is not iterable
|
||||
# TypeError if xy_color is not iterable
|
||||
# ValueError if value could not be converted to float
|
||||
pass
|
||||
|
||||
if ATTR_RGB_COLOR in dat:
|
||||
try:
|
||||
# rgb_color should be a list containing 3 ints
|
||||
rgb_color = [int(val) for val in dat.get(ATTR_RGB_COLOR)]
|
||||
rgb_color = dat.get(ATTR_RGB_COLOR)
|
||||
|
||||
if len(rgb_color) == 3:
|
||||
color = util.color_RGB_to_xy(rgb_color[0],
|
||||
rgb_color[1],
|
||||
rgb_color[2])
|
||||
color = util.color_RGB_to_xy(int(rgb_color[0]),
|
||||
int(rgb_color[1]),
|
||||
int(rgb_color[2]))
|
||||
|
||||
except (TypeError, ValueError):
|
||||
# TypeError if dat[ATTR_RGB_COLOR] is not iterable
|
||||
# TypeError if rgb_color is not iterable
|
||||
# ValueError if not all values can be converted to int
|
||||
pass
|
||||
|
||||
|
@ -27,6 +27,27 @@ ATTR_TODAY_STANDBY_TIME = "today_standby_time"
|
||||
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=10)
|
||||
|
||||
|
||||
def is_on(statemachine, entity_id=None):
|
||||
""" Returns if the wemo is on based on the statemachine. """
|
||||
entity_id = entity_id or ENTITY_ID_ALL_WEMOS
|
||||
|
||||
return statemachine.is_state(entity_id, STATE_ON)
|
||||
|
||||
|
||||
def turn_on(bus, entity_id=None):
|
||||
""" Turns all or specified wemo on. """
|
||||
data = {ATTR_ENTITY_ID: entity_id} if entity_id else None
|
||||
|
||||
bus.call_service(DOMAIN, SERVICE_TURN_ON, data)
|
||||
|
||||
|
||||
def turn_off(bus, entity_id=None):
|
||||
""" Turns all or specified wemo off. """
|
||||
data = {ATTR_ENTITY_ID: entity_id} if entity_id else None
|
||||
|
||||
bus.call_service(DOMAIN, SERVICE_TURN_OFF, data)
|
||||
|
||||
|
||||
# pylint: disable=too-many-branches
|
||||
def setup(bus, statemachine):
|
||||
""" Track states and offer events for WeMo switches. """
|
||||
|
4
homeassistant/external/__init__.py
vendored
4
homeassistant/external/__init__.py
vendored
@ -3,9 +3,9 @@ Not all external Git repositories that we depend on are
|
||||
available as a package for pip. That is why we include
|
||||
them here.
|
||||
|
||||
PyChromecast
|
||||
PyNetgear
|
||||
------------
|
||||
https://github.com/balloob/pychromecast
|
||||
https://github.com/balloob/pynetgear
|
||||
|
||||
|
||||
"""
|
||||
|
@ -182,12 +182,6 @@ class Bus(object):
|
||||
Will throw NotImplementedError. """
|
||||
raise NotImplementedError
|
||||
|
||||
def listen_once_event(self, event_type, listener):
|
||||
""" Not implemented for remote bus.
|
||||
|
||||
Will throw NotImplementedError. """
|
||||
raise NotImplementedError
|
||||
|
||||
def remove_event_listener(self, event_type, listener):
|
||||
""" Not implemented for remote bus.
|
||||
|
||||
|
@ -109,7 +109,7 @@ class TestHTTPInterface(unittest.TestCase):
|
||||
if "test" in event.data:
|
||||
test_value.append(1)
|
||||
|
||||
self.bus.listen_once_event("test_event_with_data", listener)
|
||||
ha.listen_once_event(self.bus, "test_event_with_data", listener)
|
||||
|
||||
requests.post(
|
||||
_url(hah.URL_FIRE_EVENT),
|
||||
@ -129,7 +129,7 @@ class TestHTTPInterface(unittest.TestCase):
|
||||
|
||||
data = req.json()
|
||||
|
||||
self.assertEqual(self.statemachine.entity_ids,
|
||||
self.assertEqual(list(self.statemachine.entity_ids),
|
||||
data['entity_ids'])
|
||||
|
||||
def test_api_get_state(self):
|
||||
@ -194,7 +194,7 @@ class TestHTTPInterface(unittest.TestCase):
|
||||
""" Helper method that will verify our event got called. """
|
||||
test_value.append(1)
|
||||
|
||||
self.bus.listen_once_event("test.event_no_data", listener)
|
||||
ha.listen_once_event(self.bus, "test.event_no_data", listener)
|
||||
|
||||
requests.post(
|
||||
_url(hah.URL_API_EVENTS_EVENT.format("test.event_no_data")),
|
||||
@ -216,7 +216,7 @@ class TestHTTPInterface(unittest.TestCase):
|
||||
if "test" in event.data:
|
||||
test_value.append(1)
|
||||
|
||||
self.bus.listen_once_event("test_event_with_data", listener)
|
||||
ha.listen_once_event(self.bus, "test_event_with_data", listener)
|
||||
|
||||
requests.post(
|
||||
_url(hah.URL_API_EVENTS_EVENT.format("test_event_with_data")),
|
||||
@ -237,7 +237,7 @@ class TestHTTPInterface(unittest.TestCase):
|
||||
""" Helper method that will verify our event got called. """
|
||||
test_value.append(1)
|
||||
|
||||
self.bus.listen_once_event("test_event_with_bad_data", listener)
|
||||
ha.listen_once_event(self.bus, "test_event_with_bad_data", listener)
|
||||
|
||||
req = requests.post(
|
||||
_url(hah.URL_API_EVENTS_EVENT.format("test_event")),
|
||||
@ -329,7 +329,7 @@ class TestRemote(unittest.TestCase):
|
||||
def test_remote_sm_list_state_entities(self):
|
||||
""" Test if the debug interface allows us to list state entity ids. """
|
||||
|
||||
self.assertEqual(self.statemachine.entity_ids,
|
||||
self.assertEqual(list(self.statemachine.entity_ids),
|
||||
self.remote_sm.entity_ids)
|
||||
|
||||
def test_remote_sm_get_state(self):
|
||||
@ -370,7 +370,7 @@ class TestRemote(unittest.TestCase):
|
||||
""" Helper method that will verify our event got called. """
|
||||
test_value.append(1)
|
||||
|
||||
self.bus.listen_once_event("test_event_no_data", listener)
|
||||
ha.listen_once_event(self.bus, "test_event_no_data", listener)
|
||||
|
||||
self.remote_eb.fire_event("test_event_no_data")
|
||||
|
||||
@ -389,7 +389,7 @@ class TestRemote(unittest.TestCase):
|
||||
if event.data["test"] == 1:
|
||||
test_value.append(1)
|
||||
|
||||
self.bus.listen_once_event("test_event_with_data", listener)
|
||||
ha.listen_once_event(self.bus, "test_event_with_data", listener)
|
||||
|
||||
self.remote_eb.fire_event("test_event_with_data", {"test": 1})
|
||||
|
||||
@ -444,7 +444,7 @@ class TestRemote(unittest.TestCase):
|
||||
""" Helper method that will verify our event got called. """
|
||||
test_value.append(1)
|
||||
|
||||
self.bus.listen_once_event(ha.EVENT_STATE_CHANGED, listener)
|
||||
ha.listen_once_event(self.bus, ha.EVENT_STATE_CHANGED, listener)
|
||||
|
||||
self.sm_with_remote_eb.set_state("test", "local sm with remote eb")
|
||||
|
||||
|
@ -8,6 +8,7 @@ import threading
|
||||
import queue
|
||||
import datetime
|
||||
import re
|
||||
import enum
|
||||
|
||||
RE_SANITIZE_FILENAME = re.compile(r'(~|\.\.|/|\\)')
|
||||
RE_SLUGIFY = re.compile(r'[^A-Za-z0-9_]+')
|
||||
@ -133,6 +134,31 @@ def ensure_unique_string(preferred_string, current_strings):
|
||||
return string
|
||||
|
||||
|
||||
class OrderedEnum(enum.Enum):
|
||||
""" Taken from Python 3.4.0 docs. """
|
||||
# pylint: disable=no-init
|
||||
|
||||
def __ge__(self, other):
|
||||
if self.__class__ is other.__class__:
|
||||
return self.value >= other.value
|
||||
return NotImplemented
|
||||
|
||||
def __gt__(self, other):
|
||||
if self.__class__ is other.__class__:
|
||||
return self.value > other.value
|
||||
return NotImplemented
|
||||
|
||||
def __le__(self, other):
|
||||
if self.__class__ is other.__class__:
|
||||
return self.value <= other.value
|
||||
return NotImplemented
|
||||
|
||||
def __lt__(self, other):
|
||||
if self.__class__ is other.__class__:
|
||||
return self.value < other.value
|
||||
return NotImplemented
|
||||
|
||||
|
||||
# Reason why I decided to roll my own ThreadPool instead of using
|
||||
# multiprocessing.dummy.pool or even better, use multiprocessing.pool and
|
||||
# not be hurt by the GIL in the cpython interpreter:
|
||||
|
Loading…
Reference in New Issue
Block a user