Adds SigFox sensor (#13731)

* Create sigfox.py

* Create test_sigfox.py

* Update .coveragerc

* Fix lints

* Fix logger message string

* More lints

* Address reviewer comments

* edit exception handling

* Update sigfox.py

* Update sigfox.py

* Update sigfox.py

* Update sigfox.py
This commit is contained in:
Robin 2018-04-17 12:08:32 +01:00 committed by Pascal Vizeli
parent 9487bd455a
commit 569f5c111f
3 changed files with 230 additions and 0 deletions

View File

@ -649,6 +649,7 @@ omit =
homeassistant/components/sensor/serial_pm.py
homeassistant/components/sensor/serial.py
homeassistant/components/sensor/shodan.py
homeassistant/components/sensor/sigfox.py
homeassistant/components/sensor/simulated.py
homeassistant/components/sensor/skybeacon.py
homeassistant/components/sensor/sma.py

View File

@ -0,0 +1,161 @@
"""
Sensor for SigFox devices.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.sigfox/
"""
import logging
import datetime
import json
from urllib.parse import urljoin
import requests
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import CONF_NAME
from homeassistant.helpers.entity import Entity
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = datetime.timedelta(seconds=30)
API_URL = 'https://backend.sigfox.com/api/'
CONF_API_LOGIN = 'api_login'
CONF_API_PASSWORD = 'api_password'
DEFAULT_NAME = 'sigfox'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_API_LOGIN): cv.string,
vol.Required(CONF_API_PASSWORD): cv.string,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
})
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up the sigfox sensor."""
api_login = config[CONF_API_LOGIN]
api_password = config[CONF_API_PASSWORD]
name = config[CONF_NAME]
try:
sigfox = SigfoxAPI(api_login, api_password)
except ValueError:
return False
auth = sigfox.auth
devices = sigfox.devices
sensors = []
for device in devices:
sensors.append(SigfoxDevice(device, auth, name))
add_devices(sensors, True)
def epoch_to_datetime(epoch_time):
"""Take an ms since epoch and return datetime string."""
return datetime.datetime.fromtimestamp(epoch_time).isoformat()
class SigfoxAPI(object):
"""Class for interacting with the SigFox API."""
def __init__(self, api_login, api_password):
"""Initialise the API object."""
self._auth = requests.auth.HTTPBasicAuth(api_login, api_password)
if self.check_credentials():
device_types = self.get_device_types()
self._devices = self.get_devices(device_types)
def check_credentials(self):
""""Check API credentials are valid."""
url = urljoin(API_URL, 'devicetypes')
response = requests.get(url, auth=self._auth, timeout=10)
if response.status_code != 200:
if response.status_code == 401:
_LOGGER.error(
"Invalid credentials for Sigfox API")
else:
_LOGGER.error(
"Unable to login to Sigfox API, error code %s", str(
response.status_code))
raise ValueError('Sigfox component not setup')
return True
def get_device_types(self):
"""Get a list of device types."""
url = urljoin(API_URL, 'devicetypes')
response = requests.get(url, auth=self._auth, timeout=10)
device_types = []
for device in json.loads(response.text)['data']:
device_types.append(device['id'])
return device_types
def get_devices(self, device_types):
"""Get the device_id of each device registered."""
devices = []
for unique_type in device_types:
location_url = 'devicetypes/{}/devices'.format(unique_type)
url = urljoin(API_URL, location_url)
response = requests.get(url, auth=self._auth, timeout=10)
devices_data = json.loads(response.text)['data']
for device in devices_data:
devices.append(device['id'])
return devices
@property
def auth(self):
"""Return the API authentification."""
return self._auth
@property
def devices(self):
"""Return the list of device_id."""
return self._devices
class SigfoxDevice(Entity):
"""Class for single sigfox device."""
def __init__(self, device_id, auth, name):
"""Initialise the device object."""
self._device_id = device_id
self._auth = auth
self._message_data = {}
self._name = '{}_{}'.format(name, device_id)
self._state = None
def get_last_message(self):
"""Return the last message from a device."""
device_url = 'devices/{}/messages?limit=1'.format(self._device_id)
url = urljoin(API_URL, device_url)
response = requests.get(url, auth=self._auth, timeout=10)
data = json.loads(response.text)['data'][0]
payload = bytes.fromhex(data['data']).decode('utf-8')
lat = data['rinfos'][0]['lat']
lng = data['rinfos'][0]['lng']
snr = data['snr']
epoch_time = data['time']
return {'lat': lat,
'lng': lng,
'payload': payload,
'snr': snr,
'time': epoch_to_datetime(epoch_time)}
def update(self):
"""Fetch the latest device message."""
self._message_data = self.get_last_message()
self._state = self._message_data['payload']
@property
def name(self):
"""Return the HA name of the sensor."""
return self._name
@property
def state(self):
"""Return the payload of the last message."""
return self._state
@property
def device_state_attributes(self):
"""Return other details about the last message."""
return self._message_data

View File

@ -0,0 +1,68 @@
"""Tests for the sigfox sensor."""
import re
import requests_mock
import unittest
from homeassistant.components.sensor.sigfox import (
API_URL, CONF_API_LOGIN, CONF_API_PASSWORD)
from homeassistant.setup import setup_component
from tests.common import get_test_home_assistant
TEST_API_LOGIN = 'foo'
TEST_API_PASSWORD = 'ebcd1234'
VALID_CONFIG = {
'sensor': {
'platform': 'sigfox',
CONF_API_LOGIN: TEST_API_LOGIN,
CONF_API_PASSWORD: TEST_API_PASSWORD}}
VALID_MESSAGE = """
{"data":[{
"time":1521879720,
"data":"7061796c6f6164",
"rinfos":[{"lat":"0.0","lng":"0.0"}],
"snr":"50.0"}]}
"""
class TestSigfoxSensor(unittest.TestCase):
"""Test the sigfox platform."""
def setUp(self):
"""Initialize values for this testcase class."""
self.hass = get_test_home_assistant()
def tearDown(self):
"""Stop everything that was started."""
self.hass.stop()
def test_invalid_credentials(self):
"""Test for a invalid credentials."""
with requests_mock.Mocker() as mock_req:
url = re.compile(API_URL + 'devicetypes')
mock_req.get(url, text='{}', status_code=401)
self.assertTrue(
setup_component(self.hass, 'sensor', VALID_CONFIG))
assert len(self.hass.states.entity_ids()) == 0
def test_valid_credentials(self):
"""Test for a valid credentials."""
with requests_mock.Mocker() as mock_req:
url1 = re.compile(API_URL + 'devicetypes')
mock_req.get(url1, text='{"data":[{"id":"fake_type"}]}',
status_code=200)
url2 = re.compile(API_URL + 'devicetypes/fake_type/devices')
mock_req.get(url2, text='{"data":[{"id":"fake_id"}]}')
url3 = re.compile(API_URL + 'devices/fake_id/messages*')
mock_req.get(url3, text=VALID_MESSAGE)
self.assertTrue(
setup_component(self.hass, 'sensor', VALID_CONFIG))
assert len(self.hass.states.entity_ids()) == 1
state = self.hass.states.get('sensor.sigfox_fake_id')
assert state.state == 'payload'
assert state.attributes.get('snr') == '50.0'