1
mirror of https://github.com/home-assistant/core synced 2024-08-02 23:40:32 +02:00

Some code cleanup

This commit is contained in:
Paulus Schoutsen 2014-01-04 13:48:17 -08:00
parent 328b9a84c0
commit 367433acb2
6 changed files with 117 additions and 114 deletions

View File

@ -47,13 +47,10 @@ def start_home_assistant(bus):
bus.fire_event(EVENT_HOMEASSISTANT_START)
while True:
while not request_shutdown.isSet():
try:
time.sleep(1)
if request_shutdown.isSet():
break
except KeyboardInterrupt:
break
@ -356,10 +353,10 @@ class StateMachine(object):
self.lock.release()
def get_state(self, category):
""" Returns a dict (state,last_changed, attributes) describing
""" Returns a dict (state, last_changed, attributes) describing
the state of the specified category. """
try:
# Make a copy so people won't accidently mutate the state
# Make a copy so people won't mutate the state
return dict(self.states[category])
except KeyError:
@ -393,16 +390,19 @@ class Timer(threading.Thread):
last_fired_on_second = -1
while True:
# Sleep till it is the next time that we have to fire an event.
# Aim for halfway through the second that matches TIMER_INTERVAL.
# So if TIMER_INTERVAL is 10 fire at .5, 10.5, 20.5, etc seconds.
# This will yield the best results because time.sleep() is not
# 100% accurate because of non-realtime OS's
now = datetime.now()
if now.second % TIMER_INTERVAL > 0 or \
# First check checks if we are not on a second matching the
# timer interval. Second check checks if we did not already fire
# this interval.
if now.second % TIMER_INTERVAL or \
now.second == last_fired_on_second:
# Sleep till it is the next time that we have to fire an event.
# Aim for halfway through the second that fits TIMER_INTERVAL.
# If TIMER_INTERVAL is 10 fire at .5, 10.5, 20.5, etc seconds.
# This will yield the best results because time.sleep() is not
# 100% accurate because of non-realtime OS's
slp_seconds = TIMER_INTERVAL - now.second % TIMER_INTERVAL + \
.5 - now.microsecond/1000000.0

View File

@ -12,11 +12,13 @@ from homeassistant.components import (general, chromecast,
browser, httpinterface)
# pylint: disable=too-many-branches
# pylint: disable=too-many-branches,too-many-locals,too-many-statements
def from_config_file(config_path):
""" Starts home assistant with all possible functionality
based on a config file. """
logger = logging.getLogger(__name__)
statusses = []
# Read config
@ -27,104 +29,113 @@ def from_config_file(config_path):
bus = ha.Bus()
statemachine = ha.StateMachine(bus)
has_opt = config.has_option
get_opt = config.get
has_section = config.has_section
add_status = lambda name, result: statusses.append((name, result))
# Device scanner
if config.has_option('tomato', 'host') and \
config.has_option('tomato', 'username') and \
config.has_option('tomato', 'password') and \
config.has_option('tomato', 'http_id'):
dev_scan = None
device_scanner = device.TomatoDeviceScanner(
config.get('tomato', 'host'),
config.get('tomato', 'username'),
config.get('tomato', 'password'),
config.get('tomato', 'http_id'))
try:
# For the error message if not all option fields exist
opt_fields = "host, username, password"
statusses.append(("Device Scanner - Tomato",
device_scanner.success_init))
if has_section('tomato'):
dev_scan_name = "Tomato"
opt_fields += ", http_id"
elif config.has_option('netgear', 'host') and \
config.has_option('netgear', 'username') and \
config.has_option('netgear', 'password'):
dev_scan = device.TomatoDeviceScanner(
get_opt('tomato', 'host'),
get_opt('tomato', 'username'),
get_opt('tomato', 'password'),
get_opt('tomato', 'http_id'))
device_scanner = device.NetgearDeviceScanner(
config.get('netgear', 'host'),
config.get('netgear', 'username'),
config.get('netgear', 'password'))
elif has_section('netgear'):
dev_scan_name = "Netgear"
statusses.append(("Device Scanner - Netgear",
device_scanner.success_init))
dev_scan = device.NetgearDeviceScanner(
get_opt('netgear', 'host'),
get_opt('netgear', 'username'),
get_opt('netgear', 'password'))
else:
device_scanner = None
except ConfigParser.NoOptionError:
# If one of the options didn't exist
logger.exception(("Error initializing {}DeviceScanner, "
"could not find one of the following config "
"options: {}".format(dev_scan_name, opt_fields)))
if device_scanner and not device_scanner.success_init:
device_scanner = None
add_status("Device Scanner - {}".format(dev_scan_name), False)
if dev_scan:
add_status("Device Scanner - {}".format(dev_scan_name),
dev_scan.success_init)
if not dev_scan.success_init:
dev_scan = None
# Device Tracker
if device_scanner:
device.DeviceTracker(bus, statemachine, device_scanner)
if dev_scan:
device.DeviceTracker(bus, statemachine, dev_scan)
statusses.append(("Device Tracker", True))
add_status("Device Tracker", True)
# Sun tracker
if config.has_option("common", "latitude") and \
config.has_option("common", "longitude"):
if has_opt("common", "latitude") and \
has_opt("common", "longitude"):
statusses.append(("Weather - Ephem",
sun.setup(
bus, statemachine,
config.get("common", "latitude"),
config.get("common", "longitude"))))
add_status("Weather - Ephem",
sun.setup(
bus, statemachine,
get_opt("common", "latitude"),
get_opt("common", "longitude")))
# Chromecast
if config.has_option("chromecast", "host"):
if has_opt("chromecast", "host"):
chromecast_started = chromecast.setup(bus, statemachine,
config.get("chromecast", "host"))
get_opt("chromecast", "host"))
statusses.append(("Chromecast", chromecast_started))
add_status("Chromecast", chromecast_started)
else:
chromecast_started = False
# Light control
if config.has_section("hue"):
if config.has_option("hue", "host"):
light_control = light.HueLightControl(config.get("hue", "host"))
if has_section("hue"):
if has_opt("hue", "host"):
light_control = light.HueLightControl(get_opt("hue", "host"))
else:
light_control = light.HueLightControl()
statusses.append(("Light Control - Hue", light_control.success_init))
add_status("Light Control - Hue", light_control.success_init)
light.setup(bus, statemachine, light_control)
else:
light_control = None
# Light trigger
if light_control:
light.setup(bus, statemachine, light_control)
add_status("Light Trigger",
device_sun_light_trigger.setup(bus, statemachine))
statusses.append(("Light Trigger", device_sun_light_trigger.setup(
bus, statemachine)))
if config.has_option("downloader", "download_dir"):
statusses.append(("Downloader", downloader.setup(
bus, config.get("downloader", "download_dir"))))
if has_opt("downloader", "download_dir"):
add_status("Downloader", downloader.setup(
bus, get_opt("downloader", "download_dir")))
# Currently only works with Chromecast or Light_Control
if chromecast_started or light_control:
statusses.append(("General", general.setup(bus, statemachine)))
add_status("General", general.setup(bus, statemachine))
statusses.append(("Browser", browser.setup(bus)))
add_status("Browser", browser.setup(bus))
statusses.append(("Media Buttons", keyboard.setup(bus)))
add_status("Media Buttons", keyboard.setup(bus))
# Init HTTP interface
if config.has_option("httpinterface", "api_password"):
if has_opt("httpinterface", "api_password"):
httpinterface.HTTPInterface(
bus, statemachine,
config.get("httpinterface", "api_password"))
get_opt("httpinterface", "api_password"))
statusses.append(("HTTPInterface", True))
logger = logging.getLogger(__name__)
add_status("HTTPInterface", True)
for component, success_init in statusses:
status = "initialized" if success_init else "Failed to initialize"

View File

@ -94,7 +94,7 @@ def setup(bus, statemachine, host):
ATTR_STATE: status.state,
ATTR_OPTIONS: status.options})
else:
statemachine.set_state(category, STATE_NO_APP)
statemachine.set_state(category, STATE_NO_APP, {ATTR_HOST: host})
ha.track_time_change(bus, update_chromecast_state)

View File

@ -64,10 +64,13 @@ def is_home(statemachine, device_id=None):
class DeviceTracker(object):
""" Class that tracks which devices are home and which are not. """
def __init__(self, bus, statemachine, device_scanner):
def __init__(self, bus, statemachine, device_scanner, error_scanning=None):
self.statemachine = statemachine
self.bus = bus
self.device_scanner = device_scanner
self.error_scanning = error_scanning or TIME_SPAN_FOR_ERROR_IN_SCANNING
self.logger = logging.getLogger(__name__)
self.lock = threading.Lock()
@ -75,7 +78,7 @@ class DeviceTracker(object):
# Dictionary to keep track of known devices and devices we track
self.known_devices = {}
# Did we encounter a valid known devices file
# Did we encounter an invalid known devices file
self.invalid_known_devices_file = False
self._read_known_devices_file()
@ -124,7 +127,7 @@ class DeviceTracker(object):
# not show up for 1 scan beacuse of reboot etc
for device in temp_tracking_devices:
if (now - self.known_devices[device]['last_seen'] >
TIME_SPAN_FOR_ERROR_IN_SCANNING):
self.error_scanning):
self.statemachine.set_state(
self.known_devices[device]['category'],
@ -149,7 +152,7 @@ class DeviceTracker(object):
unknown_devices = [device for device in found_devices
if device not in self.known_devices]
if len(unknown_devices) > 0:
if unknown_devices:
try:
# If file does not exist we will write the header too
is_new_file = not os.path.isfile(KNOWN_DEVICES_FILE)
@ -215,26 +218,23 @@ class DeviceTracker(object):
name = util.slugify(row['name']) if row['name'] \
else "unnamed_device"
tries = 0
suffix = ""
while True:
category = STATE_CATEGORY_FORMAT.format(name)
tries = 1
while category in used_categories:
tries += 1
if tries > 1:
suffix = "_{}".format(tries)
suffix = "_{}".format(tries)
category = STATE_CATEGORY_FORMAT.format(
name + suffix)
if category not in used_categories:
break
row['category'] = category
used_categories.append(category)
known_devices[device] = row
if len(known_devices) == 0:
if not known_devices:
self.logger.warning(
"No devices to track. Please update {}.".format(
KNOWN_DEVICES_FILE))
@ -247,7 +247,9 @@ class DeviceTracker(object):
for category in \
self.device_state_categories - new_categories:
print "Removing ", category
self.logger.info(
"DeviceTracker:Removing category {}".format(
category))
self.statemachine.remove_category(category)
# File parsed, warnings given if necessary
@ -313,7 +315,7 @@ class TomatoDeviceScanner(object):
filter_named = [item[0] for item in self.last_results['dhcpd_lease']
if item[2] == device]
if len(filter_named) == 0 or filter_named[0] == "":
if not filter_named or not filter_named[0]:
return None
else:
return filter_named[0]
@ -399,7 +401,6 @@ class NetgearDeviceScanner(object):
def __init__(self, host, username, password):
self._api = pynetgear.Netgear(host, username, password)
self.logger = logging.getLogger(__name__)
self.lock = threading.Lock()
@ -426,32 +427,26 @@ class NetgearDeviceScanner(object):
filter_named = [device.name for device in self.last_results
if device.mac == mac]
if len(filter_named) == 0:
return None
else:
if filter_named:
return filter_named[0]
else:
return None
def _update_info(self):
""" Retrieves latest information from the Netgear router.
Returns boolean if scanning successful. """
self.lock.acquire()
with self.lock:
# if date_updated is None or the date is too old we scan for
# new data
if (not self.date_updated or datetime.now() - self.date_updated >
MIN_TIME_BETWEEN_SCANS):
# if date_updated is None or the date is too old we scan for new data
if (not self.date_updated or datetime.now() - self.date_updated >
MIN_TIME_BETWEEN_SCANS):
self.logger.info("Netgear:Scanning")
self.logger.info("Netgear:Scanning")
self.last_results = self._api.get_attached_devices()
self.last_results = self._api.get_attached_devices()
return True
self.lock.release()
return True
else:
# We acquired the lock before the IF check,
# release it before we return True
self.lock.release()
return True
else:
return True

View File

@ -59,15 +59,12 @@ def setup(bus, download_path):
filename))
# If file exist append a number. We test filename, filename_2..
tries = 0
while True:
tries = 1
final_path = path + ext
while os.path.isfile(final_path):
tries += 1
name_suffix = "" if tries == 1 else "_{}".format(tries)
final_path = path + name_suffix + ext
if not os.path.isfile(final_path):
break
final_path = path + "_{}".format(tries) + ext
logger.info("FileDownloader:{} -> {}".format(
service.data['url'], final_path))

View File

@ -145,8 +145,8 @@ class HueLightControl(object):
def is_light_on(self, light_id=None):
""" Returns if specified or all light are on. """
if not light_id:
return sum(
[1 for light in self._light_map.values() if light.on]) > 0
return any(
[True for light in self._light_map.values() if light.on])
else:
return self._bridge.get_light(self._convert_id(light_id), 'on')