Add Huawei LTE reauth flow (#78005)

* Add Huawei LTE reauth flow

* Upgrade huawei-lte-api to 1.6.3, use LoginErrorInvalidCredentialsException
This commit is contained in:
Ville Skyttä 2022-10-07 13:24:09 +03:00 committed by GitHub
parent aee82e2b3b
commit 9b44cf0127
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 295 additions and 61 deletions

View File

@ -15,6 +15,7 @@ from huawei_lte_api.Client import Client
from huawei_lte_api.Connection import Connection
from huawei_lte_api.enums.device import ControlModeEnum
from huawei_lte_api.exceptions import (
LoginErrorInvalidCredentialsException,
ResponseErrorException,
ResponseErrorLoginRequiredException,
ResponseErrorNotSupportedException,
@ -38,7 +39,7 @@ from homeassistant.const import (
Platform,
)
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers import (
config_validation as cv,
device_registry as dr,
@ -339,6 +340,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
try:
connection = await hass.async_add_executor_job(get_connection)
except LoginErrorInvalidCredentialsException as ex:
raise ConfigEntryAuthFailed from ex
except Timeout as ex:
raise ConfigEntryNotReady from ex

View File

@ -1,6 +1,7 @@
"""Config flow for the Huawei LTE platform."""
from __future__ import annotations
from collections.abc import Mapping
import logging
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse
@ -89,6 +90,70 @@ class ConfigFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
errors=errors or {},
)
async def _async_show_reauth_form(
self,
user_input: dict[str, Any],
errors: dict[str, str] | None = None,
) -> FlowResult:
return self.async_show_form(
step_id="reauth_confirm",
data_schema=vol.Schema(
{
vol.Optional(
CONF_USERNAME, default=user_input.get(CONF_USERNAME) or ""
): str,
vol.Optional(
CONF_PASSWORD, default=user_input.get(CONF_PASSWORD) or ""
): str,
}
),
errors=errors or {},
)
async def _try_connect(
self, user_input: dict[str, Any], errors: dict[str, str]
) -> Connection | None:
"""Try connecting with given data."""
username = user_input.get(CONF_USERNAME) or ""
password = user_input.get(CONF_PASSWORD) or ""
def _get_connection() -> Connection:
return Connection(
url=user_input[CONF_URL],
username=username,
password=password,
timeout=CONNECTION_TIMEOUT,
)
conn = None
try:
conn = await self.hass.async_add_executor_job(_get_connection)
except LoginErrorUsernameWrongException:
errors[CONF_USERNAME] = "incorrect_username"
except LoginErrorPasswordWrongException:
errors[CONF_PASSWORD] = "incorrect_password"
except LoginErrorUsernamePasswordWrongException:
errors[CONF_USERNAME] = "invalid_auth"
except LoginErrorUsernamePasswordOverrunException:
errors["base"] = "login_attempts_exceeded"
except ResponseErrorException:
_LOGGER.warning("Response error", exc_info=True)
errors["base"] = "response_error"
except Timeout:
_LOGGER.warning("Connection timeout", exc_info=True)
errors[CONF_URL] = "connection_timeout"
except Exception: # pylint: disable=broad-except
_LOGGER.warning("Unknown error connecting to device", exc_info=True)
errors[CONF_URL] = "unknown"
return conn
@staticmethod
def _logout(conn: Connection) -> None:
try:
conn.user_session.user.logout() # type: ignore[union-attr]
except Exception: # pylint: disable=broad-except
_LOGGER.debug("Could not logout", exc_info=True)
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
@ -108,25 +173,9 @@ class ConfigFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
user_input=user_input, errors=errors
)
def logout() -> None:
try:
conn.user_session.user.logout() # type: ignore[union-attr]
except Exception: # pylint: disable=broad-except
_LOGGER.debug("Could not logout", exc_info=True)
def try_connect(user_input: dict[str, Any]) -> Connection:
"""Try connecting with given credentials."""
username = user_input.get(CONF_USERNAME) or ""
password = user_input.get(CONF_PASSWORD) or ""
conn = Connection(
user_input[CONF_URL],
username=username,
password=password,
timeout=CONNECTION_TIMEOUT,
)
return conn
def get_device_info() -> tuple[GetResponseType, GetResponseType]:
def get_device_info(
conn: Connection,
) -> tuple[GetResponseType, GetResponseType]:
"""Get router info."""
client = Client(conn)
try:
@ -147,33 +196,17 @@ class ConfigFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
wlan_settings = {}
return device_info, wlan_settings
try:
conn = await self.hass.async_add_executor_job(try_connect, user_input)
except LoginErrorUsernameWrongException:
errors[CONF_USERNAME] = "incorrect_username"
except LoginErrorPasswordWrongException:
errors[CONF_PASSWORD] = "incorrect_password"
except LoginErrorUsernamePasswordWrongException:
errors[CONF_USERNAME] = "invalid_auth"
except LoginErrorUsernamePasswordOverrunException:
errors["base"] = "login_attempts_exceeded"
except ResponseErrorException:
_LOGGER.warning("Response error", exc_info=True)
errors["base"] = "response_error"
except Timeout:
_LOGGER.warning("Connection timeout", exc_info=True)
errors[CONF_URL] = "connection_timeout"
except Exception: # pylint: disable=broad-except
_LOGGER.warning("Unknown error connecting to device", exc_info=True)
errors[CONF_URL] = "unknown"
conn = await self._try_connect(user_input, errors)
if errors:
await self.hass.async_add_executor_job(logout)
return await self._async_show_user_form(
user_input=user_input, errors=errors
)
assert conn
info, wlan_settings = await self.hass.async_add_executor_job(get_device_info)
await self.hass.async_add_executor_job(logout)
info, wlan_settings = await self.hass.async_add_executor_job(
get_device_info, conn
)
await self.hass.async_add_executor_job(self._logout, conn)
user_input[CONF_MAC] = get_device_macs(info, wlan_settings)
@ -228,6 +261,38 @@ class ConfigFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
}
return await self._async_show_user_form(user_input)
async def async_step_reauth(self, entry_data: Mapping[str, Any]) -> FlowResult:
"""Perform reauth upon an API authentication error."""
return await self.async_step_reauth_confirm()
async def async_step_reauth_confirm(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Dialog that informs the user that reauth is required."""
entry = self.hass.config_entries.async_get_entry(self.context["entry_id"])
assert entry
if not user_input:
return await self._async_show_reauth_form(
user_input={
CONF_USERNAME: entry.data[CONF_USERNAME],
CONF_PASSWORD: entry.data[CONF_PASSWORD],
}
)
new_data = {**entry.data, **user_input}
errors: dict[str, str] = {}
conn = await self._try_connect(new_data, errors)
if conn:
await self.hass.async_add_executor_job(self._logout, conn)
if errors:
return await self._async_show_reauth_form(
user_input=user_input, errors=errors
)
self.hass.config_entries.async_update_entry(entry, data=new_data)
await self.hass.config_entries.async_reload(entry.entry_id)
return self.async_abort(reason="reauth_successful")
class OptionsFlowHandler(config_entries.OptionsFlow):
"""Huawei LTE options flow."""

View File

@ -4,7 +4,7 @@
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/huawei_lte",
"requirements": [
"huawei-lte-api==1.6.1",
"huawei-lte-api==1.6.3",
"stringcase==1.2.0",
"url-normalize==1.4.3"
],

View File

@ -1,7 +1,8 @@
{
"config": {
"abort": {
"not_huawei_lte": "Not a Huawei LTE device"
"not_huawei_lte": "Not a Huawei LTE device",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]"
},
"error": {
"connection_timeout": "Connection timeout",
@ -15,6 +16,14 @@
},
"flow_title": "{name}",
"step": {
"reauth_confirm": {
"title": "[%key:common::config_flow::title::reauth%]",
"description": "Enter device access credentials.",
"data": {
"password": "[%key:common::config_flow::data::password%]",
"username": "[%key:common::config_flow::data::username%]"
}
},
"user": {
"data": {
"password": "[%key:common::config_flow::data::password%]",

View File

@ -1,7 +1,8 @@
{
"config": {
"abort": {
"not_huawei_lte": "Not a Huawei LTE device"
"not_huawei_lte": "Not a Huawei LTE device",
"reauth_successful": "Re-authentication was successful"
},
"error": {
"connection_timeout": "Connection timeout",
@ -15,6 +16,14 @@
},
"flow_title": "{name}",
"step": {
"reauth_confirm": {
"data": {
"password": "Password",
"username": "Username"
},
"description": "Enter device access credentials.",
"title": "Reauthenticate Integration"
},
"user": {
"data": {
"password": "Password",

View File

@ -886,7 +886,7 @@ horimote==0.4.1
httplib2==0.20.4
# homeassistant.components.huawei_lte
huawei-lte-api==1.6.1
huawei-lte-api==1.6.3
# homeassistant.components.hydrawise
hydrawiser==0.2

View File

@ -663,7 +663,7 @@ homepluscontrol==0.0.5
httplib2==0.20.4
# homeassistant.components.huawei_lte
huawei-lte-api==1.6.1
huawei-lte-api==1.6.3
# homeassistant.components.hyperion
hyperion-py==0.7.5

View File

@ -5,6 +5,7 @@ from unittest.mock import patch
from huawei_lte_api.enums.client import ResponseCodeEnum
from huawei_lte_api.enums.user import LoginErrorEnum, LoginStateEnum, PasswordTypeEnum
import pytest
import requests.exceptions
from requests.exceptions import ConnectionError
from requests_mock import ANY
@ -119,27 +120,66 @@ def login_requests_mock(requests_mock):
@pytest.mark.parametrize(
("code", "errors"),
("request_outcome", "fixture_override", "errors"),
(
(LoginErrorEnum.USERNAME_WRONG, {CONF_USERNAME: "incorrect_username"}),
(LoginErrorEnum.PASSWORD_WRONG, {CONF_PASSWORD: "incorrect_password"}),
(
LoginErrorEnum.USERNAME_PWD_WRONG,
{
"text": f"<error><code>{LoginErrorEnum.USERNAME_WRONG}</code><message/></error>",
},
{},
{CONF_USERNAME: "incorrect_username"},
),
(
{
"text": f"<error><code>{LoginErrorEnum.PASSWORD_WRONG}</code><message/></error>",
},
{},
{CONF_PASSWORD: "incorrect_password"},
),
(
{
"text": f"<error><code>{LoginErrorEnum.USERNAME_PWD_WRONG}</code><message/></error>",
},
{},
{CONF_USERNAME: "invalid_auth"},
),
(LoginErrorEnum.USERNAME_PWD_OVERRUN, {"base": "login_attempts_exceeded"}),
(ResponseCodeEnum.ERROR_SYSTEM_UNKNOWN, {"base": "response_error"}),
(
{
"text": f"<error><code>{LoginErrorEnum.USERNAME_PWD_OVERRUN}</code><message/></error>",
},
{},
{"base": "login_attempts_exceeded"},
),
(
{
"text": f"<error><code>{ResponseCodeEnum.ERROR_SYSTEM_UNKNOWN}</code><message/></error>",
},
{},
{"base": "response_error"},
),
({}, {CONF_URL: "/foo/bar"}, {CONF_URL: "invalid_url"}),
(
{
"exc": requests.exceptions.Timeout,
},
{},
{CONF_URL: "connection_timeout"},
),
),
)
async def test_login_error(hass, login_requests_mock, code, errors):
async def test_login_error(
hass, login_requests_mock, request_outcome, fixture_override, errors
):
"""Test we show user form with appropriate error on response failure."""
login_requests_mock.request(
ANY,
f"{FIXTURE_USER_INPUT[CONF_URL]}api/user/login",
text=f"<error><code>{code}</code><message/></error>",
**request_outcome,
)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}, data=FIXTURE_USER_INPUT
DOMAIN,
context={"source": config_entries.SOURCE_USER},
data={**FIXTURE_USER_INPUT, **fixture_override},
)
assert result["type"] == data_entry_flow.FlowResultType.FORM
@ -170,7 +210,43 @@ async def test_success(hass, login_requests_mock):
assert result["data"][CONF_PASSWORD] == FIXTURE_USER_INPUT[CONF_PASSWORD]
async def test_ssdp(hass):
@pytest.mark.parametrize(
("upnp_data", "expected_result"),
(
(
{
ssdp.ATTR_UPNP_FRIENDLY_NAME: "Mobile Wi-Fi",
ssdp.ATTR_UPNP_SERIAL: "00000000",
},
{
"type": data_entry_flow.FlowResultType.FORM,
"step_id": "user",
"errors": {},
},
),
(
{
ssdp.ATTR_UPNP_FRIENDLY_NAME: "Mobile Wi-Fi",
# No ssdp.ATTR_UPNP_SERIAL
},
{
"type": data_entry_flow.FlowResultType.FORM,
"step_id": "user",
"errors": {},
},
),
(
{
ssdp.ATTR_UPNP_FRIENDLY_NAME: "Some other device",
},
{
"type": data_entry_flow.FlowResultType.ABORT,
"reason": "not_huawei_lte",
},
),
),
)
async def test_ssdp(hass, upnp_data, expected_result):
"""Test SSDP discovery initiates config properly."""
url = "http://192.168.100.1/"
context = {"source": config_entries.SOURCE_SSDP}
@ -183,21 +259,93 @@ async def test_ssdp(hass):
ssdp_location="http://192.168.100.1:60957/rootDesc.xml",
upnp={
ssdp.ATTR_UPNP_DEVICE_TYPE: "urn:schemas-upnp-org:device:InternetGatewayDevice:1",
ssdp.ATTR_UPNP_FRIENDLY_NAME: "Mobile Wi-Fi",
ssdp.ATTR_UPNP_MANUFACTURER: "Huawei",
ssdp.ATTR_UPNP_MANUFACTURER_URL: "http://www.huawei.com/",
ssdp.ATTR_UPNP_MODEL_NAME: "Huawei router",
ssdp.ATTR_UPNP_MODEL_NUMBER: "12345678",
ssdp.ATTR_UPNP_PRESENTATION_URL: url,
ssdp.ATTR_UPNP_SERIAL: "00000000",
ssdp.ATTR_UPNP_UDN: "uuid:XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
**upnp_data,
},
),
)
for k, v in expected_result.items():
assert result[k] == v
if result.get("data_schema"):
result["data_schema"]({})[CONF_URL] == url
@pytest.mark.parametrize(
("login_response_text", "expected_result", "expected_entry_data"),
(
(
"<response>OK</response>",
{
"type": data_entry_flow.FlowResultType.ABORT,
"reason": "reauth_successful",
},
FIXTURE_USER_INPUT,
),
(
f"<error><code>{LoginErrorEnum.PASSWORD_WRONG}</code><message/></error>",
{
"type": data_entry_flow.FlowResultType.FORM,
"errors": {CONF_PASSWORD: "incorrect_password"},
"step_id": "reauth_confirm",
},
{**FIXTURE_USER_INPUT, CONF_PASSWORD: "invalid-password"},
),
),
)
async def test_reauth(
hass, login_requests_mock, login_response_text, expected_result, expected_entry_data
):
"""Test reauth."""
mock_entry_data = {**FIXTURE_USER_INPUT, CONF_PASSWORD: "invalid-password"}
entry = MockConfigEntry(
domain=DOMAIN,
unique_id=FIXTURE_UNIQUE_ID,
data=mock_entry_data,
title="Reauth canary",
)
entry.add_to_hass(hass)
context = {
"source": config_entries.SOURCE_REAUTH,
"unique_id": entry.unique_id,
"entry_id": entry.entry_id,
}
result = await hass.config_entries.flow.async_init(
DOMAIN, context=context, data=entry.data
)
assert result["type"] == data_entry_flow.FlowResultType.FORM
assert result["step_id"] == "user"
assert result["data_schema"]({})[CONF_URL] == url
assert result["step_id"] == "reauth_confirm"
assert result["data_schema"]({}) == {
CONF_USERNAME: mock_entry_data[CONF_USERNAME],
CONF_PASSWORD: mock_entry_data[CONF_PASSWORD],
}
assert not result["errors"]
login_requests_mock.request(
ANY,
f"{FIXTURE_USER_INPUT[CONF_URL]}api/user/login",
text=login_response_text,
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_USERNAME: FIXTURE_USER_INPUT[CONF_USERNAME],
CONF_PASSWORD: FIXTURE_USER_INPUT[CONF_PASSWORD],
},
)
await hass.async_block_till_done()
for k, v in expected_result.items():
assert result[k] == v
for k, v in expected_entry_data.items():
assert entry.data[k] == v
async def test_options(hass):