Add integrity check (#3608)

* Add integrity check

* add API test

* add tests

* tests for add-ons
This commit is contained in:
Pascal Vizeli 2022-04-30 10:14:43 +02:00 committed by GitHub
parent 1c75b515e0
commit ca1f764080
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 361 additions and 485 deletions

View File

@ -49,6 +49,7 @@ setup(
"supervisor.resolution.evaluations",
"supervisor.resolution.fixups",
"supervisor.resolution",
"supervisor.security",
"supervisor.services.modules",
"supervisor.services",
"supervisor.store",

View File

@ -890,3 +890,10 @@ class Addon(AddonModel):
return await self.start()
_LOGGER.info("Finished restore for add-on %s", self.slug)
def check_trust(self) -> Awaitable[None]:
"""Calculate Addon docker content trust.
Return Coroutine.
"""
return self.instance.check_trust()

View File

@ -159,6 +159,7 @@ class RestAPI(CoreSysAttributes):
[
web.get("/security/info", api_security.info),
web.post("/security/options", api_security.options),
web.post("/security/integrity", api_security.integrity_check),
]
)

View File

@ -1,8 +1,10 @@
"""Init file for Supervisor Security RESTful API."""
import asyncio
import logging
from typing import Any
from aiohttp import web
import attr
import voluptuous as vol
from ..const import ATTR_CONTENT_TRUST, ATTR_FORCE_SECURITY, ATTR_PWNED
@ -48,3 +50,9 @@ class APISecurity(CoreSysAttributes):
self.sys_security.save_data()
await self.sys_resolution.evaluate.evaluate_system()
@api_process
async def integrity_check(self, request: web.Request) -> dict[str, Any]:
"""Run backend integrity check."""
result = await asyncio.shield(self.sys_security.integrity_check())
return attr.asdict(result)

View File

@ -46,7 +46,7 @@ from .misc.tasks import Tasks
from .os.manager import OSManager
from .plugins.manager import PluginManager
from .resolution.module import ResolutionManager
from .security import Security
from .security.module import Security
from .services import ServiceManager
from .store import StoreManager
from .supervisor import Supervisor

View File

@ -36,7 +36,7 @@ if TYPE_CHECKING:
from .os.manager import OSManager
from .plugins.manager import PluginManager
from .resolution.module import ResolutionManager
from .security import Security
from .security.module import Security
from .services import ServiceManager
from .store import StoreManager
from .supervisor import Supervisor

View File

@ -471,3 +471,14 @@ class BackupError(HassioError):
class HomeAssistantBackupError(BackupError, HomeAssistantError):
"""Raise if an error during Home Assistant Core backup is happening."""
# Security
class SecurityError(HassioError):
"""Raise if an error during security checks are happening."""
class SecurityJobError(SecurityError, JobException):
"""Raise on Security job error."""

View File

@ -1,59 +0,0 @@
"""Helpers to check core trust."""
import logging
from typing import Optional
from ...const import CoreState
from ...coresys import CoreSys
from ...exceptions import CodeNotaryError, CodeNotaryUntrusted
from ..const import ContextType, IssueType, UnhealthyReason
from .base import CheckBase
_LOGGER: logging.Logger = logging.getLogger(__name__)
def setup(coresys: CoreSys) -> CheckBase:
"""Check setup function."""
return CheckCoreTrust(coresys)
class CheckCoreTrust(CheckBase):
"""CheckCoreTrust class for check."""
async def run_check(self) -> None:
"""Run check if not affected by issue."""
if not self.sys_security.content_trust:
_LOGGER.warning(
"Skipping %s, content_trust is globally disabled", self.slug
)
return
try:
await self.sys_homeassistant.core.check_trust()
except CodeNotaryUntrusted:
self.sys_resolution.unhealthy = UnhealthyReason.UNTRUSTED
self.sys_resolution.create_issue(IssueType.TRUST, ContextType.CORE)
except CodeNotaryError:
pass
async def approve_check(self, reference: Optional[str] = None) -> bool:
"""Approve check if it is affected by issue."""
try:
await self.sys_homeassistant.core.check_trust()
except CodeNotaryError:
return True
return False
@property
def issue(self) -> IssueType:
"""Return a IssueType enum."""
return IssueType.TRUST
@property
def context(self) -> ContextType:
"""Return a ContextType enum."""
return ContextType.CORE
@property
def states(self) -> list[CoreState]:
"""Return a list of valid states when this check can run."""
return [CoreState.RUNNING, CoreState.STARTUP]

View File

@ -1,65 +0,0 @@
"""Helpers to check plugin trust."""
import logging
from typing import Optional
from ...const import CoreState
from ...coresys import CoreSys
from ...exceptions import CodeNotaryError, CodeNotaryUntrusted
from ..const import ContextType, IssueType, UnhealthyReason
from .base import CheckBase
_LOGGER: logging.Logger = logging.getLogger(__name__)
def setup(coresys: CoreSys) -> CheckBase:
"""Check setup function."""
return CheckPluginTrust(coresys)
class CheckPluginTrust(CheckBase):
"""CheckPluginTrust class for check."""
async def run_check(self) -> None:
"""Run check if not affected by issue."""
if not self.sys_security.content_trust:
_LOGGER.warning(
"Skipping %s, content_trust is globally disabled", self.slug
)
return
for plugin in self.sys_plugins.all_plugins:
try:
await plugin.check_trust()
except CodeNotaryUntrusted:
self.sys_resolution.unhealthy = UnhealthyReason.UNTRUSTED
self.sys_resolution.create_issue(
IssueType.TRUST, ContextType.PLUGIN, reference=plugin.slug
)
except CodeNotaryError:
pass
async def approve_check(self, reference: Optional[str] = None) -> bool:
"""Approve check if it is affected by issue."""
for plugin in self.sys_plugins.all_plugins:
if reference != plugin.slug:
continue
try:
await plugin.check_trust()
except CodeNotaryError:
return True
return False
@property
def issue(self) -> IssueType:
"""Return a IssueType enum."""
return IssueType.TRUST
@property
def context(self) -> ContextType:
"""Return a ContextType enum."""
return ContextType.PLUGIN
@property
def states(self) -> list[CoreState]:
"""Return a list of valid states when this check can run."""
return [CoreState.RUNNING, CoreState.STARTUP]

View File

@ -1,88 +0,0 @@
"""Fetch last versions from webserver."""
import logging
from .const import (
ATTR_CONTENT_TRUST,
ATTR_FORCE_SECURITY,
ATTR_PWNED,
FILE_HASSIO_SECURITY,
)
from .coresys import CoreSys, CoreSysAttributes
from .exceptions import CodeNotaryError, CodeNotaryUntrusted, PwnedError
from .utils.codenotary import cas_validate
from .utils.common import FileConfiguration
from .utils.pwned import check_pwned_password
from .validate import SCHEMA_SECURITY_CONFIG
_LOGGER: logging.Logger = logging.getLogger(__name__)
class Security(FileConfiguration, CoreSysAttributes):
"""Handle Security properties."""
def __init__(self, coresys: CoreSys):
"""Initialize updater."""
super().__init__(FILE_HASSIO_SECURITY, SCHEMA_SECURITY_CONFIG)
self.coresys = coresys
@property
def content_trust(self) -> bool:
"""Return if content trust is enabled/disabled."""
return self._data[ATTR_CONTENT_TRUST]
@content_trust.setter
def content_trust(self, value: bool) -> None:
"""Set content trust is enabled/disabled."""
self._data[ATTR_CONTENT_TRUST] = value
@property
def force(self) -> bool:
"""Return if force security is enabled/disabled."""
return self._data[ATTR_FORCE_SECURITY]
@force.setter
def force(self, value: bool) -> None:
"""Set force security is enabled/disabled."""
self._data[ATTR_FORCE_SECURITY] = value
@property
def pwned(self) -> bool:
"""Return if pwned is enabled/disabled."""
return self._data[ATTR_PWNED]
@pwned.setter
def pwned(self, value: bool) -> None:
"""Set pwned is enabled/disabled."""
self._data[ATTR_PWNED] = value
async def verify_content(self, signer: str, checksum: str) -> None:
"""Verify content on CAS."""
if not self.content_trust:
_LOGGER.warning("Disabled content-trust, skip validation")
return
try:
await cas_validate(signer, checksum)
except CodeNotaryUntrusted:
raise
except CodeNotaryError:
if self.force:
raise
return
async def verify_own_content(self, checksum: str) -> None:
"""Verify content from HA org."""
return await self.verify_content("notary@home-assistant.io", checksum)
async def verify_secret(self, pwned_hash: str) -> None:
"""Verify pwned state of a secret."""
if not self.pwned:
_LOGGER.warning("Disabled pwned, skip validation")
return
try:
await check_pwned_password(self.sys_websession, pwned_hash)
except PwnedError:
if self.force:
raise
return

View File

@ -0,0 +1 @@
"""Security feature of Supervisor."""

View File

@ -0,0 +1,23 @@
"""Security constants."""
from enum import Enum
import attr
class ContentTrustResult(str, Enum):
"""Content trust result enum."""
PASS = "pass"
ERROR = "error"
FAILED = "failed"
UNTESTED = "untested"
@attr.s
class IntegrityResult:
"""Result of a full integrity check."""
supervisor: ContentTrustResult = attr.ib(default=ContentTrustResult.UNTESTED)
core: ContentTrustResult = attr.ib(default=ContentTrustResult.UNTESTED)
plugins: dict[str, ContentTrustResult] = attr.ib(default={})
addons: dict[str, ContentTrustResult] = attr.ib(default={})

View File

@ -0,0 +1,169 @@
"""Fetch last versions from webserver."""
from __future__ import annotations
from datetime import timedelta
import logging
from ..const import (
ATTR_CONTENT_TRUST,
ATTR_FORCE_SECURITY,
ATTR_PWNED,
FILE_HASSIO_SECURITY,
)
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import (
CodeNotaryError,
CodeNotaryUntrusted,
PwnedError,
SecurityJobError,
)
from ..jobs.decorator import Job, JobCondition, JobExecutionLimit
from ..resolution.const import ContextType, IssueType
from ..utils.codenotary import cas_validate
from ..utils.common import FileConfiguration
from ..utils.pwned import check_pwned_password
from ..validate import SCHEMA_SECURITY_CONFIG
from .const import ContentTrustResult, IntegrityResult
_LOGGER: logging.Logger = logging.getLogger(__name__)
class Security(FileConfiguration, CoreSysAttributes):
"""Handle Security properties."""
def __init__(self, coresys: CoreSys):
"""Initialize updater."""
super().__init__(FILE_HASSIO_SECURITY, SCHEMA_SECURITY_CONFIG)
self.coresys = coresys
@property
def content_trust(self) -> bool:
"""Return if content trust is enabled/disabled."""
return self._data[ATTR_CONTENT_TRUST]
@content_trust.setter
def content_trust(self, value: bool) -> None:
"""Set content trust is enabled/disabled."""
self._data[ATTR_CONTENT_TRUST] = value
@property
def force(self) -> bool:
"""Return if force security is enabled/disabled."""
return self._data[ATTR_FORCE_SECURITY]
@force.setter
def force(self, value: bool) -> None:
"""Set force security is enabled/disabled."""
self._data[ATTR_FORCE_SECURITY] = value
@property
def pwned(self) -> bool:
"""Return if pwned is enabled/disabled."""
return self._data[ATTR_PWNED]
@pwned.setter
def pwned(self, value: bool) -> None:
"""Set pwned is enabled/disabled."""
self._data[ATTR_PWNED] = value
async def verify_content(self, signer: str, checksum: str) -> None:
"""Verify content on CAS."""
if not self.content_trust:
_LOGGER.warning("Disabled content-trust, skip validation")
return
try:
await cas_validate(signer, checksum)
except CodeNotaryUntrusted:
raise
except CodeNotaryError:
if self.force:
raise
return
async def verify_own_content(self, checksum: str) -> None:
"""Verify content from HA org."""
return await self.verify_content("notary@home-assistant.io", checksum)
async def verify_secret(self, pwned_hash: str) -> None:
"""Verify pwned state of a secret."""
if not self.pwned:
_LOGGER.warning("Disabled pwned, skip validation")
return
try:
await check_pwned_password(self.sys_websession, pwned_hash)
except PwnedError:
if self.force:
raise
return
@Job(
conditions=[JobCondition.INTERNET_SYSTEM],
on_condition=SecurityJobError,
limit=JobExecutionLimit.THROTTLE_WAIT,
throttle_period=timedelta(seconds=300),
)
async def integrity_check(self) -> IntegrityResult:
"""Run a full system integrity check of the platform.
We only allow to install trusted content.
This is a out of the band manual check.
"""
result: IntegrityResult = IntegrityResult()
if not self.content_trust:
_LOGGER.warning(
"Skipping integrity check, content_trust is globally disabled"
)
return result
# Supervisor
try:
await self.sys_supervisor.check_trust()
result.supervisor = ContentTrustResult.PASS
except CodeNotaryUntrusted:
result.supervisor = ContentTrustResult.ERROR
self.sys_resolution.create_issue(IssueType.TRUST, ContextType.SUPERVISOR)
except CodeNotaryError:
result.supervisor = ContentTrustResult.FAILED
# Core
try:
await self.sys_homeassistant.core.check_trust()
result.core = ContentTrustResult.PASS
except CodeNotaryUntrusted:
result.core = ContentTrustResult.ERROR
self.sys_resolution.create_issue(IssueType.TRUST, ContextType.CORE)
except CodeNotaryError:
result.core = ContentTrustResult.FAILED
# Plugins
for plugin in self.sys_plugins.all_plugins:
try:
await plugin.check_trust()
result.plugins[plugin.slug] = ContentTrustResult.PASS
except CodeNotaryUntrusted:
result.plugins[plugin.slug] = ContentTrustResult.ERROR
self.sys_resolution.create_issue(
IssueType.TRUST, ContextType.PLUGIN, reference=plugin.slug
)
except CodeNotaryError:
result.plugins[plugin.slug] = ContentTrustResult.FAILED
# Add-ons
for addon in self.sys_addons.installed:
if not addon.signed:
result.addons[addon.slug] = ContentTrustResult.UNTESTED
continue
try:
await addon.check_trust()
result.addons[addon.slug] = ContentTrustResult.PASS
except CodeNotaryUntrusted:
result.addons[addon.slug] = ContentTrustResult.ERROR
self.sys_resolution.create_issue(
IssueType.TRUST, ContextType.ADDON, reference=addon.slug
)
except CodeNotaryError:
result.addons[addon.slug] = ContentTrustResult.FAILED
return result

View File

@ -9,8 +9,6 @@ from typing import Optional
import aiohttp
from awesomeversion import AwesomeVersion
from supervisor.jobs.const import JobExecutionLimit
from .const import (
ATTR_AUDIO,
ATTR_CHANNEL,
@ -34,7 +32,7 @@ from .exceptions import (
UpdaterError,
UpdaterJobError,
)
from .jobs.decorator import Job, JobCondition
from .jobs.decorator import Job, JobCondition, JobExecutionLimit
from .utils.codenotary import calc_checksum
from .utils.common import FileConfiguration
from .validate import SCHEMA_UPDATER_CONFIG

View File

@ -33,3 +33,15 @@ async def test_api_security_options_pwned(api_client, coresys: CoreSys):
await api_client.post("/security/options", json={"pwned": False})
assert not coresys.security.pwned
@pytest.mark.asyncio
async def test_api_integrity_check(api_client, coresys: CoreSys):
"""Test security integrity check."""
coresys.security.content_trust = False
resp = await api_client.post("/security/integrity")
result = await resp.json()
assert result["data"]["core"] == "untested"
assert result["data"]["supervisor"] == "untested"

View File

@ -1,93 +0,0 @@
"""Test Check Core trust."""
# pylint: disable=import-error,protected-access
from unittest.mock import AsyncMock, patch
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted
from supervisor.resolution.checks.core_trust import CheckCoreTrust
from supervisor.resolution.const import IssueType, UnhealthyReason
async def test_base(coresys: CoreSys):
"""Test check basics."""
core_trust = CheckCoreTrust(coresys)
assert core_trust.slug == "core_trust"
assert core_trust.enabled
async def test_check(coresys: CoreSys):
"""Test check."""
core_trust = CheckCoreTrust(coresys)
coresys.core.state = CoreState.RUNNING
assert len(coresys.resolution.issues) == 0
coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryError)
await core_trust.run_check()
assert coresys.homeassistant.core.check_trust.called
coresys.homeassistant.core.check_trust = AsyncMock(return_value=None)
await core_trust.run_check()
assert coresys.homeassistant.core.check_trust.called
assert len(coresys.resolution.issues) == 0
coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
await core_trust.run_check()
assert coresys.homeassistant.core.check_trust.called
assert len(coresys.resolution.issues) == 1
assert coresys.resolution.issues[-1].type == IssueType.TRUST
assert UnhealthyReason.UNTRUSTED in coresys.resolution.unhealthy
async def test_approve(coresys: CoreSys):
"""Test check."""
core_trust = CheckCoreTrust(coresys)
coresys.core.state = CoreState.RUNNING
coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
assert await core_trust.approve_check()
coresys.homeassistant.core.check_trust = AsyncMock(return_value=None)
assert not await core_trust.approve_check()
async def test_with_global_disable(coresys: CoreSys, caplog):
"""Test when pwned is globally disabled."""
coresys.security.content_trust = False
core_trust = CheckCoreTrust(coresys)
coresys.core.state = CoreState.RUNNING
assert len(coresys.resolution.issues) == 0
coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted)
await core_trust.run_check()
assert not coresys.security.verify_own_content.called
assert "Skipping core_trust, content_trust is globally disabled" in caplog.text
async def test_did_run(coresys: CoreSys):
"""Test that the check ran as expected."""
core_trust = CheckCoreTrust(coresys)
should_run = core_trust.states
should_not_run = [state for state in CoreState if state not in should_run]
assert len(should_run) != 0
assert len(should_not_run) != 0
with patch(
"supervisor.resolution.checks.core_trust.CheckCoreTrust.run_check",
return_value=None,
) as check:
for state in should_run:
coresys.core.state = state
await core_trust()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
coresys.core.state = state
await core_trust()
check.assert_not_called()
check.reset_mock()

View File

@ -1,120 +0,0 @@
"""Test Check Plugin trust."""
# pylint: disable=import-error,protected-access
from unittest.mock import AsyncMock, patch
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted
from supervisor.resolution.checks.plugin_trust import CheckPluginTrust
from supervisor.resolution.const import IssueType, UnhealthyReason
async def test_base(coresys: CoreSys):
"""Test check basics."""
plugin_trust = CheckPluginTrust(coresys)
assert plugin_trust.slug == "plugin_trust"
assert plugin_trust.enabled
async def test_check(coresys: CoreSys):
"""Test check."""
plugin_trust = CheckPluginTrust(coresys)
coresys.core.state = CoreState.RUNNING
assert len(coresys.resolution.issues) == 0
coresys.plugins.audio.check_trust = AsyncMock(side_effect=CodeNotaryError)
coresys.plugins.dns.check_trust = AsyncMock(side_effect=CodeNotaryError)
coresys.plugins.cli.check_trust = AsyncMock(side_effect=CodeNotaryError)
coresys.plugins.multicast.check_trust = AsyncMock(side_effect=CodeNotaryError)
coresys.plugins.observer.check_trust = AsyncMock(side_effect=CodeNotaryError)
await plugin_trust.run_check()
assert coresys.plugins.audio.check_trust.called
assert coresys.plugins.dns.check_trust.called
assert coresys.plugins.cli.check_trust.called
assert coresys.plugins.multicast.check_trust.called
assert coresys.plugins.observer.check_trust.called
coresys.plugins.audio.check_trust = AsyncMock(return_value=None)
coresys.plugins.dns.check_trust = AsyncMock(return_value=None)
coresys.plugins.cli.check_trust = AsyncMock(return_value=None)
coresys.plugins.multicast.check_trust = AsyncMock(return_value=None)
coresys.plugins.observer.check_trust = AsyncMock(return_value=None)
await plugin_trust.run_check()
assert coresys.plugins.audio.check_trust.called
assert coresys.plugins.dns.check_trust.called
assert coresys.plugins.cli.check_trust.called
assert coresys.plugins.multicast.check_trust.called
assert coresys.plugins.observer.check_trust.called
assert len(coresys.resolution.issues) == 0
coresys.plugins.audio.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
coresys.plugins.dns.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
coresys.plugins.cli.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
coresys.plugins.multicast.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
coresys.plugins.observer.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
await plugin_trust.run_check()
assert coresys.plugins.audio.check_trust.called
assert coresys.plugins.dns.check_trust.called
assert coresys.plugins.cli.check_trust.called
assert coresys.plugins.multicast.check_trust.called
assert coresys.plugins.observer.check_trust.called
assert len(coresys.resolution.issues) == 5
assert coresys.resolution.issues[-1].type == IssueType.TRUST
assert UnhealthyReason.UNTRUSTED in coresys.resolution.unhealthy
async def test_approve(coresys: CoreSys):
"""Test check."""
plugin_trust = CheckPluginTrust(coresys)
coresys.core.state = CoreState.RUNNING
coresys.plugins.audio.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
assert await plugin_trust.approve_check(reference="audio")
coresys.plugins.audio.check_trust = AsyncMock(return_value=None)
assert not await plugin_trust.approve_check(reference="audio")
async def test_with_global_disable(coresys: CoreSys, caplog):
"""Test when pwned is globally disabled."""
coresys.security.content_trust = False
plugin_trust = CheckPluginTrust(coresys)
coresys.core.state = CoreState.RUNNING
assert len(coresys.resolution.issues) == 0
coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted)
await plugin_trust.run_check()
assert not coresys.security.verify_own_content.called
assert "Skipping plugin_trust, content_trust is globally disabled" in caplog.text
async def test_did_run(coresys: CoreSys):
"""Test that the check ran as expected."""
plugin_trust = CheckPluginTrust(coresys)
should_run = plugin_trust.states
should_not_run = [state for state in CoreState if state not in should_run]
assert len(should_run) != 0
assert len(should_not_run) != 0
with patch(
"supervisor.resolution.checks.plugin_trust.CheckPluginTrust.run_check",
return_value=None,
) as check:
for state in should_run:
coresys.core.state = state
await plugin_trust()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
coresys.core.state = state
await plugin_trust()
check.assert_not_called()
check.reset_mock()

View File

@ -0,0 +1 @@
"""Test for Security."""

View File

@ -0,0 +1,124 @@
"""Testing handling with Security."""
from unittest.mock import AsyncMock, patch
import pytest
from supervisor.coresys import CoreSys
from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted
from supervisor.security.const import ContentTrustResult
async def test_content_trust(coresys: CoreSys):
"""Test Content-Trust."""
with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate:
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
assert cas_validate.called
cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff")
with patch(
"supervisor.security.module.cas_validate", AsyncMock()
) as cas_validate:
await coresys.security.verify_own_content("ffffffffffffff")
assert cas_validate.called
cas_validate.assert_called_once_with(
"notary@home-assistant.io", "ffffffffffffff"
)
async def test_disabled_content_trust(coresys: CoreSys):
"""Test Content-Trust."""
coresys.security.content_trust = False
with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate:
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
assert not cas_validate.called
with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate:
await coresys.security.verify_own_content("ffffffffffffff")
assert not cas_validate.called
async def test_force_content_trust(coresys: CoreSys):
"""Force Content-Trust tests."""
with patch(
"supervisor.security.module.cas_validate",
AsyncMock(side_effect=CodeNotaryError),
) as cas_validate:
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
assert cas_validate.called
cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff")
coresys.security.force = True
with patch(
"supervisor.security.module.cas_validate",
AsyncMock(side_effect=CodeNotaryError),
) as cas_validate:
with pytest.raises(CodeNotaryError):
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
async def test_integrity_check_disabled(coresys: CoreSys):
"""Test integrity check with disabled content trust."""
coresys.security.content_trust = False
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
assert result.core == ContentTrustResult.UNTESTED
assert result.supervisor == ContentTrustResult.UNTESTED
async def test_integrity_check(coresys: CoreSys, install_addon_ssh):
"""Test integrity check with content trust."""
coresys.homeassistant.core.check_trust = AsyncMock()
coresys.supervisor.check_trust = AsyncMock()
install_addon_ssh.check_trust = AsyncMock()
install_addon_ssh.data["codenotary"] = "test@example.com"
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
assert result.core == ContentTrustResult.PASS
assert result.supervisor == ContentTrustResult.PASS
assert result.addons[install_addon_ssh.slug] == ContentTrustResult.PASS
async def test_integrity_check_error(coresys: CoreSys, install_addon_ssh):
"""Test integrity check with content trust issues."""
coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
install_addon_ssh.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
install_addon_ssh.data["codenotary"] = "test@example.com"
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
assert result.core == ContentTrustResult.ERROR
assert result.supervisor == ContentTrustResult.ERROR
assert result.addons[install_addon_ssh.slug] == ContentTrustResult.ERROR
async def test_integrity_check_failed(coresys: CoreSys, install_addon_ssh):
"""Test integrity check with content trust failed."""
coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryError)
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryError)
install_addon_ssh.check_trust = AsyncMock(side_effect=CodeNotaryError)
install_addon_ssh.data["codenotary"] = "test@example.com"
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
assert result.core == ContentTrustResult.FAILED
assert result.supervisor == ContentTrustResult.FAILED
assert result.addons[install_addon_ssh.slug] == ContentTrustResult.FAILED
async def test_integrity_check_addon(coresys: CoreSys, install_addon_ssh):
"""Test integrity check with content trust but no signed add-ons."""
coresys.homeassistant.core.check_trust = AsyncMock()
coresys.supervisor.check_trust = AsyncMock()
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
assert result.core == ContentTrustResult.PASS
assert result.supervisor == ContentTrustResult.PASS
assert result.addons[install_addon_ssh.slug] == ContentTrustResult.UNTESTED

View File

@ -1,55 +0,0 @@
"""Testing handling with Security."""
from unittest.mock import AsyncMock, patch
import pytest
from supervisor.coresys import CoreSys
from supervisor.exceptions import CodeNotaryError
async def test_content_trust(coresys: CoreSys):
"""Test Content-Trust."""
with patch("supervisor.security.cas_validate", AsyncMock()) as cas_validate:
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
assert cas_validate.called
cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff")
with patch("supervisor.security.cas_validate", AsyncMock()) as cas_validate:
await coresys.security.verify_own_content("ffffffffffffff")
assert cas_validate.called
cas_validate.assert_called_once_with(
"notary@home-assistant.io", "ffffffffffffff"
)
async def test_disabled_content_trust(coresys: CoreSys):
"""Test Content-Trust."""
coresys.security.content_trust = False
with patch("supervisor.security.cas_validate", AsyncMock()) as cas_validate:
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
assert not cas_validate.called
with patch("supervisor.security.cas_validate", AsyncMock()) as cas_validate:
await coresys.security.verify_own_content("ffffffffffffff")
assert not cas_validate.called
async def test_force_content_trust(coresys: CoreSys):
"""Force Content-Trust tests."""
with patch(
"supervisor.security.cas_validate", AsyncMock(side_effect=CodeNotaryError)
) as cas_validate:
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
assert cas_validate.called
cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff")
coresys.security.force = True
with patch(
"supervisor.security.cas_validate", AsyncMock(side_effect=CodeNotaryError)
) as cas_validate:
with pytest.raises(CodeNotaryError):
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")