Add support for integrations v2 (#78801)

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
Erik Montnemery 2022-09-28 17:31:48 +02:00 committed by GitHub
parent 7f08dd851e
commit b173ae7f44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 5623 additions and 12 deletions

View File

@ -0,0 +1,5 @@
{
"domain": "google",
"name": "Google",
"integrations": ["google", "google_sheets"]
}

View File

@ -35,6 +35,7 @@ from homeassistant.loader import (
Integration,
IntegrationNotFound,
async_get_integration,
async_get_integration_descriptions,
async_get_integrations,
)
from homeassistant.setup import DATA_SETUP_TIME, async_get_loaded_integrations
@ -75,6 +76,7 @@ def async_register_commands(
async_reg(hass, handle_subscribe_entities)
async_reg(hass, handle_supported_brands)
async_reg(hass, handle_supported_features)
async_reg(hass, handle_integration_descriptions)
def pong_message(iden: int) -> dict[str, Any]:
@ -741,3 +743,13 @@ def handle_supported_features(
"""Handle setting supported features."""
connection.supported_features = msg["features"]
connection.send_result(msg["id"])
@decorators.require_admin
@decorators.websocket_command({"type": "integration/descriptions"})
@decorators.async_response
async def handle_integration_descriptions(
hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
) -> None:
"""Get metadata for all brands and integrations."""
connection.send_result(msg["id"], await async_get_integration_descriptions(hass))

File diff suppressed because it is too large Load Diff

View File

@ -23,6 +23,7 @@ from awesomeversion import (
AwesomeVersionStrategy,
)
from . import generated
from .generated.application_credentials import APPLICATION_CREDENTIALS
from .generated.bluetooth import BLUETOOTH
from .generated.dhcp import DHCP
@ -250,6 +251,44 @@ async def async_get_config_flows(
return flows
async def async_get_integration_descriptions(
hass: HomeAssistant,
) -> dict[str, Any]:
"""Return cached list of integrations."""
base = generated.__path__[0]
config_flow_path = pathlib.Path(base) / "integrations.json"
flow = await hass.async_add_executor_job(config_flow_path.read_text)
core_flows: dict[str, Any] = json_loads(flow)
custom_integrations = await async_get_custom_components(hass)
custom_flows: dict[str, Any] = {
"integration": {},
"hardware": {},
"helper": {},
}
for integration in custom_integrations.values():
# Remove core integration with same domain as the custom integration
if integration.integration_type in ("entity", "system"):
continue
for integration_type in ("integration", "hardware", "helper"):
if integration.domain not in core_flows[integration_type]:
continue
del core_flows[integration_type][integration.domain]
if integration.domain in core_flows["translated_name"]:
core_flows["translated_name"].remove(integration.domain)
metadata = {
"config_flow": integration.config_flow,
"iot_class": integration.iot_class,
"name": integration.name,
}
custom_flows[integration.integration_type][integration.domain] = metadata
return {"core": core_flows, "custom": custom_flows}
async def async_get_application_credentials(hass: HomeAssistant) -> list[str]:
"""Return cached list of application credentials."""
integrations = await async_get_custom_components(hass)

View File

@ -31,7 +31,6 @@ INTEGRATION_PLUGINS = [
application_credentials,
bluetooth,
codeowners,
config_flow,
dependencies,
dhcp,
json,
@ -44,6 +43,7 @@ INTEGRATION_PLUGINS = [
translations,
usb,
zeroconf,
config_flow,
]
HASS_PLUGINS = [
coverage,

71
script/hassfest/brand.py Normal file
View File

@ -0,0 +1,71 @@
"""Brand validation."""
from __future__ import annotations
import voluptuous as vol
from voluptuous.humanize import humanize_error
from .model import Brand, Config, Integration
BRAND_SCHEMA = vol.Schema(
{
vol.Required("domain"): str,
vol.Required("name"): str,
vol.Optional("integrations"): [str],
vol.Optional("iot_standards"): [vol.Any("homekit", "zigbee", "zwave")],
}
)
def _validate_brand(
brand: Brand, integrations: dict[str, Integration], config: Config
) -> None:
"""Validate brand file."""
try:
BRAND_SCHEMA(brand.brand)
except vol.Invalid as err:
config.add_error(
"brand",
f"Invalid brand file {brand.path.name}: {humanize_error(brand.brand, err)}",
)
return
if brand.domain != brand.path.stem:
config.add_error(
"brand",
f"Domain '{brand.domain}' does not match file name {brand.path.name}",
)
if not brand.integrations and not brand.iot_standards:
config.add_error(
"brand",
f"Invalid brand file {brand.path.name}: At least one of integrations or "
"iot_standards must be non-empty",
)
if brand.integrations:
for sub_integration in brand.integrations:
if sub_integration not in integrations:
config.add_error(
"brand",
f"Invalid brand file {brand.path.name}: Can't add non core domain "
f"'{sub_integration}' to 'integrations'",
)
if (
brand.domain in integrations
and not brand.integrations
or brand.domain not in brand.integrations
):
config.add_error(
"brand",
f"Invalid brand file {brand.path.name}: Brand '{brand.brand['domain']}' "
f"is an integration but is missing in the brand's 'integrations' list'",
)
def validate(
brands: dict[str, Brand], integrations: dict[str, Integration], config: Config
) -> None:
"""Handle all integrations' brands."""
for brand in brands.values():
_validate_brand(brand, integrations, config)

View File

@ -1,9 +1,13 @@
"""Generate config flow file."""
from __future__ import annotations
import json
import pathlib
import black
from .model import Config, Integration
from .brand import validate as validate_brands
from .model import Brand, Config, Integration
from .serializer import to_string
BASE = """
@ -87,14 +91,107 @@ def _generate_and_validate(integrations: dict[str, Integration], config: Config)
return black.format_str(BASE.format(to_string(domains)), mode=black.Mode())
def _populate_brand_integrations(
integration_data: dict,
integrations: dict[str, Integration],
brand_metadata: dict,
sub_integrations: list[str],
) -> None:
"""Add referenced integrations to a brand's metadata."""
brand_metadata.setdefault("integrations", {})
for domain in sub_integrations:
integration = integrations.get(domain)
if not integration or integration.integration_type in ("entity", "system"):
continue
metadata = {}
metadata["config_flow"] = integration.config_flow
metadata["iot_class"] = integration.iot_class
if integration.translated_name:
integration_data["translated_name"].add(domain)
else:
metadata["name"] = integration.name
brand_metadata["integrations"][domain] = metadata
def _generate_integrations(
brands: dict[str, Brand], integrations: dict[str, Integration], config: Config
):
"""Generate integrations data."""
result = {
"integration": {},
"hardware": {},
"helper": {},
"translated_name": set(),
}
# Not all integrations will have an item in the brands collection.
# The config flow data index will be the union of the integrations without a brands item
# and the brand domain names from the brands collection.
# Compile a set of integrations which are referenced from at least one brand's
# integrations list. These integrations will not be present in the root level of the
# generated config flow index.
brand_integration_domains = {
brand_integration_domain
for brand in brands.values()
for brand_integration_domain in brand.integrations or []
}
# Compile a set of integrations which are not referenced from any brand's
# integrations list.
primary_domains = {
domain
for domain, integration in integrations.items()
if integration.manifest and domain not in brand_integration_domains
}
# Add all brands to the set
primary_domains |= set(brands)
# Generate the config flow index
for domain in sorted(primary_domains):
metadata = {}
if brand := brands.get(domain):
metadata["name"] = brand.name
if brand.integrations:
# Add the integrations which are referenced from the brand's
# integrations list
_populate_brand_integrations(
result, integrations, metadata, brand.integrations
)
if brand.iot_standards:
metadata["iot_standards"] = brand.iot_standards
result["integration"][domain] = metadata
else: # integration
integration = integrations[domain]
if integration.integration_type in ("entity", "system"):
continue
metadata["config_flow"] = integration.config_flow
metadata["iot_class"] = integration.iot_class
if integration.translated_name:
result["translated_name"].add(domain)
else:
metadata["name"] = integration.name
result[integration.integration_type][domain] = metadata
return json.dumps(
result | {"translated_name": sorted(result["translated_name"])}, indent=2
)
def validate(integrations: dict[str, Integration], config: Config):
"""Validate config flow file."""
config_flow_path = config.root / "homeassistant/generated/config_flows.py"
integrations_path = config.root / "homeassistant/generated/integrations.json"
config.cache["config_flow"] = content = _generate_and_validate(integrations, config)
if config.specific_integrations:
return
brands = Brand.load_dir(pathlib.Path(config.root / "homeassistant/brands"), config)
validate_brands(brands, integrations, config)
with open(str(config_flow_path)) as fp:
if fp.read() != content:
config.add_error(
@ -103,11 +200,25 @@ def validate(integrations: dict[str, Integration], config: Config):
"Run python3 -m script.hassfest",
fixable=True,
)
return
config.cache["integrations"] = content = _generate_integrations(
brands, integrations, config
)
with open(str(integrations_path)) as fp:
if fp.read() != content + "\n":
config.add_error(
"config_flow",
"File integrations.json is not up to date. "
"Run python3 -m script.hassfest",
fixable=True,
)
def generate(integrations: dict[str, Integration], config: Config):
"""Generate config flow file."""
config_flow_path = config.root / "homeassistant/generated/config_flows.py"
integrations_path = config.root / "homeassistant/generated/integrations.json"
with open(str(config_flow_path), "w") as fp:
fp.write(f"{config.cache['config_flow']}")
with open(str(integrations_path), "w") as fp:
fp.write(f"{config.cache['integrations']}\n")

View File

@ -39,6 +39,62 @@ class Config:
self.errors.append(Error(*args, **kwargs))
@attr.s
class Brand:
"""Represent a brand in our validator."""
@classmethod
def load_dir(cls, path: pathlib.Path, config: Config):
"""Load all brands in a directory."""
assert path.is_dir()
brands = {}
for fil in path.iterdir():
brand = cls(fil)
brand.load_brand(config)
brands[brand.domain] = brand
return brands
path: pathlib.Path = attr.ib()
brand: dict[str, Any] | None = attr.ib(default=None)
@property
def domain(self) -> str:
"""Integration domain."""
return self.path.stem
@property
def name(self) -> str | None:
"""Return name of the integration."""
return self.brand.get("name")
@property
def integrations(self) -> list[str]:
"""Return the sub integrations of this brand."""
return self.brand.get("integrations")
@property
def iot_standards(self) -> list[str]:
"""Return list of supported IoT standards."""
return self.brand.get("iot_standards", [])
def load_brand(self, config: Config) -> None:
"""Load brand file."""
if not self.path.is_file():
config.add_error("model", f"Brand file {self.path} not found")
return
try:
brand = json.loads(self.path.read_text())
except ValueError as err:
config.add_error(
"model", f"Brand file {self.path.name} contains invalid JSON: {err}"
)
return
self.brand = brand
@attr.s
class Integration:
"""Represent an integration in our validator."""
@ -71,6 +127,7 @@ class Integration:
manifest: dict[str, Any] | None = attr.ib(default=None)
errors: list[Error] = attr.ib(factory=list)
warnings: list[Error] = attr.ib(factory=list)
translated_name: bool = attr.ib(default=False)
@property
def domain(self) -> str:
@ -122,6 +179,11 @@ class Integration:
"""Get integration_type."""
return self.manifest.get("integration_type", "integration")
@property
def iot_class(self) -> str | None:
"""Return the integration IoT Class."""
return self.manifest.get("iot_class")
def add_error(self, *args: Any, **kwargs: Any) -> None:
"""Add an error."""
self.errors.append(Error(*args, **kwargs))

View File

@ -312,7 +312,9 @@ def gen_platform_strings_schema(config: Config, integration: Integration):
ONBOARDING_SCHEMA = vol.Schema({vol.Required("area"): {str: cv.string_with_no_html}})
def validate_translation_file(config: Config, integration: Integration, all_strings):
def validate_translation_file( # noqa: C901
config: Config, integration: Integration, all_strings
):
"""Validate translation files for integration."""
if config.specific_integrations:
check_translations_directory_name(integration)
@ -363,14 +365,16 @@ def validate_translation_file(config: Config, integration: Integration, all_stri
if strings_file.name == "strings.json":
find_references(strings, name, references)
if strings.get(
"title"
) == integration.name and not allow_name_translation(integration):
integration.add_error(
"translations",
"Don't specify title in translation strings if it's a brand name "
"or add exception to ALLOW_NAME_TRANSLATION",
)
if (title := strings.get("title")) is not None:
integration.translated_name = True
if title == integration.name and not allow_name_translation(
integration
):
integration.add_error(
"translations",
"Don't specify title in translation strings if it's a brand "
"name or add exception to ALLOW_NAME_TRANSLATION",
)
platform_string_schema = gen_platform_strings_schema(config, integration)
platform_strings = [integration.path.glob("strings.*.json")]

View File

@ -2014,3 +2014,20 @@ async def test_client_message_coalescing(hass, websocket_client, hass_admin_user
hass.states.async_set("light.permitted", "on", {"color": "blue"})
await websocket_client.close()
await hass.async_block_till_done()
async def test_integration_descriptions(hass, hass_ws_client):
"""Test we can get integration descriptions."""
assert await async_setup_component(hass, "config", {})
ws_client = await hass_ws_client(hass)
await ws_client.send_json(
{
"id": 1,
"type": "integration/descriptions",
}
)
response = await ws_client.receive_json()
assert response["success"]
assert response["result"]