Fix Sonos music library play problems (#113429)

This commit is contained in:
Pete Sage 2024-04-24 08:03:40 -04:00 committed by GitHub
parent 0e0b543dec
commit 24a1f0712f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 302 additions and 31 deletions

View File

@ -199,9 +199,15 @@ def build_item_response(
payload["search_type"] == MediaType.ALBUM
and media[0].item_class == "object.item.audioItem.musicTrack"
):
item = get_media(media_library, payload["idstring"], SONOS_ALBUM_ARTIST)
idstring = payload["idstring"]
if idstring.startswith("A:ALBUMARTIST/"):
search_type = SONOS_ALBUM_ARTIST
elif idstring.startswith("A:ALBUM/"):
search_type = SONOS_ALBUM
item = get_media(media_library, idstring, search_type)
title = getattr(item, "title", None)
thumbnail = get_thumbnail_url(SONOS_ALBUM_ARTIST, payload["idstring"])
thumbnail = get_thumbnail_url(search_type, payload["idstring"])
if not title:
try:
@ -493,8 +499,9 @@ def get_content_id(item: DidlObject) -> str:
def get_media(
media_library: MusicLibrary, item_id: str, search_type: str
) -> MusicServiceItem:
"""Fetch media/album."""
) -> MusicServiceItem | None:
"""Fetch a single media/album."""
_LOGGER.debug("get_media item_id [%s], search_type [%s]", item_id, search_type)
search_type = MEDIA_TYPES_TO_SONOS.get(search_type, search_type)
if search_type == "playlists":
@ -513,9 +520,38 @@ def get_media(
if not item_id.startswith("A:ALBUM") and search_type == SONOS_ALBUM:
item_id = "A:ALBUMARTIST/" + "/".join(item_id.split("/")[2:])
search_term = urllib.parse.unquote(item_id.split("/")[-1])
matches = media_library.get_music_library_information(
search_type, search_term=search_term, full_album_art_uri=True
if item_id.startswith("A:ALBUM/") or search_type == "tracks":
search_term = urllib.parse.unquote(item_id.split("/")[-1])
matches = media_library.get_music_library_information(
search_type, search_term=search_term, full_album_art_uri=True
)
else:
# When requesting media by album_artist, composer, genre use the browse interface
# to navigate the hierarchy. This occurs when invoked from media browser or service
# calls
# Example: A:ALBUMARTIST/Neil Young/Greatest Hits - get specific album
# Example: A:ALBUMARTIST/Neil Young - get all albums
# Others: composer, genre
# A:<topic>/<name>/<optional title>
splits = item_id.split("/")
title = urllib.parse.unquote(splits[2]) if len(splits) > 2 else None
browse_id_string = splits[0] + "/" + splits[1]
matches = media_library.browse_by_idstring(
search_type, browse_id_string, full_album_art_uri=True
)
if title:
result = next(
(item for item in matches if (title == item.title)),
None,
)
matches = [result]
_LOGGER.debug(
"get_media search_type [%s] item_id [%s] matches [%d]",
search_type,
item_id,
len(matches),
)
if len(matches) > 0:
return matches[0]
return None

View File

@ -7,7 +7,7 @@ from functools import partial
import logging
from typing import Any
from soco import alarms
from soco import SoCo, alarms
from soco.core import (
MUSIC_SRC_LINE_IN,
MUSIC_SRC_RADIO,
@ -15,6 +15,7 @@ from soco.core import (
PLAY_MODES,
)
from soco.data_structures import DidlFavorite
from soco.ms_data_structures import MusicServiceItem
from sonos_websocket.exception import SonosWebsocketError
import voluptuous as vol
@ -549,6 +550,7 @@ class SonosMediaPlayerEntity(SonosEntity, MediaPlayerEntity):
self, media_type: MediaType | str, media_id: str, is_radio: bool, **kwargs: Any
) -> None:
"""Wrap sync calls to async_play_media."""
_LOGGER.debug("_play_media media_type %s media_id %s", media_type, media_id)
enqueue = kwargs.get(ATTR_MEDIA_ENQUEUE, MediaPlayerEnqueue.REPLACE)
if media_type == "favorite_item_id":
@ -645,10 +647,35 @@ class SonosMediaPlayerEntity(SonosEntity, MediaPlayerEntity):
_LOGGER.error('Could not find "%s" in the library', media_id)
return
soco.play_uri(item.get_uri())
self._play_media_queue(soco, item, enqueue)
else:
_LOGGER.error('Sonos does not support a media type of "%s"', media_type)
def _play_media_queue(
self, soco: SoCo, item: MusicServiceItem, enqueue: MediaPlayerEnqueue
):
"""Manage adding, replacing, playing items onto the sonos queue."""
_LOGGER.debug(
"_play_media_queue item_id [%s] title [%s] enqueue [%s]",
item.item_id,
item.title,
enqueue,
)
if enqueue == MediaPlayerEnqueue.REPLACE:
soco.clear_queue()
if enqueue in (MediaPlayerEnqueue.ADD, MediaPlayerEnqueue.REPLACE):
soco.add_to_queue(item, timeout=LONG_SERVICE_TIMEOUT)
if enqueue == MediaPlayerEnqueue.REPLACE:
soco.play_from_queue(0)
else:
pos = (self.media.queue_position or 0) + 1
new_pos = soco.add_to_queue(
item, position=pos, timeout=LONG_SERVICE_TIMEOUT
)
if enqueue == MediaPlayerEnqueue.PLAY:
soco.play_from_queue(new_pos - 1)
@soco_error()
def set_sleep_timer(self, sleep_time: int) -> None:
"""Set the timer on the player."""

View File

@ -203,6 +203,7 @@ class SoCoMockFactory:
my_speaker_info["zone_name"] = name
my_speaker_info["uid"] = mock_soco.uid
mock_soco.get_speaker_info = Mock(return_value=my_speaker_info)
mock_soco.add_to_queue = Mock(return_value=10)
mock_soco.avTransport = SonosMockService("AVTransport", ip_address)
mock_soco.renderingControl = SonosMockService("RenderingControl", ip_address)
@ -303,11 +304,116 @@ def config_fixture():
return {DOMAIN: {MP_DOMAIN: {CONF_HOSTS: ["192.168.42.2"]}}}
class MockMusicServiceItem:
"""Mocks a Soco MusicServiceItem."""
def __init__(self, title: str, item_id: str, parent_id: str, item_class: str):
"""Initialize the mock item."""
self.title = title
self.item_id = item_id
self.item_class = item_class
self.parent_id = parent_id
def mock_browse_by_idstring(
search_type: str, idstring: str, start=0, max_items=100, full_album_art_uri=False
) -> list[MockMusicServiceItem]:
"""Mock the call to browse_by_id_string."""
if search_type == "album_artists" and idstring == "A:ALBUMARTIST/Beatles":
return [
MockMusicServiceItem(
"All",
idstring + "/",
idstring,
"object.container.playlistContainer.sameArtist",
),
MockMusicServiceItem(
"A Hard Day's Night",
"A:ALBUMARTIST/Beatles/A%20Hard%20Day's%20Night",
idstring,
"object.container.album.musicAlbum",
),
MockMusicServiceItem(
"Abbey Road",
"A:ALBUMARTIST/Beatles/Abbey%20Road",
idstring,
"object.container.album.musicAlbum",
),
]
# browse_by_id_string works with URL encoded or decoded strings
if search_type == "genres" and idstring in (
"A:GENRE/Classic%20Rock",
"A:GENRE/Classic Rock",
):
return [
MockMusicServiceItem(
"All",
"A:GENRE/Classic%20Rock/",
"A:GENRE/Classic%20Rock",
"object.container.albumlist",
),
MockMusicServiceItem(
"Bruce Springsteen",
"A:GENRE/Classic%20Rock/Bruce%20Springsteen",
"A:GENRE/Classic%20Rock",
"object.container.person.musicArtist",
),
MockMusicServiceItem(
"Cream",
"A:GENRE/Classic%20Rock/Cream",
"A:GENRE/Classic%20Rock",
"object.container.person.musicArtist",
),
]
if search_type == "composers" and idstring in (
"A:COMPOSER/Carlos%20Santana",
"A:COMPOSER/Carlos Santana",
):
return [
MockMusicServiceItem(
"All",
"A:COMPOSER/Carlos%20Santana/",
"A:COMPOSER/Carlos%20Santana",
"object.container.playlistContainer.sameArtist",
),
MockMusicServiceItem(
"Between Good And Evil",
"A:COMPOSER/Carlos%20Santana/Between%20Good%20And%20Evil",
"A:COMPOSER/Carlos%20Santana",
"object.container.album.musicAlbum",
),
MockMusicServiceItem(
"Sacred Fire",
"A:COMPOSER/Carlos%20Santana/Sacred%20Fire",
"A:COMPOSER/Carlos%20Santana",
"object.container.album.musicAlbum",
),
]
return []
def mock_get_music_library_information(
search_type: str, search_term: str, full_album_art_uri: bool = True
) -> list[MockMusicServiceItem]:
"""Mock the call to get music library information."""
if search_type == "albums" and search_term == "Abbey Road":
return [
MockMusicServiceItem(
"Abbey Road",
"A:ALBUM/Abbey%20Road",
"A:ALBUM",
"object.container.album.musicAlbum",
)
]
@pytest.fixture(name="music_library")
def music_library_fixture():
"""Create music_library fixture."""
music_library = MagicMock()
music_library.get_sonos_favorites.return_value.update_id = 1
music_library.browse_by_idstring = mock_browse_by_idstring
music_library.get_music_library_information = mock_get_music_library_information
return music_library

View File

@ -7,7 +7,10 @@ import pytest
from homeassistant.components.media_player import (
DOMAIN as MP_DOMAIN,
SERVICE_PLAY_MEDIA,
MediaPlayerEnqueue,
)
from homeassistant.components.media_player.const import ATTR_MEDIA_ENQUEUE
from homeassistant.components.sonos.media_player import LONG_SERVICE_TIMEOUT
from homeassistant.const import STATE_IDLE
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import (
@ -16,7 +19,7 @@ from homeassistant.helpers.device_registry import (
DeviceRegistry,
)
from .conftest import SoCoMockFactory
from .conftest import MockMusicServiceItem, SoCoMockFactory
async def test_device_registry(
@ -65,35 +68,134 @@ async def test_entity_basic(
assert attributes["volume_level"] == 0.19
class _MockMusicServiceItem:
"""Mocks a Soco MusicServiceItem."""
def __init__(
self,
title: str,
item_id: str,
parent_id: str,
item_class: str,
) -> None:
"""Initialize the mock item."""
self.title = title
self.item_id = item_id
self.item_class = item_class
self.parent_id = parent_id
def get_uri(self) -> str:
"""Return URI."""
return self.item_id.replace("S://", "x-file-cifs://")
@pytest.mark.parametrize(
("media_content_type", "media_content_id", "enqueue", "test_result"),
[
(
"artist",
"A:ALBUMARTIST/Beatles",
MediaPlayerEnqueue.REPLACE,
{
"title": "All",
"item_id": "A:ALBUMARTIST/Beatles/",
"clear_queue": 1,
"position": None,
"play": 1,
"play_pos": 0,
},
),
(
"genre",
"A:GENRE/Classic%20Rock",
MediaPlayerEnqueue.ADD,
{
"title": "All",
"item_id": "A:GENRE/Classic%20Rock/",
"clear_queue": 0,
"position": None,
"play": 0,
"play_pos": 0,
},
),
(
"album",
"A:ALBUM/Abbey%20Road",
MediaPlayerEnqueue.NEXT,
{
"title": "Abbey Road",
"item_id": "A:ALBUM/Abbey%20Road",
"clear_queue": 0,
"position": 1,
"play": 0,
"play_pos": 0,
},
),
(
"composer",
"A:COMPOSER/Carlos%20Santana",
MediaPlayerEnqueue.PLAY,
{
"title": "All",
"item_id": "A:COMPOSER/Carlos%20Santana/",
"clear_queue": 0,
"position": 1,
"play": 1,
"play_pos": 9,
},
),
(
"artist",
"A:ALBUMARTIST/Beatles/Abbey%20Road",
MediaPlayerEnqueue.REPLACE,
{
"title": "Abbey Road",
"item_id": "A:ALBUMARTIST/Beatles/Abbey%20Road",
"clear_queue": 1,
"position": None,
"play": 1,
"play_pos": 0,
},
),
],
)
async def test_play_media_library(
hass: HomeAssistant,
soco_factory: SoCoMockFactory,
async_autosetup_sonos,
media_content_type,
media_content_id,
enqueue,
test_result,
) -> None:
"""Test playing local library with a variety of options."""
sock_mock = soco_factory.mock_list.get("192.168.42.2")
await hass.services.async_call(
MP_DOMAIN,
SERVICE_PLAY_MEDIA,
{
"entity_id": "media_player.zone_a",
"media_content_type": media_content_type,
"media_content_id": media_content_id,
ATTR_MEDIA_ENQUEUE: enqueue,
},
blocking=True,
)
assert sock_mock.clear_queue.call_count == test_result["clear_queue"]
assert sock_mock.add_to_queue.call_count == 1
assert (
sock_mock.add_to_queue.call_args_list[0].args[0].title == test_result["title"]
)
assert (
sock_mock.add_to_queue.call_args_list[0].args[0].item_id
== test_result["item_id"]
)
if test_result["position"] is not None:
assert (
sock_mock.add_to_queue.call_args_list[0].kwargs["position"]
== test_result["position"]
)
else:
assert "position" not in sock_mock.add_to_queue.call_args_list[0].kwargs
assert (
sock_mock.add_to_queue.call_args_list[0].kwargs["timeout"]
== LONG_SERVICE_TIMEOUT
)
assert sock_mock.play_from_queue.call_count == test_result["play"]
if test_result["play"] != 0:
assert (
sock_mock.play_from_queue.call_args_list[0].args[0]
== test_result["play_pos"]
)
_mock_playlists = [
_MockMusicServiceItem(
MockMusicServiceItem(
"playlist1",
"S://192.168.1.68/music/iTunes/iTunes%20Music%20Library.xml#GUID_1",
"A:PLAYLISTS",
"object.container.playlistContainer",
),
_MockMusicServiceItem(
MockMusicServiceItem(
"playlist2",
"S://192.168.1.68/music/iTunes/iTunes%20Music%20Library.xml#GUID_2",
"A:PLAYLISTS",