Support for Unifi direct access device tracker (No unifi controller software) (#10097)

This commit is contained in:
William Scanlon 2017-11-17 14:47:40 -05:00 committed by GitHub
parent 5b44e83c0f
commit 2664ca498e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 309 additions and 0 deletions

View File

@ -0,0 +1,134 @@
"""
Support for Unifi AP direct access.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/device_tracker.unifi_direct/
"""
import logging
import json
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import (
CONF_HOST, CONF_PASSWORD, CONF_USERNAME,
CONF_PORT)
REQUIREMENTS = ['pexpect==4.0.1']
_LOGGER = logging.getLogger(__name__)
DEFAULT_SSH_PORT = 22
UNIFI_COMMAND = 'mca-dump | tr -d "\n"'
UNIFI_SSID_TABLE = "vap_table"
UNIFI_CLIENT_TABLE = "sta_table"
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Required(CONF_USERNAME): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_SSH_PORT): cv.port
})
# pylint: disable=unused-argument
def get_scanner(hass, config):
"""Validate the configuration and return a Unifi direct scanner."""
scanner = UnifiDeviceScanner(config[DOMAIN])
if not scanner.connected:
return False
return scanner
class UnifiDeviceScanner(DeviceScanner):
"""This class queries Unifi wireless access point."""
def __init__(self, config):
"""Initialize the scanner."""
self.host = config[CONF_HOST]
self.username = config[CONF_USERNAME]
self.password = config[CONF_PASSWORD]
self.port = config[CONF_PORT]
self.ssh = None
self.connected = False
self.last_results = {}
self._connect()
def scan_devices(self):
"""Scan for new devices and return a list with found device IDs."""
result = _response_to_json(self._get_update())
if result:
self.last_results = result
return self.last_results.keys()
def get_device_name(self, device):
"""Return the name of the given device or None if we don't know."""
hostname = next((
value.get('hostname') for key, value in self.last_results.items()
if key.upper() == device.upper()), None)
if hostname is not None:
hostname = str(hostname)
return hostname
def _connect(self):
"""Connect to the Unifi AP SSH server."""
from pexpect import pxssh, exceptions
self.ssh = pxssh.pxssh()
try:
self.ssh.login(self.host, self.username,
password=self.password, port=self.port)
self.connected = True
except exceptions.EOF:
_LOGGER.error("Connection refused. SSH enabled?")
self._disconnect()
def _disconnect(self):
"""Disconnect the current SSH connection."""
# pylint: disable=broad-except
try:
self.ssh.logout()
except Exception:
pass
finally:
self.ssh = None
self.connected = False
def _get_update(self):
from pexpect import pxssh
try:
if not self.connected:
self._connect()
self.ssh.sendline(UNIFI_COMMAND)
self.ssh.prompt()
return self.ssh.before
except pxssh.ExceptionPxssh as err:
_LOGGER.error("Unexpected SSH error: %s", str(err))
self._disconnect()
return None
except AssertionError as err:
_LOGGER.error("Connection to AP unavailable: %s", str(err))
self._disconnect()
return None
def _response_to_json(response):
try:
json_response = json.loads(str(response)[31:-1].replace("\\", ""))
_LOGGER.debug(str(json_response))
ssid_table = json_response.get(UNIFI_SSID_TABLE)
active_clients = {}
for ssid in ssid_table:
client_table = ssid.get(UNIFI_CLIENT_TABLE)
for client in client_table:
active_clients[client.get("mac")] = client
return active_clients
except ValueError:
_LOGGER.error("Failed to decode response from AP.")
return {}

View File

@ -523,6 +523,7 @@ pdunehd==1.3
# homeassistant.components.device_tracker.aruba
# homeassistant.components.device_tracker.asuswrt
# homeassistant.components.device_tracker.cisco_ios
# homeassistant.components.device_tracker.unifi_direct
# homeassistant.components.media_player.pandora
pexpect==4.0.1

View File

@ -101,6 +101,7 @@ paho-mqtt==1.3.1
# homeassistant.components.device_tracker.aruba
# homeassistant.components.device_tracker.asuswrt
# homeassistant.components.device_tracker.cisco_ios
# homeassistant.components.device_tracker.unifi_direct
# homeassistant.components.media_player.pandora
pexpect==4.0.1

View File

@ -0,0 +1,172 @@
"""The tests for the Unifi direct device tracker platform."""
import os
from datetime import timedelta
import unittest
from unittest import mock
from unittest.mock import patch
import pytest
import voluptuous as vol
from homeassistant.setup import setup_component
from homeassistant.components import device_tracker
from homeassistant.components.device_tracker import (
CONF_CONSIDER_HOME, CONF_TRACK_NEW)
from homeassistant.components.device_tracker.unifi_direct import (
DOMAIN, CONF_PORT, PLATFORM_SCHEMA, _response_to_json, get_scanner)
from homeassistant.const import (CONF_PLATFORM, CONF_PASSWORD, CONF_USERNAME,
CONF_HOST)
from tests.common import (
get_test_home_assistant, assert_setup_component,
mock_component, load_fixture)
class TestComponentsDeviceTrackerUnifiDirect(unittest.TestCase):
"""Tests for the Unifi direct device tracker platform."""
hass = None
scanner_path = 'homeassistant.components.device_tracker.' + \
'unifi_direct.UnifiDeviceScanner'
def setup_method(self, _):
"""Setup things to be run when tests are started."""
self.hass = get_test_home_assistant()
mock_component(self.hass, 'zone')
def teardown_method(self, _):
"""Stop everything that was started."""
self.hass.stop()
try:
os.remove(self.hass.config.path(device_tracker.YAML_DEVICES))
except FileNotFoundError:
pass
@mock.patch(scanner_path,
return_value=mock.MagicMock())
def test_get_scanner(self, unifi_mock): \
# pylint: disable=invalid-name
"""Test creating an Unifi direct scanner with a password."""
conf_dict = {
DOMAIN: {
CONF_PLATFORM: 'unifi_direct',
CONF_HOST: 'fake_host',
CONF_USERNAME: 'fake_user',
CONF_PASSWORD: 'fake_pass',
CONF_TRACK_NEW: True,
CONF_CONSIDER_HOME: timedelta(seconds=180)
}
}
with assert_setup_component(1, DOMAIN):
assert setup_component(self.hass, DOMAIN, conf_dict)
conf_dict[DOMAIN][CONF_PORT] = 22
self.assertEqual(unifi_mock.call_args, mock.call(conf_dict[DOMAIN]))
@patch('pexpect.pxssh.pxssh')
def test_get_device_name(self, mock_ssh):
""""Testing MAC matching."""
conf_dict = {
DOMAIN: {
CONF_PLATFORM: 'unifi_direct',
CONF_HOST: 'fake_host',
CONF_USERNAME: 'fake_user',
CONF_PASSWORD: 'fake_pass',
CONF_PORT: 22,
CONF_TRACK_NEW: True,
CONF_CONSIDER_HOME: timedelta(seconds=180)
}
}
mock_ssh.return_value.before = load_fixture('unifi_direct.txt')
scanner = get_scanner(self.hass, conf_dict)
devices = scanner.scan_devices()
self.assertEqual(23, len(devices))
self.assertEqual("iPhone",
scanner.get_device_name("98:00:c6:56:34:12"))
self.assertEqual("iPhone",
scanner.get_device_name("98:00:C6:56:34:12"))
@patch('pexpect.pxssh.pxssh.logout')
@patch('pexpect.pxssh.pxssh.login')
def test_failed_to_log_in(self, mock_login, mock_logout):
""""Testing exception at login results in False."""
from pexpect import exceptions
conf_dict = {
DOMAIN: {
CONF_PLATFORM: 'unifi_direct',
CONF_HOST: 'fake_host',
CONF_USERNAME: 'fake_user',
CONF_PASSWORD: 'fake_pass',
CONF_PORT: 22,
CONF_TRACK_NEW: True,
CONF_CONSIDER_HOME: timedelta(seconds=180)
}
}
mock_login.side_effect = exceptions.EOF("Test")
scanner = get_scanner(self.hass, conf_dict)
self.assertFalse(scanner)
@patch('pexpect.pxssh.pxssh.logout')
@patch('pexpect.pxssh.pxssh.login', autospec=True)
@patch('pexpect.pxssh.pxssh.prompt')
@patch('pexpect.pxssh.pxssh.sendline')
def test_to_get_update(self, mock_sendline, mock_prompt, mock_login,
mock_logout):
""""Testing exception in get_update matching."""
conf_dict = {
DOMAIN: {
CONF_PLATFORM: 'unifi_direct',
CONF_HOST: 'fake_host',
CONF_USERNAME: 'fake_user',
CONF_PASSWORD: 'fake_pass',
CONF_PORT: 22,
CONF_TRACK_NEW: True,
CONF_CONSIDER_HOME: timedelta(seconds=180)
}
}
scanner = get_scanner(self.hass, conf_dict)
# mock_sendline.side_effect = AssertionError("Test")
mock_prompt.side_effect = AssertionError("Test")
devices = scanner._get_update() # pylint: disable=protected-access
self.assertTrue(devices is None)
def test_good_reponse_parses(self):
"""Test that the response form the AP parses to JSON correctly."""
response = _response_to_json(load_fixture('unifi_direct.txt'))
self.assertTrue(response != {})
def test_bad_reponse_returns_none(self):
"""Test that a bad response form the AP parses to JSON correctly."""
self.assertTrue(_response_to_json("{(}") == {})
def test_config_error():
"""Test for configuration errors."""
with pytest.raises(vol.Invalid):
PLATFORM_SCHEMA({
# no username
CONF_PASSWORD: 'password',
CONF_PLATFORM: DOMAIN,
CONF_HOST: 'myhost',
'port': 123,
})
with pytest.raises(vol.Invalid):
PLATFORM_SCHEMA({
# no password
CONF_USERNAME: 'foo',
CONF_PLATFORM: DOMAIN,
CONF_HOST: 'myhost',
'port': 123,
})
with pytest.raises(vol.Invalid):
PLATFORM_SCHEMA({
CONF_PLATFORM: DOMAIN,
CONF_USERNAME: 'foo',
CONF_PASSWORD: 'password',
CONF_HOST: 'myhost',
'port': 'foo', # bad port!
})

1
tests/fixtures/unifi_direct.txt vendored Normal file

File diff suppressed because one or more lines are too long