Improve typing of Spotify (#66109)

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
Franck Nijhof 2022-02-08 23:07:13 +01:00 committed by GitHub
parent b012b79167
commit b9f21d4e07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 253 additions and 189 deletions

View File

@ -4,7 +4,7 @@ import aiohttp
from spotipy import Spotify, SpotifyException
import voluptuous as vol
from homeassistant.components.media_player import BrowseError
from homeassistant.components.media_player import BrowseError, BrowseMedia
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
ATTR_CREDENTIALS,
@ -47,19 +47,23 @@ CONFIG_SCHEMA = vol.Schema(
PLATFORMS = [Platform.MEDIA_PLAYER]
def is_spotify_media_type(media_content_type):
def is_spotify_media_type(media_content_type: str) -> bool:
"""Return whether the media_content_type is a valid Spotify media_id."""
return media_content_type.startswith(MEDIA_PLAYER_PREFIX)
def resolve_spotify_media_type(media_content_type):
def resolve_spotify_media_type(media_content_type: str) -> str:
"""Return actual spotify media_content_type."""
return media_content_type[len(MEDIA_PLAYER_PREFIX) :]
async def async_browse_media(
hass, media_content_type, media_content_id, *, can_play_artist=True
):
hass: HomeAssistant,
media_content_type: str,
media_content_id: str,
*,
can_play_artist: bool = True,
) -> BrowseMedia:
"""Browse Spotify media."""
if not (info := next(iter(hass.data[DOMAIN].values()), None)):
raise BrowseError("No Spotify accounts available")
@ -128,12 +132,6 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload Spotify config entry."""
# Unload entities for this entry/device.
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
# Cleanup
del hass.data[DOMAIN][entry.entry_id]
if not hass.data[DOMAIN]:
del hass.data[DOMAIN]
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
del hass.data[DOMAIN][entry.entry_id]
return unload_ok

View File

@ -8,6 +8,7 @@ from spotipy import Spotify
import voluptuous as vol
from homeassistant.components import persistent_notification
from homeassistant.config_entries import ConfigEntry
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers import config_entry_oauth2_flow
@ -22,10 +23,7 @@ class SpotifyFlowHandler(
DOMAIN = DOMAIN
VERSION = 1
def __init__(self) -> None:
"""Instantiate config flow."""
super().__init__()
self.entry: dict[str, Any] | None = None
reauth_entry: ConfigEntry | None = None
@property
def logger(self) -> logging.Logger:
@ -48,7 +46,7 @@ class SpotifyFlowHandler(
name = data["id"] = current_user["id"]
if self.entry and self.entry["id"] != current_user["id"]:
if self.reauth_entry and self.reauth_entry.data["id"] != current_user["id"]:
return self.async_abort(reason="reauth_account_mismatch")
if current_user.get("display_name"):
@ -61,8 +59,9 @@ class SpotifyFlowHandler(
async def async_step_reauth(self, entry: dict[str, Any]) -> FlowResult:
"""Perform reauth upon migration of old entries."""
if entry:
self.entry = entry
self.reauth_entry = self.hass.config_entries.async_get_entry(
self.context["entry_id"]
)
persistent_notification.async_create(
self.hass,
@ -77,16 +76,18 @@ class SpotifyFlowHandler(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Confirm reauth dialog."""
if user_input is None:
if self.reauth_entry is None:
return self.async_abort(reason="reauth_account_mismatch")
if user_input is None and self.reauth_entry:
return self.async_show_form(
step_id="reauth_confirm",
description_placeholders={"account": self.entry["id"]},
description_placeholders={"account": self.reauth_entry.data["id"]},
data_schema=vol.Schema({}),
errors={},
)
persistent_notification.async_dismiss(self.hass, "spotify_reauth")
return await self.async_step_pick_implementation(
user_input={"implementation": self.entry["auth_implementation"]}
user_input={"implementation": self.reauth_entry.data["auth_implementation"]}
)

View File

@ -6,6 +6,7 @@ import datetime as dt
from datetime import timedelta
from functools import partial
import logging
from typing import Any
import requests
from spotipy import Spotify, SpotifyException
@ -128,57 +129,57 @@ class BrowsableMedia(StrEnum):
LIBRARY_MAP = {
BrowsableMedia.CURRENT_USER_PLAYLISTS: "Playlists",
BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS: "Artists",
BrowsableMedia.CURRENT_USER_SAVED_ALBUMS: "Albums",
BrowsableMedia.CURRENT_USER_SAVED_TRACKS: "Tracks",
BrowsableMedia.CURRENT_USER_SAVED_SHOWS: "Podcasts",
BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED: "Recently played",
BrowsableMedia.CURRENT_USER_TOP_ARTISTS: "Top Artists",
BrowsableMedia.CURRENT_USER_TOP_TRACKS: "Top Tracks",
BrowsableMedia.CATEGORIES: "Categories",
BrowsableMedia.FEATURED_PLAYLISTS: "Featured Playlists",
BrowsableMedia.NEW_RELEASES: "New Releases",
BrowsableMedia.CURRENT_USER_PLAYLISTS.value: "Playlists",
BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS.value: "Artists",
BrowsableMedia.CURRENT_USER_SAVED_ALBUMS.value: "Albums",
BrowsableMedia.CURRENT_USER_SAVED_TRACKS.value: "Tracks",
BrowsableMedia.CURRENT_USER_SAVED_SHOWS.value: "Podcasts",
BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED.value: "Recently played",
BrowsableMedia.CURRENT_USER_TOP_ARTISTS.value: "Top Artists",
BrowsableMedia.CURRENT_USER_TOP_TRACKS.value: "Top Tracks",
BrowsableMedia.CATEGORIES.value: "Categories",
BrowsableMedia.FEATURED_PLAYLISTS.value: "Featured Playlists",
BrowsableMedia.NEW_RELEASES.value: "New Releases",
}
CONTENT_TYPE_MEDIA_CLASS = {
BrowsableMedia.CURRENT_USER_PLAYLISTS: {
CONTENT_TYPE_MEDIA_CLASS: dict[str, Any] = {
BrowsableMedia.CURRENT_USER_PLAYLISTS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_PLAYLIST,
},
BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS: {
BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_ARTIST,
},
BrowsableMedia.CURRENT_USER_SAVED_ALBUMS: {
BrowsableMedia.CURRENT_USER_SAVED_ALBUMS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_ALBUM,
},
BrowsableMedia.CURRENT_USER_SAVED_TRACKS: {
BrowsableMedia.CURRENT_USER_SAVED_TRACKS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_TRACK,
},
BrowsableMedia.CURRENT_USER_SAVED_SHOWS: {
BrowsableMedia.CURRENT_USER_SAVED_SHOWS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_PODCAST,
},
BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED: {
BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_TRACK,
},
BrowsableMedia.CURRENT_USER_TOP_ARTISTS: {
BrowsableMedia.CURRENT_USER_TOP_ARTISTS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_ARTIST,
},
BrowsableMedia.CURRENT_USER_TOP_TRACKS: {
BrowsableMedia.CURRENT_USER_TOP_TRACKS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_TRACK,
},
BrowsableMedia.FEATURED_PLAYLISTS: {
BrowsableMedia.FEATURED_PLAYLISTS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_PLAYLIST,
},
BrowsableMedia.CATEGORIES: {
BrowsableMedia.CATEGORIES.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_GENRE,
},
@ -186,7 +187,7 @@ CONTENT_TYPE_MEDIA_CLASS = {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_PLAYLIST,
},
BrowsableMedia.NEW_RELEASES: {
BrowsableMedia.NEW_RELEASES.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_ALBUM,
},
@ -276,7 +277,7 @@ class SpotifyMediaPlayer(MediaPlayerEntity):
self._attr_unique_id = user_id
@property
def _me(self) -> dict:
def _me(self) -> dict[str, Any]:
"""Return spotify user info."""
return self._spotify_data[DATA_SPOTIFY_ME]
@ -319,23 +320,30 @@ class SpotifyMediaPlayer(MediaPlayerEntity):
@property
def volume_level(self) -> float | None:
"""Return the device volume."""
if not self._currently_playing:
return None
return self._currently_playing.get("device", {}).get("volume_percent", 0) / 100
@property
def media_content_id(self) -> str | None:
"""Return the media URL."""
if not self._currently_playing:
return None
item = self._currently_playing.get("item") or {}
return item.get("uri")
@property
def media_duration(self) -> int | None:
"""Duration of current playing media in seconds."""
if self._currently_playing.get("item") is None:
if (
self._currently_playing is None
or self._currently_playing.get("item") is None
):
return None
return self._currently_playing["item"]["duration_ms"] / 1000
@property
def media_position(self) -> str | None:
def media_position(self) -> int | None:
"""Position of current playing media in seconds."""
if not self._currently_playing:
return None
@ -352,7 +360,8 @@ class SpotifyMediaPlayer(MediaPlayerEntity):
def media_image_url(self) -> str | None:
"""Return the media image URL."""
if (
self._currently_playing.get("item") is None
not self._currently_playing
or self._currently_playing.get("item") is None
or not self._currently_playing["item"]["album"]["images"]
):
return None
@ -361,13 +370,15 @@ class SpotifyMediaPlayer(MediaPlayerEntity):
@property
def media_title(self) -> str | None:
"""Return the media title."""
if not self._currently_playing:
return None
item = self._currently_playing.get("item") or {}
return item.get("name")
@property
def media_artist(self) -> str | None:
"""Return the media artist."""
if self._currently_playing.get("item") is None:
if not self._currently_playing or self._currently_playing.get("item") is None:
return None
return ", ".join(
artist["name"] for artist in self._currently_playing["item"]["artists"]
@ -376,13 +387,15 @@ class SpotifyMediaPlayer(MediaPlayerEntity):
@property
def media_album_name(self) -> str | None:
"""Return the media album."""
if self._currently_playing.get("item") is None:
if not self._currently_playing or self._currently_playing.get("item") is None:
return None
return self._currently_playing["item"]["album"]["name"]
@property
def media_track(self) -> int | None:
"""Track number of current playing media, music track only."""
if not self._currently_playing:
return None
item = self._currently_playing.get("item") or {}
return item.get("track_number")
@ -396,6 +409,8 @@ class SpotifyMediaPlayer(MediaPlayerEntity):
@property
def source(self) -> str | None:
"""Return the current playback device."""
if not self._currently_playing:
return None
return self._currently_playing.get("device", {}).get("name")
@property
@ -406,14 +421,20 @@ class SpotifyMediaPlayer(MediaPlayerEntity):
return [device["name"] for device in self._devices]
@property
def shuffle(self) -> bool:
def shuffle(self) -> bool | None:
"""Shuffling state."""
return bool(self._currently_playing.get("shuffle_state"))
if not self._currently_playing:
return None
return self._currently_playing.get("shuffle_state")
@property
def repeat(self) -> str | None:
"""Return current repeat mode."""
repeat_state = self._currently_playing.get("repeat_state")
if (
not self._currently_playing
or (repeat_state := self._currently_playing.get("repeat_state")) is None
):
return None
return REPEAT_MODE_MAPPING_TO_HA.get(repeat_state)
@property
@ -473,7 +494,11 @@ class SpotifyMediaPlayer(MediaPlayerEntity):
_LOGGER.error("Media type %s is not supported", media_type)
return
if not self._currently_playing.get("device") and self._devices:
if (
self._currently_playing
and not self._currently_playing.get("device")
and self._devices
):
kwargs["device_id"] = self._devices[0].get("id")
self._spotify.start_playback(**kwargs)
@ -481,6 +506,9 @@ class SpotifyMediaPlayer(MediaPlayerEntity):
@spotify_exception_handler
def select_source(self, source: str) -> None:
"""Select playback device."""
if not self._devices:
return
for device in self._devices:
if device["name"] == source:
self._spotify.transfer_playback(
@ -525,7 +553,9 @@ class SpotifyMediaPlayer(MediaPlayerEntity):
devices = self._spotify.devices() or {}
self._devices = devices.get("devices", [])
async def async_browse_media(self, media_content_type=None, media_content_id=None):
async def async_browse_media(
self, media_content_type: str | None = None, media_content_id: str | None = None
) -> BrowseMedia:
"""Implement the websocket media browsing helper."""
if not self._scope_ok:
@ -545,15 +575,15 @@ class SpotifyMediaPlayer(MediaPlayerEntity):
async def async_browse_media_internal(
hass,
spotify,
session,
current_user,
media_content_type,
media_content_id,
hass: HomeAssistant,
spotify: Spotify,
session: OAuth2Session,
current_user: dict[str, Any],
media_content_type: str | None,
media_content_id: str | None,
*,
can_play_artist=True,
):
can_play_artist: bool = True,
) -> BrowseMedia:
"""Browse spotify media."""
if media_content_type in (None, f"{MEDIA_PLAYER_PREFIX}library"):
return await hass.async_add_executor_job(
@ -563,7 +593,8 @@ async def async_browse_media_internal(
await session.async_ensure_token_valid()
# Strip prefix
media_content_type = media_content_type[len(MEDIA_PLAYER_PREFIX) :]
if media_content_type:
media_content_type = media_content_type[len(MEDIA_PLAYER_PREFIX) :]
payload = {
"media_content_type": media_content_type,
@ -583,76 +614,91 @@ async def async_browse_media_internal(
return response
def build_item_response(spotify, user, payload, *, can_play_artist): # noqa: C901
def build_item_response( # noqa: C901
spotify: Spotify,
user: dict[str, Any],
payload: dict[str, str | None],
*,
can_play_artist: bool,
) -> BrowseMedia | None:
"""Create response payload for the provided media query."""
media_content_type = payload["media_content_type"]
media_content_id = payload["media_content_id"]
if media_content_type is None or media_content_id is None:
return None
title = None
image = None
media: dict[str, Any] | None = None
items = []
if media_content_type == BrowsableMedia.CURRENT_USER_PLAYLISTS:
media = spotify.current_user_playlists(limit=BROWSE_LIMIT)
items = media.get("items", [])
if media := spotify.current_user_playlists(limit=BROWSE_LIMIT):
items = media.get("items", [])
elif media_content_type == BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS:
media = spotify.current_user_followed_artists(limit=BROWSE_LIMIT)
items = media.get("artists", {}).get("items", [])
if media := spotify.current_user_followed_artists(limit=BROWSE_LIMIT):
items = media.get("artists", {}).get("items", [])
elif media_content_type == BrowsableMedia.CURRENT_USER_SAVED_ALBUMS:
media = spotify.current_user_saved_albums(limit=BROWSE_LIMIT)
items = [item["album"] for item in media.get("items", [])]
if media := spotify.current_user_saved_albums(limit=BROWSE_LIMIT):
items = [item["album"] for item in media.get("items", [])]
elif media_content_type == BrowsableMedia.CURRENT_USER_SAVED_TRACKS:
media = spotify.current_user_saved_tracks(limit=BROWSE_LIMIT)
items = [item["track"] for item in media.get("items", [])]
if media := spotify.current_user_saved_tracks(limit=BROWSE_LIMIT):
items = [item["track"] for item in media.get("items", [])]
elif media_content_type == BrowsableMedia.CURRENT_USER_SAVED_SHOWS:
media = spotify.current_user_saved_shows(limit=BROWSE_LIMIT)
items = [item["show"] for item in media.get("items", [])]
if media := spotify.current_user_saved_shows(limit=BROWSE_LIMIT):
items = [item["show"] for item in media.get("items", [])]
elif media_content_type == BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED:
media = spotify.current_user_recently_played(limit=BROWSE_LIMIT)
items = [item["track"] for item in media.get("items", [])]
if media := spotify.current_user_recently_played(limit=BROWSE_LIMIT):
items = [item["track"] for item in media.get("items", [])]
elif media_content_type == BrowsableMedia.CURRENT_USER_TOP_ARTISTS:
media = spotify.current_user_top_artists(limit=BROWSE_LIMIT)
items = media.get("items", [])
if media := spotify.current_user_top_artists(limit=BROWSE_LIMIT):
items = media.get("items", [])
elif media_content_type == BrowsableMedia.CURRENT_USER_TOP_TRACKS:
media = spotify.current_user_top_tracks(limit=BROWSE_LIMIT)
items = media.get("items", [])
if media := spotify.current_user_top_tracks(limit=BROWSE_LIMIT):
items = media.get("items", [])
elif media_content_type == BrowsableMedia.FEATURED_PLAYLISTS:
media = spotify.featured_playlists(country=user["country"], limit=BROWSE_LIMIT)
items = media.get("playlists", {}).get("items", [])
if media := spotify.featured_playlists(
country=user["country"], limit=BROWSE_LIMIT
):
items = media.get("playlists", {}).get("items", [])
elif media_content_type == BrowsableMedia.CATEGORIES:
media = spotify.categories(country=user["country"], limit=BROWSE_LIMIT)
items = media.get("categories", {}).get("items", [])
if media := spotify.categories(country=user["country"], limit=BROWSE_LIMIT):
items = media.get("categories", {}).get("items", [])
elif media_content_type == "category_playlists":
media = spotify.category_playlists(
category_id=media_content_id,
country=user["country"],
limit=BROWSE_LIMIT,
)
category = spotify.category(media_content_id, country=user["country"])
title = category.get("name")
image = fetch_image_url(category, key="icons")
items = media.get("playlists", {}).get("items", [])
if (
media := spotify.category_playlists(
category_id=media_content_id,
country=user["country"],
limit=BROWSE_LIMIT,
)
) and (category := spotify.category(media_content_id, country=user["country"])):
title = category.get("name")
image = fetch_image_url(category, key="icons")
items = media.get("playlists", {}).get("items", [])
elif media_content_type == BrowsableMedia.NEW_RELEASES:
media = spotify.new_releases(country=user["country"], limit=BROWSE_LIMIT)
items = media.get("albums", {}).get("items", [])
if media := spotify.new_releases(country=user["country"], limit=BROWSE_LIMIT):
items = media.get("albums", {}).get("items", [])
elif media_content_type == MEDIA_TYPE_PLAYLIST:
media = spotify.playlist(media_content_id)
items = [item["track"] for item in media.get("tracks", {}).get("items", [])]
if media := spotify.playlist(media_content_id):
items = [item["track"] for item in media.get("tracks", {}).get("items", [])]
elif media_content_type == MEDIA_TYPE_ALBUM:
media = spotify.album(media_content_id)
items = media.get("tracks", {}).get("items", [])
if media := spotify.album(media_content_id):
items = media.get("tracks", {}).get("items", [])
elif media_content_type == MEDIA_TYPE_ARTIST:
media = spotify.artist_albums(media_content_id, limit=BROWSE_LIMIT)
artist = spotify.artist(media_content_id)
title = artist.get("name")
image = fetch_image_url(artist)
items = media.get("items", [])
if (media := spotify.artist_albums(media_content_id, limit=BROWSE_LIMIT)) and (
artist := spotify.artist(media_content_id)
):
title = artist.get("name")
image = fetch_image_url(artist)
items = media.get("items", [])
elif media_content_type == MEDIA_TYPE_SHOW:
media = spotify.show_episodes(media_content_id, limit=BROWSE_LIMIT)
show = spotify.show(media_content_id)
title = show.get("name")
image = fetch_image_url(show)
items = media.get("items", [])
else:
media = None
items = []
if (media := spotify.show_episodes(media_content_id, limit=BROWSE_LIMIT)) and (
show := spotify.show(media_content_id)
):
title = show.get("name")
image = fetch_image_url(show)
items = media.get("items", [])
if media is None:
return None
@ -665,15 +711,16 @@ def build_item_response(spotify, user, payload, *, can_play_artist): # noqa: C9
if media_content_type == BrowsableMedia.CATEGORIES:
media_item = BrowseMedia(
title=LIBRARY_MAP.get(media_content_id),
media_class=media_class["parent"],
children_media_class=media_class["children"],
media_content_id=media_content_id,
media_content_type=MEDIA_PLAYER_PREFIX + media_content_type,
can_play=False,
can_expand=True,
children=[],
can_play=False,
children_media_class=media_class["children"],
media_class=media_class["parent"],
media_content_id=media_content_id,
media_content_type=f"{MEDIA_PLAYER_PREFIX}{media_content_type}",
title=LIBRARY_MAP.get(media_content_id, "Unknown"),
)
media_item.children = []
for item in items:
try:
item_id = item["id"]
@ -682,52 +729,54 @@ def build_item_response(spotify, user, payload, *, can_play_artist): # noqa: C9
continue
media_item.children.append(
BrowseMedia(
title=item.get("name"),
media_class=MEDIA_CLASS_PLAYLIST,
children_media_class=MEDIA_CLASS_TRACK,
media_content_id=item_id,
media_content_type=MEDIA_PLAYER_PREFIX + "category_playlists",
thumbnail=fetch_image_url(item, key="icons"),
can_play=False,
can_expand=True,
can_play=False,
children_media_class=MEDIA_CLASS_TRACK,
media_class=MEDIA_CLASS_PLAYLIST,
media_content_id=item_id,
media_content_type=f"{MEDIA_PLAYER_PREFIX}category_playlists",
thumbnail=fetch_image_url(item, key="icons"),
title=item.get("name"),
)
)
return media_item
if title is None:
title = LIBRARY_MAP.get(media_content_id, "Unknown")
if "name" in media:
title = media.get("name")
else:
title = LIBRARY_MAP.get(payload["media_content_id"])
title = media["name"]
params = {
"title": title,
"media_class": media_class["parent"],
"children_media_class": media_class["children"],
"media_content_id": media_content_id,
"media_content_type": MEDIA_PLAYER_PREFIX + media_content_type,
"can_play": media_content_type in PLAYABLE_MEDIA_TYPES
and (media_content_type != MEDIA_TYPE_ARTIST or can_play_artist),
"children": [],
"can_expand": True,
}
can_play = media_content_type in PLAYABLE_MEDIA_TYPES and (
media_content_type != MEDIA_TYPE_ARTIST or can_play_artist
)
browse_media = BrowseMedia(
can_expand=True,
can_play=can_play,
children_media_class=media_class["children"],
media_class=media_class["parent"],
media_content_id=media_content_id,
media_content_type=f"{MEDIA_PLAYER_PREFIX}{media_content_type}",
thumbnail=image,
title=title,
)
browse_media.children = []
for item in items:
try:
params["children"].append(
browse_media.children.append(
item_payload(item, can_play_artist=can_play_artist)
)
except (MissingMediaInformation, UnknownMediaType):
continue
if "images" in media:
params["thumbnail"] = fetch_image_url(media)
elif image:
params["thumbnail"] = image
browse_media.thumbnail = fetch_image_url(media)
return BrowseMedia(**params)
return browse_media
def item_payload(item, *, can_play_artist):
def item_payload(item: dict[str, Any], *, can_play_artist: bool) -> BrowseMedia:
"""
Create response payload for a single media item.
@ -751,54 +800,56 @@ def item_payload(item, *, can_play_artist):
MEDIA_TYPE_EPISODE,
]
payload = {
"title": item.get("name"),
"media_class": media_class["parent"],
"children_media_class": media_class["children"],
"media_content_id": media_id,
"media_content_type": MEDIA_PLAYER_PREFIX + media_type,
"can_play": media_type in PLAYABLE_MEDIA_TYPES
and (media_type != MEDIA_TYPE_ARTIST or can_play_artist),
"can_expand": can_expand,
}
can_play = media_type in PLAYABLE_MEDIA_TYPES and (
media_type != MEDIA_TYPE_ARTIST or can_play_artist
)
browse_media = BrowseMedia(
can_expand=can_expand,
can_play=can_play,
children_media_class=media_class["children"],
media_class=media_class["parent"],
media_content_id=media_id,
media_content_type=f"{MEDIA_PLAYER_PREFIX}{media_type}",
title=item.get("name", "Unknown"),
)
if "images" in item:
payload["thumbnail"] = fetch_image_url(item)
browse_media.thumbnail = fetch_image_url(item)
elif MEDIA_TYPE_ALBUM in item:
payload["thumbnail"] = fetch_image_url(item[MEDIA_TYPE_ALBUM])
browse_media.thumbnail = fetch_image_url(item[MEDIA_TYPE_ALBUM])
return BrowseMedia(**payload)
return browse_media
def library_payload(*, can_play_artist):
def library_payload(*, can_play_artist: bool) -> BrowseMedia:
"""
Create response payload to describe contents of a specific library.
Used by async_browse_media.
"""
library_info = {
"title": "Media Library",
"media_class": MEDIA_CLASS_DIRECTORY,
"media_content_id": "library",
"media_content_type": MEDIA_PLAYER_PREFIX + "library",
"can_play": False,
"can_expand": True,
"children": [],
}
browse_media = BrowseMedia(
can_expand=True,
can_play=False,
children_media_class=MEDIA_CLASS_DIRECTORY,
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id="library",
media_content_type=f"{MEDIA_PLAYER_PREFIX}library",
title="Media Library",
)
browse_media.children = []
for item in [{"name": n, "type": t} for t, n in LIBRARY_MAP.items()]:
library_info["children"].append(
browse_media.children.append(
item_payload(
{"name": item["name"], "type": item["type"], "uri": item["type"]},
can_play_artist=can_play_artist,
)
)
response = BrowseMedia(**library_info)
response.children_media_class = MEDIA_CLASS_DIRECTORY
return response
return browse_media
def fetch_image_url(item, key="images"):
def fetch_image_url(item: dict[str, Any], key="images") -> str | None:
"""Fetch image url."""
try:
return item.get(key, [])[0].get("url")

View File

@ -11,8 +11,8 @@
},
"abort": {
"authorize_url_timeout": "Timeout generating authorize URL.",
"no_url_available": "[%key:common::config_flow::abort::oauth2_no_url_available%]",
"missing_configuration": "The Spotify integration is not configured. Please follow the documentation.",
"no_url_available": "[%key:common::config_flow::abort::oauth2_no_url_available%]",
"reauth_account_mismatch": "The Spotify account authenticated with, does not match the account needed re-authentication."
},
"create_entry": { "default": "Successfully authenticated with Spotify." }

View File

@ -2646,12 +2646,6 @@ ignore_errors = true
[mypy-homeassistant.components.sonos.statistics]
ignore_errors = true
[mypy-homeassistant.components.spotify.config_flow]
ignore_errors = true
[mypy-homeassistant.components.spotify.media_player]
ignore_errors = true
[mypy-homeassistant.components.system_health]
ignore_errors = true

View File

@ -188,8 +188,6 @@ IGNORED_MODULES: Final[list[str]] = [
"homeassistant.components.sonos.sensor",
"homeassistant.components.sonos.speaker",
"homeassistant.components.sonos.statistics",
"homeassistant.components.spotify.config_flow",
"homeassistant.components.spotify.media_player",
"homeassistant.components.system_health",
"homeassistant.components.telegram_bot.polling",
"homeassistant.components.template",

View File

@ -194,7 +194,13 @@ async def test_reauthentication(
old_entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_REAUTH}, data=old_entry.data
DOMAIN,
context={
"source": SOURCE_REAUTH,
"unique_id": old_entry.unique_id,
"entry_id": old_entry.entry_id,
},
data=old_entry.data,
)
flows = hass.config_entries.flow.async_progress()
@ -261,7 +267,13 @@ async def test_reauth_account_mismatch(
old_entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_REAUTH}, data=old_entry.data
DOMAIN,
context={
"source": SOURCE_REAUTH,
"unique_id": old_entry.unique_id,
"entry_id": old_entry.entry_id,
},
data=old_entry.data,
)
flows = hass.config_entries.flow.async_progress()
@ -294,3 +306,13 @@ async def test_reauth_account_mismatch(
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "reauth_account_mismatch"
async def test_abort_if_no_reauth_entry(hass):
"""Check flow aborts when no entry is known when entring reauth confirmation."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": "reauth_confirm"}
)
assert result.get("type") == data_entry_flow.RESULT_TYPE_ABORT
assert result.get("reason") == "reauth_account_mismatch"