Add config flow to fritzbox_callmonitor (#40736)

This commit is contained in:
springstan 2021-01-27 16:53:45 +01:00 committed by GitHub
parent f14c4412b7
commit 566058f701
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1091 additions and 177 deletions

View File

@ -314,6 +314,9 @@ omit =
homeassistant/components/freebox/sensor.py
homeassistant/components/freebox/switch.py
homeassistant/components/fritz/device_tracker.py
homeassistant/components/fritzbox_callmonitor/__init__.py
homeassistant/components/fritzbox_callmonitor/const.py
homeassistant/components/fritzbox_callmonitor/base.py
homeassistant/components/fritzbox_callmonitor/sensor.py
homeassistant/components/fritzbox_netmonitor/sensor.py
homeassistant/components/fronius/sensor.py

View File

@ -1 +1,92 @@
"""The fritzbox_callmonitor component."""
"""The fritzbox_callmonitor integration."""
from asyncio import gather
import logging
from fritzconnection.core.exceptions import FritzConnectionException, FritzSecurityError
from requests.exceptions import ConnectionError as RequestsConnectionError
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.exceptions import ConfigEntryNotReady
from .base import FritzBoxPhonebook
from .const import (
CONF_PHONEBOOK,
CONF_PREFIXES,
DOMAIN,
FRITZBOX_PHONEBOOK,
PLATFORMS,
UNDO_UPDATE_LISTENER,
)
_LOGGER = logging.getLogger(__name__)
async def async_setup(hass, config):
"""Set up the fritzbox_callmonitor integration."""
return True
async def async_setup_entry(hass, config_entry):
"""Set up the fritzbox_callmonitor platforms."""
fritzbox_phonebook = FritzBoxPhonebook(
host=config_entry.data[CONF_HOST],
username=config_entry.data[CONF_USERNAME],
password=config_entry.data[CONF_PASSWORD],
phonebook_id=config_entry.data[CONF_PHONEBOOK],
prefixes=config_entry.options.get(CONF_PREFIXES),
)
try:
await hass.async_add_executor_job(fritzbox_phonebook.init_phonebook)
except FritzSecurityError as ex:
_LOGGER.error(
"User has insufficient permissions to access AVM FRITZ!Box settings and its phonebooks: %s",
ex,
)
return False
except FritzConnectionException as ex:
_LOGGER.error("Invalid authentication: %s", ex)
return False
except RequestsConnectionError as ex:
_LOGGER.error("Unable to connect to AVM FRITZ!Box call monitor: %s", ex)
raise ConfigEntryNotReady from ex
undo_listener = config_entry.add_update_listener(update_listener)
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][config_entry.entry_id] = {
FRITZBOX_PHONEBOOK: fritzbox_phonebook,
UNDO_UPDATE_LISTENER: undo_listener,
}
for component in PLATFORMS:
hass.async_create_task(
hass.config_entries.async_forward_entry_setup(config_entry, component)
)
return True
async def async_unload_entry(hass, config_entry):
"""Unloading the fritzbox_callmonitor platforms."""
unload_ok = all(
await gather(
*[
hass.config_entries.async_forward_entry_unload(config_entry, component)
for component in PLATFORMS
]
)
)
hass.data[DOMAIN][config_entry.entry_id][UNDO_UPDATE_LISTENER]()
if unload_ok:
hass.data[DOMAIN].pop(config_entry.entry_id)
return unload_ok
async def update_listener(hass, config_entry):
"""Update listener to reload after option has changed."""
await hass.config_entries.async_reload(config_entry.entry_id)

View File

@ -0,0 +1,79 @@
"""Base class for fritzbox_callmonitor entities."""
from datetime import timedelta
import logging
import re
from fritzconnection.lib.fritzphonebook import FritzPhonebook
from homeassistant.util import Throttle
from .const import REGEX_NUMBER, UNKOWN_NAME
_LOGGER = logging.getLogger(__name__)
# Return cached results if phonebook was downloaded less then this time ago.
MIN_TIME_PHONEBOOK_UPDATE = timedelta(hours=6)
class FritzBoxPhonebook:
"""This connects to a FritzBox router and downloads its phone book."""
def __init__(self, host, username, password, phonebook_id, prefixes):
"""Initialize the class."""
self.host = host
self.username = username
self.password = password
self.phonebook_id = phonebook_id
self.phonebook_dict = None
self.number_dict = None
self.prefixes = prefixes
self.fph = None
def init_phonebook(self):
"""Establish a connection to the FRITZ!Box and check if phonebook_id is valid."""
self.fph = FritzPhonebook(
address=self.host,
user=self.username,
password=self.password,
)
self.update_phonebook()
@Throttle(MIN_TIME_PHONEBOOK_UPDATE)
def update_phonebook(self):
"""Update the phone book dictionary."""
if not self.phonebook_id:
return
self.phonebook_dict = self.fph.get_all_names(self.phonebook_id)
self.number_dict = {
re.sub(REGEX_NUMBER, "", nr): name
for name, nrs in self.phonebook_dict.items()
for nr in nrs
}
_LOGGER.info("Fritz!Box phone book successfully updated")
def get_phonebook_ids(self):
"""Return list of phonebook ids."""
return self.fph.phonebook_ids
def get_name(self, number):
"""Return a name for a given phone number."""
number = re.sub(REGEX_NUMBER, "", str(number))
if self.number_dict is None:
return UNKOWN_NAME
if number in self.number_dict:
return self.number_dict[number]
if not self.prefixes:
return UNKOWN_NAME
for prefix in self.prefixes:
try:
return self.number_dict[prefix + number]
except KeyError:
pass
try:
return self.number_dict[prefix + number.lstrip("0")]
except KeyError:
pass

View File

@ -0,0 +1,265 @@
"""Config flow for fritzbox_callmonitor."""
from fritzconnection import FritzConnection
from fritzconnection.core.exceptions import FritzConnectionException, FritzSecurityError
from requests.exceptions import ConnectionError as RequestsConnectionError
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import (
CONF_HOST,
CONF_NAME,
CONF_PASSWORD,
CONF_PORT,
CONF_USERNAME,
)
from homeassistant.core import callback
from .base import FritzBoxPhonebook
# pylint:disable=unused-import
from .const import (
CONF_PHONEBOOK,
CONF_PREFIXES,
DEFAULT_HOST,
DEFAULT_PHONEBOOK,
DEFAULT_PORT,
DEFAULT_USERNAME,
DOMAIN,
FRITZ_ACTION_GET_INFO,
FRITZ_ATTR_NAME,
FRITZ_ATTR_SERIAL_NUMBER,
FRITZ_SERVICE_DEVICE_INFO,
SERIAL_NUMBER,
)
DATA_SCHEMA_USER = vol.Schema(
{
vol.Required(CONF_HOST, default=DEFAULT_HOST): str,
vol.Required(CONF_PORT, default=DEFAULT_PORT): vol.Coerce(int),
vol.Required(CONF_USERNAME, default=DEFAULT_USERNAME): str,
vol.Required(CONF_PASSWORD): str,
}
)
RESULT_INVALID_AUTH = "invalid_auth"
RESULT_INSUFFICIENT_PERMISSIONS = "insufficient_permissions"
RESULT_MALFORMED_PREFIXES = "malformed_prefixes"
RESULT_NO_DEVIES_FOUND = "no_devices_found"
RESULT_SUCCESS = "success"
class FritzBoxCallMonitorConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a fritzbox_callmonitor config flow."""
VERSION = 1
CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_POLL
def __init__(self):
"""Initialize flow."""
self._host = None
self._port = None
self._username = None
self._password = None
self._phonebook_name = None
self._phonebook_names = None
self._phonebook_id = None
self._phonebook_ids = None
self._fritzbox_phonebook = None
self._prefixes = None
self._serial_number = None
def _get_config_entry(self):
"""Create and return an config entry."""
return self.async_create_entry(
title=self._phonebook_name,
data={
CONF_HOST: self._host,
CONF_PORT: self._port,
CONF_USERNAME: self._username,
CONF_PASSWORD: self._password,
CONF_PHONEBOOK: self._phonebook_id,
CONF_PREFIXES: self._prefixes,
SERIAL_NUMBER: self._serial_number,
},
)
def _try_connect(self):
"""Try to connect and check auth."""
self._fritzbox_phonebook = FritzBoxPhonebook(
host=self._host,
username=self._username,
password=self._password,
phonebook_id=self._phonebook_id,
prefixes=self._prefixes,
)
try:
self._fritzbox_phonebook.init_phonebook()
self._phonebook_ids = self._fritzbox_phonebook.get_phonebook_ids()
fritz_connection = FritzConnection(
address=self._host, user=self._username, password=self._password
)
device_info = fritz_connection.call_action(
FRITZ_SERVICE_DEVICE_INFO, FRITZ_ACTION_GET_INFO
)
self._serial_number = device_info[FRITZ_ATTR_SERIAL_NUMBER]
return RESULT_SUCCESS
except RequestsConnectionError:
return RESULT_NO_DEVIES_FOUND
except FritzSecurityError:
return RESULT_INSUFFICIENT_PERMISSIONS
except FritzConnectionException:
return RESULT_INVALID_AUTH
async def _get_name_of_phonebook(self, phonebook_id):
"""Return name of phonebook for given phonebook_id."""
phonebook_info = await self.hass.async_add_executor_job(
self._fritzbox_phonebook.fph.phonebook_info, phonebook_id
)
return phonebook_info[FRITZ_ATTR_NAME]
async def _get_list_of_phonebook_names(self):
"""Return list of names for all available phonebooks."""
phonebook_names = []
for phonebook_id in self._phonebook_ids:
phonebook_names.append(await self._get_name_of_phonebook(phonebook_id))
return phonebook_names
@staticmethod
@callback
def async_get_options_flow(config_entry):
"""Get the options flow for this handler."""
return FritzBoxCallMonitorOptionsFlowHandler(config_entry)
async def async_step_import(self, user_input=None):
"""Handle configuration by yaml file."""
return await self.async_step_user(user_input)
async def async_step_user(self, user_input=None):
"""Handle a flow initialized by the user."""
if user_input is None:
return self.async_show_form(
step_id="user", data_schema=DATA_SCHEMA_USER, errors={}
)
self._host = user_input[CONF_HOST]
self._port = user_input[CONF_PORT]
self._password = user_input[CONF_PASSWORD]
self._username = user_input[CONF_USERNAME]
result = await self.hass.async_add_executor_job(self._try_connect)
if result == RESULT_INVALID_AUTH:
return self.async_show_form(
step_id="user",
data_schema=DATA_SCHEMA_USER,
errors={"base": RESULT_INVALID_AUTH},
)
if result != RESULT_SUCCESS:
return self.async_abort(reason=result)
if ( # pylint: disable=no-member
self.context["source"] == config_entries.SOURCE_IMPORT
):
self._phonebook_id = user_input[CONF_PHONEBOOK]
self._phonebook_name = user_input[CONF_NAME]
elif len(self._phonebook_ids) > 1:
return await self.async_step_phonebook()
else:
self._phonebook_id = DEFAULT_PHONEBOOK
self._phonebook_name = await self._get_name_of_phonebook(self._phonebook_id)
await self.async_set_unique_id(f"{self._serial_number}-{self._phonebook_id}")
self._abort_if_unique_id_configured()
return self._get_config_entry()
async def async_step_phonebook(self, user_input=None):
"""Handle a flow to chose one of multiple available phonebooks."""
if self._phonebook_names is None:
self._phonebook_names = await self._get_list_of_phonebook_names()
if user_input is None:
return self.async_show_form(
step_id="phonebook",
data_schema=vol.Schema(
{vol.Required(CONF_PHONEBOOK): vol.In(self._phonebook_names)}
),
errors={},
)
self._phonebook_name = user_input[CONF_PHONEBOOK]
self._phonebook_id = self._phonebook_names.index(self._phonebook_name)
await self.async_set_unique_id(f"{self._serial_number}-{self._phonebook_id}")
self._abort_if_unique_id_configured()
return self._get_config_entry()
class FritzBoxCallMonitorOptionsFlowHandler(config_entries.OptionsFlow):
"""Handle a fritzbox_callmonitor options flow."""
def __init__(self, config_entry):
"""Initialize."""
self.config_entry = config_entry
@classmethod
def _are_prefixes_valid(cls, prefixes):
"""Check if prefixes are valid."""
return prefixes.strip() if prefixes else prefixes is None
@classmethod
def _get_list_of_prefixes(cls, prefixes):
"""Get list of prefixes."""
if prefixes is None:
return None
return [prefix.strip() for prefix in prefixes.split(",")]
def _get_option_schema_prefixes(self):
"""Get option schema for entering prefixes."""
return vol.Schema(
{
vol.Optional(
CONF_PREFIXES,
description={
"suggested_value": self.config_entry.options.get(CONF_PREFIXES)
},
): str
}
)
async def async_step_init(self, user_input=None):
"""Manage the options."""
option_schema_prefixes = self._get_option_schema_prefixes()
if user_input is None:
return self.async_show_form(
step_id="init",
data_schema=option_schema_prefixes,
errors={},
)
prefixes = user_input.get(CONF_PREFIXES)
if not self._are_prefixes_valid(prefixes):
return self.async_show_form(
step_id="init",
data_schema=option_schema_prefixes,
errors={"base": RESULT_MALFORMED_PREFIXES},
)
return self.async_create_entry(
title="", data={CONF_PREFIXES: self._get_list_of_prefixes(prefixes)}
)

View File

@ -0,0 +1,41 @@
"""Constants for the AVM Fritz!Box call monitor integration."""
STATE_RINGING = "ringing"
STATE_DIALING = "dialing"
STATE_TALKING = "talking"
STATE_IDLE = "idle"
FRITZ_STATE_RING = "RING"
FRITZ_STATE_CALL = "CALL"
FRITZ_STATE_CONNECT = "CONNECT"
FRITZ_STATE_DISCONNECT = "DISCONNECT"
ICON_PHONE = "mdi:phone"
ATTR_PREFIXES = "prefixes"
FRITZ_ACTION_GET_INFO = "GetInfo"
FRITZ_ATTR_NAME = "name"
FRITZ_ATTR_SERIAL_NUMBER = "NewSerialNumber"
FRITZ_SERVICE_DEVICE_INFO = "DeviceInfo"
UNKOWN_NAME = "unknown"
SERIAL_NUMBER = "serial_number"
REGEX_NUMBER = r"[^\d\+]"
CONF_PHONEBOOK = "phonebook"
CONF_PHONEBOOK_NAME = "phonebook_name"
CONF_PREFIXES = "prefixes"
DEFAULT_HOST = "169.254.1.1" # IP valid for all Fritz!Box routers
DEFAULT_PORT = 1012
DEFAULT_USERNAME = "admin"
DEFAULT_PHONEBOOK = 0
DEFAULT_NAME = "Phone"
DOMAIN = "fritzbox_callmonitor"
MANUFACTURER = "AVM"
PLATFORMS = ["sensor"]
UNDO_UPDATE_LISTENER = "undo_update_listener"
FRITZBOX_PHONEBOOK = "fritzbox_phonebook"

View File

@ -1,6 +1,7 @@
{
"domain": "fritzbox_callmonitor",
"name": "AVM FRITZ!Box Call Monitor",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/fritzbox_callmonitor",
"requirements": ["fritzconnection==1.4.0"],
"codeowners": []

View File

@ -1,15 +1,15 @@
"""Sensor to monitor incoming/outgoing phone calls on a Fritz!Box router."""
import datetime
from datetime import datetime, timedelta
import logging
import re
import socket
import threading
import time
import queue
from threading import Event as ThreadingEvent, Thread
from time import sleep
from fritzconnection.lib.fritzphonebook import FritzPhonebook
from fritzconnection.core.fritzmonitor import FritzMonitor
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.config_entries import SOURCE_IMPORT
from homeassistant.const import (
CONF_HOST,
CONF_NAME,
@ -20,97 +20,125 @@ from homeassistant.const import (
)
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity
from homeassistant.util import Throttle
from .const import (
ATTR_PREFIXES,
CONF_PHONEBOOK,
CONF_PREFIXES,
DEFAULT_HOST,
DEFAULT_NAME,
DEFAULT_PHONEBOOK,
DEFAULT_PORT,
DEFAULT_USERNAME,
DOMAIN,
FRITZ_STATE_CALL,
FRITZ_STATE_CONNECT,
FRITZ_STATE_DISCONNECT,
FRITZ_STATE_RING,
FRITZBOX_PHONEBOOK,
ICON_PHONE,
MANUFACTURER,
SERIAL_NUMBER,
STATE_DIALING,
STATE_IDLE,
STATE_RINGING,
STATE_TALKING,
UNKOWN_NAME,
)
_LOGGER = logging.getLogger(__name__)
CONF_PHONEBOOK = "phonebook"
CONF_PREFIXES = "prefixes"
DEFAULT_HOST = "169.254.1.1" # IP valid for all Fritz!Box routers
DEFAULT_USERNAME = "admin"
DEFAULT_NAME = "Phone"
DEFAULT_PORT = 1012
DEFAULT_PHONEBOOK = 0
INTERVAL_RECONNECT = 60
VALUE_CALL = "dialing"
VALUE_CONNECT = "talking"
VALUE_DEFAULT = "idle"
VALUE_DISCONNECT = "idle"
VALUE_RING = "ringing"
# Return cached results if phonebook was downloaded less then this time ago.
MIN_TIME_PHONEBOOK_UPDATE = datetime.timedelta(hours=6)
SCAN_INTERVAL = datetime.timedelta(hours=3)
SCAN_INTERVAL = timedelta(hours=3)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_HOST, default=DEFAULT_HOST): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
vol.Optional(CONF_USERNAME, default=DEFAULT_USERNAME): cv.string,
vol.Optional(CONF_PASSWORD): cv.string,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_PHONEBOOK, default=DEFAULT_PHONEBOOK): cv.positive_int,
vol.Optional(CONF_PREFIXES, default=[]): vol.All(cv.ensure_list, [cv.string]),
vol.Optional(CONF_PREFIXES): vol.All(cv.ensure_list, [cv.string]),
}
)
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up Fritz!Box call monitor sensor platform."""
name = config[CONF_NAME]
host = config[CONF_HOST]
# Try to resolve a hostname; if it is already an IP, it will be returned as-is
try:
host = socket.gethostbyname(host)
except OSError:
_LOGGER.error("Could not resolve hostname %s", host)
return
port = config[CONF_PORT]
username = config[CONF_USERNAME]
password = config.get(CONF_PASSWORD)
phonebook_id = config[CONF_PHONEBOOK]
prefixes = config[CONF_PREFIXES]
try:
phonebook = FritzBoxPhonebook(
host=host,
port=port,
username=username,
password=password,
phonebook_id=phonebook_id,
prefixes=prefixes,
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
"""Import the platform into a config entry."""
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_IMPORT}, data=config
)
except: # noqa: E722 pylint: disable=bare-except
phonebook = None
_LOGGER.warning("Phonebook with ID %s not found on Fritz!Box", phonebook_id)
)
sensor = FritzBoxCallSensor(name=name, phonebook=phonebook)
add_entities([sensor])
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up the fritzbox_callmonitor sensor from config_entry."""
fritzbox_phonebook = hass.data[DOMAIN][config_entry.entry_id][FRITZBOX_PHONEBOOK]
monitor = FritzBoxCallMonitor(host=host, port=port, sensor=sensor)
monitor.connect()
phonebook_name = config_entry.title
phonebook_id = config_entry.data[CONF_PHONEBOOK]
prefixes = config_entry.options.get(CONF_PREFIXES)
serial_number = config_entry.data[SERIAL_NUMBER]
host = config_entry.data[CONF_HOST]
port = config_entry.data[CONF_PORT]
def _stop_listener(_event):
monitor.stopped.set()
name = f"{fritzbox_phonebook.fph.modelname} Call Monitor {phonebook_name}"
unique_id = f"{serial_number}-{phonebook_id}"
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, _stop_listener)
sensor = FritzBoxCallSensor(
name=name,
unique_id=unique_id,
fritzbox_phonebook=fritzbox_phonebook,
prefixes=prefixes,
host=host,
port=port,
)
return monitor.sock is not None
hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_STOP, sensor.async_will_remove_from_hass()
)
async_add_entities([sensor])
class FritzBoxCallSensor(Entity):
"""Implementation of a Fritz!Box call monitor."""
def __init__(self, name, phonebook):
def __init__(self, name, unique_id, fritzbox_phonebook, prefixes, host, port):
"""Initialize the sensor."""
self._state = VALUE_DEFAULT
self._state = STATE_IDLE
self._attributes = {}
self._name = name
self.phonebook = phonebook
self._name = name.title()
self._unique_id = unique_id
self._fritzbox_phonebook = fritzbox_phonebook
self._prefixes = prefixes
self._host = host
self._port = port
self._monitor = None
async def async_added_to_hass(self):
"""Connect to FRITZ!Box to monitor its call state."""
_LOGGER.debug("Starting monitor for: %s", self.entity_id)
self._monitor = FritzBoxCallMonitor(
host=self._host,
port=self._port,
sensor=self,
)
self._monitor.connect()
async def async_will_remove_from_hass(self):
"""Disconnect from FRITZ!Box by stopping monitor."""
if (
self._monitor
and self._monitor.stopped
and not self._monitor.stopped.is_set()
and self._monitor.connection
and self._monitor.connection.is_alive
):
self._monitor.stopped.set()
self._monitor.connection.stop()
_LOGGER.debug("Stopped monitor for: %s", self.entity_id)
def set_state(self, state):
"""Set the state."""
@ -120,10 +148,15 @@ class FritzBoxCallSensor(Entity):
"""Set the state attributes."""
self._attributes = attributes
@property
def name(self):
"""Return name of this sensor."""
return self._name
@property
def should_poll(self):
"""Only poll to update phonebook, if defined."""
return self.phonebook is not None
return self._fritzbox_phonebook is not None
@property
def state(self):
@ -131,25 +164,43 @@ class FritzBoxCallSensor(Entity):
return self._state
@property
def name(self):
"""Return the name of the sensor."""
return self._name
def icon(self):
"""Return the icon of the sensor."""
return ICON_PHONE
@property
def device_state_attributes(self):
"""Return the state attributes."""
if self._prefixes:
self._attributes[ATTR_PREFIXES] = self._prefixes
return self._attributes
@property
def device_info(self):
"""Return device specific attributes."""
return {
"name": self._fritzbox_phonebook.fph.modelname,
"identifiers": {(DOMAIN, self._unique_id)},
"manufacturer": MANUFACTURER,
"model": self._fritzbox_phonebook.fph.modelname,
"sw_version": self._fritzbox_phonebook.fph.fc.system_version,
}
@property
def unique_id(self):
"""Return the unique ID of the device."""
return self._unique_id
def number_to_name(self, number):
"""Return a name for a given phone number."""
if self.phonebook is None:
return "unknown"
return self.phonebook.get_name(number)
if self._fritzbox_phonebook is None:
return UNKOWN_NAME
return self._fritzbox_phonebook.get_name(number)
def update(self):
"""Update the phonebook if it is defined."""
if self.phonebook is not None:
self.phonebook.update_phonebook()
if self._fritzbox_phonebook is not None:
self._fritzbox_phonebook.update_phonebook()
class FritzBoxCallMonitor:
@ -159,142 +210,78 @@ class FritzBoxCallMonitor:
"""Initialize Fritz!Box monitor instance."""
self.host = host
self.port = port
self.sock = None
self.connection = None
self.stopped = ThreadingEvent()
self._sensor = sensor
self.stopped = threading.Event()
def connect(self):
"""Connect to the Fritz!Box."""
_LOGGER.debug("Setting up socket...")
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(10)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
_LOGGER.debug("Setting up socket connection")
try:
self.sock.connect((self.host, self.port))
threading.Thread(target=self._listen).start()
self.connection = FritzMonitor(address=self.host, port=self.port)
kwargs = {"event_queue": self.connection.start()}
Thread(target=self._process_events, kwargs=kwargs).start()
except OSError as err:
self.sock = None
self.connection = None
_LOGGER.error(
"Cannot connect to %s on port %s: %s", self.host, self.port, err
)
def _listen(self):
def _process_events(self, event_queue):
"""Listen to incoming or outgoing calls."""
_LOGGER.debug("Connection established, waiting for response...")
while not self.stopped.isSet():
_LOGGER.debug("Connection established, waiting for events")
while not self.stopped.is_set():
try:
response = self.sock.recv(2048)
except socket.timeout:
# if no response after 10 seconds, just recv again
event = event_queue.get(timeout=10)
except queue.Empty:
if not self.connection.is_alive and not self.stopped.is_set():
_LOGGER.error("Connection has abruptly ended")
_LOGGER.debug("Empty event queue")
continue
response = str(response, "utf-8")
_LOGGER.debug("Received %s", response)
if not response:
# if the response is empty, the connection has been lost.
# try to reconnect
_LOGGER.warning("Connection lost, reconnecting...")
self.sock = None
while self.sock is None:
self.connect()
time.sleep(INTERVAL_RECONNECT)
else:
line = response.split("\n", 1)[0]
self._parse(line)
time.sleep(1)
_LOGGER.debug("Received event: %s", event)
self._parse(event)
sleep(1)
def _parse(self, line):
"""Parse the call information and set the sensor states."""
line = line.split(";")
df_in = "%d.%m.%y %H:%M:%S"
df_out = "%Y-%m-%dT%H:%M:%S"
isotime = datetime.datetime.strptime(line[0], df_in).strftime(df_out)
if line[1] == "RING":
self._sensor.set_state(VALUE_RING)
isotime = datetime.strptime(line[0], df_in).strftime(df_out)
if line[1] == FRITZ_STATE_RING:
self._sensor.set_state(STATE_RINGING)
att = {
"type": "incoming",
"from": line[3],
"to": line[4],
"device": line[5],
"initiated": isotime,
"from_name": self._sensor.number_to_name(line[3]),
}
att["from_name"] = self._sensor.number_to_name(att["from"])
self._sensor.set_attributes(att)
elif line[1] == "CALL":
self._sensor.set_state(VALUE_CALL)
elif line[1] == FRITZ_STATE_CALL:
self._sensor.set_state(STATE_DIALING)
att = {
"type": "outgoing",
"from": line[4],
"to": line[5],
"device": line[6],
"initiated": isotime,
"to_name": self._sensor.number_to_name(line[5]),
}
att["to_name"] = self._sensor.number_to_name(att["to"])
self._sensor.set_attributes(att)
elif line[1] == "CONNECT":
self._sensor.set_state(VALUE_CONNECT)
att = {"with": line[4], "device": line[3], "accepted": isotime}
att["with_name"] = self._sensor.number_to_name(att["with"])
elif line[1] == FRITZ_STATE_CONNECT:
self._sensor.set_state(STATE_TALKING)
att = {
"with": line[4],
"device": line[3],
"accepted": isotime,
"with_name": self._sensor.number_to_name(line[4]),
}
self._sensor.set_attributes(att)
elif line[1] == "DISCONNECT":
self._sensor.set_state(VALUE_DISCONNECT)
elif line[1] == FRITZ_STATE_DISCONNECT:
self._sensor.set_state(STATE_IDLE)
att = {"duration": line[3], "closed": isotime}
self._sensor.set_attributes(att)
self._sensor.schedule_update_ha_state()
class FritzBoxPhonebook:
"""This connects to a FritzBox router and downloads its phone book."""
def __init__(self, host, port, username, password, phonebook_id=0, prefixes=None):
"""Initialize the class."""
self.host = host
self.username = username
self.password = password
self.port = port
self.phonebook_id = phonebook_id
self.phonebook_dict = None
self.number_dict = None
self.prefixes = prefixes or []
# Establish a connection to the FRITZ!Box.
self.fph = FritzPhonebook(
address=self.host, user=self.username, password=self.password
)
if self.phonebook_id not in self.fph.list_phonebooks:
raise ValueError("Phonebook with this ID not found.")
self.update_phonebook()
@Throttle(MIN_TIME_PHONEBOOK_UPDATE)
def update_phonebook(self):
"""Update the phone book dictionary."""
self.phonebook_dict = self.fph.get_all_names(self.phonebook_id)
self.number_dict = {
re.sub(r"[^\d\+]", "", nr): name
for name, nrs in self.phonebook_dict.items()
for nr in nrs
}
_LOGGER.info("Fritz!Box phone book successfully updated")
def get_name(self, number):
"""Return a name for a given phone number."""
number = re.sub(r"[^\d\+]", "", str(number))
if self.number_dict is None:
return "unknown"
try:
return self.number_dict[number]
except KeyError:
pass
if self.prefixes:
for prefix in self.prefixes:
try:
return self.number_dict[prefix + number]
except KeyError:
pass
try:
return self.number_dict[prefix + number.lstrip("0")]
except KeyError:
pass
return "unknown"

View File

@ -0,0 +1,41 @@
{
"config": {
"flow_title": "AVM FRITZ!Box call monitor: {name}",
"step": {
"user": {
"data": {
"host": "[%key:common::config_flow::data::host%]",
"port": "[%key:common::config_flow::data::port%]",
"username": "[%key:common::config_flow::data::username%]",
"password": "[%key:common::config_flow::data::password%]"
}
},
"phonebook": {
"data": {
"phonebook": "Phonebook"
}
}
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"no_devices_found": "[%key:common::config_flow::abort::no_devices_found%]",
"insufficient_permissions": "User has insufficient permissions to access AVM FRITZ!Box settings and its phonebooks."
},
"error": {
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]"
}
},
"options": {
"step": {
"init": {
"title": "Configure Prefixes",
"data": {
"prefixes": "Prefixes (comma separated list)"
}
}
},
"error": {
"malformed_prefixes": "Prefixes are malformed, please check their format."
}
}
}

View File

@ -0,0 +1,41 @@
{
"config": {
"abort": {
"already_configured": "Device is already configured",
"insufficient_permissions": "User has insufficient permissions to access AVM FRITZ!Box settings and its phonebooks.",
"no_devices_found": "No devices found on the network"
},
"error": {
"invalid_auth": "Invalid authentication"
},
"flow_title": "AVM FRITZ!Box call monitor: {name}",
"step": {
"phonebook": {
"data": {
"phonebook": "Phonebook"
}
},
"user": {
"data": {
"host": "Host",
"password": "Password",
"port": "Port",
"username": "Username"
}
}
}
},
"options": {
"error": {
"malformed_prefixes": "Prefixes are malformed, please check their format."
},
"step": {
"init": {
"data": {
"prefixes": "Prefixes (comma separated list)"
},
"title": "Configure Prefixes"
}
}
}
}

View File

@ -70,6 +70,7 @@ FLOWS = [
"foscam",
"freebox",
"fritzbox",
"fritzbox_callmonitor",
"garmin_connect",
"gdacs",
"geofency",

View File

@ -310,6 +310,11 @@ fnvhash==0.1.0
# homeassistant.components.foobot
foobot_async==1.0.0
# homeassistant.components.fritz
# homeassistant.components.fritzbox_callmonitor
# homeassistant.components.fritzbox_netmonitor
fritzconnection==1.4.0
# homeassistant.components.google_translate
gTTS==2.2.1

View File

@ -0,0 +1 @@
"""Tests for fritzbox_callmonitor."""

View File

@ -0,0 +1,358 @@
"""Tests for fritzbox_callmonitor config flow."""
from unittest.mock import PropertyMock
from fritzconnection.core.exceptions import FritzConnectionException, FritzSecurityError
from requests.exceptions import ConnectionError as RequestsConnectionError
from homeassistant.components.fritzbox_callmonitor.config_flow import (
RESULT_INSUFFICIENT_PERMISSIONS,
RESULT_INVALID_AUTH,
RESULT_MALFORMED_PREFIXES,
RESULT_NO_DEVIES_FOUND,
)
from homeassistant.components.fritzbox_callmonitor.const import (
CONF_PHONEBOOK,
CONF_PREFIXES,
DOMAIN,
FRITZ_ATTR_NAME,
FRITZ_ATTR_SERIAL_NUMBER,
SERIAL_NUMBER,
)
from homeassistant.config_entries import SOURCE_IMPORT, SOURCE_USER
from homeassistant.const import (
CONF_HOST,
CONF_NAME,
CONF_PASSWORD,
CONF_PORT,
CONF_USERNAME,
)
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import (
RESULT_TYPE_ABORT,
RESULT_TYPE_CREATE_ENTRY,
RESULT_TYPE_FORM,
)
from tests.common import MockConfigEntry, patch
MOCK_HOST = "fake_host"
MOCK_PORT = 1234
MOCK_USERNAME = "fake_username"
MOCK_PASSWORD = "fake_password"
MOCK_PHONEBOOK_NAME_1 = "fake_phonebook_name_1"
MOCK_PHONEBOOK_NAME_2 = "fake_phonebook_name_2"
MOCK_PHONEBOOK_ID = 0
MOCK_SERIAL_NUMBER = "fake_serial_number"
MOCK_NAME = "fake_call_monitor_name"
MOCK_USER_DATA = {
CONF_HOST: MOCK_HOST,
CONF_PORT: MOCK_PORT,
CONF_PASSWORD: MOCK_PASSWORD,
CONF_USERNAME: MOCK_USERNAME,
}
MOCK_CONFIG_ENTRY = {
CONF_HOST: MOCK_HOST,
CONF_PORT: MOCK_PORT,
CONF_PASSWORD: MOCK_PASSWORD,
CONF_USERNAME: MOCK_USERNAME,
CONF_PREFIXES: None,
CONF_PHONEBOOK: MOCK_PHONEBOOK_ID,
SERIAL_NUMBER: MOCK_SERIAL_NUMBER,
}
MOCK_YAML_CONFIG = {
CONF_HOST: MOCK_HOST,
CONF_PORT: MOCK_PORT,
CONF_PASSWORD: MOCK_PASSWORD,
CONF_USERNAME: MOCK_USERNAME,
CONF_PHONEBOOK: MOCK_PHONEBOOK_ID,
CONF_NAME: MOCK_NAME,
}
MOCK_DEVICE_INFO = {FRITZ_ATTR_SERIAL_NUMBER: MOCK_SERIAL_NUMBER}
MOCK_PHONEBOOK_INFO_1 = {FRITZ_ATTR_NAME: MOCK_PHONEBOOK_NAME_1}
MOCK_PHONEBOOK_INFO_2 = {FRITZ_ATTR_NAME: MOCK_PHONEBOOK_NAME_2}
MOCK_UNIQUE_ID = f"{MOCK_SERIAL_NUMBER}-{MOCK_PHONEBOOK_ID}"
async def test_yaml_import(hass: HomeAssistant) -> None:
"""Test configuration.yaml import."""
with patch(
"homeassistant.components.fritzbox_callmonitor.base.FritzPhonebook.__init__",
return_value=None,
), patch(
"homeassistant.components.fritzbox_callmonitor.base.FritzPhonebook.phonebook_ids",
new_callable=PropertyMock,
return_value=[0],
), patch(
"homeassistant.components.fritzbox_callmonitor.base.FritzPhonebook.phonebook_info",
return_value=MOCK_PHONEBOOK_INFO_1,
), patch(
"homeassistant.components.fritzbox_callmonitor.base.FritzPhonebook.modelname",
return_value=MOCK_PHONEBOOK_NAME_1,
), patch(
"homeassistant.components.fritzbox_callmonitor.config_flow.FritzConnection.__init__",
return_value=None,
), patch(
"homeassistant.components.fritzbox_callmonitor.config_flow.FritzConnection.call_action",
return_value=MOCK_DEVICE_INFO,
), patch(
"homeassistant.components.fritzbox_callmonitor.async_setup_entry",
return_value=True,
) as mock_setup_entry:
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data=MOCK_YAML_CONFIG,
)
assert result["type"] == RESULT_TYPE_CREATE_ENTRY
assert result["title"] == MOCK_NAME
assert result["data"] == MOCK_CONFIG_ENTRY
assert len(mock_setup_entry.mock_calls) == 1
async def test_setup_one_phonebook(hass: HomeAssistant) -> None:
"""Test setting up manually."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
assert result["type"] == RESULT_TYPE_FORM
assert result["step_id"] == "user"
with patch(
"homeassistant.components.fritzbox_callmonitor.base.FritzPhonebook.__init__",
return_value=None,
), patch(
"homeassistant.components.fritzbox_callmonitor.base.FritzPhonebook.phonebook_ids",
new_callable=PropertyMock,
return_value=[0],
), patch(
"homeassistant.components.fritzbox_callmonitor.base.FritzPhonebook.phonebook_info",
return_value=MOCK_PHONEBOOK_INFO_1,
), patch(
"homeassistant.components.fritzbox_callmonitor.base.FritzPhonebook.modelname",
return_value=MOCK_PHONEBOOK_NAME_1,
), patch(
"homeassistant.components.fritzbox_callmonitor.config_flow.FritzConnection.__init__",
return_value=None,
), patch(
"homeassistant.components.fritzbox_callmonitor.config_flow.FritzConnection.call_action",
return_value=MOCK_DEVICE_INFO,
), patch(
"homeassistant.components.fritzbox_callmonitor.async_setup_entry",
return_value=True,
) as mock_setup_entry:
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=MOCK_USER_DATA
)
assert result["type"] == RESULT_TYPE_CREATE_ENTRY
assert result["title"] == MOCK_PHONEBOOK_NAME_1
assert result["data"] == MOCK_CONFIG_ENTRY
assert len(mock_setup_entry.mock_calls) == 1
async def test_setup_multiple_phonebooks(hass: HomeAssistant) -> None:
"""Test setting up manually."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
assert result["type"] == RESULT_TYPE_FORM
assert result["step_id"] == "user"
with patch(
"homeassistant.components.fritzbox_callmonitor.base.FritzPhonebook.__init__",
return_value=None,
), patch(
"homeassistant.components.fritzbox_callmonitor.base.FritzPhonebook.phonebook_ids",
new_callable=PropertyMock,
return_value=[0, 1],
), patch(
"homeassistant.components.fritzbox_callmonitor.config_flow.FritzConnection.__init__",
return_value=None,
), patch(
"homeassistant.components.fritzbox_callmonitor.config_flow.FritzConnection.call_action",
return_value=MOCK_DEVICE_INFO,
), patch(
"homeassistant.components.fritzbox_callmonitor.base.FritzPhonebook.phonebook_info",
side_effect=[MOCK_PHONEBOOK_INFO_1, MOCK_PHONEBOOK_INFO_2],
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=MOCK_USER_DATA
)
assert result["type"] == RESULT_TYPE_FORM
assert result["step_id"] == "phonebook"
assert result["errors"] == {}
with patch(
"homeassistant.components.fritzbox_callmonitor.base.FritzPhonebook.modelname",
return_value=MOCK_PHONEBOOK_NAME_1,
), patch(
"homeassistant.components.fritzbox_callmonitor.async_setup_entry",
return_value=True,
) as mock_setup_entry:
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_PHONEBOOK: MOCK_PHONEBOOK_NAME_2},
)
assert result["type"] == RESULT_TYPE_CREATE_ENTRY
assert result["title"] == MOCK_PHONEBOOK_NAME_2
assert result["data"] == {
CONF_HOST: MOCK_HOST,
CONF_PORT: MOCK_PORT,
CONF_PASSWORD: MOCK_PASSWORD,
CONF_USERNAME: MOCK_USERNAME,
CONF_PREFIXES: None,
CONF_PHONEBOOK: 1,
SERIAL_NUMBER: MOCK_SERIAL_NUMBER,
}
assert len(mock_setup_entry.mock_calls) == 1
async def test_setup_cannot_connect(hass: HomeAssistant) -> None:
"""Test we handle cannot connect."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
with patch(
"homeassistant.components.fritzbox_callmonitor.base.FritzPhonebook.__init__",
side_effect=RequestsConnectionError,
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=MOCK_USER_DATA
)
assert result["type"] == RESULT_TYPE_ABORT
assert result["reason"] == RESULT_NO_DEVIES_FOUND
async def test_setup_insufficient_permissions(hass: HomeAssistant) -> None:
"""Test we handle insufficient permissions."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
with patch(
"homeassistant.components.fritzbox_callmonitor.base.FritzPhonebook.__init__",
side_effect=FritzSecurityError,
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=MOCK_USER_DATA
)
assert result["type"] == RESULT_TYPE_ABORT
assert result["reason"] == RESULT_INSUFFICIENT_PERMISSIONS
async def test_setup_invalid_auth(hass: HomeAssistant) -> None:
"""Test we handle invalid auth."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
with patch(
"homeassistant.components.fritzbox_callmonitor.base.FritzPhonebook.__init__",
side_effect=FritzConnectionException,
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=MOCK_USER_DATA
)
assert result["type"] == RESULT_TYPE_FORM
assert result["errors"] == {"base": RESULT_INVALID_AUTH}
async def test_options_flow_correct_prefixes(hass: HomeAssistant) -> None:
"""Test config flow options."""
config_entry = MockConfigEntry(
domain=DOMAIN,
unique_id=MOCK_UNIQUE_ID,
data=MOCK_CONFIG_ENTRY,
options={CONF_PREFIXES: None},
)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.fritzbox_callmonitor.async_setup_entry",
return_value=True,
):
await hass.config_entries.async_setup(config_entry.entry_id)
result = await hass.config_entries.options.async_init(config_entry.entry_id)
assert result["type"] == RESULT_TYPE_FORM
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"], user_input={CONF_PREFIXES: "+49, 491234"}
)
assert result["type"] == RESULT_TYPE_CREATE_ENTRY
assert config_entry.options == {CONF_PREFIXES: ["+49", "491234"]}
async def test_options_flow_incorrect_prefixes(hass: HomeAssistant) -> None:
"""Test config flow options."""
config_entry = MockConfigEntry(
domain=DOMAIN,
unique_id=MOCK_UNIQUE_ID,
data=MOCK_CONFIG_ENTRY,
options={CONF_PREFIXES: None},
)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.fritzbox_callmonitor.async_setup_entry",
return_value=True,
):
await hass.config_entries.async_setup(config_entry.entry_id)
result = await hass.config_entries.options.async_init(config_entry.entry_id)
assert result["type"] == RESULT_TYPE_FORM
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"], user_input={CONF_PREFIXES: ""}
)
assert result["type"] == RESULT_TYPE_FORM
assert result["errors"] == {"base": RESULT_MALFORMED_PREFIXES}
async def test_options_flow_no_prefixes(hass: HomeAssistant) -> None:
"""Test config flow options."""
config_entry = MockConfigEntry(
domain=DOMAIN,
unique_id=MOCK_UNIQUE_ID,
data=MOCK_CONFIG_ENTRY,
options={CONF_PREFIXES: None},
)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.fritzbox_callmonitor.async_setup_entry",
return_value=True,
):
await hass.config_entries.async_setup(config_entry.entry_id)
result = await hass.config_entries.options.async_init(config_entry.entry_id)
assert result["type"] == RESULT_TYPE_FORM
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"], user_input={}
)
assert result["type"] == RESULT_TYPE_CREATE_ENTRY
assert config_entry.options == {CONF_PREFIXES: None}