1
mirror of https://github.com/home-assistant/core synced 2024-09-25 00:41:32 +02:00

Extract Spotify media browsing into a module (#66175)

This commit is contained in:
Franck Nijhof 2022-02-09 22:03:15 +01:00 committed by GitHub
parent 3bce870c6d
commit 8a09303c98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 493 additions and 440 deletions

View File

@ -1078,8 +1078,10 @@ omit =
homeassistant/components/spider/*
homeassistant/components/splunk/*
homeassistant/components/spotify/__init__.py
homeassistant/components/spotify/browse_media.py
homeassistant/components/spotify/media_player.py
homeassistant/components/spotify/system_health.py
homeassistant/components/spotify/util.py
homeassistant/components/squeezebox/__init__.py
homeassistant/components/squeezebox/browse_media.py
homeassistant/components/squeezebox/media_player.py

View File

@ -4,7 +4,6 @@ import aiohttp
from spotipy import Spotify, SpotifyException
import voluptuous as vol
from homeassistant.components.media_player import BrowseError, BrowseMedia
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
ATTR_CREDENTIALS,
@ -22,15 +21,15 @@ from homeassistant.helpers.config_entry_oauth2_flow import (
from homeassistant.helpers.typing import ConfigType
from . import config_flow
from .browse_media import async_browse_media
from .const import (
DATA_SPOTIFY_CLIENT,
DATA_SPOTIFY_ME,
DATA_SPOTIFY_SESSION,
DOMAIN,
MEDIA_PLAYER_PREFIX,
SPOTIFY_SCOPES,
)
from .media_player import async_browse_media_internal
from .util import is_spotify_media_type, resolve_spotify_media_type
CONFIG_SCHEMA = vol.Schema(
{
@ -47,35 +46,12 @@ CONFIG_SCHEMA = vol.Schema(
PLATFORMS = [Platform.MEDIA_PLAYER]
def is_spotify_media_type(media_content_type: str) -> bool:
"""Return whether the media_content_type is a valid Spotify media_id."""
return media_content_type.startswith(MEDIA_PLAYER_PREFIX)
def resolve_spotify_media_type(media_content_type: str) -> str:
"""Return actual spotify media_content_type."""
return media_content_type[len(MEDIA_PLAYER_PREFIX) :]
async def async_browse_media(
hass: HomeAssistant,
media_content_type: str,
media_content_id: str,
*,
can_play_artist: bool = True,
) -> BrowseMedia:
"""Browse Spotify media."""
if not (info := next(iter(hass.data[DOMAIN].values()), None)):
raise BrowseError("No Spotify accounts available")
return await async_browse_media_internal(
hass,
info[DATA_SPOTIFY_CLIENT],
info[DATA_SPOTIFY_SESSION],
info[DATA_SPOTIFY_ME],
media_content_type,
media_content_id,
can_play_artist=can_play_artist,
)
__all__ = [
"async_browse_media",
"DOMAIN",
"is_spotify_media_type",
"resolve_spotify_media_type",
]
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:

View File

@ -0,0 +1,439 @@
"""Support for Spotify media browsing."""
from __future__ import annotations
from functools import partial
import logging
from typing import Any
from spotipy import Spotify
from homeassistant.backports.enum import StrEnum
from homeassistant.components.media_player import BrowseError, BrowseMedia
from homeassistant.components.media_player.const import (
MEDIA_CLASS_ALBUM,
MEDIA_CLASS_ARTIST,
MEDIA_CLASS_DIRECTORY,
MEDIA_CLASS_EPISODE,
MEDIA_CLASS_GENRE,
MEDIA_CLASS_PLAYLIST,
MEDIA_CLASS_PODCAST,
MEDIA_CLASS_TRACK,
MEDIA_TYPE_ALBUM,
MEDIA_TYPE_ARTIST,
MEDIA_TYPE_EPISODE,
MEDIA_TYPE_PLAYLIST,
MEDIA_TYPE_TRACK,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.config_entry_oauth2_flow import OAuth2Session
from .const import (
DATA_SPOTIFY_CLIENT,
DATA_SPOTIFY_ME,
DATA_SPOTIFY_SESSION,
DOMAIN,
MEDIA_PLAYER_PREFIX,
MEDIA_TYPE_SHOW,
PLAYABLE_MEDIA_TYPES,
)
from .util import fetch_image_url
BROWSE_LIMIT = 48
_LOGGER = logging.getLogger(__name__)
class BrowsableMedia(StrEnum):
"""Enum of browsable media."""
CURRENT_USER_PLAYLISTS = "current_user_playlists"
CURRENT_USER_FOLLOWED_ARTISTS = "current_user_followed_artists"
CURRENT_USER_SAVED_ALBUMS = "current_user_saved_albums"
CURRENT_USER_SAVED_TRACKS = "current_user_saved_tracks"
CURRENT_USER_SAVED_SHOWS = "current_user_saved_shows"
CURRENT_USER_RECENTLY_PLAYED = "current_user_recently_played"
CURRENT_USER_TOP_ARTISTS = "current_user_top_artists"
CURRENT_USER_TOP_TRACKS = "current_user_top_tracks"
CATEGORIES = "categories"
FEATURED_PLAYLISTS = "featured_playlists"
NEW_RELEASES = "new_releases"
LIBRARY_MAP = {
BrowsableMedia.CURRENT_USER_PLAYLISTS.value: "Playlists",
BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS.value: "Artists",
BrowsableMedia.CURRENT_USER_SAVED_ALBUMS.value: "Albums",
BrowsableMedia.CURRENT_USER_SAVED_TRACKS.value: "Tracks",
BrowsableMedia.CURRENT_USER_SAVED_SHOWS.value: "Podcasts",
BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED.value: "Recently played",
BrowsableMedia.CURRENT_USER_TOP_ARTISTS.value: "Top Artists",
BrowsableMedia.CURRENT_USER_TOP_TRACKS.value: "Top Tracks",
BrowsableMedia.CATEGORIES.value: "Categories",
BrowsableMedia.FEATURED_PLAYLISTS.value: "Featured Playlists",
BrowsableMedia.NEW_RELEASES.value: "New Releases",
}
CONTENT_TYPE_MEDIA_CLASS: dict[str, Any] = {
BrowsableMedia.CURRENT_USER_PLAYLISTS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_PLAYLIST,
},
BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_ARTIST,
},
BrowsableMedia.CURRENT_USER_SAVED_ALBUMS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_ALBUM,
},
BrowsableMedia.CURRENT_USER_SAVED_TRACKS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_TRACK,
},
BrowsableMedia.CURRENT_USER_SAVED_SHOWS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_PODCAST,
},
BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_TRACK,
},
BrowsableMedia.CURRENT_USER_TOP_ARTISTS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_ARTIST,
},
BrowsableMedia.CURRENT_USER_TOP_TRACKS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_TRACK,
},
BrowsableMedia.FEATURED_PLAYLISTS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_PLAYLIST,
},
BrowsableMedia.CATEGORIES.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_GENRE,
},
"category_playlists": {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_PLAYLIST,
},
BrowsableMedia.NEW_RELEASES.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_ALBUM,
},
MEDIA_TYPE_PLAYLIST: {
"parent": MEDIA_CLASS_PLAYLIST,
"children": MEDIA_CLASS_TRACK,
},
MEDIA_TYPE_ALBUM: {"parent": MEDIA_CLASS_ALBUM, "children": MEDIA_CLASS_TRACK},
MEDIA_TYPE_ARTIST: {"parent": MEDIA_CLASS_ARTIST, "children": MEDIA_CLASS_ALBUM},
MEDIA_TYPE_EPISODE: {"parent": MEDIA_CLASS_EPISODE, "children": None},
MEDIA_TYPE_SHOW: {"parent": MEDIA_CLASS_PODCAST, "children": MEDIA_CLASS_EPISODE},
MEDIA_TYPE_TRACK: {"parent": MEDIA_CLASS_TRACK, "children": None},
}
class MissingMediaInformation(BrowseError):
"""Missing media required information."""
class UnknownMediaType(BrowseError):
"""Unknown media type."""
async def async_browse_media(
hass: HomeAssistant,
media_content_type: str,
media_content_id: str,
*,
can_play_artist: bool = True,
) -> BrowseMedia:
"""Browse Spotify media."""
if not (info := next(iter(hass.data[DOMAIN].values()), None)):
raise BrowseError("No Spotify accounts available")
return await async_browse_media_internal(
hass,
info[DATA_SPOTIFY_CLIENT],
info[DATA_SPOTIFY_SESSION],
info[DATA_SPOTIFY_ME],
media_content_type,
media_content_id,
can_play_artist=can_play_artist,
)
async def async_browse_media_internal(
hass: HomeAssistant,
spotify: Spotify,
session: OAuth2Session,
current_user: dict[str, Any],
media_content_type: str | None,
media_content_id: str | None,
*,
can_play_artist: bool = True,
) -> BrowseMedia:
"""Browse spotify media."""
if media_content_type in (None, f"{MEDIA_PLAYER_PREFIX}library"):
return await hass.async_add_executor_job(
partial(library_payload, can_play_artist=can_play_artist)
)
await session.async_ensure_token_valid()
# Strip prefix
if media_content_type:
media_content_type = media_content_type[len(MEDIA_PLAYER_PREFIX) :]
payload = {
"media_content_type": media_content_type,
"media_content_id": media_content_id,
}
response = await hass.async_add_executor_job(
partial(
build_item_response,
spotify,
current_user,
payload,
can_play_artist=can_play_artist,
)
)
if response is None:
raise BrowseError(f"Media not found: {media_content_type} / {media_content_id}")
return response
def build_item_response( # noqa: C901
spotify: Spotify,
user: dict[str, Any],
payload: dict[str, str | None],
*,
can_play_artist: bool,
) -> BrowseMedia | None:
"""Create response payload for the provided media query."""
media_content_type = payload["media_content_type"]
media_content_id = payload["media_content_id"]
if media_content_type is None or media_content_id is None:
return None
title = None
image = None
media: dict[str, Any] | None = None
items = []
if media_content_type == BrowsableMedia.CURRENT_USER_PLAYLISTS:
if media := spotify.current_user_playlists(limit=BROWSE_LIMIT):
items = media.get("items", [])
elif media_content_type == BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS:
if media := spotify.current_user_followed_artists(limit=BROWSE_LIMIT):
items = media.get("artists", {}).get("items", [])
elif media_content_type == BrowsableMedia.CURRENT_USER_SAVED_ALBUMS:
if media := spotify.current_user_saved_albums(limit=BROWSE_LIMIT):
items = [item["album"] for item in media.get("items", [])]
elif media_content_type == BrowsableMedia.CURRENT_USER_SAVED_TRACKS:
if media := spotify.current_user_saved_tracks(limit=BROWSE_LIMIT):
items = [item["track"] for item in media.get("items", [])]
elif media_content_type == BrowsableMedia.CURRENT_USER_SAVED_SHOWS:
if media := spotify.current_user_saved_shows(limit=BROWSE_LIMIT):
items = [item["show"] for item in media.get("items", [])]
elif media_content_type == BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED:
if media := spotify.current_user_recently_played(limit=BROWSE_LIMIT):
items = [item["track"] for item in media.get("items", [])]
elif media_content_type == BrowsableMedia.CURRENT_USER_TOP_ARTISTS:
if media := spotify.current_user_top_artists(limit=BROWSE_LIMIT):
items = media.get("items", [])
elif media_content_type == BrowsableMedia.CURRENT_USER_TOP_TRACKS:
if media := spotify.current_user_top_tracks(limit=BROWSE_LIMIT):
items = media.get("items", [])
elif media_content_type == BrowsableMedia.FEATURED_PLAYLISTS:
if media := spotify.featured_playlists(
country=user["country"], limit=BROWSE_LIMIT
):
items = media.get("playlists", {}).get("items", [])
elif media_content_type == BrowsableMedia.CATEGORIES:
if media := spotify.categories(country=user["country"], limit=BROWSE_LIMIT):
items = media.get("categories", {}).get("items", [])
elif media_content_type == "category_playlists":
if (
media := spotify.category_playlists(
category_id=media_content_id,
country=user["country"],
limit=BROWSE_LIMIT,
)
) and (category := spotify.category(media_content_id, country=user["country"])):
title = category.get("name")
image = fetch_image_url(category, key="icons")
items = media.get("playlists", {}).get("items", [])
elif media_content_type == BrowsableMedia.NEW_RELEASES:
if media := spotify.new_releases(country=user["country"], limit=BROWSE_LIMIT):
items = media.get("albums", {}).get("items", [])
elif media_content_type == MEDIA_TYPE_PLAYLIST:
if media := spotify.playlist(media_content_id):
items = [item["track"] for item in media.get("tracks", {}).get("items", [])]
elif media_content_type == MEDIA_TYPE_ALBUM:
if media := spotify.album(media_content_id):
items = media.get("tracks", {}).get("items", [])
elif media_content_type == MEDIA_TYPE_ARTIST:
if (media := spotify.artist_albums(media_content_id, limit=BROWSE_LIMIT)) and (
artist := spotify.artist(media_content_id)
):
title = artist.get("name")
image = fetch_image_url(artist)
items = media.get("items", [])
elif media_content_type == MEDIA_TYPE_SHOW:
if (media := spotify.show_episodes(media_content_id, limit=BROWSE_LIMIT)) and (
show := spotify.show(media_content_id)
):
title = show.get("name")
image = fetch_image_url(show)
items = media.get("items", [])
if media is None:
return None
try:
media_class = CONTENT_TYPE_MEDIA_CLASS[media_content_type]
except KeyError:
_LOGGER.debug("Unknown media type received: %s", media_content_type)
return None
if media_content_type == BrowsableMedia.CATEGORIES:
media_item = BrowseMedia(
can_expand=True,
can_play=False,
children_media_class=media_class["children"],
media_class=media_class["parent"],
media_content_id=media_content_id,
media_content_type=f"{MEDIA_PLAYER_PREFIX}{media_content_type}",
title=LIBRARY_MAP.get(media_content_id, "Unknown"),
)
media_item.children = []
for item in items:
try:
item_id = item["id"]
except KeyError:
_LOGGER.debug("Missing ID for media item: %s", item)
continue
media_item.children.append(
BrowseMedia(
can_expand=True,
can_play=False,
children_media_class=MEDIA_CLASS_TRACK,
media_class=MEDIA_CLASS_PLAYLIST,
media_content_id=item_id,
media_content_type=f"{MEDIA_PLAYER_PREFIX}category_playlists",
thumbnail=fetch_image_url(item, key="icons"),
title=item.get("name"),
)
)
return media_item
if title is None:
title = LIBRARY_MAP.get(media_content_id, "Unknown")
if "name" in media:
title = media["name"]
can_play = media_content_type in PLAYABLE_MEDIA_TYPES and (
media_content_type != MEDIA_TYPE_ARTIST or can_play_artist
)
browse_media = BrowseMedia(
can_expand=True,
can_play=can_play,
children_media_class=media_class["children"],
media_class=media_class["parent"],
media_content_id=media_content_id,
media_content_type=f"{MEDIA_PLAYER_PREFIX}{media_content_type}",
thumbnail=image,
title=title,
)
browse_media.children = []
for item in items:
try:
browse_media.children.append(
item_payload(item, can_play_artist=can_play_artist)
)
except (MissingMediaInformation, UnknownMediaType):
continue
if "images" in media:
browse_media.thumbnail = fetch_image_url(media)
return browse_media
def item_payload(item: dict[str, Any], *, can_play_artist: bool) -> BrowseMedia:
"""
Create response payload for a single media item.
Used by async_browse_media.
"""
try:
media_type = item["type"]
media_id = item["uri"]
except KeyError as err:
_LOGGER.debug("Missing type or URI for media item: %s", item)
raise MissingMediaInformation from err
try:
media_class = CONTENT_TYPE_MEDIA_CLASS[media_type]
except KeyError as err:
_LOGGER.debug("Unknown media type received: %s", media_type)
raise UnknownMediaType from err
can_expand = media_type not in [
MEDIA_TYPE_TRACK,
MEDIA_TYPE_EPISODE,
]
can_play = media_type in PLAYABLE_MEDIA_TYPES and (
media_type != MEDIA_TYPE_ARTIST or can_play_artist
)
browse_media = BrowseMedia(
can_expand=can_expand,
can_play=can_play,
children_media_class=media_class["children"],
media_class=media_class["parent"],
media_content_id=media_id,
media_content_type=f"{MEDIA_PLAYER_PREFIX}{media_type}",
title=item.get("name", "Unknown"),
)
if "images" in item:
browse_media.thumbnail = fetch_image_url(item)
elif MEDIA_TYPE_ALBUM in item:
browse_media.thumbnail = fetch_image_url(item[MEDIA_TYPE_ALBUM])
return browse_media
def library_payload(*, can_play_artist: bool) -> BrowseMedia:
"""
Create response payload to describe contents of a specific library.
Used by async_browse_media.
"""
browse_media = BrowseMedia(
can_expand=True,
can_play=False,
children_media_class=MEDIA_CLASS_DIRECTORY,
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id="library",
media_content_type=f"{MEDIA_PLAYER_PREFIX}library",
title="Media Library",
)
browse_media.children = []
for item in [{"name": n, "type": t} for t, n in LIBRARY_MAP.items()]:
browse_media.children.append(
item_payload(
{"name": item["name"], "type": item["type"], "uri": item["type"]},
can_play_artist=can_play_artist,
)
)
return browse_media

View File

@ -1,4 +1,11 @@
"""Define constants for the Spotify integration."""
from homeassistant.components.media_player.const import (
MEDIA_TYPE_ALBUM,
MEDIA_TYPE_ARTIST,
MEDIA_TYPE_EPISODE,
MEDIA_TYPE_PLAYLIST,
MEDIA_TYPE_TRACK,
)
DOMAIN = "spotify"
@ -24,3 +31,13 @@ SPOTIFY_SCOPES = [
]
MEDIA_PLAYER_PREFIX = "spotify://"
MEDIA_TYPE_SHOW = "show"
PLAYABLE_MEDIA_TYPES = [
MEDIA_TYPE_PLAYLIST,
MEDIA_TYPE_ALBUM,
MEDIA_TYPE_ARTIST,
MEDIA_TYPE_EPISODE,
MEDIA_TYPE_SHOW,
MEDIA_TYPE_TRACK,
]

View File

@ -4,7 +4,6 @@ from __future__ import annotations
from asyncio import run_coroutine_threadsafe
import datetime as dt
from datetime import timedelta
from functools import partial
import logging
from typing import Any
@ -12,19 +11,8 @@ import requests
from spotipy import Spotify, SpotifyException
from yarl import URL
from homeassistant.backports.enum import StrEnum
from homeassistant.components.media_player import BrowseMedia, MediaPlayerEntity
from homeassistant.components.media_player.const import (
MEDIA_CLASS_ALBUM,
MEDIA_CLASS_ARTIST,
MEDIA_CLASS_DIRECTORY,
MEDIA_CLASS_EPISODE,
MEDIA_CLASS_GENRE,
MEDIA_CLASS_PLAYLIST,
MEDIA_CLASS_PODCAST,
MEDIA_CLASS_TRACK,
MEDIA_TYPE_ALBUM,
MEDIA_TYPE_ARTIST,
MEDIA_TYPE_EPISODE,
MEDIA_TYPE_MUSIC,
MEDIA_TYPE_PLAYLIST,
@ -44,7 +32,6 @@ from homeassistant.components.media_player.const import (
SUPPORT_SHUFFLE_SET,
SUPPORT_VOLUME_SET,
)
from homeassistant.components.media_player.errors import BrowseError
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_ID,
@ -61,14 +48,17 @@ from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.util.dt import utc_from_timestamp
from .browse_media import async_browse_media_internal
from .const import (
DATA_SPOTIFY_CLIENT,
DATA_SPOTIFY_ME,
DATA_SPOTIFY_SESSION,
DOMAIN,
MEDIA_PLAYER_PREFIX,
PLAYABLE_MEDIA_TYPES,
SPOTIFY_SCOPES,
)
from .util import fetch_image_url
_LOGGER = logging.getLogger(__name__)
@ -98,118 +88,6 @@ REPEAT_MODE_MAPPING_TO_SPOTIFY = {
value: key for key, value in REPEAT_MODE_MAPPING_TO_HA.items()
}
BROWSE_LIMIT = 48
MEDIA_TYPE_SHOW = "show"
PLAYABLE_MEDIA_TYPES = [
MEDIA_TYPE_PLAYLIST,
MEDIA_TYPE_ALBUM,
MEDIA_TYPE_ARTIST,
MEDIA_TYPE_EPISODE,
MEDIA_TYPE_SHOW,
MEDIA_TYPE_TRACK,
]
class BrowsableMedia(StrEnum):
"""Enum of browsable media."""
CURRENT_USER_PLAYLISTS = "current_user_playlists"
CURRENT_USER_FOLLOWED_ARTISTS = "current_user_followed_artists"
CURRENT_USER_SAVED_ALBUMS = "current_user_saved_albums"
CURRENT_USER_SAVED_TRACKS = "current_user_saved_tracks"
CURRENT_USER_SAVED_SHOWS = "current_user_saved_shows"
CURRENT_USER_RECENTLY_PLAYED = "current_user_recently_played"
CURRENT_USER_TOP_ARTISTS = "current_user_top_artists"
CURRENT_USER_TOP_TRACKS = "current_user_top_tracks"
CATEGORIES = "categories"
FEATURED_PLAYLISTS = "featured_playlists"
NEW_RELEASES = "new_releases"
LIBRARY_MAP = {
BrowsableMedia.CURRENT_USER_PLAYLISTS.value: "Playlists",
BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS.value: "Artists",
BrowsableMedia.CURRENT_USER_SAVED_ALBUMS.value: "Albums",
BrowsableMedia.CURRENT_USER_SAVED_TRACKS.value: "Tracks",
BrowsableMedia.CURRENT_USER_SAVED_SHOWS.value: "Podcasts",
BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED.value: "Recently played",
BrowsableMedia.CURRENT_USER_TOP_ARTISTS.value: "Top Artists",
BrowsableMedia.CURRENT_USER_TOP_TRACKS.value: "Top Tracks",
BrowsableMedia.CATEGORIES.value: "Categories",
BrowsableMedia.FEATURED_PLAYLISTS.value: "Featured Playlists",
BrowsableMedia.NEW_RELEASES.value: "New Releases",
}
CONTENT_TYPE_MEDIA_CLASS: dict[str, Any] = {
BrowsableMedia.CURRENT_USER_PLAYLISTS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_PLAYLIST,
},
BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_ARTIST,
},
BrowsableMedia.CURRENT_USER_SAVED_ALBUMS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_ALBUM,
},
BrowsableMedia.CURRENT_USER_SAVED_TRACKS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_TRACK,
},
BrowsableMedia.CURRENT_USER_SAVED_SHOWS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_PODCAST,
},
BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_TRACK,
},
BrowsableMedia.CURRENT_USER_TOP_ARTISTS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_ARTIST,
},
BrowsableMedia.CURRENT_USER_TOP_TRACKS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_TRACK,
},
BrowsableMedia.FEATURED_PLAYLISTS.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_PLAYLIST,
},
BrowsableMedia.CATEGORIES.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_GENRE,
},
"category_playlists": {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_PLAYLIST,
},
BrowsableMedia.NEW_RELEASES.value: {
"parent": MEDIA_CLASS_DIRECTORY,
"children": MEDIA_CLASS_ALBUM,
},
MEDIA_TYPE_PLAYLIST: {
"parent": MEDIA_CLASS_PLAYLIST,
"children": MEDIA_CLASS_TRACK,
},
MEDIA_TYPE_ALBUM: {"parent": MEDIA_CLASS_ALBUM, "children": MEDIA_CLASS_TRACK},
MEDIA_TYPE_ARTIST: {"parent": MEDIA_CLASS_ARTIST, "children": MEDIA_CLASS_ALBUM},
MEDIA_TYPE_EPISODE: {"parent": MEDIA_CLASS_EPISODE, "children": None},
MEDIA_TYPE_SHOW: {"parent": MEDIA_CLASS_PODCAST, "children": MEDIA_CLASS_EPISODE},
MEDIA_TYPE_TRACK: {"parent": MEDIA_CLASS_TRACK, "children": None},
}
class MissingMediaInformation(BrowseError):
"""Missing media required information."""
class UnknownMediaType(BrowseError):
"""Unknown media type."""
async def async_setup_entry(
hass: HomeAssistant,
@ -572,286 +450,3 @@ class SpotifyMediaPlayer(MediaPlayerEntity):
media_content_type,
media_content_id,
)
async def async_browse_media_internal(
hass: HomeAssistant,
spotify: Spotify,
session: OAuth2Session,
current_user: dict[str, Any],
media_content_type: str | None,
media_content_id: str | None,
*,
can_play_artist: bool = True,
) -> BrowseMedia:
"""Browse spotify media."""
if media_content_type in (None, f"{MEDIA_PLAYER_PREFIX}library"):
return await hass.async_add_executor_job(
partial(library_payload, can_play_artist=can_play_artist)
)
await session.async_ensure_token_valid()
# Strip prefix
if media_content_type:
media_content_type = media_content_type[len(MEDIA_PLAYER_PREFIX) :]
payload = {
"media_content_type": media_content_type,
"media_content_id": media_content_id,
}
response = await hass.async_add_executor_job(
partial(
build_item_response,
spotify,
current_user,
payload,
can_play_artist=can_play_artist,
)
)
if response is None:
raise BrowseError(f"Media not found: {media_content_type} / {media_content_id}")
return response
def build_item_response( # noqa: C901
spotify: Spotify,
user: dict[str, Any],
payload: dict[str, str | None],
*,
can_play_artist: bool,
) -> BrowseMedia | None:
"""Create response payload for the provided media query."""
media_content_type = payload["media_content_type"]
media_content_id = payload["media_content_id"]
if media_content_type is None or media_content_id is None:
return None
title = None
image = None
media: dict[str, Any] | None = None
items = []
if media_content_type == BrowsableMedia.CURRENT_USER_PLAYLISTS:
if media := spotify.current_user_playlists(limit=BROWSE_LIMIT):
items = media.get("items", [])
elif media_content_type == BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS:
if media := spotify.current_user_followed_artists(limit=BROWSE_LIMIT):
items = media.get("artists", {}).get("items", [])
elif media_content_type == BrowsableMedia.CURRENT_USER_SAVED_ALBUMS:
if media := spotify.current_user_saved_albums(limit=BROWSE_LIMIT):
items = [item["album"] for item in media.get("items", [])]
elif media_content_type == BrowsableMedia.CURRENT_USER_SAVED_TRACKS:
if media := spotify.current_user_saved_tracks(limit=BROWSE_LIMIT):
items = [item["track"] for item in media.get("items", [])]
elif media_content_type == BrowsableMedia.CURRENT_USER_SAVED_SHOWS:
if media := spotify.current_user_saved_shows(limit=BROWSE_LIMIT):
items = [item["show"] for item in media.get("items", [])]
elif media_content_type == BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED:
if media := spotify.current_user_recently_played(limit=BROWSE_LIMIT):
items = [item["track"] for item in media.get("items", [])]
elif media_content_type == BrowsableMedia.CURRENT_USER_TOP_ARTISTS:
if media := spotify.current_user_top_artists(limit=BROWSE_LIMIT):
items = media.get("items", [])
elif media_content_type == BrowsableMedia.CURRENT_USER_TOP_TRACKS:
if media := spotify.current_user_top_tracks(limit=BROWSE_LIMIT):
items = media.get("items", [])
elif media_content_type == BrowsableMedia.FEATURED_PLAYLISTS:
if media := spotify.featured_playlists(
country=user["country"], limit=BROWSE_LIMIT
):
items = media.get("playlists", {}).get("items", [])
elif media_content_type == BrowsableMedia.CATEGORIES:
if media := spotify.categories(country=user["country"], limit=BROWSE_LIMIT):
items = media.get("categories", {}).get("items", [])
elif media_content_type == "category_playlists":
if (
media := spotify.category_playlists(
category_id=media_content_id,
country=user["country"],
limit=BROWSE_LIMIT,
)
) and (category := spotify.category(media_content_id, country=user["country"])):
title = category.get("name")
image = fetch_image_url(category, key="icons")
items = media.get("playlists", {}).get("items", [])
elif media_content_type == BrowsableMedia.NEW_RELEASES:
if media := spotify.new_releases(country=user["country"], limit=BROWSE_LIMIT):
items = media.get("albums", {}).get("items", [])
elif media_content_type == MEDIA_TYPE_PLAYLIST:
if media := spotify.playlist(media_content_id):
items = [item["track"] for item in media.get("tracks", {}).get("items", [])]
elif media_content_type == MEDIA_TYPE_ALBUM:
if media := spotify.album(media_content_id):
items = media.get("tracks", {}).get("items", [])
elif media_content_type == MEDIA_TYPE_ARTIST:
if (media := spotify.artist_albums(media_content_id, limit=BROWSE_LIMIT)) and (
artist := spotify.artist(media_content_id)
):
title = artist.get("name")
image = fetch_image_url(artist)
items = media.get("items", [])
elif media_content_type == MEDIA_TYPE_SHOW:
if (media := spotify.show_episodes(media_content_id, limit=BROWSE_LIMIT)) and (
show := spotify.show(media_content_id)
):
title = show.get("name")
image = fetch_image_url(show)
items = media.get("items", [])
if media is None:
return None
try:
media_class = CONTENT_TYPE_MEDIA_CLASS[media_content_type]
except KeyError:
_LOGGER.debug("Unknown media type received: %s", media_content_type)
return None
if media_content_type == BrowsableMedia.CATEGORIES:
media_item = BrowseMedia(
can_expand=True,
can_play=False,
children_media_class=media_class["children"],
media_class=media_class["parent"],
media_content_id=media_content_id,
media_content_type=f"{MEDIA_PLAYER_PREFIX}{media_content_type}",
title=LIBRARY_MAP.get(media_content_id, "Unknown"),
)
media_item.children = []
for item in items:
try:
item_id = item["id"]
except KeyError:
_LOGGER.debug("Missing ID for media item: %s", item)
continue
media_item.children.append(
BrowseMedia(
can_expand=True,
can_play=False,
children_media_class=MEDIA_CLASS_TRACK,
media_class=MEDIA_CLASS_PLAYLIST,
media_content_id=item_id,
media_content_type=f"{MEDIA_PLAYER_PREFIX}category_playlists",
thumbnail=fetch_image_url(item, key="icons"),
title=item.get("name"),
)
)
return media_item
if title is None:
title = LIBRARY_MAP.get(media_content_id, "Unknown")
if "name" in media:
title = media["name"]
can_play = media_content_type in PLAYABLE_MEDIA_TYPES and (
media_content_type != MEDIA_TYPE_ARTIST or can_play_artist
)
browse_media = BrowseMedia(
can_expand=True,
can_play=can_play,
children_media_class=media_class["children"],
media_class=media_class["parent"],
media_content_id=media_content_id,
media_content_type=f"{MEDIA_PLAYER_PREFIX}{media_content_type}",
thumbnail=image,
title=title,
)
browse_media.children = []
for item in items:
try:
browse_media.children.append(
item_payload(item, can_play_artist=can_play_artist)
)
except (MissingMediaInformation, UnknownMediaType):
continue
if "images" in media:
browse_media.thumbnail = fetch_image_url(media)
return browse_media
def item_payload(item: dict[str, Any], *, can_play_artist: bool) -> BrowseMedia:
"""
Create response payload for a single media item.
Used by async_browse_media.
"""
try:
media_type = item["type"]
media_id = item["uri"]
except KeyError as err:
_LOGGER.debug("Missing type or URI for media item: %s", item)
raise MissingMediaInformation from err
try:
media_class = CONTENT_TYPE_MEDIA_CLASS[media_type]
except KeyError as err:
_LOGGER.debug("Unknown media type received: %s", media_type)
raise UnknownMediaType from err
can_expand = media_type not in [
MEDIA_TYPE_TRACK,
MEDIA_TYPE_EPISODE,
]
can_play = media_type in PLAYABLE_MEDIA_TYPES and (
media_type != MEDIA_TYPE_ARTIST or can_play_artist
)
browse_media = BrowseMedia(
can_expand=can_expand,
can_play=can_play,
children_media_class=media_class["children"],
media_class=media_class["parent"],
media_content_id=media_id,
media_content_type=f"{MEDIA_PLAYER_PREFIX}{media_type}",
title=item.get("name", "Unknown"),
)
if "images" in item:
browse_media.thumbnail = fetch_image_url(item)
elif MEDIA_TYPE_ALBUM in item:
browse_media.thumbnail = fetch_image_url(item[MEDIA_TYPE_ALBUM])
return browse_media
def library_payload(*, can_play_artist: bool) -> BrowseMedia:
"""
Create response payload to describe contents of a specific library.
Used by async_browse_media.
"""
browse_media = BrowseMedia(
can_expand=True,
can_play=False,
children_media_class=MEDIA_CLASS_DIRECTORY,
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id="library",
media_content_type=f"{MEDIA_PLAYER_PREFIX}library",
title="Media Library",
)
browse_media.children = []
for item in [{"name": n, "type": t} for t, n in LIBRARY_MAP.items()]:
browse_media.children.append(
item_payload(
{"name": item["name"], "type": item["type"], "uri": item["type"]},
can_play_artist=can_play_artist,
)
)
return browse_media
def fetch_image_url(item: dict[str, Any], key="images") -> str | None:
"""Fetch image url."""
try:
return item.get(key, [])[0].get("url")
except IndexError:
return None

View File

@ -0,0 +1,24 @@
"""Utils for Spotify."""
from __future__ import annotations
from typing import Any
from .const import MEDIA_PLAYER_PREFIX
def is_spotify_media_type(media_content_type: str) -> bool:
"""Return whether the media_content_type is a valid Spotify media_id."""
return media_content_type.startswith(MEDIA_PLAYER_PREFIX)
def resolve_spotify_media_type(media_content_type: str) -> str:
"""Return actual spotify media_content_type."""
return media_content_type[len(MEDIA_PLAYER_PREFIX) :]
def fetch_image_url(item: dict[str, Any], key="images") -> str | None:
"""Fetch image url."""
try:
return item.get(key, [])[0].get("url")
except IndexError:
return None