Support for encrypted payload (#3587)

This commit is contained in:
Erik Eriksson 2016-10-04 09:57:37 +02:00 committed by Paulus Schoutsen
parent 8592ba3cb9
commit 287a7e2720
3 changed files with 195 additions and 5 deletions

View File

@ -7,6 +7,7 @@ https://home-assistant.io/components/device_tracker.owntracks/
import json
import logging
import threading
import base64
from collections import defaultdict
import voluptuous as vol
@ -19,6 +20,7 @@ from homeassistant.components import zone as zone_comp
from homeassistant.components.device_tracker import PLATFORM_SCHEMA
DEPENDENCIES = ['mqtt']
REQUIREMENTS = ['libnacl==1.5.0']
REGIONS_ENTERED = defaultdict(list)
MOBILE_BEACONS_ACTIVE = defaultdict(list)
@ -36,6 +38,7 @@ LOCK = threading.Lock()
CONF_MAX_GPS_ACCURACY = 'max_gps_accuracy'
CONF_WAYPOINT_IMPORT = 'waypoints'
CONF_WAYPOINT_WHITELIST = 'waypoint_whitelist'
CONF_SECRET = 'secret'
VALIDATE_LOCATION = 'location'
VALIDATE_TRANSITION = 'transition'
@ -47,24 +50,88 @@ WAYPOINT_LON_KEY = 'lon'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_MAX_GPS_ACCURACY): vol.Coerce(float),
vol.Optional(CONF_WAYPOINT_IMPORT, default=True): cv.boolean,
vol.Optional(CONF_WAYPOINT_WHITELIST): vol.All(cv.ensure_list, [cv.string])
vol.Optional(CONF_WAYPOINT_WHITELIST): vol.All(
cv.ensure_list, [cv.string]),
vol.Optional(CONF_SECRET): vol.Any(
vol.Schema({vol.Optional(cv.string): cv.string}),
cv.string)
})
def get_cipher():
"""Return decryption function and length of key."""
from libnacl import crypto_secretbox_KEYBYTES as KEYLEN
from libnacl.secret import SecretBox
def decrypt(ciphertext, key):
"""Decrypt ciphertext using key."""
return SecretBox(key).decrypt(ciphertext)
return (KEYLEN, decrypt)
def setup_scanner(hass, config, see):
"""Setup an OwnTracks tracker."""
max_gps_accuracy = config.get(CONF_MAX_GPS_ACCURACY)
waypoint_import = config.get(CONF_WAYPOINT_IMPORT)
waypoint_whitelist = config.get(CONF_WAYPOINT_WHITELIST)
secret = config.get(CONF_SECRET)
def validate_payload(payload, data_type):
def decrypt_payload(topic, ciphertext):
"""Decrypt encrypted payload."""
try:
keylen, decrypt = get_cipher()
except OSError:
_LOGGER.warning('Ignoring encrypted payload '
'because libsodium not installed.')
return None
if isinstance(secret, dict):
key = secret.get(topic)
else:
key = secret
if key is None:
_LOGGER.warning('Ignoring encrypted payload '
'because no decryption key known '
'for topic %s.', topic)
return None
key = key.encode("utf-8")
key = key[:keylen]
key = key.ljust(keylen, b'\0')
try:
ciphertext = base64.b64decode(ciphertext)
message = decrypt(ciphertext, key)
message = message.decode("utf-8")
_LOGGER.debug("Decrypted payload: %s", message)
return message
except ValueError:
_LOGGER.warning('Ignoring encrypted payload '
'because unable to decrypt using key '
'for topic %s.', topic)
return None
def validate_payload(topic, payload, data_type):
"""Validate OwnTracks payload."""
# pylint: disable=too-many-return-statements
try:
data = json.loads(payload)
except ValueError:
# If invalid JSON
_LOGGER.error('Unable to parse payload as JSON: %s', payload)
return None
if isinstance(data, dict) and \
data.get('_type') == 'encrypted' and \
'data' in data:
plaintext_payload = decrypt_payload(topic, data['data'])
if plaintext_payload is None:
return None
else:
return validate_payload(topic, plaintext_payload, data_type)
if not isinstance(data, dict) or data.get('_type') != data_type:
_LOGGER.debug('Skipping %s update for following data '
'because of missing or malformatted data: %s',
@ -90,7 +157,7 @@ def setup_scanner(hass, config, see):
"""MQTT message received."""
# Docs on available data:
# http://owntracks.org/booklet/tech/json/#_typelocation
data = validate_payload(payload, VALIDATE_LOCATION)
data = validate_payload(topic, payload, VALIDATE_LOCATION)
if not data:
return
@ -111,7 +178,7 @@ def setup_scanner(hass, config, see):
"""MQTT event (geofences) received."""
# Docs on available data:
# http://owntracks.org/booklet/tech/json/#_typetransition
data = validate_payload(payload, VALIDATE_TRANSITION)
data = validate_payload(topic, payload, VALIDATE_TRANSITION)
if not data:
return
@ -206,7 +273,7 @@ def setup_scanner(hass, config, see):
"""List of waypoints published by a user."""
# Docs on available data:
# http://owntracks.org/booklet/tech/json/#_typewaypoints
data = validate_payload(payload, VALIDATE_WAYPOINTS)
data = validate_payload(topic, payload, VALIDATE_WAYPOINTS)
if not data:
return

View File

@ -235,6 +235,9 @@ keyring>=9.3,<10.0
# homeassistant.components.knx
knxip==0.3.3
# homeassistant.components.device_tracker.owntracks
libnacl==1.5.0
# homeassistant.components.light.lifx
liffylights==0.9.4

View File

@ -2,6 +2,7 @@
import json
import os
import unittest
from unittest.mock import patch
from collections import defaultdict
@ -31,6 +32,7 @@ REGION_TRACKER_STATE = "device_tracker.beacon_{}".format(IBEACON_DEVICE)
CONF_MAX_GPS_ACCURACY = 'max_gps_accuracy'
CONF_WAYPOINT_IMPORT = owntracks.CONF_WAYPOINT_IMPORT
CONF_WAYPOINT_WHITELIST = owntracks.CONF_WAYPOINT_WHITELIST
CONF_SECRET = owntracks.CONF_SECRET
LOCATION_MESSAGE = {
'batt': 92,
@ -184,6 +186,26 @@ REGION_LEAVE_ZERO_MESSAGE = {
BAD_JSON_PREFIX = '--$this is bad json#--'
BAD_JSON_SUFFIX = '** and it ends here ^^'
SECRET_KEY = "s3cretkey"
ENCRYPTED_LOCATION_MESSAGE = {
# Encrypted version of LOCATION_MESSAGE using libsodium and SECRET_KEY
'_type': 'encrypted',
'data': ('qm1A83I6TVFRmH5343xy+cbex8jBBxDFkHRuJhELVKVRA/DgXcyKtghw'
'9pOw75Lo4gHcyy2wV5CmkjrpKEBR7Qhye4AR0y7hOvlx6U/a3GuY1+W8'
'I4smrLkwMvGgBOzXSNdVTzbFTHDvG3gRRaNHFkt2+5MsbH2Dd6CXmpzq'
'DIfSN7QzwOevuvNIElii5MlFxI6ZnYIDYA/ZdnAXHEVsNIbyT2N0CXt3'
'fTPzgGtFzsufx40EEUkC06J7QTJl7lLG6qaLW1cCWp86Vp0eL3vtZ6xq')}
MOCK_ENCRYPTED_LOCATION_MESSAGE = {
# Mock-encrypted version of LOCATION_MESSAGE using pickle
'_type': 'encrypted',
'data': ('gANDCXMzY3JldGtleXEAQ6p7ImxvbiI6IDEuMCwgInQiOiAidSIsICJi'
'YXR0IjogOTIsICJhY2MiOiA2MCwgInZlbCI6IDAsICJfdHlwZSI6ICJs'
'b2NhdGlvbiIsICJ2YWMiOiA0LCAicCI6IDEwMS4zOTc3NTg0ODM4ODY3'
'LCAidHN0IjogMSwgImxhdCI6IDIuMCwgImFsdCI6IDI3LCAiY29nIjog'
'MjQ4LCAidGlkIjogInVzZXIifXEBhnECLg==')
}
class TestDeviceTrackerOwnTracks(unittest.TestCase):
"""Test the OwnTrack sensor."""
@ -650,3 +672,101 @@ class TestDeviceTrackerOwnTracks(unittest.TestCase):
self.send_message(WAYPOINT_TOPIC, waypoints_message)
new_wayp = self.hass.states.get(WAYPOINT_ENTITY_NAMES[0])
self.assertTrue(wayp == new_wayp)
try:
import libnacl
except (ImportError, OSError):
libnacl = None
@unittest.skipUnless(libnacl,
"libnacl/libsodium is not installed")
def test_encrypted_payload_libsodium(self):
"""Test sending encrypted message payload."""
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {
CONF_PLATFORM: 'owntracks',
CONF_SECRET: SECRET_KEY,
}}))
self.send_message(LOCATION_TOPIC, ENCRYPTED_LOCATION_MESSAGE)
self.assert_location_latitude(2.0)
def mock_cipher():
"""Return a dummy pickle-based cipher."""
def mock_decrypt(ciphertext, key):
"""Decrypt/unpickle."""
import pickle
(mkey, plaintext) = pickle.loads(ciphertext)
if key != mkey:
raise ValueError()
return plaintext
return (len(SECRET_KEY), mock_decrypt)
@patch('homeassistant.components.device_tracker.owntracks.get_cipher',
mock_cipher)
def test_encrypted_payload(self):
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {
CONF_PLATFORM: 'owntracks',
CONF_SECRET: SECRET_KEY,
}}))
self.send_message(LOCATION_TOPIC, MOCK_ENCRYPTED_LOCATION_MESSAGE)
self.assert_location_latitude(2.0)
@patch('homeassistant.components.device_tracker.owntracks.get_cipher',
mock_cipher)
def test_encrypted_payload_topic_key(self):
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {
CONF_PLATFORM: 'owntracks',
CONF_SECRET: {
LOCATION_TOPIC: SECRET_KEY,
}}}))
self.send_message(LOCATION_TOPIC, MOCK_ENCRYPTED_LOCATION_MESSAGE)
self.assert_location_latitude(2.0)
@patch('homeassistant.components.device_tracker.owntracks.get_cipher',
mock_cipher)
def test_encrypted_payload_no_key(self):
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {
CONF_PLATFORM: 'owntracks',
# key missing
}}))
self.send_message(LOCATION_TOPIC, MOCK_ENCRYPTED_LOCATION_MESSAGE)
self.assert_location_latitude(None)
@patch('homeassistant.components.device_tracker.owntracks.get_cipher',
mock_cipher)
def test_encrypted_payload_wrong_key(self):
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {
CONF_PLATFORM: 'owntracks',
CONF_SECRET: 'wrong key',
}}))
self.send_message(LOCATION_TOPIC, MOCK_ENCRYPTED_LOCATION_MESSAGE)
self.assert_location_latitude(None)
@patch('homeassistant.components.device_tracker.owntracks.get_cipher',
mock_cipher)
def test_encrypted_payload_wrong_topic_key(self):
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {
CONF_PLATFORM: 'owntracks',
CONF_SECRET: {
LOCATION_TOPIC: "wrong key"
}}}))
self.send_message(LOCATION_TOPIC, MOCK_ENCRYPTED_LOCATION_MESSAGE)
self.assert_location_latitude(None)
@patch('homeassistant.components.device_tracker.owntracks.get_cipher',
mock_cipher)
def test_encrypted_payload_no_topic_key(self):
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {
CONF_PLATFORM: 'owntracks',
CONF_SECRET: {
"owntracks/{}/{}".format(USER, "otherdevice"): "foobar"
}}}))
self.send_message(LOCATION_TOPIC, MOCK_ENCRYPTED_LOCATION_MESSAGE)
self.assert_location_latitude(None)