Add test suite for Supervisor tests (#2880)
* Add test suite for Supervisor tests * test_supervisor_is_updated should depend on test_update_supervisor Co-authored-by: Stefan Agner <stefan@agner.ch> --------- Co-authored-by: Stefan Agner <stefan@agner.ch>
This commit is contained in:
parent
e1be4f3efb
commit
39778e882a
|
@ -68,7 +68,7 @@ jobs:
|
|||
with:
|
||||
name: logs
|
||||
path: |
|
||||
tests/lg_logs/*
|
||||
tests/lg_logs/**
|
||||
|
||||
- name: Archive JUnit reports
|
||||
uses: actions/upload-artifact@v3
|
||||
|
|
|
@ -1,8 +1,23 @@
|
|||
import json
|
||||
import logging
|
||||
import os
|
||||
|
||||
from labgrid.driver import ShellDriver
|
||||
import pytest
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True, scope="module")
|
||||
def restart_qemu(strategy):
|
||||
"""Use fresh QEMU instance for each module."""
|
||||
if strategy.status.name == "shell":
|
||||
logger.info("Restarting QEMU before %s module tests.", strategy.target.name)
|
||||
strategy.transition("off")
|
||||
strategy.transition("shell")
|
||||
|
||||
|
||||
@pytest.hookimpl
|
||||
def pytest_runtest_setup(item):
|
||||
log_dir = item.config.option.lg_log
|
||||
|
@ -11,11 +26,25 @@ def pytest_runtest_setup(item):
|
|||
return
|
||||
|
||||
logging_plugin = item.config.pluginmanager.get_plugin("logging-plugin")
|
||||
logging_plugin.set_log_path(os.path.join(log_dir, f"{item.name}.log"))
|
||||
log_name = item.nodeid.replace(".py::", "/")
|
||||
logging_plugin.set_log_path(os.path.join(log_dir, f"{log_name}.log"))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def shell_command(target, strategy):
|
||||
def shell(target, strategy) -> ShellDriver:
|
||||
"""Fixture for accessing shell."""
|
||||
strategy.transition("shell")
|
||||
shell = target.get_driver("ShellDriver")
|
||||
return shell
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def shell_json(target, strategy) -> callable:
|
||||
"""Fixture for running CLI commands returning JSON string as output."""
|
||||
strategy.transition("shell")
|
||||
shell = target.get_driver("ShellDriver")
|
||||
|
||||
def get_json_response(command, *, timeout=60) -> dict:
|
||||
return json.loads("\n".join(shell.run_check(command, timeout=timeout)))
|
||||
|
||||
return get_json_response
|
||||
|
|
|
@ -44,6 +44,7 @@ class QEMUShellStrategy(Strategy):
|
|||
elif status == Status.off:
|
||||
self.target.activate(self.qemu)
|
||||
self.qemu.off()
|
||||
self.target.deactivate(self.shell)
|
||||
elif status == Status.shell:
|
||||
self.target.activate(self.qemu)
|
||||
self.qemu.on()
|
||||
|
|
|
@ -1 +1,3 @@
|
|||
labgrid==23.0.3
|
||||
pytest==7.2.2
|
||||
pytest-dependency==0.5.1
|
||||
|
|
|
@ -13,4 +13,4 @@ if [ -z "$GITHUB_ACTIONS" ] && [ -z "$VIRTUAL_ENV" ]; then
|
|||
pip3 install -r requirements.txt
|
||||
fi
|
||||
|
||||
pytest --lg-env qemu-strategy.yaml --lg-log=lg_logs --junitxml=junit_reports/smoke_test.xml smoke_test
|
||||
pytest --lg-env qemu-strategy.yaml --lg-log=lg_logs --junitxml=junit_reports/tests.xml "$@"
|
||||
|
|
|
@ -5,9 +5,9 @@ from time import sleep
|
|||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def test_init(shell_command):
|
||||
def test_init(shell):
|
||||
def check_container_running(container_name):
|
||||
out = shell_command.run_check(
|
||||
out = shell.run_check(
|
||||
f"docker container inspect -f '{{{{.State.Status}}}}' {container_name} || true"
|
||||
)
|
||||
return "running" in out
|
||||
|
@ -21,28 +21,27 @@ def test_init(shell_command):
|
|||
|
||||
# wait for system ready
|
||||
for _ in range(20):
|
||||
output = "\n".join(shell_command.run_check("ha os info || true"))
|
||||
output = "\n".join(shell.run_check("ha os info || true"))
|
||||
if "System is not ready" not in output:
|
||||
break
|
||||
|
||||
sleep(5)
|
||||
|
||||
output = shell_command.run_check("ha os info")
|
||||
output = shell.run_check("ha os info")
|
||||
_LOGGER.info("%s", "\n".join(output))
|
||||
|
||||
def test_dmesg(shell):
|
||||
output = shell.run_check("dmesg")
|
||||
_LOGGER.info("%s", "\n".join(output))
|
||||
|
||||
|
||||
def test_dmesg(shell_command):
|
||||
output = shell_command.run_check("dmesg")
|
||||
def test_supervisor_logs(shell):
|
||||
output = shell.run_check("ha su logs")
|
||||
_LOGGER.info("%s", "\n".join(output))
|
||||
|
||||
|
||||
def test_supervisor_logs(shell_command):
|
||||
output = shell_command.run_check("ha su logs")
|
||||
_LOGGER.info("%s", "\n".join(output))
|
||||
|
||||
|
||||
def test_systemctl_status(shell_command):
|
||||
output = shell_command.run_check(
|
||||
def test_systemctl_status(shell):
|
||||
output = shell.run_check(
|
||||
"systemctl --no-pager -l status -a || true", timeout=90
|
||||
)
|
||||
_LOGGER.info("%s", "\n".join(output))
|
||||
|
|
|
@ -0,0 +1,195 @@
|
|||
import logging
|
||||
from time import sleep
|
||||
|
||||
import pytest
|
||||
from labgrid.driver import ExecutionError
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def stash() -> dict:
|
||||
"""Simple stash for sharing data between tests in this module."""
|
||||
stash = {}
|
||||
return stash
|
||||
|
||||
|
||||
@pytest.mark.dependency()
|
||||
def test_start_supervisor(shell, shell_json):
|
||||
def check_container_running(container_name):
|
||||
out = shell.run_check(f"docker container inspect -f '{{{{.State.Status}}}}' {container_name} || true")
|
||||
return "running" in out
|
||||
|
||||
for _ in range(20):
|
||||
if check_container_running("homeassistant") and check_container_running("hassio_supervisor"):
|
||||
break
|
||||
|
||||
sleep(5)
|
||||
|
||||
supervisor_ip = "\n".join(
|
||||
shell.run_check("docker inspect --format='{{.NetworkSettings.IPAddress}}' hassio_supervisor")
|
||||
)
|
||||
|
||||
for _ in range(20):
|
||||
try:
|
||||
if shell_json(f"curl -sSL http://{supervisor_ip}/supervisor/ping").get("result") == "ok":
|
||||
break
|
||||
except ExecutionError:
|
||||
pass # avoid failure when the container is restarting
|
||||
sleep(5)
|
||||
else:
|
||||
raise AssertionError("Supervisor did not start in time")
|
||||
|
||||
|
||||
@pytest.mark.dependency(depends=["test_start_supervisor"])
|
||||
def test_check_supervisor(shell_json):
|
||||
# check supervisor info
|
||||
supervisor_info = shell_json("ha supervisor info --no-progress --raw-json")
|
||||
assert supervisor_info.get("result") == "ok", "supervisor info failed"
|
||||
logger.info("Supervisor info: %s", supervisor_info)
|
||||
# check network info
|
||||
network_info = shell_json("ha network info --no-progress --raw-json")
|
||||
assert network_info.get("result") == "ok", "network info failed"
|
||||
logger.info("Network info: %s", network_info)
|
||||
|
||||
|
||||
@pytest.mark.dependency(depends=["test_check_supervisor"])
|
||||
def test_update_supervisor(shell_json):
|
||||
supervisor_info = shell_json("ha supervisor info --no-progress --raw-json")
|
||||
supervisor_version = supervisor_info.get("data").get("version")
|
||||
if supervisor_version == supervisor_info.get("data").get("version_latest"):
|
||||
logger.info("Supervisor is already up to date")
|
||||
pytest.skip("Supervisor is already up to date")
|
||||
else:
|
||||
result = shell_json("ha supervisor update --no-progress --raw-json")
|
||||
if result.get("result") == "error" and "Another job is running" in result.get("message"):
|
||||
pass
|
||||
else:
|
||||
assert result.get("result") == "ok", f"Supervisor update failed: {result}"
|
||||
|
||||
for _ in range(40):
|
||||
try:
|
||||
supervisor_info = shell_json("ha supervisor info --no-progress --raw-json", timeout=90)
|
||||
data = supervisor_info.get("data")
|
||||
if data and data.get("version") == data.get("version_latest"):
|
||||
logger.info(
|
||||
"Supervisor updated from %s to %s: %s",
|
||||
supervisor_version,
|
||||
data.get("version"),
|
||||
supervisor_info,
|
||||
)
|
||||
break
|
||||
except ExecutionError:
|
||||
pass # avoid failure when the container is restarting
|
||||
sleep(5)
|
||||
else:
|
||||
raise AssertionError("Supervisor did not update in time")
|
||||
|
||||
|
||||
@pytest.mark.dependency(depends=["test_update_supervisor"])
|
||||
def test_supervisor_is_updated(shell_json):
|
||||
supervisor_info = shell_json("ha supervisor info --no-progress --raw-json", timeout=90)
|
||||
data = supervisor_info.get("data")
|
||||
assert data and data.get("version") == data.get("version_latest")
|
||||
|
||||
|
||||
@pytest.mark.dependency(depends=["test_supervisor_is_updated"])
|
||||
def test_addon_install(shell_json):
|
||||
# install Core SSH add-on
|
||||
assert (
|
||||
shell_json("ha addons install core_ssh --no-progress --raw-json", timeout=300).get("result") == "ok"
|
||||
), "Core SSH add-on install failed"
|
||||
# check Core SSH add-on is installed
|
||||
assert (
|
||||
shell_json("ha addons info core_ssh --no-progress --raw-json").get("data", {}).get("version") is not None
|
||||
), "Core SSH add-on not installed"
|
||||
# start Core SSH add-on
|
||||
assert (
|
||||
shell_json("ha addons start core_ssh --no-progress --raw-json").get("result") == "ok"
|
||||
), "Core SSH add-on start failed"
|
||||
# check Core SSH add-on is running
|
||||
ssh_info = shell_json("ha addons info core_ssh --no-progress --raw-json")
|
||||
assert ssh_info.get("data", {}).get("state") == "started", "Core SSH add-on not running"
|
||||
logger.info("Core SSH add-on info: %s", ssh_info)
|
||||
|
||||
|
||||
@pytest.mark.dependency(depends=["test_supervisor_is_updated"])
|
||||
def test_code_sign(shell_json):
|
||||
# enable Content-Trust
|
||||
assert (
|
||||
shell_json("ha security options --content-trust=true --no-progress --raw-json").get("result") == "ok"
|
||||
), "Content-Trust enable failed"
|
||||
# run Supervisor health check
|
||||
health_check = shell_json("ha resolution healthcheck --no-progress --raw-json")
|
||||
assert health_check.get("result") == "ok", "Supervisor health check failed"
|
||||
logger.info("Supervisor health check result: %s", health_check)
|
||||
# get resolution center info
|
||||
resolution_info = shell_json("ha resolution info --no-progress --raw-json")
|
||||
logger.info("Resolution center info: %s", resolution_info)
|
||||
# check supervisor is healthy
|
||||
unhealthy = resolution_info.get("data").get("unhealthy")
|
||||
assert len(unhealthy) == 0, "Supervisor is unhealthy"
|
||||
# check for unsupported entries
|
||||
unsupported = resolution_info.get("data").get("unsupported")
|
||||
assert len(unsupported) == 0, "Unsupported entries found"
|
||||
|
||||
|
||||
@pytest.mark.dependency(depends=["test_supervisor_is_updated"])
|
||||
def test_create_backup(shell_json, stash):
|
||||
result = shell_json("ha backups new --no-progress --raw-json")
|
||||
assert result.get("result") == "ok", f"Backup creation failed: {result}"
|
||||
slug = result.get("data", {}).get("slug")
|
||||
assert slug is not None
|
||||
stash.update(slug=slug)
|
||||
logger.info("Backup creation result: %s", result)
|
||||
|
||||
|
||||
@pytest.mark.dependency(depends=["test_addon_install"])
|
||||
def test_addon_uninstall(shell_json):
|
||||
result = shell_json("ha addons uninstall core_ssh --no-progress --raw-json")
|
||||
assert result.get("result") == "ok", f"Core SSH add-on uninstall failed: {result}"
|
||||
logger.info("Core SSH add-on uninstall result: %s", result)
|
||||
|
||||
|
||||
@pytest.mark.dependency(depends=["test_supervisor_is_updated"])
|
||||
def test_restart_supervisor(shell, shell_json):
|
||||
result = shell_json("ha supervisor restart --no-progress --raw-json")
|
||||
assert result.get("result") == "ok", f"Supervisor restart failed: {result}"
|
||||
|
||||
supervisor_ip = "\n".join(
|
||||
shell.run_check("docker inspect --format='{{.NetworkSettings.IPAddress}}' hassio_supervisor")
|
||||
)
|
||||
|
||||
for _ in range(100):
|
||||
try:
|
||||
if shell_json(f"curl -sSL http://{supervisor_ip}/supervisor/ping").get("result") == "ok":
|
||||
if shell_json("ha os info --no-progress --raw-json").get("result") == "ok":
|
||||
break
|
||||
except ExecutionError:
|
||||
pass # avoid failure when the container is restarting
|
||||
sleep(5)
|
||||
else:
|
||||
raise AssertionError("Supervisor did not start in time")
|
||||
|
||||
|
||||
@pytest.mark.dependency(depends=["test_create_backup"])
|
||||
def test_restore_backup(shell_json, stash):
|
||||
result = shell_json(
|
||||
f"ha backups restore {stash.get('slug')} --addons core_ssh --no-progress --raw-json",
|
||||
timeout=300,
|
||||
)
|
||||
assert result.get("result") == "ok", f"Backup restore failed: {result}"
|
||||
logger.info("Backup restore result: %s", result)
|
||||
|
||||
addon_info = shell_json("ha addons info core_ssh --no-progress --raw-json")
|
||||
assert addon_info.get("data", {}).get("version") is not None, "Core SSH add-on not installed"
|
||||
assert addon_info.get("data", {}).get("state") == "started", "Core SSH add-on not running"
|
||||
logger.info("Core SSH add-on info: %s", addon_info)
|
||||
|
||||
|
||||
@pytest.mark.dependency(depends=["test_create_backup"])
|
||||
def test_restore_ssl_directory(shell_json, stash):
|
||||
result = shell_json(f"ha backups restore {stash.get('slug')} --folders ssl --raw-json")
|
||||
assert result.get("result") == "ok", f"Backup restore failed: {result}"
|
||||
logger.info("Backup restore result: %s", result)
|
Loading…
Reference in New Issue