Clean up secret loading (#47034)

This commit is contained in:
Paulus Schoutsen 2021-03-02 12:58:53 -08:00 committed by GitHub
parent 17444e2f2f
commit 2df644c6cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 172 additions and 134 deletions

View File

@ -28,7 +28,6 @@ from homeassistant.setup import (
from homeassistant.util.async_ import gather_with_concurrency
from homeassistant.util.logging import async_activate_log_queue_handler
from homeassistant.util.package import async_get_user_site, is_virtual_env
from homeassistant.util.yaml import clear_secret_cache
if TYPE_CHECKING:
from .runner import RuntimeConfig
@ -122,8 +121,6 @@ async def async_setup_hass(
basic_setup_success = (
await async_from_config_dict(config_dict, hass) is not None
)
finally:
clear_secret_cache()
if config_dict is None:
safe_mode = True

View File

@ -2,6 +2,7 @@
from abc import ABC, abstractmethod
import logging
import os
from pathlib import Path
import time
from typing import Optional, cast
@ -12,7 +13,7 @@ from homeassistant.const import CONF_FILENAME
from homeassistant.core import callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import collection, storage
from homeassistant.util.yaml import load_yaml
from homeassistant.util.yaml import Secrets, load_yaml
from .const import (
CONF_ICON,
@ -201,7 +202,7 @@ class LovelaceYAML(LovelaceConfig):
is_updated = self._cache is not None
try:
config = load_yaml(self.path)
config = load_yaml(self.path, Secrets(Path(self.hass.config.config_dir)))
except FileNotFoundError:
raise ConfigNotFound from None

View File

@ -2,6 +2,7 @@
from collections import OrderedDict
import logging
import os
from pathlib import Path
import re
import shutil
from types import ModuleType
@ -59,7 +60,7 @@ from homeassistant.requirements import (
)
from homeassistant.util.package import is_docker_env
from homeassistant.util.unit_system import IMPERIAL_SYSTEM, METRIC_SYSTEM
from homeassistant.util.yaml import SECRET_YAML, load_yaml
from homeassistant.util.yaml import SECRET_YAML, Secrets, load_yaml
_LOGGER = logging.getLogger(__name__)
@ -318,23 +319,33 @@ async def async_hass_config_yaml(hass: HomeAssistant) -> Dict:
This function allow a component inside the asyncio loop to reload its
configuration by itself. Include package merge.
"""
if hass.config.config_dir is None:
secrets = None
else:
secrets = Secrets(Path(hass.config.config_dir))
# Not using async_add_executor_job because this is an internal method.
config = await hass.loop.run_in_executor(
None, load_yaml_config_file, hass.config.path(YAML_CONFIG_FILE)
None,
load_yaml_config_file,
hass.config.path(YAML_CONFIG_FILE),
secrets,
)
core_config = config.get(CONF_CORE, {})
await merge_packages_config(hass, config, core_config.get(CONF_PACKAGES, {}))
return config
def load_yaml_config_file(config_path: str) -> Dict[Any, Any]:
def load_yaml_config_file(
config_path: str, secrets: Optional[Secrets] = None
) -> Dict[Any, Any]:
"""Parse a YAML configuration file.
Raises FileNotFoundError or HomeAssistantError.
This method needs to run in an executor.
"""
conf_dict = load_yaml(config_path)
conf_dict = load_yaml(config_path, secrets)
if not isinstance(conf_dict, dict):
msg = (

View File

@ -4,6 +4,7 @@ from __future__ import annotations
from collections import OrderedDict
import logging
import os
from pathlib import Path
from typing import List, NamedTuple, Optional
import voluptuous as vol
@ -87,13 +88,18 @@ async def async_check_ha_config_file(hass: HomeAssistant) -> HomeAssistantConfig
try:
if not await hass.async_add_executor_job(os.path.isfile, config_path):
return result.add_error("File configuration.yaml not found.")
config = await hass.async_add_executor_job(load_yaml_config_file, config_path)
assert hass.config.config_dir is not None
config = await hass.async_add_executor_job(
load_yaml_config_file,
config_path,
yaml_loader.Secrets(Path(hass.config.config_dir)),
)
except FileNotFoundError:
return result.add_error(f"File not found: {config_path}")
except HomeAssistantError as err:
return result.add_error(f"Error loading {config_path}: {err}")
finally:
yaml_loader.clear_secret_cache()
# Extract and validate core [homeassistant] config
try:

View File

@ -9,10 +9,11 @@ import os
from typing import Any, Callable, Dict, List, Tuple
from unittest.mock import patch
from homeassistant import bootstrap, core
from homeassistant import core
from homeassistant.config import get_default_config_dir
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.check_config import async_check_ha_config_file
from homeassistant.util.yaml import Secrets
import homeassistant.util.yaml.loader as yaml_loader
# mypy: allow-untyped-calls, allow-untyped-defs
@ -26,7 +27,6 @@ MOCKS: Dict[str, Tuple[str, Callable]] = {
"load*": ("homeassistant.config.load_yaml", yaml_loader.load_yaml),
"secrets": ("homeassistant.util.yaml.loader.secret_yaml", yaml_loader.secret_yaml),
}
SILENCE = ("homeassistant.scripts.check_config.yaml_loader.clear_secret_cache",)
PATCHES: Dict[str, Any] = {}
@ -154,14 +154,14 @@ def check(config_dir, secrets=False):
"secrets": OrderedDict(), # secret cache and secrets loaded
"except": OrderedDict(), # exceptions raised (with config)
#'components' is a HomeAssistantConfig # noqa: E265
"secret_cache": None,
"secret_cache": {},
}
# pylint: disable=possibly-unused-variable
def mock_load(filename):
def mock_load(filename, secrets=None):
"""Mock hass.util.load_yaml to save config file names."""
res["yaml_files"][filename] = True
return MOCKS["load"][1](filename)
return MOCKS["load"][1](filename, secrets)
# pylint: disable=possibly-unused-variable
def mock_secrets(ldr, node):
@ -173,10 +173,6 @@ def check(config_dir, secrets=False):
res["secrets"][node.value] = val
return val
# Patches to skip functions
for sil in SILENCE:
PATCHES[sil] = patch(sil)
# Patches with local mock functions
for key, val in MOCKS.items():
if not secrets and key == "secrets":
@ -192,11 +188,19 @@ def check(config_dir, secrets=False):
if secrets:
# Ensure !secrets point to the patched function
yaml_loader.yaml.SafeLoader.add_constructor("!secret", yaml_loader.secret_yaml)
yaml_loader.SafeLineLoader.add_constructor("!secret", yaml_loader.secret_yaml)
def secrets_proxy(*args):
secrets = Secrets(*args)
res["secret_cache"] = secrets._cache
return secrets
try:
res["components"] = asyncio.run(async_check_config(config_dir))
res["secret_cache"] = OrderedDict(yaml_loader.__SECRET_CACHE)
with patch.object(yaml_loader, "Secrets", secrets_proxy):
res["components"] = asyncio.run(async_check_config(config_dir))
res["secret_cache"] = {
str(key): val for key, val in res["secret_cache"].items()
}
for err in res["components"].errors:
domain = err.domain or ERROR_STR
res["except"].setdefault(domain, []).append(err.message)
@ -212,10 +216,9 @@ def check(config_dir, secrets=False):
pat.stop()
if secrets:
# Ensure !secrets point to the original function
yaml_loader.yaml.SafeLoader.add_constructor(
yaml_loader.SafeLineLoader.add_constructor(
"!secret", yaml_loader.secret_yaml
)
bootstrap.clear_secret_cache()
return res

View File

@ -2,7 +2,7 @@
from .const import SECRET_YAML
from .dumper import dump, save_yaml
from .input import UndefinedSubstitution, extract_inputs, substitute
from .loader import clear_secret_cache, load_yaml, parse_yaml, secret_yaml
from .loader import Secrets, load_yaml, parse_yaml, secret_yaml
from .objects import Input
__all__ = [
@ -10,7 +10,7 @@ __all__ = [
"Input",
"dump",
"save_yaml",
"clear_secret_cache",
"Secrets",
"load_yaml",
"secret_yaml",
"parse_yaml",

View File

@ -3,8 +3,8 @@ from collections import OrderedDict
import fnmatch
import logging
import os
import sys
from typing import Dict, Iterator, List, TextIO, TypeVar, Union, overload
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, TextIO, TypeVar, Union, overload
import yaml
@ -19,20 +19,82 @@ JSON_TYPE = Union[List, Dict, str] # pylint: disable=invalid-name
DICT_T = TypeVar("DICT_T", bound=Dict) # pylint: disable=invalid-name
_LOGGER = logging.getLogger(__name__)
__SECRET_CACHE: Dict[str, JSON_TYPE] = {}
def clear_secret_cache() -> None:
"""Clear the secret cache.
class Secrets:
"""Store secrets while loading YAML."""
Async friendly.
"""
__SECRET_CACHE.clear()
def __init__(self, config_dir: Path):
"""Initialize secrets."""
self.config_dir = config_dir
self._cache: Dict[Path, Dict[str, str]] = {}
def get(self, requester_path: str, secret: str) -> str:
"""Return the value of a secret."""
current_path = Path(requester_path)
secret_dir = current_path
while True:
secret_dir = secret_dir.parent
try:
secret_dir.relative_to(self.config_dir)
except ValueError:
# We went above the config dir
break
secrets = self._load_secret_yaml(secret_dir)
if secret in secrets:
_LOGGER.debug(
"Secret %s retrieved from secrets.yaml in folder %s",
secret,
secret_dir,
)
return secrets[secret]
raise HomeAssistantError(f"Secret {secret} not defined")
def _load_secret_yaml(self, secret_dir: Path) -> Dict[str, str]:
"""Load the secrets yaml from path."""
secret_path = secret_dir / SECRET_YAML
if secret_path in self._cache:
return self._cache[secret_path]
_LOGGER.debug("Loading %s", secret_path)
try:
secrets = load_yaml(str(secret_path))
if not isinstance(secrets, dict):
raise HomeAssistantError("Secrets is not a dictionary")
if "logger" in secrets:
logger = str(secrets["logger"]).lower()
if logger == "debug":
_LOGGER.setLevel(logging.DEBUG)
else:
_LOGGER.error(
"secrets.yaml: 'logger: debug' expected, but 'logger: %s' found",
logger,
)
del secrets["logger"]
except FileNotFoundError:
secrets = {}
self._cache[secret_path] = secrets
return secrets
class SafeLineLoader(yaml.SafeLoader):
"""Loader class that keeps track of line numbers."""
def __init__(self, stream: Any, secrets: Optional[Secrets] = None) -> None:
"""Initialize a safe line loader."""
super().__init__(stream)
self.secrets = secrets
def compose_node(self, parent: yaml.nodes.Node, index: int) -> yaml.nodes.Node:
"""Annotate a node with the first line it was seen."""
last_line: int = self.line
@ -41,22 +103,27 @@ class SafeLineLoader(yaml.SafeLoader):
return node
def load_yaml(fname: str) -> JSON_TYPE:
def load_yaml(fname: str, secrets: Optional[Secrets] = None) -> JSON_TYPE:
"""Load a YAML file."""
try:
with open(fname, encoding="utf-8") as conf_file:
return parse_yaml(conf_file)
return parse_yaml(conf_file, secrets)
except UnicodeDecodeError as exc:
_LOGGER.error("Unable to read file %s: %s", fname, exc)
raise HomeAssistantError(exc) from exc
def parse_yaml(content: Union[str, TextIO]) -> JSON_TYPE:
def parse_yaml(
content: Union[str, TextIO], secrets: Optional[Secrets] = None
) -> JSON_TYPE:
"""Load a YAML file."""
try:
# If configuration file is empty YAML returns None
# We convert that to an empty dict
return yaml.load(content, Loader=SafeLineLoader) or OrderedDict()
return (
yaml.load(content, Loader=lambda stream: SafeLineLoader(stream, secrets))
or OrderedDict()
)
except yaml.YAMLError as exc:
_LOGGER.error(str(exc))
raise HomeAssistantError(exc) from exc
@ -64,21 +131,21 @@ def parse_yaml(content: Union[str, TextIO]) -> JSON_TYPE:
@overload
def _add_reference(
obj: Union[list, NodeListClass], loader: yaml.SafeLoader, node: yaml.nodes.Node
obj: Union[list, NodeListClass], loader: SafeLineLoader, node: yaml.nodes.Node
) -> NodeListClass:
...
@overload
def _add_reference(
obj: Union[str, NodeStrClass], loader: yaml.SafeLoader, node: yaml.nodes.Node
obj: Union[str, NodeStrClass], loader: SafeLineLoader, node: yaml.nodes.Node
) -> NodeStrClass:
...
@overload
def _add_reference(
obj: DICT_T, loader: yaml.SafeLoader, node: yaml.nodes.Node
obj: DICT_T, loader: SafeLineLoader, node: yaml.nodes.Node
) -> DICT_T:
...
@ -103,7 +170,7 @@ def _include_yaml(loader: SafeLineLoader, node: yaml.nodes.Node) -> JSON_TYPE:
"""
fname = os.path.join(os.path.dirname(loader.name), node.value)
try:
return _add_reference(load_yaml(fname), loader, node)
return _add_reference(load_yaml(fname, loader.secrets), loader, node)
except FileNotFoundError as exc:
raise HomeAssistantError(
f"{node.start_mark}: Unable to read file {fname}."
@ -135,7 +202,7 @@ def _include_dir_named_yaml(
filename = os.path.splitext(os.path.basename(fname))[0]
if os.path.basename(fname) == SECRET_YAML:
continue
mapping[filename] = load_yaml(fname)
mapping[filename] = load_yaml(fname, loader.secrets)
return _add_reference(mapping, loader, node)
@ -148,7 +215,7 @@ def _include_dir_merge_named_yaml(
for fname in _find_files(loc, "*.yaml"):
if os.path.basename(fname) == SECRET_YAML:
continue
loaded_yaml = load_yaml(fname)
loaded_yaml = load_yaml(fname, loader.secrets)
if isinstance(loaded_yaml, dict):
mapping.update(loaded_yaml)
return _add_reference(mapping, loader, node)
@ -175,7 +242,7 @@ def _include_dir_merge_list_yaml(
for fname in _find_files(loc, "*.yaml"):
if os.path.basename(fname) == SECRET_YAML:
continue
loaded_yaml = load_yaml(fname)
loaded_yaml = load_yaml(fname, loader.secrets)
if isinstance(loaded_yaml, list):
merged_list.extend(loaded_yaml)
return _add_reference(merged_list, loader, node)
@ -232,75 +299,27 @@ def _env_var_yaml(loader: SafeLineLoader, node: yaml.nodes.Node) -> str:
raise HomeAssistantError(node.value)
def _load_secret_yaml(secret_path: str) -> JSON_TYPE:
"""Load the secrets yaml from path."""
secret_path = os.path.join(secret_path, SECRET_YAML)
if secret_path in __SECRET_CACHE:
return __SECRET_CACHE[secret_path]
_LOGGER.debug("Loading %s", secret_path)
try:
secrets = load_yaml(secret_path)
if not isinstance(secrets, dict):
raise HomeAssistantError("Secrets is not a dictionary")
if "logger" in secrets:
logger = str(secrets["logger"]).lower()
if logger == "debug":
_LOGGER.setLevel(logging.DEBUG)
else:
_LOGGER.error(
"secrets.yaml: 'logger: debug' expected, but 'logger: %s' found",
logger,
)
del secrets["logger"]
except FileNotFoundError:
secrets = {}
__SECRET_CACHE[secret_path] = secrets
return secrets
def secret_yaml(loader: SafeLineLoader, node: yaml.nodes.Node) -> JSON_TYPE:
"""Load secrets and embed it into the configuration YAML."""
if os.path.basename(loader.name) == SECRET_YAML:
_LOGGER.error("secrets.yaml: attempt to load secret from within secrets file")
raise HomeAssistantError(
"secrets.yaml: attempt to load secret from within secrets file"
)
secret_path = os.path.dirname(loader.name)
while True:
secrets = _load_secret_yaml(secret_path)
if loader.secrets is None:
raise HomeAssistantError("Secrets not supported in this YAML file")
if node.value in secrets:
_LOGGER.debug(
"Secret %s retrieved from secrets.yaml in folder %s",
node.value,
secret_path,
)
return secrets[node.value]
if secret_path == os.path.dirname(sys.path[0]):
break # sys.path[0] set to config/deps folder by bootstrap
secret_path = os.path.dirname(secret_path)
if not os.path.exists(secret_path) or len(secret_path) < 5:
break # Somehow we got past the .homeassistant config folder
raise HomeAssistantError(f"Secret {node.value} not defined")
return loader.secrets.get(loader.name, node.value)
yaml.SafeLoader.add_constructor("!include", _include_yaml)
yaml.SafeLoader.add_constructor(
SafeLineLoader.add_constructor("!include", _include_yaml)
SafeLineLoader.add_constructor(
yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, _ordered_dict
)
yaml.SafeLoader.add_constructor(
SafeLineLoader.add_constructor(
yaml.resolver.BaseResolver.DEFAULT_SEQUENCE_TAG, _construct_seq
)
yaml.SafeLoader.add_constructor("!env_var", _env_var_yaml)
yaml.SafeLoader.add_constructor("!secret", secret_yaml)
yaml.SafeLoader.add_constructor("!include_dir_list", _include_dir_list_yaml)
yaml.SafeLoader.add_constructor("!include_dir_merge_list", _include_dir_merge_list_yaml)
yaml.SafeLoader.add_constructor("!include_dir_named", _include_dir_named_yaml)
yaml.SafeLoader.add_constructor(
SafeLineLoader.add_constructor("!env_var", _env_var_yaml)
SafeLineLoader.add_constructor("!secret", secret_yaml)
SafeLineLoader.add_constructor("!include_dir_list", _include_dir_list_yaml)
SafeLineLoader.add_constructor("!include_dir_merge_list", _include_dir_merge_list_yaml)
SafeLineLoader.add_constructor("!include_dir_named", _include_dir_named_yaml)
SafeLineLoader.add_constructor(
"!include_dir_merge_named", _include_dir_merge_named_yaml
)
yaml.SafeLoader.add_constructor("!input", Input.from_node)
SafeLineLoader.add_constructor("!input", Input.from_node)

View File

@ -18,7 +18,7 @@ def test_simple_list():
"""Test simple list."""
conf = "config:\n - simple\n - list"
with io.StringIO(conf) as file:
doc = yaml_loader.yaml.safe_load(file)
doc = yaml_loader.yaml.load(file, Loader=yaml_loader.SafeLineLoader)
assert doc["config"] == ["simple", "list"]
@ -26,7 +26,7 @@ def test_simple_dict():
"""Test simple dict."""
conf = "key: value"
with io.StringIO(conf) as file:
doc = yaml_loader.yaml.safe_load(file)
doc = yaml_loader.yaml.load(file, Loader=yaml_loader.SafeLineLoader)
assert doc["key"] == "value"
@ -49,7 +49,7 @@ def test_environment_variable():
os.environ["PASSWORD"] = "secret_password"
conf = "password: !env_var PASSWORD"
with io.StringIO(conf) as file:
doc = yaml_loader.yaml.safe_load(file)
doc = yaml_loader.yaml.load(file, Loader=yaml_loader.SafeLineLoader)
assert doc["password"] == "secret_password"
del os.environ["PASSWORD"]
@ -58,7 +58,7 @@ def test_environment_variable_default():
"""Test config file with default value for environment variable."""
conf = "password: !env_var PASSWORD secret_password"
with io.StringIO(conf) as file:
doc = yaml_loader.yaml.safe_load(file)
doc = yaml_loader.yaml.load(file, Loader=yaml_loader.SafeLineLoader)
assert doc["password"] == "secret_password"
@ -67,7 +67,7 @@ def test_invalid_environment_variable():
conf = "password: !env_var PASSWORD"
with pytest.raises(HomeAssistantError):
with io.StringIO(conf) as file:
yaml_loader.yaml.safe_load(file)
yaml_loader.yaml.load(file, Loader=yaml_loader.SafeLineLoader)
def test_include_yaml():
@ -75,13 +75,13 @@ def test_include_yaml():
with patch_yaml_files({"test.yaml": "value"}):
conf = "key: !include test.yaml"
with io.StringIO(conf) as file:
doc = yaml_loader.yaml.safe_load(file)
doc = yaml_loader.yaml.load(file, Loader=yaml_loader.SafeLineLoader)
assert doc["key"] == "value"
with patch_yaml_files({"test.yaml": None}):
conf = "key: !include test.yaml"
with io.StringIO(conf) as file:
doc = yaml_loader.yaml.safe_load(file)
doc = yaml_loader.yaml.load(file, Loader=yaml_loader.SafeLineLoader)
assert doc["key"] == {}
@ -93,7 +93,7 @@ def test_include_dir_list(mock_walk):
with patch_yaml_files({"/test/one.yaml": "one", "/test/two.yaml": "two"}):
conf = "key: !include_dir_list /test"
with io.StringIO(conf) as file:
doc = yaml_loader.yaml.safe_load(file)
doc = yaml_loader.yaml.load(file, Loader=yaml_loader.SafeLineLoader)
assert doc["key"] == sorted(["one", "two"])
@ -118,7 +118,7 @@ def test_include_dir_list_recursive(mock_walk):
assert (
".ignore" in mock_walk.return_value[0][1]
), "Expecting .ignore in here"
doc = yaml_loader.yaml.safe_load(file)
doc = yaml_loader.yaml.load(file, Loader=yaml_loader.SafeLineLoader)
assert "tmp2" in mock_walk.return_value[0][1]
assert ".ignore" not in mock_walk.return_value[0][1]
assert sorted(doc["key"]) == sorted(["zero", "one", "two"])
@ -135,7 +135,7 @@ def test_include_dir_named(mock_walk):
conf = "key: !include_dir_named /test"
correct = {"first": "one", "second": "two"}
with io.StringIO(conf) as file:
doc = yaml_loader.yaml.safe_load(file)
doc = yaml_loader.yaml.load(file, Loader=yaml_loader.SafeLineLoader)
assert doc["key"] == correct
@ -161,7 +161,7 @@ def test_include_dir_named_recursive(mock_walk):
assert (
".ignore" in mock_walk.return_value[0][1]
), "Expecting .ignore in here"
doc = yaml_loader.yaml.safe_load(file)
doc = yaml_loader.yaml.load(file, Loader=yaml_loader.SafeLineLoader)
assert "tmp2" in mock_walk.return_value[0][1]
assert ".ignore" not in mock_walk.return_value[0][1]
assert doc["key"] == correct
@ -177,7 +177,7 @@ def test_include_dir_merge_list(mock_walk):
):
conf = "key: !include_dir_merge_list /test"
with io.StringIO(conf) as file:
doc = yaml_loader.yaml.safe_load(file)
doc = yaml_loader.yaml.load(file, Loader=yaml_loader.SafeLineLoader)
assert sorted(doc["key"]) == sorted(["one", "two", "three"])
@ -202,7 +202,7 @@ def test_include_dir_merge_list_recursive(mock_walk):
assert (
".ignore" in mock_walk.return_value[0][1]
), "Expecting .ignore in here"
doc = yaml_loader.yaml.safe_load(file)
doc = yaml_loader.yaml.load(file, Loader=yaml_loader.SafeLineLoader)
assert "tmp2" in mock_walk.return_value[0][1]
assert ".ignore" not in mock_walk.return_value[0][1]
assert sorted(doc["key"]) == sorted(["one", "two", "three", "four"])
@ -221,7 +221,7 @@ def test_include_dir_merge_named(mock_walk):
with patch_yaml_files(files):
conf = "key: !include_dir_merge_named /test"
with io.StringIO(conf) as file:
doc = yaml_loader.yaml.safe_load(file)
doc = yaml_loader.yaml.load(file, Loader=yaml_loader.SafeLineLoader)
assert doc["key"] == {"key1": "one", "key2": "two", "key3": "three"}
@ -246,7 +246,7 @@ def test_include_dir_merge_named_recursive(mock_walk):
assert (
".ignore" in mock_walk.return_value[0][1]
), "Expecting .ignore in here"
doc = yaml_loader.yaml.safe_load(file)
doc = yaml_loader.yaml.load(file, Loader=yaml_loader.SafeLineLoader)
assert "tmp2" in mock_walk.return_value[0][1]
assert ".ignore" not in mock_walk.return_value[0][1]
assert doc["key"] == {
@ -278,11 +278,11 @@ def test_dump_unicode():
FILES = {}
def load_yaml(fname, string):
def load_yaml(fname, string, secrets=None):
"""Write a string to file and return the parsed yaml."""
FILES[fname] = string
with patch_yaml_files(FILES):
return load_yaml_config_file(fname)
return load_yaml_config_file(fname, secrets)
class TestSecrets(unittest.TestCase):
@ -293,7 +293,6 @@ class TestSecrets(unittest.TestCase):
def setUp(self):
"""Create & load secrets file."""
config_dir = get_test_config_dir()
yaml.clear_secret_cache()
self._yaml_path = os.path.join(config_dir, YAML_CONFIG_FILE)
self._secret_path = os.path.join(config_dir, yaml.SECRET_YAML)
self._sub_folder_path = os.path.join(config_dir, "subFolder")
@ -315,11 +314,11 @@ class TestSecrets(unittest.TestCase):
" username: !secret comp1_un\n"
" password: !secret comp1_pw\n"
"",
yaml_loader.Secrets(config_dir),
)
def tearDown(self):
"""Clean up secrets."""
yaml.clear_secret_cache()
FILES.clear()
def test_secrets_from_yaml(self):
@ -341,6 +340,7 @@ class TestSecrets(unittest.TestCase):
" username: !secret comp1_un\n"
" password: !secret comp1_pw\n"
"",
yaml_loader.Secrets(get_test_config_dir()),
)
assert expected == self._yaml["http"]
@ -359,6 +359,7 @@ class TestSecrets(unittest.TestCase):
" username: !secret comp1_un\n"
" password: !secret comp1_pw\n"
"",
yaml_loader.Secrets(get_test_config_dir()),
)
assert expected == self._yaml["http"]
@ -380,9 +381,12 @@ class TestSecrets(unittest.TestCase):
@patch("homeassistant.util.yaml.loader._LOGGER.error")
def test_bad_logger_value(self, mock_error):
"""Ensure logger: debug was removed."""
yaml.clear_secret_cache()
load_yaml(self._secret_path, "logger: info\npw: abc")
load_yaml(self._yaml_path, "api_password: !secret pw")
load_yaml(
self._yaml_path,
"api_password: !secret pw",
yaml_loader.Secrets(get_test_config_dir()),
)
assert mock_error.call_count == 1, "Expected an error about logger: value"
def test_secrets_are_not_dict(self):
@ -390,7 +394,6 @@ class TestSecrets(unittest.TestCase):
FILES[
self._secret_path
] = "- http_pw: pwhttp\n comp1_un: un1\n comp1_pw: pw1\n"
yaml.clear_secret_cache()
with pytest.raises(HomeAssistantError):
load_yaml(
self._yaml_path,
@ -424,10 +427,8 @@ def test_no_recursive_secrets(caplog):
files = {YAML_CONFIG_FILE: "key: !secret a", yaml.SECRET_YAML: "a: 1\nb: !secret a"}
with patch_yaml_files(files), pytest.raises(HomeAssistantError) as e:
load_yaml_config_file(YAML_CONFIG_FILE)
assert e.value.args == (
"secrets.yaml: attempt to load secret from within secrets file",
)
assert "attempt to load secret from within secrets file" in caplog.text
assert e.value.args == ("Secrets not supported in this YAML file",)
def test_input_class():