mirror of https://github.com/home-assistant/core
Haveibeenpwned sensor platform (#3618)
* Initial version of "haveibeenpwned" sensor component * 2 flake8 fixes * remove debugging error message * Increase scan_interval as well as throttle to make sure that during initial startup of hass the request happens with 5 seconds delays and after startup with 15 minutes delays. Scan_interval is increased also to not call update as often * update .coveragerc * remove (ssl) verify=False * - use dict to keep the request values with email as key - use track_point_in_time system to make sure data updates initially at 5 seconds between each call until all sensor's email have a result in the dict. * fix a pylint error that happend on the py35 tests
This commit is contained in:
parent
7b40a641ec
commit
8f8bba4ad7
|
@ -235,6 +235,7 @@ omit =
|
|||
homeassistant/components/sensor/google_travel_time.py
|
||||
homeassistant/components/sensor/gpsd.py
|
||||
homeassistant/components/sensor/gtfs.py
|
||||
homeassistant/components/sensor/haveibeenpwned.py
|
||||
homeassistant/components/sensor/hp_ilo.py
|
||||
homeassistant/components/sensor/imap.py
|
||||
homeassistant/components/sensor/imap_email_content.py
|
||||
|
|
|
@ -0,0 +1,181 @@
|
|||
"""
|
||||
Support for haveibeenpwned (email breaches) sensor.
|
||||
|
||||
For more details about this platform, please refer to the documentation at
|
||||
https://home-assistant.io/components/sensor.haveibeenpwned/
|
||||
"""
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
|
||||
import voluptuous as vol
|
||||
import requests
|
||||
|
||||
from homeassistant.components.sensor import PLATFORM_SCHEMA
|
||||
from homeassistant.const import (STATE_UNKNOWN, CONF_EMAIL)
|
||||
from homeassistant.helpers.entity import Entity
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
from homeassistant.util import Throttle
|
||||
import homeassistant.util.dt as dt_util
|
||||
from homeassistant.helpers.event import track_point_in_time
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
DATE_STR_FORMAT = "%Y-%m-%d %H:%M:%S"
|
||||
USER_AGENT = "Home Assistant HaveIBeenPwned Sensor Component"
|
||||
|
||||
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=15)
|
||||
MIN_TIME_BETWEEN_FORCED_UPDATES = timedelta(seconds=5)
|
||||
|
||||
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
|
||||
vol.Required(CONF_EMAIL): vol.All(cv.ensure_list, [cv.string]),
|
||||
})
|
||||
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def setup_platform(hass, config, add_devices, discovery_info=None):
|
||||
"""Setup the RESTful sensor."""
|
||||
emails = config.get(CONF_EMAIL)
|
||||
data = HaveIBeenPwnedData(emails)
|
||||
|
||||
devices = []
|
||||
for email in emails:
|
||||
devices.append(HaveIBeenPwnedSensor(data, hass, email))
|
||||
|
||||
add_devices(devices)
|
||||
|
||||
# To make sure we get initial data for the sensors
|
||||
# ignoring the normal throttle of 15 minutes but using
|
||||
# an update throttle of 5 seconds
|
||||
for sensor in devices:
|
||||
sensor.update_nothrottle()
|
||||
|
||||
|
||||
class HaveIBeenPwnedSensor(Entity):
|
||||
"""Implementation of HaveIBeenPwnedSensor."""
|
||||
|
||||
def __init__(self, data, hass, email):
|
||||
"""Initialize the HaveIBeenPwnedSensor sensor."""
|
||||
self._state = STATE_UNKNOWN
|
||||
self._data = data
|
||||
self._hass = hass
|
||||
self._email = email
|
||||
self._unit_of_measurement = "Breaches"
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
"""Return the name of the sensor."""
|
||||
return "Breaches {}".format(self._email)
|
||||
|
||||
@property
|
||||
def unit_of_measurement(self):
|
||||
"""Return the unit the value is expressed in."""
|
||||
return self._unit_of_measurement
|
||||
|
||||
@property
|
||||
def state(self):
|
||||
"""Return the state of the device."""
|
||||
return self._state
|
||||
|
||||
@property
|
||||
def state_attributes(self):
|
||||
"""Return the atrributes of the sensor."""
|
||||
val = {}
|
||||
if self._email not in self._data.data:
|
||||
return val
|
||||
|
||||
for idx, value in enumerate(self._data.data[self._email]):
|
||||
tmpname = "breach {}".format(idx+1)
|
||||
tmpvalue = "{} {}".format(
|
||||
value["Title"],
|
||||
dt_util.as_local(dt_util.parse_datetime(
|
||||
value["AddedDate"])).strftime(DATE_STR_FORMAT))
|
||||
val[tmpname] = tmpvalue
|
||||
|
||||
return val
|
||||
|
||||
def update_nothrottle(self, dummy=None):
|
||||
"""Update sensor without throttle."""
|
||||
self._data.update_no_throttle()
|
||||
|
||||
# Schedule a forced update 5 seconds in the future if the
|
||||
# update above returned no data for this sensors email.
|
||||
# this is mainly to make sure that we don't
|
||||
# get http error "too many requests" and to have initial
|
||||
# data after hass startup once we have the data it will
|
||||
# update as normal using update
|
||||
if self._email not in self._data.data:
|
||||
track_point_in_time(self._hass,
|
||||
self.update_nothrottle,
|
||||
dt_util.now() +
|
||||
MIN_TIME_BETWEEN_FORCED_UPDATES)
|
||||
return
|
||||
|
||||
if self._email in self._data.data:
|
||||
self._state = len(self._data.data[self._email])
|
||||
self.update_ha_state()
|
||||
|
||||
def update(self):
|
||||
"""Update data and see if it contains data for our email."""
|
||||
self._data.update()
|
||||
|
||||
if self._email in self._data.data:
|
||||
self._state = len(self._data.data[self._email])
|
||||
|
||||
|
||||
class HaveIBeenPwnedData(object):
|
||||
"""Class for handling the data retrieval."""
|
||||
|
||||
def __init__(self, emails):
|
||||
"""Initialize the data object."""
|
||||
self._email_count = len(emails)
|
||||
self._current_index = 0
|
||||
self.data = {}
|
||||
self._email = emails[0]
|
||||
self._emails = emails
|
||||
|
||||
def set_next_email(self):
|
||||
"""Set the next email to be looked up."""
|
||||
self._current_index = (self._current_index + 1) % self._email_count
|
||||
self._email = self._emails[self._current_index]
|
||||
|
||||
def update_no_throttle(self):
|
||||
"""Get the data for a specific email."""
|
||||
self.update(no_throttle=True)
|
||||
|
||||
@Throttle(MIN_TIME_BETWEEN_UPDATES, MIN_TIME_BETWEEN_FORCED_UPDATES)
|
||||
def update(self, **kwargs):
|
||||
"""Get the latest data for current email from REST service."""
|
||||
try:
|
||||
url = "https://haveibeenpwned.com/api/v2/breachedaccount/{}". \
|
||||
format(self._email)
|
||||
|
||||
_LOGGER.info("Checking for breaches for email %s", self._email)
|
||||
|
||||
req = requests.get(url, headers={"User-agent": USER_AGENT},
|
||||
allow_redirects=True, timeout=5)
|
||||
|
||||
except requests.exceptions.RequestException:
|
||||
_LOGGER.error("failed fetching HaveIBeenPwned Data for '%s'",
|
||||
self._email)
|
||||
return
|
||||
|
||||
if req.status_code == 200:
|
||||
self.data[self._email] = sorted(req.json(),
|
||||
key=lambda k: k["AddedDate"],
|
||||
reverse=True)
|
||||
|
||||
# only goto next email if we had data so that
|
||||
# the forced updates try this current email again
|
||||
self.set_next_email()
|
||||
|
||||
elif req.status_code == 404:
|
||||
self.data[self._email] = []
|
||||
|
||||
# only goto next email if we had data so that
|
||||
# the forced updates try this current email again
|
||||
self.set_next_email()
|
||||
|
||||
else:
|
||||
_LOGGER.error("failed fetching HaveIBeenPwned Data for '%s'"
|
||||
"(HTTP Status_code = %d)", self._email,
|
||||
req.status_code)
|
Loading…
Reference in New Issue