From ca1f764080e93e5342075a7a4e83a0518130d284 Mon Sep 17 00:00:00 2001 From: Pascal Vizeli Date: Sat, 30 Apr 2022 10:14:43 +0200 Subject: [PATCH] Add integrity check (#3608) * Add integrity check * add API test * add tests * tests for add-ons --- setup.py | 1 + supervisor/addons/addon.py | 7 + supervisor/api/__init__.py | 1 + supervisor/api/security.py | 8 + supervisor/bootstrap.py | 2 +- supervisor/coresys.py | 2 +- supervisor/exceptions.py | 11 ++ supervisor/resolution/checks/core_trust.py | 59 ------ supervisor/resolution/checks/plugin_trust.py | 65 ------- supervisor/security.py | 88 --------- supervisor/security/__init__.py | 1 + supervisor/security/const.py | 23 +++ supervisor/security/module.py | 169 ++++++++++++++++++ supervisor/updater.py | 4 +- tests/api/test_security.py | 12 ++ .../resolution/check/test_check_core_trust.py | 93 ---------- .../check/test_check_plugin_trust.py | 120 ------------- tests/security/__init__.py | 1 + tests/security/test_module.py | 124 +++++++++++++ tests/test_security.py | 55 ------ 20 files changed, 361 insertions(+), 485 deletions(-) delete mode 100644 supervisor/resolution/checks/core_trust.py delete mode 100644 supervisor/resolution/checks/plugin_trust.py delete mode 100644 supervisor/security.py create mode 100644 supervisor/security/__init__.py create mode 100644 supervisor/security/const.py create mode 100644 supervisor/security/module.py delete mode 100644 tests/resolution/check/test_check_core_trust.py delete mode 100644 tests/resolution/check/test_check_plugin_trust.py create mode 100644 tests/security/__init__.py create mode 100644 tests/security/test_module.py delete mode 100644 tests/test_security.py diff --git a/setup.py b/setup.py index 9f2bd2431..90df0c0a1 100644 --- a/setup.py +++ b/setup.py @@ -49,6 +49,7 @@ setup( "supervisor.resolution.evaluations", "supervisor.resolution.fixups", "supervisor.resolution", + "supervisor.security", "supervisor.services.modules", "supervisor.services", "supervisor.store", diff --git a/supervisor/addons/addon.py b/supervisor/addons/addon.py index dc1e3e541..e21dbc199 100644 --- a/supervisor/addons/addon.py +++ b/supervisor/addons/addon.py @@ -890,3 +890,10 @@ class Addon(AddonModel): return await self.start() _LOGGER.info("Finished restore for add-on %s", self.slug) + + def check_trust(self) -> Awaitable[None]: + """Calculate Addon docker content trust. + + Return Coroutine. + """ + return self.instance.check_trust() diff --git a/supervisor/api/__init__.py b/supervisor/api/__init__.py index 3f79ebae2..4c3c0ad36 100644 --- a/supervisor/api/__init__.py +++ b/supervisor/api/__init__.py @@ -159,6 +159,7 @@ class RestAPI(CoreSysAttributes): [ web.get("/security/info", api_security.info), web.post("/security/options", api_security.options), + web.post("/security/integrity", api_security.integrity_check), ] ) diff --git a/supervisor/api/security.py b/supervisor/api/security.py index 681be2061..ec7bebc25 100644 --- a/supervisor/api/security.py +++ b/supervisor/api/security.py @@ -1,8 +1,10 @@ """Init file for Supervisor Security RESTful API.""" +import asyncio import logging from typing import Any from aiohttp import web +import attr import voluptuous as vol from ..const import ATTR_CONTENT_TRUST, ATTR_FORCE_SECURITY, ATTR_PWNED @@ -48,3 +50,9 @@ class APISecurity(CoreSysAttributes): self.sys_security.save_data() await self.sys_resolution.evaluate.evaluate_system() + + @api_process + async def integrity_check(self, request: web.Request) -> dict[str, Any]: + """Run backend integrity check.""" + result = await asyncio.shield(self.sys_security.integrity_check()) + return attr.asdict(result) diff --git a/supervisor/bootstrap.py b/supervisor/bootstrap.py index 40264ef00..d7f693b48 100644 --- a/supervisor/bootstrap.py +++ b/supervisor/bootstrap.py @@ -46,7 +46,7 @@ from .misc.tasks import Tasks from .os.manager import OSManager from .plugins.manager import PluginManager from .resolution.module import ResolutionManager -from .security import Security +from .security.module import Security from .services import ServiceManager from .store import StoreManager from .supervisor import Supervisor diff --git a/supervisor/coresys.py b/supervisor/coresys.py index 946154cb2..9a3969ba6 100644 --- a/supervisor/coresys.py +++ b/supervisor/coresys.py @@ -36,7 +36,7 @@ if TYPE_CHECKING: from .os.manager import OSManager from .plugins.manager import PluginManager from .resolution.module import ResolutionManager - from .security import Security + from .security.module import Security from .services import ServiceManager from .store import StoreManager from .supervisor import Supervisor diff --git a/supervisor/exceptions.py b/supervisor/exceptions.py index 44e414848..8c2dcbb6e 100644 --- a/supervisor/exceptions.py +++ b/supervisor/exceptions.py @@ -471,3 +471,14 @@ class BackupError(HassioError): class HomeAssistantBackupError(BackupError, HomeAssistantError): """Raise if an error during Home Assistant Core backup is happening.""" + + +# Security + + +class SecurityError(HassioError): + """Raise if an error during security checks are happening.""" + + +class SecurityJobError(SecurityError, JobException): + """Raise on Security job error.""" diff --git a/supervisor/resolution/checks/core_trust.py b/supervisor/resolution/checks/core_trust.py deleted file mode 100644 index 9f636f2fe..000000000 --- a/supervisor/resolution/checks/core_trust.py +++ /dev/null @@ -1,59 +0,0 @@ -"""Helpers to check core trust.""" -import logging -from typing import Optional - -from ...const import CoreState -from ...coresys import CoreSys -from ...exceptions import CodeNotaryError, CodeNotaryUntrusted -from ..const import ContextType, IssueType, UnhealthyReason -from .base import CheckBase - -_LOGGER: logging.Logger = logging.getLogger(__name__) - - -def setup(coresys: CoreSys) -> CheckBase: - """Check setup function.""" - return CheckCoreTrust(coresys) - - -class CheckCoreTrust(CheckBase): - """CheckCoreTrust class for check.""" - - async def run_check(self) -> None: - """Run check if not affected by issue.""" - if not self.sys_security.content_trust: - _LOGGER.warning( - "Skipping %s, content_trust is globally disabled", self.slug - ) - return - - try: - await self.sys_homeassistant.core.check_trust() - except CodeNotaryUntrusted: - self.sys_resolution.unhealthy = UnhealthyReason.UNTRUSTED - self.sys_resolution.create_issue(IssueType.TRUST, ContextType.CORE) - except CodeNotaryError: - pass - - async def approve_check(self, reference: Optional[str] = None) -> bool: - """Approve check if it is affected by issue.""" - try: - await self.sys_homeassistant.core.check_trust() - except CodeNotaryError: - return True - return False - - @property - def issue(self) -> IssueType: - """Return a IssueType enum.""" - return IssueType.TRUST - - @property - def context(self) -> ContextType: - """Return a ContextType enum.""" - return ContextType.CORE - - @property - def states(self) -> list[CoreState]: - """Return a list of valid states when this check can run.""" - return [CoreState.RUNNING, CoreState.STARTUP] diff --git a/supervisor/resolution/checks/plugin_trust.py b/supervisor/resolution/checks/plugin_trust.py deleted file mode 100644 index 1588db0d7..000000000 --- a/supervisor/resolution/checks/plugin_trust.py +++ /dev/null @@ -1,65 +0,0 @@ -"""Helpers to check plugin trust.""" -import logging -from typing import Optional - -from ...const import CoreState -from ...coresys import CoreSys -from ...exceptions import CodeNotaryError, CodeNotaryUntrusted -from ..const import ContextType, IssueType, UnhealthyReason -from .base import CheckBase - -_LOGGER: logging.Logger = logging.getLogger(__name__) - - -def setup(coresys: CoreSys) -> CheckBase: - """Check setup function.""" - return CheckPluginTrust(coresys) - - -class CheckPluginTrust(CheckBase): - """CheckPluginTrust class for check.""" - - async def run_check(self) -> None: - """Run check if not affected by issue.""" - if not self.sys_security.content_trust: - _LOGGER.warning( - "Skipping %s, content_trust is globally disabled", self.slug - ) - return - - for plugin in self.sys_plugins.all_plugins: - try: - await plugin.check_trust() - except CodeNotaryUntrusted: - self.sys_resolution.unhealthy = UnhealthyReason.UNTRUSTED - self.sys_resolution.create_issue( - IssueType.TRUST, ContextType.PLUGIN, reference=plugin.slug - ) - except CodeNotaryError: - pass - - async def approve_check(self, reference: Optional[str] = None) -> bool: - """Approve check if it is affected by issue.""" - for plugin in self.sys_plugins.all_plugins: - if reference != plugin.slug: - continue - try: - await plugin.check_trust() - except CodeNotaryError: - return True - return False - - @property - def issue(self) -> IssueType: - """Return a IssueType enum.""" - return IssueType.TRUST - - @property - def context(self) -> ContextType: - """Return a ContextType enum.""" - return ContextType.PLUGIN - - @property - def states(self) -> list[CoreState]: - """Return a list of valid states when this check can run.""" - return [CoreState.RUNNING, CoreState.STARTUP] diff --git a/supervisor/security.py b/supervisor/security.py deleted file mode 100644 index 0f0f1522a..000000000 --- a/supervisor/security.py +++ /dev/null @@ -1,88 +0,0 @@ -"""Fetch last versions from webserver.""" -import logging - -from .const import ( - ATTR_CONTENT_TRUST, - ATTR_FORCE_SECURITY, - ATTR_PWNED, - FILE_HASSIO_SECURITY, -) -from .coresys import CoreSys, CoreSysAttributes -from .exceptions import CodeNotaryError, CodeNotaryUntrusted, PwnedError -from .utils.codenotary import cas_validate -from .utils.common import FileConfiguration -from .utils.pwned import check_pwned_password -from .validate import SCHEMA_SECURITY_CONFIG - -_LOGGER: logging.Logger = logging.getLogger(__name__) - - -class Security(FileConfiguration, CoreSysAttributes): - """Handle Security properties.""" - - def __init__(self, coresys: CoreSys): - """Initialize updater.""" - super().__init__(FILE_HASSIO_SECURITY, SCHEMA_SECURITY_CONFIG) - self.coresys = coresys - - @property - def content_trust(self) -> bool: - """Return if content trust is enabled/disabled.""" - return self._data[ATTR_CONTENT_TRUST] - - @content_trust.setter - def content_trust(self, value: bool) -> None: - """Set content trust is enabled/disabled.""" - self._data[ATTR_CONTENT_TRUST] = value - - @property - def force(self) -> bool: - """Return if force security is enabled/disabled.""" - return self._data[ATTR_FORCE_SECURITY] - - @force.setter - def force(self, value: bool) -> None: - """Set force security is enabled/disabled.""" - self._data[ATTR_FORCE_SECURITY] = value - - @property - def pwned(self) -> bool: - """Return if pwned is enabled/disabled.""" - return self._data[ATTR_PWNED] - - @pwned.setter - def pwned(self, value: bool) -> None: - """Set pwned is enabled/disabled.""" - self._data[ATTR_PWNED] = value - - async def verify_content(self, signer: str, checksum: str) -> None: - """Verify content on CAS.""" - if not self.content_trust: - _LOGGER.warning("Disabled content-trust, skip validation") - return - - try: - await cas_validate(signer, checksum) - except CodeNotaryUntrusted: - raise - except CodeNotaryError: - if self.force: - raise - return - - async def verify_own_content(self, checksum: str) -> None: - """Verify content from HA org.""" - return await self.verify_content("notary@home-assistant.io", checksum) - - async def verify_secret(self, pwned_hash: str) -> None: - """Verify pwned state of a secret.""" - if not self.pwned: - _LOGGER.warning("Disabled pwned, skip validation") - return - - try: - await check_pwned_password(self.sys_websession, pwned_hash) - except PwnedError: - if self.force: - raise - return diff --git a/supervisor/security/__init__.py b/supervisor/security/__init__.py new file mode 100644 index 000000000..ebca777db --- /dev/null +++ b/supervisor/security/__init__.py @@ -0,0 +1 @@ +"""Security feature of Supervisor.""" diff --git a/supervisor/security/const.py b/supervisor/security/const.py new file mode 100644 index 000000000..40306ac68 --- /dev/null +++ b/supervisor/security/const.py @@ -0,0 +1,23 @@ +"""Security constants.""" +from enum import Enum + +import attr + + +class ContentTrustResult(str, Enum): + """Content trust result enum.""" + + PASS = "pass" + ERROR = "error" + FAILED = "failed" + UNTESTED = "untested" + + +@attr.s +class IntegrityResult: + """Result of a full integrity check.""" + + supervisor: ContentTrustResult = attr.ib(default=ContentTrustResult.UNTESTED) + core: ContentTrustResult = attr.ib(default=ContentTrustResult.UNTESTED) + plugins: dict[str, ContentTrustResult] = attr.ib(default={}) + addons: dict[str, ContentTrustResult] = attr.ib(default={}) diff --git a/supervisor/security/module.py b/supervisor/security/module.py new file mode 100644 index 000000000..32a9d2b49 --- /dev/null +++ b/supervisor/security/module.py @@ -0,0 +1,169 @@ +"""Fetch last versions from webserver.""" +from __future__ import annotations + +from datetime import timedelta +import logging + +from ..const import ( + ATTR_CONTENT_TRUST, + ATTR_FORCE_SECURITY, + ATTR_PWNED, + FILE_HASSIO_SECURITY, +) +from ..coresys import CoreSys, CoreSysAttributes +from ..exceptions import ( + CodeNotaryError, + CodeNotaryUntrusted, + PwnedError, + SecurityJobError, +) +from ..jobs.decorator import Job, JobCondition, JobExecutionLimit +from ..resolution.const import ContextType, IssueType +from ..utils.codenotary import cas_validate +from ..utils.common import FileConfiguration +from ..utils.pwned import check_pwned_password +from ..validate import SCHEMA_SECURITY_CONFIG +from .const import ContentTrustResult, IntegrityResult + +_LOGGER: logging.Logger = logging.getLogger(__name__) + + +class Security(FileConfiguration, CoreSysAttributes): + """Handle Security properties.""" + + def __init__(self, coresys: CoreSys): + """Initialize updater.""" + super().__init__(FILE_HASSIO_SECURITY, SCHEMA_SECURITY_CONFIG) + self.coresys = coresys + + @property + def content_trust(self) -> bool: + """Return if content trust is enabled/disabled.""" + return self._data[ATTR_CONTENT_TRUST] + + @content_trust.setter + def content_trust(self, value: bool) -> None: + """Set content trust is enabled/disabled.""" + self._data[ATTR_CONTENT_TRUST] = value + + @property + def force(self) -> bool: + """Return if force security is enabled/disabled.""" + return self._data[ATTR_FORCE_SECURITY] + + @force.setter + def force(self, value: bool) -> None: + """Set force security is enabled/disabled.""" + self._data[ATTR_FORCE_SECURITY] = value + + @property + def pwned(self) -> bool: + """Return if pwned is enabled/disabled.""" + return self._data[ATTR_PWNED] + + @pwned.setter + def pwned(self, value: bool) -> None: + """Set pwned is enabled/disabled.""" + self._data[ATTR_PWNED] = value + + async def verify_content(self, signer: str, checksum: str) -> None: + """Verify content on CAS.""" + if not self.content_trust: + _LOGGER.warning("Disabled content-trust, skip validation") + return + + try: + await cas_validate(signer, checksum) + except CodeNotaryUntrusted: + raise + except CodeNotaryError: + if self.force: + raise + return + + async def verify_own_content(self, checksum: str) -> None: + """Verify content from HA org.""" + return await self.verify_content("notary@home-assistant.io", checksum) + + async def verify_secret(self, pwned_hash: str) -> None: + """Verify pwned state of a secret.""" + if not self.pwned: + _LOGGER.warning("Disabled pwned, skip validation") + return + + try: + await check_pwned_password(self.sys_websession, pwned_hash) + except PwnedError: + if self.force: + raise + return + + @Job( + conditions=[JobCondition.INTERNET_SYSTEM], + on_condition=SecurityJobError, + limit=JobExecutionLimit.THROTTLE_WAIT, + throttle_period=timedelta(seconds=300), + ) + async def integrity_check(self) -> IntegrityResult: + """Run a full system integrity check of the platform. + + We only allow to install trusted content. + This is a out of the band manual check. + """ + result: IntegrityResult = IntegrityResult() + if not self.content_trust: + _LOGGER.warning( + "Skipping integrity check, content_trust is globally disabled" + ) + return result + + # Supervisor + try: + await self.sys_supervisor.check_trust() + result.supervisor = ContentTrustResult.PASS + except CodeNotaryUntrusted: + result.supervisor = ContentTrustResult.ERROR + self.sys_resolution.create_issue(IssueType.TRUST, ContextType.SUPERVISOR) + except CodeNotaryError: + result.supervisor = ContentTrustResult.FAILED + + # Core + try: + await self.sys_homeassistant.core.check_trust() + result.core = ContentTrustResult.PASS + except CodeNotaryUntrusted: + result.core = ContentTrustResult.ERROR + self.sys_resolution.create_issue(IssueType.TRUST, ContextType.CORE) + except CodeNotaryError: + result.core = ContentTrustResult.FAILED + + # Plugins + for plugin in self.sys_plugins.all_plugins: + try: + await plugin.check_trust() + result.plugins[plugin.slug] = ContentTrustResult.PASS + except CodeNotaryUntrusted: + result.plugins[plugin.slug] = ContentTrustResult.ERROR + self.sys_resolution.create_issue( + IssueType.TRUST, ContextType.PLUGIN, reference=plugin.slug + ) + except CodeNotaryError: + result.plugins[plugin.slug] = ContentTrustResult.FAILED + + # Add-ons + for addon in self.sys_addons.installed: + if not addon.signed: + result.addons[addon.slug] = ContentTrustResult.UNTESTED + continue + try: + await addon.check_trust() + result.addons[addon.slug] = ContentTrustResult.PASS + except CodeNotaryUntrusted: + result.addons[addon.slug] = ContentTrustResult.ERROR + self.sys_resolution.create_issue( + IssueType.TRUST, ContextType.ADDON, reference=addon.slug + ) + except CodeNotaryError: + result.addons[addon.slug] = ContentTrustResult.FAILED + + return result diff --git a/supervisor/updater.py b/supervisor/updater.py index 82135753e..e383d1d50 100644 --- a/supervisor/updater.py +++ b/supervisor/updater.py @@ -9,8 +9,6 @@ from typing import Optional import aiohttp from awesomeversion import AwesomeVersion -from supervisor.jobs.const import JobExecutionLimit - from .const import ( ATTR_AUDIO, ATTR_CHANNEL, @@ -34,7 +32,7 @@ from .exceptions import ( UpdaterError, UpdaterJobError, ) -from .jobs.decorator import Job, JobCondition +from .jobs.decorator import Job, JobCondition, JobExecutionLimit from .utils.codenotary import calc_checksum from .utils.common import FileConfiguration from .validate import SCHEMA_UPDATER_CONFIG diff --git a/tests/api/test_security.py b/tests/api/test_security.py index adfac6366..e0bf9344f 100644 --- a/tests/api/test_security.py +++ b/tests/api/test_security.py @@ -33,3 +33,15 @@ async def test_api_security_options_pwned(api_client, coresys: CoreSys): await api_client.post("/security/options", json={"pwned": False}) assert not coresys.security.pwned + + +@pytest.mark.asyncio +async def test_api_integrity_check(api_client, coresys: CoreSys): + """Test security integrity check.""" + coresys.security.content_trust = False + + resp = await api_client.post("/security/integrity") + result = await resp.json() + + assert result["data"]["core"] == "untested" + assert result["data"]["supervisor"] == "untested" diff --git a/tests/resolution/check/test_check_core_trust.py b/tests/resolution/check/test_check_core_trust.py deleted file mode 100644 index 7667f4d1e..000000000 --- a/tests/resolution/check/test_check_core_trust.py +++ /dev/null @@ -1,93 +0,0 @@ -"""Test Check Core trust.""" -# pylint: disable=import-error,protected-access -from unittest.mock import AsyncMock, patch - -from supervisor.const import CoreState -from supervisor.coresys import CoreSys -from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted -from supervisor.resolution.checks.core_trust import CheckCoreTrust -from supervisor.resolution.const import IssueType, UnhealthyReason - - -async def test_base(coresys: CoreSys): - """Test check basics.""" - core_trust = CheckCoreTrust(coresys) - assert core_trust.slug == "core_trust" - assert core_trust.enabled - - -async def test_check(coresys: CoreSys): - """Test check.""" - core_trust = CheckCoreTrust(coresys) - coresys.core.state = CoreState.RUNNING - - assert len(coresys.resolution.issues) == 0 - - coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryError) - await core_trust.run_check() - assert coresys.homeassistant.core.check_trust.called - - coresys.homeassistant.core.check_trust = AsyncMock(return_value=None) - await core_trust.run_check() - assert coresys.homeassistant.core.check_trust.called - - assert len(coresys.resolution.issues) == 0 - - coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) - await core_trust.run_check() - assert coresys.homeassistant.core.check_trust.called - - assert len(coresys.resolution.issues) == 1 - assert coresys.resolution.issues[-1].type == IssueType.TRUST - - assert UnhealthyReason.UNTRUSTED in coresys.resolution.unhealthy - - -async def test_approve(coresys: CoreSys): - """Test check.""" - core_trust = CheckCoreTrust(coresys) - coresys.core.state = CoreState.RUNNING - - coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) - assert await core_trust.approve_check() - - coresys.homeassistant.core.check_trust = AsyncMock(return_value=None) - assert not await core_trust.approve_check() - - -async def test_with_global_disable(coresys: CoreSys, caplog): - """Test when pwned is globally disabled.""" - coresys.security.content_trust = False - core_trust = CheckCoreTrust(coresys) - coresys.core.state = CoreState.RUNNING - - assert len(coresys.resolution.issues) == 0 - coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted) - await core_trust.run_check() - assert not coresys.security.verify_own_content.called - assert "Skipping core_trust, content_trust is globally disabled" in caplog.text - - -async def test_did_run(coresys: CoreSys): - """Test that the check ran as expected.""" - core_trust = CheckCoreTrust(coresys) - should_run = core_trust.states - should_not_run = [state for state in CoreState if state not in should_run] - assert len(should_run) != 0 - assert len(should_not_run) != 0 - - with patch( - "supervisor.resolution.checks.core_trust.CheckCoreTrust.run_check", - return_value=None, - ) as check: - for state in should_run: - coresys.core.state = state - await core_trust() - check.assert_called_once() - check.reset_mock() - - for state in should_not_run: - coresys.core.state = state - await core_trust() - check.assert_not_called() - check.reset_mock() diff --git a/tests/resolution/check/test_check_plugin_trust.py b/tests/resolution/check/test_check_plugin_trust.py deleted file mode 100644 index 1231f082d..000000000 --- a/tests/resolution/check/test_check_plugin_trust.py +++ /dev/null @@ -1,120 +0,0 @@ -"""Test Check Plugin trust.""" -# pylint: disable=import-error,protected-access -from unittest.mock import AsyncMock, patch - -from supervisor.const import CoreState -from supervisor.coresys import CoreSys -from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted -from supervisor.resolution.checks.plugin_trust import CheckPluginTrust -from supervisor.resolution.const import IssueType, UnhealthyReason - - -async def test_base(coresys: CoreSys): - """Test check basics.""" - plugin_trust = CheckPluginTrust(coresys) - assert plugin_trust.slug == "plugin_trust" - assert plugin_trust.enabled - - -async def test_check(coresys: CoreSys): - """Test check.""" - plugin_trust = CheckPluginTrust(coresys) - coresys.core.state = CoreState.RUNNING - - assert len(coresys.resolution.issues) == 0 - - coresys.plugins.audio.check_trust = AsyncMock(side_effect=CodeNotaryError) - coresys.plugins.dns.check_trust = AsyncMock(side_effect=CodeNotaryError) - coresys.plugins.cli.check_trust = AsyncMock(side_effect=CodeNotaryError) - coresys.plugins.multicast.check_trust = AsyncMock(side_effect=CodeNotaryError) - coresys.plugins.observer.check_trust = AsyncMock(side_effect=CodeNotaryError) - - await plugin_trust.run_check() - assert coresys.plugins.audio.check_trust.called - assert coresys.plugins.dns.check_trust.called - assert coresys.plugins.cli.check_trust.called - assert coresys.plugins.multicast.check_trust.called - assert coresys.plugins.observer.check_trust.called - - coresys.plugins.audio.check_trust = AsyncMock(return_value=None) - coresys.plugins.dns.check_trust = AsyncMock(return_value=None) - coresys.plugins.cli.check_trust = AsyncMock(return_value=None) - coresys.plugins.multicast.check_trust = AsyncMock(return_value=None) - coresys.plugins.observer.check_trust = AsyncMock(return_value=None) - - await plugin_trust.run_check() - assert coresys.plugins.audio.check_trust.called - assert coresys.plugins.dns.check_trust.called - assert coresys.plugins.cli.check_trust.called - assert coresys.plugins.multicast.check_trust.called - assert coresys.plugins.observer.check_trust.called - - assert len(coresys.resolution.issues) == 0 - - coresys.plugins.audio.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) - coresys.plugins.dns.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) - coresys.plugins.cli.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) - coresys.plugins.multicast.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) - coresys.plugins.observer.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) - - await plugin_trust.run_check() - assert coresys.plugins.audio.check_trust.called - assert coresys.plugins.dns.check_trust.called - assert coresys.plugins.cli.check_trust.called - assert coresys.plugins.multicast.check_trust.called - assert coresys.plugins.observer.check_trust.called - - assert len(coresys.resolution.issues) == 5 - assert coresys.resolution.issues[-1].type == IssueType.TRUST - - assert UnhealthyReason.UNTRUSTED in coresys.resolution.unhealthy - - -async def test_approve(coresys: CoreSys): - """Test check.""" - plugin_trust = CheckPluginTrust(coresys) - coresys.core.state = CoreState.RUNNING - - coresys.plugins.audio.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) - assert await plugin_trust.approve_check(reference="audio") - - coresys.plugins.audio.check_trust = AsyncMock(return_value=None) - assert not await plugin_trust.approve_check(reference="audio") - - -async def test_with_global_disable(coresys: CoreSys, caplog): - """Test when pwned is globally disabled.""" - coresys.security.content_trust = False - plugin_trust = CheckPluginTrust(coresys) - coresys.core.state = CoreState.RUNNING - - assert len(coresys.resolution.issues) == 0 - coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted) - await plugin_trust.run_check() - assert not coresys.security.verify_own_content.called - assert "Skipping plugin_trust, content_trust is globally disabled" in caplog.text - - -async def test_did_run(coresys: CoreSys): - """Test that the check ran as expected.""" - plugin_trust = CheckPluginTrust(coresys) - should_run = plugin_trust.states - should_not_run = [state for state in CoreState if state not in should_run] - assert len(should_run) != 0 - assert len(should_not_run) != 0 - - with patch( - "supervisor.resolution.checks.plugin_trust.CheckPluginTrust.run_check", - return_value=None, - ) as check: - for state in should_run: - coresys.core.state = state - await plugin_trust() - check.assert_called_once() - check.reset_mock() - - for state in should_not_run: - coresys.core.state = state - await plugin_trust() - check.assert_not_called() - check.reset_mock() diff --git a/tests/security/__init__.py b/tests/security/__init__.py new file mode 100644 index 000000000..f90200e72 --- /dev/null +++ b/tests/security/__init__.py @@ -0,0 +1 @@ +"""Test for Security.""" diff --git a/tests/security/test_module.py b/tests/security/test_module.py new file mode 100644 index 000000000..89d45f2ed --- /dev/null +++ b/tests/security/test_module.py @@ -0,0 +1,124 @@ +"""Testing handling with Security.""" +from unittest.mock import AsyncMock, patch + +import pytest + +from supervisor.coresys import CoreSys +from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted +from supervisor.security.const import ContentTrustResult + + +async def test_content_trust(coresys: CoreSys): + """Test Content-Trust.""" + + with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate: + await coresys.security.verify_content("test@mail.com", "ffffffffffffff") + assert cas_validate.called + cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff") + + with patch( + "supervisor.security.module.cas_validate", AsyncMock() + ) as cas_validate: + await coresys.security.verify_own_content("ffffffffffffff") + assert cas_validate.called + cas_validate.assert_called_once_with( + "notary@home-assistant.io", "ffffffffffffff" + ) + + +async def test_disabled_content_trust(coresys: CoreSys): + """Test Content-Trust.""" + coresys.security.content_trust = False + + with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate: + await coresys.security.verify_content("test@mail.com", "ffffffffffffff") + assert not cas_validate.called + + with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate: + await coresys.security.verify_own_content("ffffffffffffff") + assert not cas_validate.called + + +async def test_force_content_trust(coresys: CoreSys): + """Force Content-Trust tests.""" + + with patch( + "supervisor.security.module.cas_validate", + AsyncMock(side_effect=CodeNotaryError), + ) as cas_validate: + await coresys.security.verify_content("test@mail.com", "ffffffffffffff") + assert cas_validate.called + cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff") + + coresys.security.force = True + + with patch( + "supervisor.security.module.cas_validate", + AsyncMock(side_effect=CodeNotaryError), + ) as cas_validate: + with pytest.raises(CodeNotaryError): + await coresys.security.verify_content("test@mail.com", "ffffffffffffff") + + +async def test_integrity_check_disabled(coresys: CoreSys): + """Test integrity check with disabled content trust.""" + coresys.security.content_trust = False + + result = await coresys.security.integrity_check.__wrapped__(coresys.security) + + assert result.core == ContentTrustResult.UNTESTED + assert result.supervisor == ContentTrustResult.UNTESTED + + +async def test_integrity_check(coresys: CoreSys, install_addon_ssh): + """Test integrity check with content trust.""" + coresys.homeassistant.core.check_trust = AsyncMock() + coresys.supervisor.check_trust = AsyncMock() + install_addon_ssh.check_trust = AsyncMock() + install_addon_ssh.data["codenotary"] = "test@example.com" + + result = await coresys.security.integrity_check.__wrapped__(coresys.security) + + assert result.core == ContentTrustResult.PASS + assert result.supervisor == ContentTrustResult.PASS + assert result.addons[install_addon_ssh.slug] == ContentTrustResult.PASS + + +async def test_integrity_check_error(coresys: CoreSys, install_addon_ssh): + """Test integrity check with content trust issues.""" + coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) + coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) + install_addon_ssh.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) + install_addon_ssh.data["codenotary"] = "test@example.com" + + result = await coresys.security.integrity_check.__wrapped__(coresys.security) + + assert result.core == ContentTrustResult.ERROR + assert result.supervisor == ContentTrustResult.ERROR + assert result.addons[install_addon_ssh.slug] == ContentTrustResult.ERROR + + +async def test_integrity_check_failed(coresys: CoreSys, install_addon_ssh): + """Test integrity check with content trust failed.""" + coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryError) + coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryError) + install_addon_ssh.check_trust = AsyncMock(side_effect=CodeNotaryError) + install_addon_ssh.data["codenotary"] = "test@example.com" + + result = await coresys.security.integrity_check.__wrapped__(coresys.security) + + assert result.core == ContentTrustResult.FAILED + assert result.supervisor == ContentTrustResult.FAILED + assert result.addons[install_addon_ssh.slug] == ContentTrustResult.FAILED + + +async def test_integrity_check_addon(coresys: CoreSys, install_addon_ssh): + """Test integrity check with content trust but no signed add-ons.""" + coresys.homeassistant.core.check_trust = AsyncMock() + coresys.supervisor.check_trust = AsyncMock() + + result = await coresys.security.integrity_check.__wrapped__(coresys.security) + + assert result.core == ContentTrustResult.PASS + assert result.supervisor == ContentTrustResult.PASS + assert result.addons[install_addon_ssh.slug] == ContentTrustResult.UNTESTED diff --git a/tests/test_security.py b/tests/test_security.py deleted file mode 100644 index 7297dd695..000000000 --- a/tests/test_security.py +++ /dev/null @@ -1,55 +0,0 @@ -"""Testing handling with Security.""" -from unittest.mock import AsyncMock, patch - -import pytest - -from supervisor.coresys import CoreSys -from supervisor.exceptions import CodeNotaryError - - -async def test_content_trust(coresys: CoreSys): - """Test Content-Trust.""" - - with patch("supervisor.security.cas_validate", AsyncMock()) as cas_validate: - await coresys.security.verify_content("test@mail.com", "ffffffffffffff") - assert cas_validate.called - cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff") - - with patch("supervisor.security.cas_validate", AsyncMock()) as cas_validate: - await coresys.security.verify_own_content("ffffffffffffff") - assert cas_validate.called - cas_validate.assert_called_once_with( - "notary@home-assistant.io", "ffffffffffffff" - ) - - -async def test_disabled_content_trust(coresys: CoreSys): - """Test Content-Trust.""" - coresys.security.content_trust = False - - with patch("supervisor.security.cas_validate", AsyncMock()) as cas_validate: - await coresys.security.verify_content("test@mail.com", "ffffffffffffff") - assert not cas_validate.called - - with patch("supervisor.security.cas_validate", AsyncMock()) as cas_validate: - await coresys.security.verify_own_content("ffffffffffffff") - assert not cas_validate.called - - -async def test_force_content_trust(coresys: CoreSys): - """Force Content-Trust tests.""" - - with patch( - "supervisor.security.cas_validate", AsyncMock(side_effect=CodeNotaryError) - ) as cas_validate: - await coresys.security.verify_content("test@mail.com", "ffffffffffffff") - assert cas_validate.called - cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff") - - coresys.security.force = True - - with patch( - "supervisor.security.cas_validate", AsyncMock(side_effect=CodeNotaryError) - ) as cas_validate: - with pytest.raises(CodeNotaryError): - await coresys.security.verify_content("test@mail.com", "ffffffffffffff")