mirror of
https://github.com/home-assistant/core
synced 2024-09-28 03:04:04 +02:00
Signficantly reduce executor contention during bootstrap (#107312)
* Signficantly reduce executor contention during bootstrap At startup we have a thundering herd wanting to use the executor to load manifiest.json. Since we know which integrations we are about to load in each resolver step, group the manifest loads into single executor jobs by calling async_get_integrations on the deps of the integrations after they are resolved. In practice this reduced the number of executor jobs by 80% during bootstrap * merge * naming * tweak * tweak * not enough contention to be worth it there * refactor to avoid waiting * refactor to avoid waiting * tweaks * tweaks * tweak * background is fine * comment
This commit is contained in:
parent
acf78664e2
commit
69307374f4
@ -16,7 +16,7 @@ from typing import TYPE_CHECKING, Any
|
|||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
import yarl
|
import yarl
|
||||||
|
|
||||||
from . import config as conf_util, config_entries, core, loader
|
from . import config as conf_util, config_entries, core, loader, requirements
|
||||||
from .components import http
|
from .components import http
|
||||||
from .const import (
|
from .const import (
|
||||||
FORMAT_DATETIME,
|
FORMAT_DATETIME,
|
||||||
@ -229,7 +229,7 @@ def open_hass_ui(hass: core.HomeAssistant) -> None:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def load_registries(hass: core.HomeAssistant) -> None:
|
async def async_load_base_functionality(hass: core.HomeAssistant) -> None:
|
||||||
"""Load the registries and cache the result of platform.uname().processor."""
|
"""Load the registries and cache the result of platform.uname().processor."""
|
||||||
if DATA_REGISTRIES_LOADED in hass.data:
|
if DATA_REGISTRIES_LOADED in hass.data:
|
||||||
return
|
return
|
||||||
@ -256,6 +256,7 @@ async def load_registries(hass: core.HomeAssistant) -> None:
|
|||||||
hass.async_add_executor_job(_cache_uname_processor),
|
hass.async_add_executor_job(_cache_uname_processor),
|
||||||
template.async_load_custom_templates(hass),
|
template.async_load_custom_templates(hass),
|
||||||
restore_state.async_load(hass),
|
restore_state.async_load(hass),
|
||||||
|
hass.config_entries.async_initialize(),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -270,8 +271,7 @@ async def async_from_config_dict(
|
|||||||
start = monotonic()
|
start = monotonic()
|
||||||
|
|
||||||
hass.config_entries = config_entries.ConfigEntries(hass, config)
|
hass.config_entries = config_entries.ConfigEntries(hass, config)
|
||||||
await hass.config_entries.async_initialize()
|
await async_load_base_functionality(hass)
|
||||||
await load_registries(hass)
|
|
||||||
|
|
||||||
# Set up core.
|
# Set up core.
|
||||||
_LOGGER.debug("Setting up %s", CORE_INTEGRATIONS)
|
_LOGGER.debug("Setting up %s", CORE_INTEGRATIONS)
|
||||||
@ -527,11 +527,13 @@ async def async_setup_multi_components(
|
|||||||
config: dict[str, Any],
|
config: dict[str, Any],
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Set up multiple domains. Log on failure."""
|
"""Set up multiple domains. Log on failure."""
|
||||||
|
# Avoid creating tasks for domains that were setup in a previous stage
|
||||||
|
domains_not_yet_setup = domains - hass.config.components
|
||||||
futures = {
|
futures = {
|
||||||
domain: hass.async_create_task(
|
domain: hass.async_create_task(
|
||||||
async_setup_component(hass, domain, config), f"setup component {domain}"
|
async_setup_component(hass, domain, config), f"setup component {domain}"
|
||||||
)
|
)
|
||||||
for domain in domains
|
for domain in domains_not_yet_setup
|
||||||
}
|
}
|
||||||
results = await asyncio.gather(*futures.values(), return_exceptions=True)
|
results = await asyncio.gather(*futures.values(), return_exceptions=True)
|
||||||
for idx, domain in enumerate(futures):
|
for idx, domain in enumerate(futures):
|
||||||
@ -555,6 +557,8 @@ async def _async_set_up_integrations(
|
|||||||
|
|
||||||
domains_to_setup = _get_domains(hass, config)
|
domains_to_setup = _get_domains(hass, config)
|
||||||
|
|
||||||
|
needed_requirements: set[str] = set()
|
||||||
|
|
||||||
# Resolve all dependencies so we know all integrations
|
# Resolve all dependencies so we know all integrations
|
||||||
# that will have to be loaded and start rightaway
|
# that will have to be loaded and start rightaway
|
||||||
integration_cache: dict[str, loader.Integration] = {}
|
integration_cache: dict[str, loader.Integration] = {}
|
||||||
@ -570,6 +574,25 @@ async def _async_set_up_integrations(
|
|||||||
).values()
|
).values()
|
||||||
if isinstance(int_or_exc, loader.Integration)
|
if isinstance(int_or_exc, loader.Integration)
|
||||||
]
|
]
|
||||||
|
|
||||||
|
manifest_deps: set[str] = set()
|
||||||
|
for itg in integrations_to_process:
|
||||||
|
manifest_deps.update(itg.dependencies)
|
||||||
|
manifest_deps.update(itg.after_dependencies)
|
||||||
|
needed_requirements.update(itg.requirements)
|
||||||
|
|
||||||
|
if manifest_deps:
|
||||||
|
# If there are dependencies, try to preload all
|
||||||
|
# the integrations manifest at once and add them
|
||||||
|
# to the list of requirements we need to install
|
||||||
|
# so we can try to check if they are already installed
|
||||||
|
# in a single call below which avoids each integration
|
||||||
|
# having to wait for the lock to do it individually
|
||||||
|
deps = await loader.async_get_integrations(hass, manifest_deps)
|
||||||
|
for dependant_itg in deps.values():
|
||||||
|
if isinstance(dependant_itg, loader.Integration):
|
||||||
|
needed_requirements.update(dependant_itg.requirements)
|
||||||
|
|
||||||
resolve_dependencies_tasks = [
|
resolve_dependencies_tasks = [
|
||||||
itg.resolve_dependencies()
|
itg.resolve_dependencies()
|
||||||
for itg in integrations_to_process
|
for itg in integrations_to_process
|
||||||
@ -591,6 +614,14 @@ async def _async_set_up_integrations(
|
|||||||
|
|
||||||
_LOGGER.info("Domains to be set up: %s", domains_to_setup)
|
_LOGGER.info("Domains to be set up: %s", domains_to_setup)
|
||||||
|
|
||||||
|
# Optimistically check if requirements are already installed
|
||||||
|
# ahead of setting up the integrations so we can prime the cache
|
||||||
|
# We do not wait for this since its an optimization only
|
||||||
|
hass.async_create_background_task(
|
||||||
|
requirements.async_load_installed_versions(hass, needed_requirements),
|
||||||
|
"check installed requirements",
|
||||||
|
)
|
||||||
|
|
||||||
# Initialize recorder
|
# Initialize recorder
|
||||||
if "recorder" in domains_to_setup:
|
if "recorder" in domains_to_setup:
|
||||||
recorder.async_initialize_recorder(hass)
|
recorder.async_initialize_recorder(hass)
|
||||||
|
@ -63,6 +63,13 @@ async def async_process_requirements(
|
|||||||
await _async_get_manager(hass).async_process_requirements(name, requirements)
|
await _async_get_manager(hass).async_process_requirements(name, requirements)
|
||||||
|
|
||||||
|
|
||||||
|
async def async_load_installed_versions(
|
||||||
|
hass: HomeAssistant, requirements: set[str]
|
||||||
|
) -> None:
|
||||||
|
"""Load the installed version of requirements."""
|
||||||
|
await _async_get_manager(hass).async_load_installed_versions(requirements)
|
||||||
|
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def _async_get_manager(hass: HomeAssistant) -> RequirementsManager:
|
def _async_get_manager(hass: HomeAssistant) -> RequirementsManager:
|
||||||
"""Get the requirements manager."""
|
"""Get the requirements manager."""
|
||||||
@ -284,3 +291,15 @@ class RequirementsManager:
|
|||||||
self.install_failure_history |= failures
|
self.install_failure_history |= failures
|
||||||
if failures:
|
if failures:
|
||||||
raise RequirementsNotFound(name, list(failures))
|
raise RequirementsNotFound(name, list(failures))
|
||||||
|
|
||||||
|
async def async_load_installed_versions(
|
||||||
|
self,
|
||||||
|
requirements: set[str],
|
||||||
|
) -> None:
|
||||||
|
"""Load the installed version of requirements."""
|
||||||
|
if not (requirements_to_check := requirements - self.is_installed_cache):
|
||||||
|
return
|
||||||
|
|
||||||
|
self.is_installed_cache |= await self.hass.async_add_executor_job(
|
||||||
|
pkg_util.get_installed_versions, requirements_to_check
|
||||||
|
)
|
||||||
|
@ -29,6 +29,11 @@ def is_docker_env() -> bool:
|
|||||||
return Path("/.dockerenv").exists()
|
return Path("/.dockerenv").exists()
|
||||||
|
|
||||||
|
|
||||||
|
def get_installed_versions(specifiers: set[str]) -> set[str]:
|
||||||
|
"""Return a set of installed packages and versions."""
|
||||||
|
return {specifier for specifier in specifiers if is_installed(specifier)}
|
||||||
|
|
||||||
|
|
||||||
def is_installed(requirement_str: str) -> bool:
|
def is_installed(requirement_str: str) -> bool:
|
||||||
"""Check if a package is installed and will be loaded when we import it.
|
"""Check if a package is installed and will be loaded when we import it.
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user