"""Script to manage users for the Home Assistant auth provider.""" import argparse import asyncio import logging import os from homeassistant.auth import auth_manager_from_config from homeassistant.auth.providers import homeassistant as hass_auth from homeassistant.core import HomeAssistant from homeassistant.config import get_default_config_dir # mypy: allow-untyped-calls, allow-untyped-defs def run(args): """Handle Home Assistant auth provider script.""" parser = argparse.ArgumentParser(description="Manage Home Assistant users") parser.add_argument("--script", choices=["auth"]) parser.add_argument( "-c", "--config", default=get_default_config_dir(), help="Directory that contains the Home Assistant configuration", ) subparsers = parser.add_subparsers(dest="func") subparsers.required = True parser_list = subparsers.add_parser("list") parser_list.set_defaults(func=list_users) parser_add = subparsers.add_parser("add") parser_add.add_argument("username", type=str) parser_add.add_argument("password", type=str) parser_add.set_defaults(func=add_user) parser_validate_login = subparsers.add_parser("validate") parser_validate_login.add_argument("username", type=str) parser_validate_login.add_argument("password", type=str) parser_validate_login.set_defaults(func=validate_login) parser_change_pw = subparsers.add_parser("change_password") parser_change_pw.add_argument("username", type=str) parser_change_pw.add_argument("new_password", type=str) parser_change_pw.set_defaults(func=change_password) args = parser.parse_args(args) loop = asyncio.get_event_loop() hass = HomeAssistant(loop=loop) loop.run_until_complete(run_command(hass, args)) # Triggers save on used storage helpers with delay (core auth) logging.getLogger("homeassistant.core").setLevel(logging.WARNING) loop.run_until_complete(hass.async_stop()) async def run_command(hass, args): """Run the command.""" hass.config.config_dir = os.path.join(os.getcwd(), args.config) hass.auth = await auth_manager_from_config(hass, [{"type": "homeassistant"}], []) provider = hass.auth.auth_providers[0] await provider.async_initialize() await args.func(hass, provider, args) async def list_users(hass, provider, args): """List the users.""" count = 0 for user in provider.data.users: count += 1 print(user["username"]) print() print("Total users:", count) async def add_user(hass, provider, args): """Create a user.""" try: provider.data.add_auth(args.username, args.password) except hass_auth.InvalidUser: print("Username already exists!") return # Save username/password await provider.data.async_save() print("Auth created") async def validate_login(hass, provider, args): """Validate a login.""" try: provider.data.validate_login(args.username, args.password) print("Auth valid") except hass_auth.InvalidAuth: print("Auth invalid") async def change_password(hass, provider, args): """Change password.""" try: provider.data.change_password(args.username, args.new_password) await provider.data.async_save() print("Password changed") except hass_auth.InvalidUser: print("User not found")