Add schema auto repairs for states tables (#90083)

This commit is contained in:
J. Nick Koston 2023-03-22 10:05:23 -10:00 committed by GitHub
parent 5948347b6b
commit 4ebce9746d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 731 additions and 485 deletions

View File

@ -0,0 +1,218 @@
"""Schema repairs."""
from __future__ import annotations
from collections.abc import Iterable, Mapping
import logging
from typing import TYPE_CHECKING
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm.attributes import InstrumentedAttribute
from ..const import SupportedDialect
from ..db_schema import DOUBLE_PRECISION_TYPE_SQL, DOUBLE_TYPE
from ..util import session_scope
if TYPE_CHECKING:
from .. import Recorder
_LOGGER = logging.getLogger(__name__)
MYSQL_ERR_INCORRECT_STRING_VALUE = 1366
# This name can't be represented unless 4-byte UTF-8 unicode is supported
UTF8_NAME = "𓆚𓃗"
# This number can't be accurately represented as a 32-bit float
PRECISE_NUMBER = 1.000000000000001
def _get_precision_column_types(
table_object: type[DeclarativeBase],
) -> list[str]:
"""Get the column names for the columns that need to be checked for precision."""
return [
column.key
for column in table_object.__table__.columns
if column.type is DOUBLE_TYPE
]
def validate_table_schema_supports_utf8(
instance: Recorder,
table_object: type[DeclarativeBase],
columns: tuple[InstrumentedAttribute, ...],
) -> set[str]:
"""Do some basic checks for common schema errors caused by manual migration."""
schema_errors: set[str] = set()
# Lack of full utf8 support is only an issue for MySQL / MariaDB
if instance.dialect_name != SupportedDialect.MYSQL:
return schema_errors
try:
schema_errors = _validate_table_schema_supports_utf8(
instance, table_object, columns
)
except Exception as exc: # pylint: disable=broad-except
_LOGGER.exception("Error when validating DB schema: %s", exc)
_log_schema_errors(table_object, schema_errors)
return schema_errors
def _validate_table_schema_supports_utf8(
instance: Recorder,
table_object: type[DeclarativeBase],
columns: tuple[InstrumentedAttribute, ...],
) -> set[str]:
"""Do some basic checks for common schema errors caused by manual migration."""
schema_errors: set[str] = set()
# Mark the session as read_only to ensure that the test data is not committed
# to the database and we always rollback when the scope is exited
with session_scope(session=instance.get_session(), read_only=True) as session:
db_object = table_object(**{column.key: UTF8_NAME for column in columns})
table = table_object.__tablename__
# Try inserting some data which needs utf8mb4 support
session.add(db_object)
try:
session.flush()
except OperationalError as err:
if err.orig and err.orig.args[0] == MYSQL_ERR_INCORRECT_STRING_VALUE:
_LOGGER.debug(
"Database %s statistics_meta does not support 4-byte UTF-8",
table,
)
schema_errors.add(f"{table}.4-byte UTF-8")
return schema_errors
raise
finally:
session.rollback()
return schema_errors
def validate_db_schema_precision(
instance: Recorder,
table_object: type[DeclarativeBase],
) -> set[str]:
"""Do some basic checks for common schema errors caused by manual migration."""
schema_errors: set[str] = set()
# Wrong precision is only an issue for MySQL / MariaDB / PostgreSQL
if instance.dialect_name not in (
SupportedDialect.MYSQL,
SupportedDialect.POSTGRESQL,
):
return schema_errors
try:
schema_errors = _validate_db_schema_precision(instance, table_object)
except Exception as exc: # pylint: disable=broad-except
_LOGGER.exception("Error when validating DB schema: %s", exc)
_log_schema_errors(table_object, schema_errors)
return schema_errors
def _validate_db_schema_precision(
instance: Recorder,
table_object: type[DeclarativeBase],
) -> set[str]:
"""Do some basic checks for common schema errors caused by manual migration."""
schema_errors: set[str] = set()
columns = _get_precision_column_types(table_object)
# Mark the session as read_only to ensure that the test data is not committed
# to the database and we always rollback when the scope is exited
with session_scope(session=instance.get_session(), read_only=True) as session:
db_object = table_object(**{column: PRECISE_NUMBER for column in columns})
table = table_object.__tablename__
try:
session.add(db_object)
session.flush()
session.refresh(db_object)
_check_columns(
schema_errors=schema_errors,
stored={column: getattr(db_object, column) for column in columns},
expected={column: PRECISE_NUMBER for column in columns},
columns=columns,
table_name=table,
supports="double precision",
)
finally:
session.rollback()
return schema_errors
def _log_schema_errors(
table_object: type[DeclarativeBase], schema_errors: set[str]
) -> None:
"""Log schema errors."""
if not schema_errors:
return
_LOGGER.debug(
"Detected %s schema errors: %s",
table_object.__tablename__,
", ".join(sorted(schema_errors)),
)
def _check_columns(
schema_errors: set[str],
stored: Mapping,
expected: Mapping,
columns: Iterable[str],
table_name: str,
supports: str,
) -> None:
"""Check that the columns in the table support the given feature.
Errors are logged and added to the schema_errors set.
"""
for column in columns:
if stored[column] == expected[column]:
continue
schema_errors.add(f"{table_name}.{supports}")
_LOGGER.error(
"Column %s in database table %s does not support %s (stored=%s != expected=%s)",
column,
table_name,
supports,
stored[column],
expected[column],
)
def correct_db_schema_utf8(
instance: Recorder, table_object: type[DeclarativeBase], schema_errors: set[str]
) -> None:
"""Correct utf8 issues detected by validate_db_schema."""
table_name = table_object.__tablename__
if f"{table_name}.4-byte UTF-8" in schema_errors:
from ..migration import ( # pylint: disable=import-outside-toplevel
_correct_table_character_set_and_collation,
)
_correct_table_character_set_and_collation(table_name, instance.get_session)
def correct_db_schema_precision(
instance: Recorder,
table_object: type[DeclarativeBase],
schema_errors: set[str],
) -> None:
"""Correct precision issues detected by validate_db_schema."""
table_name = table_object.__tablename__
if f"{table_name}.double precision" in schema_errors:
from ..migration import ( # pylint: disable=import-outside-toplevel
_modify_columns,
)
precision_columns = _get_precision_column_types(table_object)
# Attempt to convert timestamp columns to µs precision
session_maker = instance.get_session
engine = instance.engine
assert engine is not None, "Engine should be set"
_modify_columns(
session_maker,
engine,
table_name,
[f"{column} {DOUBLE_PRECISION_TYPE_SQL}" for column in precision_columns],
)

View File

@ -0,0 +1,39 @@
"""States schema repairs."""
from __future__ import annotations
from typing import TYPE_CHECKING
from ...db_schema import StateAttributes, States
from ..schema import (
correct_db_schema_precision,
correct_db_schema_utf8,
validate_db_schema_precision,
validate_table_schema_supports_utf8,
)
if TYPE_CHECKING:
from ... import Recorder
TABLE_UTF8_COLUMNS = {
States: (States.state,),
StateAttributes: (StateAttributes.shared_attrs,),
}
def validate_db_schema(instance: Recorder) -> set[str]:
"""Do some basic checks for common schema errors caused by manual migration."""
schema_errors: set[str] = set()
for table, columns in TABLE_UTF8_COLUMNS.items():
schema_errors |= validate_table_schema_supports_utf8(instance, table, columns)
schema_errors |= validate_db_schema_precision(instance, States)
return schema_errors
def correct_db_schema(
instance: Recorder,
schema_errors: set[str],
) -> None:
"""Correct issues detected by validate_db_schema."""
for table in (States, StateAttributes):
correct_db_schema_utf8(instance, table, schema_errors)
correct_db_schema_precision(instance, States, schema_errors)

View File

@ -1,28 +1,16 @@
"""Statistics schema repairs."""
from __future__ import annotations
from collections.abc import Callable, Mapping
import contextlib
from datetime import datetime
import logging
from typing import TYPE_CHECKING
from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.orm.session import Session
from homeassistant.core import HomeAssistant
from homeassistant.util import dt as dt_util
from ...const import DOMAIN, SupportedDialect
from ...db_schema import Statistics, StatisticsShortTerm
from ...models import StatisticData, StatisticMetaData, datetime_to_timestamp_or_none
from ...statistics import (
_import_statistics_with_session,
_statistics_during_period_with_session,
from ...db_schema import Statistics, StatisticsMeta, StatisticsShortTerm
from ..schema import (
correct_db_schema_precision,
correct_db_schema_utf8,
validate_db_schema_precision,
validate_table_schema_supports_utf8,
)
from ...util import session_scope
if TYPE_CHECKING:
from ... import Recorder
@ -30,200 +18,14 @@ if TYPE_CHECKING:
_LOGGER = logging.getLogger(__name__)
def _validate_db_schema_utf8(
instance: Recorder, session_maker: Callable[[], Session]
) -> set[str]:
def validate_db_schema(instance: Recorder) -> set[str]:
"""Do some basic checks for common schema errors caused by manual migration."""
schema_errors: set[str] = set()
# Lack of full utf8 support is only an issue for MySQL / MariaDB
if instance.dialect_name != SupportedDialect.MYSQL:
return schema_errors
# This name can't be represented unless 4-byte UTF-8 unicode is supported
utf8_name = "𓆚𓃗"
statistic_id = f"{DOMAIN}.db_test"
metadata: StatisticMetaData = {
"has_mean": True,
"has_sum": True,
"name": utf8_name,
"source": DOMAIN,
"statistic_id": statistic_id,
"unit_of_measurement": None,
}
statistics_meta_manager = instance.statistics_meta_manager
# Try inserting some metadata which needs utf8mb4 support
try:
# Mark the session as read_only to ensure that the test data is not committed
# to the database and we always rollback when the scope is exited
with session_scope(session=session_maker(), read_only=True) as session:
old_metadata_dict = statistics_meta_manager.get_many(
session, statistic_ids={statistic_id}
)
try:
statistics_meta_manager.update_or_add(
session, metadata, old_metadata_dict
)
statistics_meta_manager.delete(session, statistic_ids=[statistic_id])
except OperationalError as err:
if err.orig and err.orig.args[0] == 1366:
_LOGGER.debug(
"Database table statistics_meta does not support 4-byte UTF-8"
)
schema_errors.add("statistics_meta.4-byte UTF-8")
session.rollback()
else:
raise
except Exception as exc: # pylint: disable=broad-except
_LOGGER.exception("Error when validating DB schema: %s", exc)
return schema_errors
def _get_future_year() -> int:
"""Get a year in the future."""
return datetime.now().year + 1
def _validate_db_schema(
hass: HomeAssistant, instance: Recorder, session_maker: Callable[[], Session]
) -> set[str]:
"""Do some basic checks for common schema errors caused by manual migration."""
schema_errors: set[str] = set()
statistics_meta_manager = instance.statistics_meta_manager
# Wrong precision is only an issue for MySQL / MariaDB / PostgreSQL
if instance.dialect_name not in (
SupportedDialect.MYSQL,
SupportedDialect.POSTGRESQL,
):
return schema_errors
# This number can't be accurately represented as a 32-bit float
precise_number = 1.000000000000001
# This time can't be accurately represented unless datetimes have µs precision
#
# We want to insert statistics for a time in the future, in case they
# have conflicting metadata_id's with existing statistics that were
# never cleaned up. By inserting in the future, we can be sure that
# that by selecting the last inserted row, we will get the one we
# just inserted.
#
future_year = _get_future_year()
precise_time = datetime(future_year, 10, 6, microsecond=1, tzinfo=dt_util.UTC)
start_time = datetime(future_year, 10, 6, tzinfo=dt_util.UTC)
statistic_id = f"{DOMAIN}.db_test"
metadata: StatisticMetaData = {
"has_mean": True,
"has_sum": True,
"name": None,
"source": DOMAIN,
"statistic_id": statistic_id,
"unit_of_measurement": None,
}
statistics: StatisticData = {
"last_reset": precise_time,
"max": precise_number,
"mean": precise_number,
"min": precise_number,
"start": precise_time,
"state": precise_number,
"sum": precise_number,
}
def check_columns(
schema_errors: set[str],
stored: Mapping,
expected: Mapping,
columns: tuple[str, ...],
table_name: str,
supports: str,
) -> None:
for column in columns:
if stored[column] != expected[column]:
schema_errors.add(f"{table_name}.{supports}")
_LOGGER.error(
"Column %s in database table %s does not support %s (stored=%s != expected=%s)",
column,
table_name,
supports,
stored[column],
expected[column],
)
# Insert / adjust a test statistics row in each of the tables
tables: tuple[type[Statistics | StatisticsShortTerm], ...] = (
Statistics,
StatisticsShortTerm,
schema_errors |= validate_table_schema_supports_utf8(
instance, StatisticsMeta, (StatisticsMeta.statistic_id,)
)
try:
# Mark the session as read_only to ensure that the test data is not committed
# to the database and we always rollback when the scope is exited
with session_scope(session=session_maker(), read_only=True) as session:
for table in tables:
_import_statistics_with_session(
instance, session, metadata, (statistics,), table
)
stored_statistics = _statistics_during_period_with_session(
hass,
session,
start_time,
None,
{statistic_id},
"hour" if table == Statistics else "5minute",
None,
{"last_reset", "max", "mean", "min", "state", "sum"},
)
if not (stored_statistic := stored_statistics.get(statistic_id)):
_LOGGER.warning(
"Schema validation failed for table: %s", table.__tablename__
)
continue
# We want to look at the last inserted row to make sure there
# is not previous garbage data in the table that would cause
# the test to produce an incorrect result. To achieve this,
# we inserted a row in the future, and now we select the last
# inserted row back.
last_stored_statistic = stored_statistic[-1]
check_columns(
schema_errors,
last_stored_statistic,
statistics,
("max", "mean", "min", "state", "sum"),
table.__tablename__,
"double precision",
)
assert statistics["last_reset"]
check_columns(
schema_errors,
last_stored_statistic,
{
"last_reset": datetime_to_timestamp_or_none(
statistics["last_reset"]
),
"start": datetime_to_timestamp_or_none(statistics["start"]),
},
("start", "last_reset"),
table.__tablename__,
"µs precision",
)
statistics_meta_manager.delete(session, statistic_ids=[statistic_id])
except Exception as exc: # pylint: disable=broad-except
_LOGGER.exception("Error when validating DB schema: %s", exc)
return schema_errors
def validate_db_schema(
hass: HomeAssistant, instance: Recorder, session_maker: Callable[[], Session]
) -> set[str]:
"""Do some basic checks for common schema errors caused by manual migration."""
schema_errors: set[str] = set()
schema_errors |= _validate_db_schema_utf8(instance, session_maker)
schema_errors |= _validate_db_schema(hass, instance, session_maker)
for table in (Statistics, StatisticsShortTerm):
schema_errors |= validate_db_schema_precision(instance, table)
if schema_errors:
_LOGGER.debug(
"Detected statistics schema errors: %s", ", ".join(sorted(schema_errors))
@ -233,63 +35,9 @@ def validate_db_schema(
def correct_db_schema(
instance: Recorder,
engine: Engine,
session_maker: Callable[[], Session],
schema_errors: set[str],
) -> None:
"""Correct issues detected by validate_db_schema."""
from ...migration import _modify_columns # pylint: disable=import-outside-toplevel
if "statistics_meta.4-byte UTF-8" in schema_errors:
# Attempt to convert the table to utf8mb4
_LOGGER.warning(
(
"Updating character set and collation of table %s to utf8mb4. "
"Note: this can take several minutes on large databases and slow "
"computers. Please be patient!"
),
"statistics_meta",
)
with contextlib.suppress(SQLAlchemyError), session_scope(
session=session_maker()
) as session:
connection = session.connection()
connection.execute(
# Using LOCK=EXCLUSIVE to prevent the database from corrupting
# https://github.com/home-assistant/core/issues/56104
text(
"ALTER TABLE statistics_meta CONVERT TO CHARACTER SET utf8mb4"
" COLLATE utf8mb4_unicode_ci, LOCK=EXCLUSIVE"
)
)
tables: tuple[type[Statistics | StatisticsShortTerm], ...] = (
Statistics,
StatisticsShortTerm,
)
for table in tables:
if f"{table.__tablename__}.double precision" in schema_errors:
# Attempt to convert float columns to double precision
_modify_columns(
session_maker,
engine,
table.__tablename__,
[
"mean DOUBLE PRECISION",
"min DOUBLE PRECISION",
"max DOUBLE PRECISION",
"state DOUBLE PRECISION",
"sum DOUBLE PRECISION",
],
)
if f"{table.__tablename__}.µs precision" in schema_errors:
# Attempt to convert timestamp columns to µs precision
_modify_columns(
session_maker,
engine,
table.__tablename__,
[
"last_reset_ts DOUBLE PRECISION",
"start_ts DOUBLE PRECISION",
],
)
correct_db_schema_utf8(instance, StatisticsMeta, schema_errors)
for table in (Statistics, StatisticsShortTerm):
correct_db_schema_precision(instance, table, schema_errors)

View File

@ -119,13 +119,17 @@ STATES_CONTEXT_ID_BIN_INDEX = "ix_states_context_id_bin"
LEGACY_STATES_EVENT_ID_INDEX = "ix_states_event_id"
CONTEXT_ID_BIN_MAX_LENGTH = 16
MYSQL_COLLATE = "utf8mb4_unicode_ci"
MYSQL_DEFAULT_CHARSET = "utf8mb4"
MYSQL_ENGINE = "InnoDB"
_DEFAULT_TABLE_ARGS = {
"mysql_default_charset": "utf8mb4",
"mysql_collate": "utf8mb4_unicode_ci",
"mysql_engine": "InnoDB",
"mariadb_default_charset": "utf8mb4",
"mariadb_collate": "utf8mb4_unicode_ci",
"mariadb_engine": "InnoDB",
"mysql_default_charset": MYSQL_DEFAULT_CHARSET,
"mysql_collate": MYSQL_COLLATE,
"mysql_engine": MYSQL_ENGINE,
"mariadb_default_charset": MYSQL_DEFAULT_CHARSET,
"mariadb_collate": MYSQL_COLLATE,
"mariadb_engine": MYSQL_ENGINE,
}
@ -154,6 +158,7 @@ DOUBLE_TYPE = (
.with_variant(oracle.DOUBLE_PRECISION(), "oracle")
.with_variant(postgresql.DOUBLE_PRECISION(), "postgresql")
)
DOUBLE_PRECISION_TYPE_SQL = "DOUBLE PRECISION"
TIMESTAMP_TYPE = DOUBLE_TYPE

View File

@ -28,6 +28,10 @@ from homeassistant.core import HomeAssistant
from homeassistant.util.enum import try_parse_enum
from homeassistant.util.ulid import ulid_to_bytes
from .auto_repairs.states.schema import (
correct_db_schema as states_correct_db_schema,
validate_db_schema as states_validate_db_schema,
)
from .auto_repairs.statistics.duplicates import (
delete_statistics_duplicates,
delete_statistics_meta_duplicates,
@ -39,7 +43,10 @@ from .auto_repairs.statistics.schema import (
from .const import SupportedDialect
from .db_schema import (
CONTEXT_ID_BIN_MAX_LENGTH,
DOUBLE_PRECISION_TYPE_SQL,
LEGACY_STATES_EVENT_ID_INDEX,
MYSQL_COLLATE,
MYSQL_DEFAULT_CHARSET,
SCHEMA_VERSION,
STATISTICS_TABLES,
TABLE_STATES,
@ -96,13 +103,13 @@ class _ColumnTypesForDialect:
_MYSQL_COLUMN_TYPES = _ColumnTypesForDialect(
big_int_type="INTEGER(20)",
timestamp_type="DOUBLE PRECISION",
timestamp_type=DOUBLE_PRECISION_TYPE_SQL,
context_bin_type=f"BLOB({CONTEXT_ID_BIN_MAX_LENGTH})",
)
_POSTGRESQL_COLUMN_TYPES = _ColumnTypesForDialect(
big_int_type="INTEGER",
timestamp_type="DOUBLE PRECISION",
timestamp_type=DOUBLE_PRECISION_TYPE_SQL,
context_bin_type="BYTEA",
)
@ -151,7 +158,7 @@ class SchemaValidationStatus:
"""Store schema validation status."""
current_version: int
statistics_schema_errors: set[str]
schema_errors: set[str]
valid: bool
@ -178,13 +185,23 @@ def validate_db_schema(
if is_current := _schema_is_current(current_version):
# We can only check for further errors if the schema is current, because
# columns may otherwise not exist etc.
schema_errors |= statistics_validate_db_schema(hass, instance, session_maker)
schema_errors = _find_schema_errors(hass, instance, session_maker)
valid = is_current and not schema_errors
return SchemaValidationStatus(current_version, schema_errors, valid)
def _find_schema_errors(
hass: HomeAssistant, instance: Recorder, session_maker: Callable[[], Session]
) -> set[str]:
"""Find schema errors."""
schema_errors: set[str] = set()
schema_errors |= statistics_validate_db_schema(instance)
schema_errors |= states_validate_db_schema(instance)
return schema_errors
def live_migration(schema_status: SchemaValidationStatus) -> bool:
"""Check if live migration is possible."""
return schema_status.current_version >= LIVE_MIGRATION_MIN_SCHEMA_VERSION
@ -226,12 +243,13 @@ def migrate_schema(
# so its clear that the upgrade is done
_LOGGER.warning("Upgrade to version %s done", new_version)
if schema_errors := schema_status.statistics_schema_errors:
if schema_errors := schema_status.schema_errors:
_LOGGER.warning(
"Database is about to correct DB schema errors: %s",
", ".join(sorted(schema_errors)),
)
statistics_correct_db_schema(instance, engine, session_maker, schema_errors)
statistics_correct_db_schema(instance, schema_errors)
states_correct_db_schema(instance, schema_errors)
if current_version != SCHEMA_VERSION:
instance.queue_task(PostSchemaMigrationTask(current_version, SCHEMA_VERSION))
@ -732,38 +750,15 @@ def _apply_update( # noqa: C901
engine,
"statistics",
[
"mean DOUBLE PRECISION",
"min DOUBLE PRECISION",
"max DOUBLE PRECISION",
"state DOUBLE PRECISION",
"sum DOUBLE PRECISION",
f"{column} {DOUBLE_PRECISION_TYPE_SQL}"
for column in ("max", "mean", "min", "state", "sum")
],
)
elif new_version == 21:
# Try to change the character set of the statistic_meta table
if engine.dialect.name == SupportedDialect.MYSQL:
for table in ("events", "states", "statistics_meta"):
_LOGGER.warning(
(
"Updating character set and collation of table %s to utf8mb4."
" Note: this can take several minutes on large databases and"
" slow computers. Please be patient!"
),
table,
)
with contextlib.suppress(SQLAlchemyError), session_scope(
session=session_maker()
) as session:
connection = session.connection()
connection.execute(
# Using LOCK=EXCLUSIVE to prevent
# the database from corrupting
# https://github.com/home-assistant/core/issues/56104
text(
f"ALTER TABLE {table} CONVERT TO CHARACTER SET utf8mb4"
" COLLATE utf8mb4_unicode_ci, LOCK=EXCLUSIVE"
)
)
_correct_table_character_set_and_collation(table, session_maker)
elif new_version == 22:
# Recreate the all statistics tables for Oracle DB with Identity columns
#
@ -1090,6 +1085,33 @@ def _apply_update( # noqa: C901
raise ValueError(f"No schema migration defined for version {new_version}")
def _correct_table_character_set_and_collation(
table: str,
session_maker: Callable[[], Session],
) -> None:
"""Correct issues detected by validate_db_schema."""
# Attempt to convert the table to utf8mb4
_LOGGER.warning(
"Updating character set and collation of table %s to utf8mb4. "
"Note: this can take several minutes on large databases and slow "
"computers. Please be patient!",
table,
)
with contextlib.suppress(SQLAlchemyError), session_scope(
session=session_maker()
) as session:
connection = session.connection()
connection.execute(
# Using LOCK=EXCLUSIVE to prevent the database from corrupting
# https://github.com/home-assistant/core/issues/56104
text(
f"ALTER TABLE {table} CONVERT TO CHARACTER SET "
f"{MYSQL_DEFAULT_CHARSET} "
f"COLLATE {MYSQL_COLLATE}, LOCK=EXCLUSIVE"
)
)
def post_schema_migration(
instance: Recorder,
old_version: int,

View File

@ -102,7 +102,7 @@ _TEST_FIXTURES: dict[str, list[str] | str] = {
"enable_custom_integrations": "None",
"enable_nightly_purge": "bool",
"enable_statistics": "bool",
"enable_statistics_table_validation": "bool",
"enable_schema_validation": "bool",
"entity_registry": "EntityRegistry",
"freezer": "FrozenDateTimeFactory",
"hass_access_token": "str",

View File

@ -0,0 +1,5 @@
"""Tests for Recorder component."""
import pytest
pytest.register_assert_rewrite("tests.components.recorder.common")

View File

@ -0,0 +1,106 @@
"""The test repairing states schema."""
# pylint: disable=invalid-name
from unittest.mock import ANY, patch
import pytest
from homeassistant.core import HomeAssistant
from ...common import async_wait_recording_done
from tests.typing import RecorderInstanceGenerator
@pytest.mark.parametrize("enable_schema_validation", [True])
@pytest.mark.parametrize("db_engine", ("mysql", "postgresql"))
async def test_validate_db_schema_fix_float_issue(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
db_engine,
) -> None:
"""Test validating DB schema with postgresql and mysql.
Note: The test uses SQLite, the purpose is only to exercise the code.
"""
with patch(
"homeassistant.components.recorder.core.Recorder.dialect_name", db_engine
), patch(
"homeassistant.components.recorder.auto_repairs.schema._validate_db_schema_precision",
return_value={"states.double precision"},
), patch(
"homeassistant.components.recorder.migration._modify_columns"
) as modify_columns_mock:
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
assert "Schema validation failed" not in caplog.text
assert (
"Database is about to correct DB schema errors: states.double precision"
in caplog.text
)
modification = [
"last_changed_ts DOUBLE PRECISION",
"last_updated_ts DOUBLE PRECISION",
]
modify_columns_mock.assert_called_once_with(ANY, ANY, "states", modification)
@pytest.mark.parametrize("enable_schema_validation", [True])
async def test_validate_db_schema_fix_utf8_issue_states(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test validating DB schema with MySQL.
Note: The test uses SQLite, the purpose is only to exercise the code.
"""
with patch(
"homeassistant.components.recorder.core.Recorder.dialect_name", "mysql"
), patch(
"homeassistant.components.recorder.auto_repairs.schema._validate_table_schema_supports_utf8",
return_value={"states.4-byte UTF-8"},
):
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
assert "Schema validation failed" not in caplog.text
assert (
"Database is about to correct DB schema errors: states.4-byte UTF-8"
in caplog.text
)
assert (
"Updating character set and collation of table states to utf8mb4" in caplog.text
)
@pytest.mark.parametrize("enable_schema_validation", [True])
async def test_validate_db_schema_fix_utf8_issue_state_attributes(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test validating DB schema with MySQL.
Note: The test uses SQLite, the purpose is only to exercise the code.
"""
with patch(
"homeassistant.components.recorder.core.Recorder.dialect_name", "mysql"
), patch(
"homeassistant.components.recorder.auto_repairs.schema._validate_table_schema_supports_utf8",
return_value={"state_attributes.4-byte UTF-8"},
):
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
assert "Schema validation failed" not in caplog.text
assert (
"Database is about to correct DB schema errors: state_attributes.4-byte UTF-8"
in caplog.text
)
assert (
"Updating character set and collation of table state_attributes to utf8mb4"
in caplog.text
)

View File

@ -1,52 +1,18 @@
"""The test repairing statistics schema."""
# pylint: disable=invalid-name
from datetime import datetime
from unittest.mock import ANY, DEFAULT, MagicMock, patch
from unittest.mock import ANY, patch
import pytest
from sqlalchemy.exc import OperationalError
from homeassistant.components.recorder.auto_repairs.statistics.schema import (
_get_future_year,
)
from homeassistant.components.recorder.statistics import (
_statistics_during_period_with_session,
)
from homeassistant.components.recorder.table_managers.statistics_meta import (
StatisticsMetaManager,
)
from homeassistant.core import HomeAssistant
import homeassistant.util.dt as dt_util
from ...common import async_wait_recording_done
from tests.typing import RecorderInstanceGenerator
@pytest.mark.parametrize("enable_statistics_table_validation", [True])
@pytest.mark.parametrize("db_engine", ("mysql", "postgresql"))
async def test_validate_db_schema(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
db_engine,
) -> None:
"""Test validating DB schema with MySQL and PostgreSQL.
Note: The test uses SQLite, the purpose is only to exercise the code.
"""
with patch(
"homeassistant.components.recorder.core.Recorder.dialect_name", db_engine
):
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
assert "Schema validation failed" not in caplog.text
assert "Detected statistics schema errors" not in caplog.text
assert "Database is about to correct DB schema errors" not in caplog.text
@pytest.mark.parametrize("enable_statistics_table_validation", [True])
@pytest.mark.parametrize("enable_schema_validation", [True])
async def test_validate_db_schema_fix_utf8_issue(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
@ -56,15 +22,11 @@ async def test_validate_db_schema_fix_utf8_issue(
Note: The test uses SQLite, the purpose is only to exercise the code.
"""
orig_error = MagicMock()
orig_error.args = [1366]
utf8_error = OperationalError("", "", orig=orig_error)
with patch(
"homeassistant.components.recorder.core.Recorder.dialect_name", "mysql"
), patch(
"homeassistant.components.recorder.table_managers.statistics_meta.StatisticsMetaManager.update_or_add",
wraps=StatisticsMetaManager.update_or_add,
side_effect=[utf8_error, DEFAULT, DEFAULT],
"homeassistant.components.recorder.auto_repairs.schema._validate_table_schema_supports_utf8",
return_value={"statistics_meta.4-byte UTF-8"},
):
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
@ -80,60 +42,25 @@ async def test_validate_db_schema_fix_utf8_issue(
)
@pytest.mark.parametrize("enable_statistics_table_validation", [True])
@pytest.mark.parametrize("enable_schema_validation", [True])
@pytest.mark.parametrize("table", ("statistics_short_term", "statistics"))
@pytest.mark.parametrize("db_engine", ("mysql", "postgresql"))
@pytest.mark.parametrize(
("table", "replace_index"), (("statistics", 0), ("statistics_short_term", 1))
)
@pytest.mark.parametrize(
("column", "value"),
(("max", 1.0), ("mean", 1.0), ("min", 1.0), ("state", 1.0), ("sum", 1.0)),
)
async def test_validate_db_schema_fix_float_issue(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
db_engine,
table,
replace_index,
column,
value,
table: str,
db_engine: str,
) -> None:
"""Test validating DB schema with MySQL.
"""Test validating DB schema with postgresql and mysql.
Note: The test uses SQLite, the purpose is only to exercise the code.
"""
orig_error = MagicMock()
orig_error.args = [1366]
precise_number = 1.000000000000001
fixed_future_year = _get_future_year()
precise_time = datetime(fixed_future_year, 10, 6, microsecond=1, tzinfo=dt_util.UTC)
statistics = {
"recorder.db_test": [
{
"last_reset": precise_time.timestamp(),
"max": precise_number,
"mean": precise_number,
"min": precise_number,
"start": precise_time.timestamp(),
"state": precise_number,
"sum": precise_number,
}
]
}
statistics["recorder.db_test"][0][column] = value
fake_statistics = [DEFAULT, DEFAULT]
fake_statistics[replace_index] = statistics
with patch(
"homeassistant.components.recorder.core.Recorder.dialect_name", db_engine
), patch(
"homeassistant.components.recorder.auto_repairs.statistics.schema._get_future_year",
return_value=fixed_future_year,
), patch(
"homeassistant.components.recorder.auto_repairs.statistics.schema._statistics_during_period_with_session",
side_effect=fake_statistics,
wraps=_statistics_during_period_with_session,
"homeassistant.components.recorder.auto_repairs.schema._validate_db_schema_precision",
return_value={f"{table}.double precision"},
), patch(
"homeassistant.components.recorder.migration._modify_columns"
) as modify_columns_mock:
@ -146,90 +73,13 @@ async def test_validate_db_schema_fix_float_issue(
in caplog.text
)
modification = [
"created_ts DOUBLE PRECISION",
"start_ts DOUBLE PRECISION",
"mean DOUBLE PRECISION",
"min DOUBLE PRECISION",
"max DOUBLE PRECISION",
"last_reset_ts DOUBLE PRECISION",
"state DOUBLE PRECISION",
"sum DOUBLE PRECISION",
]
modify_columns_mock.assert_called_once_with(ANY, ANY, table, modification)
@pytest.mark.parametrize("enable_statistics_table_validation", [True])
@pytest.mark.parametrize(
("db_engine", "modification"),
(
("mysql", ["last_reset_ts DOUBLE PRECISION", "start_ts DOUBLE PRECISION"]),
(
"postgresql",
[
"last_reset_ts DOUBLE PRECISION",
"start_ts DOUBLE PRECISION",
],
),
),
)
@pytest.mark.parametrize(
("table", "replace_index"), (("statistics", 0), ("statistics_short_term", 1))
)
@pytest.mark.parametrize(
("column", "value"),
(
("last_reset", "2020-10-06T00:00:00+00:00"),
("start", "2020-10-06T00:00:00+00:00"),
),
)
async def test_validate_db_schema_fix_statistics_datetime_issue(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
db_engine,
modification,
table,
replace_index,
column,
value,
) -> None:
"""Test validating DB schema with MySQL.
Note: The test uses SQLite, the purpose is only to exercise the code.
"""
orig_error = MagicMock()
orig_error.args = [1366]
precise_number = 1.000000000000001
precise_time = datetime(2020, 10, 6, microsecond=1, tzinfo=dt_util.UTC)
statistics = {
"recorder.db_test": [
{
"last_reset": precise_time,
"max": precise_number,
"mean": precise_number,
"min": precise_number,
"start": precise_time,
"state": precise_number,
"sum": precise_number,
}
]
}
statistics["recorder.db_test"][0][column] = value
fake_statistics = [DEFAULT, DEFAULT]
fake_statistics[replace_index] = statistics
with patch(
"homeassistant.components.recorder.core.Recorder.dialect_name", db_engine
), patch(
"homeassistant.components.recorder.auto_repairs.statistics.schema._statistics_during_period_with_session",
side_effect=fake_statistics,
wraps=_statistics_during_period_with_session,
), patch(
"homeassistant.components.recorder.migration._modify_columns"
) as modify_columns_mock:
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
assert "Schema validation failed" not in caplog.text
assert (
f"Database is about to correct DB schema errors: {table}.µs precision"
in caplog.text
)
modify_columns_mock.assert_called_once_with(ANY, ANY, table, modification)

View File

@ -0,0 +1,253 @@
"""The test validating and repairing schema."""
# pylint: disable=invalid-name
from unittest.mock import patch
import pytest
from sqlalchemy import text
from homeassistant.components.recorder.auto_repairs.schema import (
correct_db_schema_precision,
correct_db_schema_utf8,
validate_db_schema_precision,
validate_table_schema_supports_utf8,
)
from homeassistant.components.recorder.db_schema import States
from homeassistant.components.recorder.migration import _modify_columns
from homeassistant.components.recorder.util import get_instance, session_scope
from homeassistant.core import HomeAssistant
from ..common import async_wait_recording_done
from tests.typing import RecorderInstanceGenerator
@pytest.mark.parametrize("enable_schema_validation", [True])
@pytest.mark.parametrize("db_engine", ("mysql", "postgresql"))
async def test_validate_db_schema(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
db_engine,
) -> None:
"""Test validating DB schema with MySQL and PostgreSQL.
Note: The test uses SQLite, the purpose is only to exercise the code.
"""
with patch(
"homeassistant.components.recorder.core.Recorder.dialect_name", db_engine
):
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
assert "Schema validation failed" not in caplog.text
assert "Detected statistics schema errors" not in caplog.text
assert "Database is about to correct DB schema errors" not in caplog.text
async def test_validate_db_schema_fix_utf8_issue_good_schema(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
recorder_db_url: str,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test validating DB schema with MySQL when the schema is correct."""
if not recorder_db_url.startswith("mysql://"):
# This problem only happens on MySQL
return
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
instance = get_instance(hass)
schema_errors = await instance.async_add_executor_job(
validate_table_schema_supports_utf8, instance, States, (States.state,)
)
assert schema_errors == set()
async def test_validate_db_schema_fix_utf8_issue_with_broken_schema(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
recorder_db_url: str,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test validating DB schema with MySQL when the schema is broken and repairing it."""
if not recorder_db_url.startswith("mysql://"):
# This problem only happens on MySQL
return
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
instance = get_instance(hass)
session_maker = instance.get_session
def _break_states_schema():
with session_scope(session=session_maker()) as session:
session.execute(
text(
"ALTER TABLE states MODIFY state VARCHAR(255) "
"CHARACTER SET ascii COLLATE ascii_general_ci, "
"LOCK=EXCLUSIVE;"
)
)
await instance.async_add_executor_job(_break_states_schema)
schema_errors = await instance.async_add_executor_job(
validate_table_schema_supports_utf8, instance, States, (States.state,)
)
assert schema_errors == {"states.4-byte UTF-8"}
# Now repair the schema
await instance.async_add_executor_job(
correct_db_schema_utf8, instance, States, schema_errors
)
# Now validate the schema again
schema_errors = await instance.async_add_executor_job(
validate_table_schema_supports_utf8, instance, States, ("state",)
)
assert schema_errors == set()
async def test_validate_db_schema_fix_utf8_issue_with_broken_schema_unrepairable(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
recorder_db_url: str,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test validating DB schema with MySQL when the schema is broken and cannot be repaired."""
if not recorder_db_url.startswith("mysql://"):
# This problem only happens on MySQL
return
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
instance = get_instance(hass)
session_maker = instance.get_session
def _break_states_schema():
with session_scope(session=session_maker()) as session:
session.execute(
text(
"ALTER TABLE states MODIFY state VARCHAR(255) "
"CHARACTER SET ascii COLLATE ascii_general_ci, "
"LOCK=EXCLUSIVE;"
)
)
_modify_columns(
session_maker,
instance.engine,
"states",
[
"entity_id VARCHAR(255) NOT NULL",
],
)
await instance.async_add_executor_job(_break_states_schema)
schema_errors = await instance.async_add_executor_job(
validate_table_schema_supports_utf8, instance, States, ("state",)
)
assert schema_errors == set()
assert "Error when validating DB schema" in caplog.text
async def test_validate_db_schema_precision_good_schema(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
recorder_db_url: str,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test validating DB schema when the schema is correct."""
if not recorder_db_url.startswith(("mysql://", "postgresql://")):
# This problem only happens on MySQL and PostgreSQL
return
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
instance = get_instance(hass)
schema_errors = await instance.async_add_executor_job(
validate_db_schema_precision,
instance,
States,
)
assert schema_errors == set()
async def test_validate_db_schema_precision_with_broken_schema(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
recorder_db_url: str,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test validating DB schema when the schema is broken and than repair it."""
if not recorder_db_url.startswith(("mysql://", "postgresql://")):
# This problem only happens on MySQL and PostgreSQL
return
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
instance = get_instance(hass)
session_maker = instance.get_session
def _break_states_schema():
_modify_columns(
session_maker,
instance.engine,
"states",
[
"last_updated_ts FLOAT(4)",
"last_changed_ts FLOAT(4)",
],
)
await instance.async_add_executor_job(_break_states_schema)
schema_errors = await instance.async_add_executor_job(
validate_db_schema_precision,
instance,
States,
)
assert schema_errors == {"states.double precision"}
# Now repair the schema
await instance.async_add_executor_job(
correct_db_schema_precision, instance, States, schema_errors
)
# Now validate the schema again
schema_errors = await instance.async_add_executor_job(
validate_db_schema_precision,
instance,
States,
)
assert schema_errors == set()
async def test_validate_db_schema_precision_with_unrepairable_broken_schema(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
recorder_db_url: str,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test validating DB schema when the schema is broken and cannot be repaired."""
if not recorder_db_url.startswith("mysql://"):
# This problem only happens on MySQL
return
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
instance = get_instance(hass)
session_maker = instance.get_session
def _break_states_schema():
_modify_columns(
session_maker,
instance.engine,
"states",
[
"state VARCHAR(255) NOT NULL",
"last_updated_ts FLOAT(4)",
"last_changed_ts FLOAT(4)",
],
)
await instance.async_add_executor_job(_break_states_schema)
schema_errors = await instance.async_add_executor_job(
validate_db_schema_precision,
instance,
States,
)
assert "Error when validating DB schema" in caplog.text
assert schema_errors == set()

View File

@ -1161,11 +1161,11 @@ def enable_statistics() -> bool:
@pytest.fixture
def enable_statistics_table_validation() -> bool:
def enable_schema_validation() -> bool:
"""Fixture to control enabling of recorder's statistics table validation.
To enable statistics table validation, tests can be marked with:
@pytest.mark.parametrize("enable_statistics_table_validation", [True])
@pytest.mark.parametrize("enable_schema_validation", [True])
"""
return False
@ -1272,7 +1272,7 @@ def hass_recorder(
recorder_db_url: str,
enable_nightly_purge: bool,
enable_statistics: bool,
enable_statistics_table_validation: bool,
enable_schema_validation: bool,
enable_migrate_context_ids: bool,
enable_migrate_event_type_ids: bool,
enable_migrate_entity_ids: bool,
@ -1283,16 +1283,16 @@ def hass_recorder(
from homeassistant.components import recorder
# pylint: disable-next=import-outside-toplevel
from homeassistant.components.recorder.auto_repairs.statistics import schema
from homeassistant.components.recorder import migration
original_tz = dt_util.DEFAULT_TIME_ZONE
hass = get_test_home_assistant()
nightly = recorder.Recorder.async_nightly_tasks if enable_nightly_purge else None
stats = recorder.Recorder.async_periodic_statistics if enable_statistics else None
stats_validate = (
schema.validate_db_schema
if enable_statistics_table_validation
schema_validate = (
migration._find_schema_errors
if enable_schema_validation
else itertools.repeat(set())
)
migrate_states_context_ids = (
@ -1322,8 +1322,8 @@ def hass_recorder(
side_effect=stats,
autospec=True,
), patch(
"homeassistant.components.recorder.migration.statistics_validate_db_schema",
side_effect=stats_validate,
"homeassistant.components.recorder.migration._find_schema_errors",
side_effect=schema_validate,
autospec=True,
), patch(
"homeassistant.components.recorder.Recorder._migrate_events_context_ids",
@ -1391,7 +1391,7 @@ async def async_setup_recorder_instance(
recorder_db_url: str,
enable_nightly_purge: bool,
enable_statistics: bool,
enable_statistics_table_validation: bool,
enable_schema_validation: bool,
enable_migrate_context_ids: bool,
enable_migrate_event_type_ids: bool,
enable_migrate_entity_ids: bool,
@ -1401,16 +1401,16 @@ async def async_setup_recorder_instance(
from homeassistant.components import recorder
# pylint: disable-next=import-outside-toplevel
from homeassistant.components.recorder.auto_repairs.statistics import schema
from homeassistant.components.recorder import migration
# pylint: disable-next=import-outside-toplevel
from .components.recorder.common import async_recorder_block_till_done
nightly = recorder.Recorder.async_nightly_tasks if enable_nightly_purge else None
stats = recorder.Recorder.async_periodic_statistics if enable_statistics else None
stats_validate = (
schema.validate_db_schema
if enable_statistics_table_validation
schema_validate = (
migration._find_schema_errors
if enable_schema_validation
else itertools.repeat(set())
)
migrate_states_context_ids = (
@ -1440,8 +1440,8 @@ async def async_setup_recorder_instance(
side_effect=stats,
autospec=True,
), patch(
"homeassistant.components.recorder.migration.statistics_validate_db_schema",
side_effect=stats_validate,
"homeassistant.components.recorder.migration._find_schema_errors",
side_effect=schema_validate,
autospec=True,
), patch(
"homeassistant.components.recorder.Recorder._migrate_events_context_ids",