Allow listing of HA users via admin CLI (#4912)

* Allow listing of HA users via admin CLI

* Filter out system generated users and fields
This commit is contained in:
Mike Degatano 2024-02-28 13:30:37 -05:00 committed by GitHub
parent 9c75996c40
commit 8b5c808e8c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 247 additions and 15 deletions

View File

@ -336,6 +336,7 @@ class RestAPI(CoreSysAttributes):
web.post("/auth", api_auth.auth),
web.post("/auth/reset", api_auth.reset),
web.delete("/auth/cache", api_auth.cache),
web.get("/auth/list", api_auth.list_users),
]
)

View File

@ -1,6 +1,7 @@
"""Init file for Supervisor auth/SSO RESTful API."""
import asyncio
import logging
from typing import Any
from aiohttp import BasicAuth, web
from aiohttp.hdrs import AUTHORIZATION, CONTENT_TYPE, WWW_AUTHENTICATE
@ -8,11 +9,19 @@ from aiohttp.web_exceptions import HTTPUnauthorized
import voluptuous as vol
from ..addons.addon import Addon
from ..const import ATTR_PASSWORD, ATTR_USERNAME, REQUEST_FROM
from ..const import ATTR_NAME, ATTR_PASSWORD, ATTR_USERNAME, REQUEST_FROM
from ..coresys import CoreSysAttributes
from ..exceptions import APIForbidden
from ..utils.json import json_loads
from .const import CONTENT_TYPE_JSON, CONTENT_TYPE_URL
from .const import (
ATTR_GROUP_IDS,
ATTR_IS_ACTIVE,
ATTR_IS_OWNER,
ATTR_LOCAL_ONLY,
ATTR_USERS,
CONTENT_TYPE_JSON,
CONTENT_TYPE_URL,
)
from .utils import api_process, api_validate
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -90,3 +99,21 @@ class APIAuth(CoreSysAttributes):
async def cache(self, request: web.Request) -> None:
"""Process cache reset request."""
self.sys_auth.reset_data()
@api_process
async def list_users(self, request: web.Request) -> dict[str, list[dict[str, Any]]]:
"""List users on the Home Assistant instance."""
return {
ATTR_USERS: [
{
ATTR_USERNAME: user[ATTR_USERNAME],
ATTR_NAME: user[ATTR_NAME],
ATTR_IS_OWNER: user[ATTR_IS_OWNER],
ATTR_IS_ACTIVE: user[ATTR_IS_ACTIVE],
ATTR_LOCAL_ONLY: user[ATTR_LOCAL_ONLY],
ATTR_GROUP_IDS: user[ATTR_GROUP_IDS],
}
for user in await self.sys_auth.list_users()
if user[ATTR_USERNAME]
]
}

View File

@ -31,11 +31,15 @@ ATTR_DT_UTC = "dt_utc"
ATTR_EJECTABLE = "ejectable"
ATTR_FALLBACK = "fallback"
ATTR_FILESYSTEMS = "filesystems"
ATTR_GROUP_IDS = "group_ids"
ATTR_IDENTIFIERS = "identifiers"
ATTR_IS_ACTIVE = "is_active"
ATTR_IS_OWNER = "is_owner"
ATTR_JOB_ID = "job_id"
ATTR_JOBS = "jobs"
ATTR_LLMNR = "llmnr"
ATTR_LLMNR_HOSTNAME = "llmnr_hostname"
ATTR_LOCAL_ONLY = "local_only"
ATTR_MDNS = "mdns"
ATTR_MODEL = "model"
ATTR_MOUNTS = "mounts"
@ -51,6 +55,7 @@ ATTR_SYSFS = "sysfs"
ATTR_SYSTEM_HEALTH_LED = "system_health_led"
ATTR_TIME_DETECTED = "time_detected"
ATTR_UPDATE_TYPE = "update_type"
ATTR_USE_NTP = "use_ntp"
ATTR_USAGE = "usage"
ATTR_USE_NTP = "use_ntp"
ATTR_USERS = "users"
ATTR_VENDOR = "vendor"

View File

@ -2,11 +2,18 @@
import asyncio
import hashlib
import logging
from typing import Any
from .addons.addon import Addon
from .const import ATTR_ADDON, ATTR_PASSWORD, ATTR_USERNAME, FILE_HASSIO_AUTH
from .const import ATTR_ADDON, ATTR_PASSWORD, ATTR_TYPE, ATTR_USERNAME, FILE_HASSIO_AUTH
from .coresys import CoreSys, CoreSysAttributes
from .exceptions import AuthError, AuthPasswordResetError, HomeAssistantAPIError
from .exceptions import (
AuthError,
AuthListUsersError,
AuthPasswordResetError,
HomeAssistantAPIError,
HomeAssistantWSError,
)
from .utils.common import FileConfiguration
from .validate import SCHEMA_AUTH_CONFIG
@ -132,6 +139,17 @@ class Auth(FileConfiguration, CoreSysAttributes):
raise AuthPasswordResetError()
async def list_users(self) -> list[dict[str, Any]]:
"""List users on the Home Assistant instance."""
try:
return await self.sys_homeassistant.websocket.async_send_command(
{ATTR_TYPE: "config/auth/list"}
)
except HomeAssistantWSError:
_LOGGER.error("Can't request listing users on Home Assistant!")
raise AuthListUsersError()
@staticmethod
def _rehash(value: str, salt2: str = "") -> str:
"""Rehash a value."""

View File

@ -267,6 +267,10 @@ class AuthPasswordResetError(HassioError):
"""Auth error if password reset failed."""
class AuthListUsersError(HassioError):
"""Auth error if listing users failed."""
# Host

View File

@ -4,11 +4,13 @@ from http import HTTPStatus
from unittest.mock import patch
from aiohttp import web
from aiohttp.test_utils import TestClient
import pytest
import urllib3
from supervisor.addons.addon import Addon
from supervisor.api import RestAPI
from supervisor.const import CoreState
from supervisor.const import ROLE_ALL, CoreState
from supervisor.coresys import CoreSys
# pylint: disable=redefined-outer-name
@ -20,7 +22,7 @@ async def mock_handler(request):
@pytest.fixture
async def api_system(aiohttp_client, run_dir, coresys: CoreSys):
async def api_system(aiohttp_client, run_dir, coresys: CoreSys) -> TestClient:
"""Fixture for RestAPI client."""
api = RestAPI(coresys)
api.webapp = web.Application()
@ -35,8 +37,25 @@ async def api_system(aiohttp_client, run_dir, coresys: CoreSys):
yield await aiohttp_client(api.webapp)
@pytest.fixture
async def api_token_validation(aiohttp_client, run_dir, coresys: CoreSys) -> TestClient:
"""Fixture for RestAPI client with token validation middleware."""
api = RestAPI(coresys)
api.webapp = web.Application()
with patch("supervisor.docker.supervisor.os") as os:
os.environ = {"SUPERVISOR_NAME": "hassio_supervisor"}
await api.start()
api.webapp.middlewares.append(api.security.token_validation)
api.webapp.router.add_get("/{all:.*}", mock_handler)
api.webapp.router.add_post("/{all:.*}", mock_handler)
api.webapp.router.add_delete("/{all:.*}", mock_handler)
yield await aiohttp_client(api.webapp)
@pytest.mark.asyncio
async def test_api_security_system_initialize(api_system, coresys: CoreSys):
async def test_api_security_system_initialize(api_system: TestClient, coresys: CoreSys):
"""Test security."""
coresys.core.state = CoreState.INITIALIZE
@ -47,7 +66,7 @@ async def test_api_security_system_initialize(api_system, coresys: CoreSys):
@pytest.mark.asyncio
async def test_api_security_system_setup(api_system, coresys: CoreSys):
async def test_api_security_system_setup(api_system: TestClient, coresys: CoreSys):
"""Test security."""
coresys.core.state = CoreState.SETUP
@ -58,7 +77,7 @@ async def test_api_security_system_setup(api_system, coresys: CoreSys):
@pytest.mark.asyncio
async def test_api_security_system_running(api_system, coresys: CoreSys):
async def test_api_security_system_running(api_system: TestClient, coresys: CoreSys):
"""Test security."""
coresys.core.state = CoreState.RUNNING
@ -67,7 +86,7 @@ async def test_api_security_system_running(api_system, coresys: CoreSys):
@pytest.mark.asyncio
async def test_api_security_system_startup(api_system, coresys: CoreSys):
async def test_api_security_system_startup(api_system: TestClient, coresys: CoreSys):
"""Test security."""
coresys.core.state = CoreState.STARTUP
@ -105,10 +124,10 @@ async def test_api_security_system_startup(api_system, coresys: CoreSys):
],
)
async def test_bad_requests(
request_path,
request_params,
fail_on_query_string,
api_system,
request_path: str,
request_params: dict[str, str],
fail_on_query_string: bool,
api_system: TestClient,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test request paths that should be filtered."""
@ -135,3 +154,53 @@ async def test_bad_requests(
if fail_on_query_string:
message = "Filtered a request with a potential harmful query string:"
assert message in caplog.text
@pytest.mark.parametrize(
"request_method,request_path,success_roles",
[
("post", "/auth/reset", {"admin"}),
("get", "/auth/list", {"admin"}),
("delete", "/auth/cache", {"admin", "manager"}),
("get", "/auth", set(ROLE_ALL)),
("post", "/auth", set(ROLE_ALL)),
("get", "/backups/info", set(ROLE_ALL)),
("get", "/backups/abc123/download", {"admin", "manager", "backup"}),
("post", "/backups/new/full", {"admin", "manager", "backup"}),
("post", "/backups/abc123/restore/full", {"admin", "manager", "backup"}),
("get", "/core/info", set(ROLE_ALL)),
("post", "/core/update", {"admin", "manager", "homeassistant"}),
("post", "/core/restart", {"admin", "manager", "homeassistant"}),
("get", "/addons/self/options/config", set(ROLE_ALL)),
("post", "/addons/self/options", set(ROLE_ALL)),
("post", "/addons/self/restart", set(ROLE_ALL)),
("post", "/addons/self/security", {"admin"}),
("get", "/addons/abc123/options/config", {"admin", "manager"}),
("post", "/addons/abc123/options", {"admin", "manager"}),
("post", "/addons/abc123/restart", {"admin", "manager"}),
("post", "/addons/abc123/security", {"admin"}),
],
)
async def test_token_validation(
api_token_validation: TestClient,
install_addon_example: Addon,
request_method: str,
request_path: str,
success_roles: set[str],
):
"""Test token validation paths."""
install_addon_example.persist["access_token"] = "abc123"
install_addon_example.data["hassio_api"] = True
for role in success_roles:
install_addon_example.data["hassio_role"] = role
resp = await getattr(api_token_validation, request_method)(
request_path, headers={"Authorization": "Bearer abc123"}
)
assert resp.status == 200
for role in set(ROLE_ALL) - success_roles:
install_addon_example.data["hassio_role"] = role
resp = await getattr(api_token_validation, request_method)(
request_path, headers={"Authorization": "Bearer abc123"}
)
assert resp.status == 403

108
tests/api/test_auth.py Normal file
View File

@ -0,0 +1,108 @@
"""Test auth API."""
from datetime import UTC, datetime, timedelta
from unittest.mock import AsyncMock, patch
from aiohttp.test_utils import TestClient
import pytest
from supervisor.coresys import CoreSys
LIST_USERS_RESPONSE = [
{
"id": "a1d90e114a3b4da4a487fe327918dcef",
"username": None,
"name": "Home Assistant Content",
"is_owner": False,
"is_active": True,
"local_only": False,
"system_generated": True,
"group_ids": ["system-read-only"],
"credentials": [],
},
{
"id": "d25a2ca897704a31ac9534b5324dc230",
"username": None,
"name": "Supervisor",
"is_owner": False,
"is_active": True,
"local_only": False,
"system_generated": True,
"group_ids": ["system-admin"],
"credentials": [],
},
{
"id": "0b39e9305ba64531a8fee9ed5b86876e",
"username": None,
"name": "Home Assistant Cast",
"is_owner": False,
"is_active": True,
"local_only": False,
"system_generated": True,
"group_ids": ["system-admin"],
"credentials": [],
},
{
"id": "514698a459cd4ce0b75f137a3d7df539",
"username": "test",
"name": "Test",
"is_owner": True,
"is_active": True,
"local_only": False,
"system_generated": False,
"group_ids": ["system-admin"],
"credentials": [{"type": "homeassistant"}],
},
{
"id": "7d5fac79097a4eb49aff83cdf20821b0",
"username": None,
"name": None,
"is_owner": False,
"is_active": True,
"local_only": False,
"system_generated": False,
"group_ids": ["system-admin"],
"credentials": [{"type": "command_line"}],
},
]
async def test_password_reset(
api_client: TestClient, coresys: CoreSys, caplog: pytest.LogCaptureFixture
):
"""Test password reset api."""
coresys.homeassistant.api.access_token = "abc123"
# pylint: disable-next=protected-access
coresys.homeassistant.api._access_token_expires = datetime.now(tz=UTC) + timedelta(
days=1
)
mock_websession = AsyncMock()
mock_websession.post.return_value.__aenter__.return_value.status = 200
with patch("supervisor.coresys.aiohttp.ClientSession.post") as post:
post.return_value.__aenter__.return_value.status = 200
resp = await api_client.post(
"/auth/reset", json={"username": "john", "password": "doe"}
)
assert resp.status == 200
assert "Successful password reset for 'john'" in caplog.text
async def test_list_users(
api_client: TestClient, coresys: CoreSys, ha_ws_client: AsyncMock
):
"""Test list users api."""
ha_ws_client.async_send_command.return_value = LIST_USERS_RESPONSE
resp = await api_client.get("/auth/list")
assert resp.status == 200
result = await resp.json()
assert result["data"]["users"] == [
{
"username": "test",
"name": "Test",
"is_owner": True,
"is_active": True,
"local_only": False,
"group_ids": ["system-admin"],
},
]