Improve handling of timeouts in tests (#2890)

* Improve handling of timeouts in tests

Make timeout handling in tests more transparent. Added a custom shell
driver that allows to define global timeout for commands in the config
file, and replaced for/sleep constructs with infinite loops that will be
eventually terminated by pytest-timeout plugin. Current timeouts taken
from last runs on Github CI with some extra headroom.

* test_supervisor_is_updated shouldn't be skipped if no update was needed

* Allow more time for system startup

* Allow even more time for system startup
This commit is contained in:
Jan Čermák 2023-10-31 18:16:49 +01:00 committed by GitHub
parent c33fc03fd6
commit 2888ccf28e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 53 additions and 32 deletions

View File

@ -44,7 +44,7 @@ def shell_json(target, strategy) -> callable:
strategy.transition("shell")
shell = target.get_driver("ShellDriver")
def get_json_response(command, *, timeout=60) -> dict:
def get_json_response(command, *, timeout=None) -> dict:
return json.loads("\n".join(shell.run_check(command, timeout=timeout)))
return get_json_response

2
tests/pytest.ini Normal file
View File

@ -0,0 +1,2 @@
[pytest]
timeout_method = signal

View File

@ -12,11 +12,12 @@ targets:
nic: user,model=virtio-net-pci
disk: disk-image
bios: bios
- ShellDriver:
- CustomTimeoutShellDriver:
login_prompt: 'homeassistant login: '
username: 'root'
prompt: '# '
login_timeout: 300
command_timeout: 300
- QEMUShellStrategy: {}
tools:

View File

@ -4,6 +4,7 @@ import os
import attr
from labgrid import target_factory, step
from labgrid.driver import ShellDriver
from labgrid.strategy import Strategy, StrategyError
@ -13,6 +14,19 @@ class Status(enum.Enum):
shell = 2
@target_factory.reg_driver
@attr.s(eq=False)
class CustomTimeoutShellDriver(ShellDriver):
"""ShellDriver with a config-customizable timeout for run and run_check."""
command_timeout = attr.ib(default=30, validator=attr.validators.instance_of(int))
def run(self, cmd: str, *, timeout=None, codec="utf-8", decodeerrors="strict"):
return super().run(cmd, timeout=timeout or self.command_timeout, codec=codec, decodeerrors=decodeerrors)
def run_check(self, cmd: str, *, timeout=None, codec="utf-8", decodeerrors="strict"):
return super().run_check(cmd, timeout=timeout or self.command_timeout, codec=codec, decodeerrors=decodeerrors)
@target_factory.reg_driver
@attr.s(eq=False)
class QEMUShellStrategy(Strategy):
@ -20,7 +34,7 @@ class QEMUShellStrategy(Strategy):
bindings = {
"qemu": "QEMUDriver",
"shell": "ShellDriver",
"shell": "CustomTimeoutShellDriver",
}
status = attr.ib(default=Status.unknown)

View File

@ -1,3 +1,4 @@
labgrid==23.0.3
pytest==7.2.2
pytest-dependency==0.5.1
pytest-timeout==2.2.0

View File

@ -1,10 +1,14 @@
import logging
from time import sleep
import pytest
_LOGGER = logging.getLogger(__name__)
@pytest.mark.dependency()
@pytest.mark.timeout(600)
def test_init(shell):
def check_container_running(container_name):
out = shell.run_check(
@ -13,35 +17,37 @@ def test_init(shell):
return "running" in out
# wait for important containers first
for _ in range(20):
while True:
if check_container_running("homeassistant") and check_container_running("hassio_supervisor"):
break
sleep(5)
sleep(1)
# wait for system ready
for _ in range(20):
while True:
output = "\n".join(shell.run_check("ha os info || true"))
if "System is not ready" not in output:
break
sleep(5)
sleep(1)
output = shell.run_check("ha os info")
_LOGGER.info("%s", "\n".join(output))
@pytest.mark.dependency(depends=["test_init"])
def test_dmesg(shell):
output = shell.run_check("dmesg")
_LOGGER.info("%s", "\n".join(output))
@pytest.mark.dependency(depends=["test_init"])
def test_supervisor_logs(shell):
output = shell.run_check("ha su logs")
_LOGGER.info("%s", "\n".join(output))
@pytest.mark.dependency(depends=["test_init"])
def test_systemctl_status(shell):
output = shell.run_check(
"systemctl --no-pager -l status -a || true", timeout=90
)
output = shell.run_check("systemctl --no-pager -l status -a || true")
_LOGGER.info("%s", "\n".join(output))

View File

@ -16,30 +16,30 @@ def stash() -> dict:
@pytest.mark.dependency()
@pytest.mark.timeout(600)
def test_start_supervisor(shell, shell_json):
def check_container_running(container_name):
out = shell.run_check(f"docker container inspect -f '{{{{.State.Status}}}}' {container_name} || true")
return "running" in out
for _ in range(20):
while True:
if check_container_running("homeassistant") and check_container_running("hassio_supervisor"):
break
sleep(5)
sleep(1)
supervisor_ip = "\n".join(
shell.run_check("docker inspect --format='{{.NetworkSettings.IPAddress}}' hassio_supervisor")
)
for _ in range(20):
while True:
try:
if shell_json(f"curl -sSL http://{supervisor_ip}/supervisor/ping").get("result") == "ok":
break
except ExecutionError:
pass # avoid failure when the container is restarting
sleep(5)
else:
raise AssertionError("Supervisor did not start in time")
sleep(1)
@pytest.mark.dependency(depends=["test_start_supervisor"])
@ -55,6 +55,7 @@ def test_check_supervisor(shell_json):
@pytest.mark.dependency(depends=["test_check_supervisor"])
@pytest.mark.timeout(300)
def test_update_supervisor(shell_json):
supervisor_info = shell_json("ha supervisor info --no-progress --raw-json")
supervisor_version = supervisor_info.get("data").get("version")
@ -68,9 +69,9 @@ def test_update_supervisor(shell_json):
else:
assert result.get("result") == "ok", f"Supervisor update failed: {result}"
for _ in range(40):
while True:
try:
supervisor_info = shell_json("ha supervisor info --no-progress --raw-json", timeout=90)
supervisor_info = shell_json("ha supervisor info --no-progress --raw-json")
data = supervisor_info.get("data")
if data and data.get("version") == data.get("version_latest"):
logger.info(
@ -82,14 +83,13 @@ def test_update_supervisor(shell_json):
break
except ExecutionError:
pass # avoid failure when the container is restarting
sleep(5)
else:
raise AssertionError("Supervisor did not update in time")
sleep(1)
@pytest.mark.dependency(depends=["test_update_supervisor"])
@pytest.mark.dependency(depends=["test_check_supervisor"])
def test_supervisor_is_updated(shell_json):
supervisor_info = shell_json("ha supervisor info --no-progress --raw-json", timeout=90)
supervisor_info = shell_json("ha supervisor info --no-progress --raw-json")
data = supervisor_info.get("data")
assert data and data.get("version") == data.get("version_latest")
@ -98,7 +98,7 @@ def test_supervisor_is_updated(shell_json):
def test_addon_install(shell_json):
# install Core SSH add-on
assert (
shell_json("ha addons install core_ssh --no-progress --raw-json", timeout=300).get("result") == "ok"
shell_json("ha addons install core_ssh --no-progress --raw-json").get("result") == "ok"
), "Core SSH add-on install failed"
# check Core SSH add-on is installed
assert (
@ -153,6 +153,7 @@ def test_addon_uninstall(shell_json):
@pytest.mark.dependency(depends=["test_supervisor_is_updated"])
@pytest.mark.timeout(450)
def test_restart_supervisor(shell, shell_json):
result = shell_json("ha supervisor restart --no-progress --raw-json")
assert result.get("result") == "ok", f"Supervisor restart failed: {result}"
@ -161,24 +162,20 @@ def test_restart_supervisor(shell, shell_json):
shell.run_check("docker inspect --format='{{.NetworkSettings.IPAddress}}' hassio_supervisor")
)
for _ in range(100):
while True:
try:
if shell_json(f"curl -sSL http://{supervisor_ip}/supervisor/ping").get("result") == "ok":
if shell_json("ha os info --no-progress --raw-json").get("result") == "ok":
break
except ExecutionError:
pass # avoid failure when the container is restarting
sleep(5)
else:
raise AssertionError("Supervisor did not start in time")
sleep(1)
@pytest.mark.dependency(depends=["test_create_backup"])
def test_restore_backup(shell_json, stash):
result = shell_json(
f"ha backups restore {stash.get('slug')} --addons core_ssh --no-progress --raw-json",
timeout=300,
)
result = shell_json(f"ha backups restore {stash.get('slug')} --addons core_ssh --no-progress --raw-json")
assert result.get("result") == "ok", f"Backup restore failed: {result}"
logger.info("Backup restore result: %s", result)