1
mirror of https://github.com/home-assistant/core synced 2024-07-12 07:21:24 +02:00

Improve type hint in cast media_player entity (#77025)

* Improve type hint in cast media_player entity

* Update docstring
This commit is contained in:
epenet 2022-08-22 09:13:14 +02:00 committed by GitHub
parent e03eb238e2
commit ed60611b07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -7,6 +7,7 @@ from contextlib import suppress
from datetime import datetime
import json
import logging
from typing import Any
import pychromecast
from pychromecast.controllers.homeassistant import HomeAssistantController
@ -52,7 +53,8 @@ from homeassistant.const import (
STATE_PAUSED,
STATE_PLAYING,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity import DeviceInfo
@ -232,9 +234,9 @@ class CastDevice:
self._status_listener = CastStatusListener(
self, chromecast, self.mz_mgr, self._mz_only
)
self._chromecast.start()
chromecast.start()
async def _async_disconnect(self):
async def _async_disconnect(self) -> None:
"""Disconnect Chromecast object if it is set."""
if self._chromecast is not None:
_LOGGER.debug(
@ -246,7 +248,7 @@ class CastDevice:
self._invalidate()
def _invalidate(self):
def _invalidate(self) -> None:
"""Invalidate some attributes."""
self._chromecast = None
self.mz_mgr = None
@ -254,7 +256,7 @@ class CastDevice:
self._status_listener.invalidate()
self._status_listener = None
async def _async_cast_discovered(self, discover: ChromecastInfo):
async def _async_cast_discovered(self, discover: ChromecastInfo) -> None:
"""Handle discovery of new Chromecast."""
if self._cast_info.uuid != discover.uuid:
# Discovered is not our device.
@ -263,13 +265,19 @@ class CastDevice:
_LOGGER.debug("Discovered chromecast with same UUID: %s", discover)
self._cast_info = discover
async def _async_cast_removed(self, discover: ChromecastInfo):
async def _async_cast_removed(self, discover: ChromecastInfo) -> None:
"""Handle removal of Chromecast."""
async def _async_stop(self, event):
async def _async_stop(self, event: Event) -> None:
"""Disconnect socket on Home Assistant stop."""
await self._async_disconnect()
def _get_chromecast(self) -> pychromecast.Chromecast:
"""Ensure chromecast is available, to facilitate type checking."""
if self._chromecast is None:
raise HomeAssistantError("Chromecast is not available.")
return self._chromecast
class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity):
"""Representation of a Cast device on the network."""
@ -292,7 +300,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity):
self._attr_available = False
self._hass_cast_controller: HomeAssistantController | None = None
self._cast_view_remove_handler = None
self._cast_view_remove_handler: CALLBACK_TYPE | None = None
self._attr_unique_id = str(cast_info.uuid)
self._attr_device_info = DeviceInfo(
identifiers={(CAST_DOMAIN, str(cast_info.uuid).replace("-", ""))},
@ -307,7 +315,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity):
]:
self._attr_device_class = MediaPlayerDeviceClass.SPEAKER
async def async_added_to_hass(self):
async def async_added_to_hass(self) -> None:
"""Create chromecast object when added to hass."""
self._async_setup(self.entity_id)
@ -491,62 +499,63 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity):
return media_controller
def turn_on(self):
def turn_on(self) -> None:
"""Turn on the cast device."""
if not self._chromecast.is_idle:
chromecast = self._get_chromecast()
if not chromecast.is_idle:
# Already turned on
return
if self._chromecast.app_id is not None:
if chromecast.app_id is not None:
# Quit the previous app before starting splash screen or media player
self._chromecast.quit_app()
chromecast.quit_app()
# The only way we can turn the Chromecast is on is by launching an app
if self._chromecast.cast_type == pychromecast.const.CAST_TYPE_CHROMECAST:
if chromecast.cast_type == pychromecast.const.CAST_TYPE_CHROMECAST:
app_data = {"media_id": CAST_SPLASH, "media_type": "image/png"}
quick_play(self._chromecast, "default_media_receiver", app_data)
quick_play(chromecast, "default_media_receiver", app_data)
else:
self._chromecast.start_app(pychromecast.config.APP_MEDIA_RECEIVER)
chromecast.start_app(pychromecast.config.APP_MEDIA_RECEIVER)
def turn_off(self):
def turn_off(self) -> None:
"""Turn off the cast device."""
self._chromecast.quit_app()
self._get_chromecast().quit_app()
def mute_volume(self, mute):
def mute_volume(self, mute: bool) -> None:
"""Mute the volume."""
self._chromecast.set_volume_muted(mute)
self._get_chromecast().set_volume_muted(mute)
def set_volume_level(self, volume):
def set_volume_level(self, volume: float) -> None:
"""Set volume level, range 0..1."""
self._chromecast.set_volume(volume)
self._get_chromecast().set_volume(volume)
def media_play(self):
def media_play(self) -> None:
"""Send play command."""
media_controller = self._media_controller()
media_controller.play()
def media_pause(self):
def media_pause(self) -> None:
"""Send pause command."""
media_controller = self._media_controller()
media_controller.pause()
def media_stop(self):
def media_stop(self) -> None:
"""Send stop command."""
media_controller = self._media_controller()
media_controller.stop()
def media_previous_track(self):
def media_previous_track(self) -> None:
"""Send previous track command."""
media_controller = self._media_controller()
media_controller.queue_prev()
def media_next_track(self):
def media_next_track(self) -> None:
"""Send next track command."""
media_controller = self._media_controller()
media_controller.queue_next()
def media_seek(self, position):
def media_seek(self, position: float) -> None:
"""Seek the media to a specific location."""
media_controller = self._media_controller()
media_controller.seek(position)
@ -589,11 +598,14 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity):
children=sorted(children, key=lambda c: c.title),
)
async def async_browse_media(self, media_content_type=None, media_content_id=None):
async def async_browse_media(
self, media_content_type: str | None = None, media_content_id: str | None = None
) -> BrowseMedia:
"""Implement the websocket media browsing helper."""
content_filter = None
if self._chromecast.cast_type in (
chromecast = self._get_chromecast()
if chromecast.cast_type in (
pychromecast.const.CAST_TYPE_AUDIO,
pychromecast.const.CAST_TYPE_GROUP,
):
@ -612,7 +624,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity):
self.hass,
media_content_type,
media_content_id,
self._chromecast.cast_type,
chromecast.cast_type,
)
if browse_media:
return browse_media
@ -621,8 +633,11 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity):
self.hass, media_content_id, content_filter=content_filter
)
async def async_play_media(self, media_type, media_id, **kwargs):
async def async_play_media(
self, media_type: str, media_id: str, **kwargs: Any
) -> None:
"""Play a piece of media."""
chromecast = self._get_chromecast()
# Handle media_source
if media_source.is_media_source_id(media_id):
sourced_media = await media_source.async_resolve_media(
@ -648,9 +663,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity):
if "app_id" in app_data:
app_id = app_data.pop("app_id")
_LOGGER.info("Starting Cast app by ID %s", app_id)
await self.hass.async_add_executor_job(
self._chromecast.start_app, app_id
)
await self.hass.async_add_executor_job(chromecast.start_app, app_id)
if app_data:
_LOGGER.warning(
"Extra keys %s were ignored. Please use app_name to cast media",
@ -661,7 +674,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity):
app_name = app_data.pop("app_name")
try:
await self.hass.async_add_executor_job(
quick_play, self._chromecast, app_name, app_data
quick_play, chromecast, app_name, app_data
)
except NotImplementedError:
_LOGGER.error("App %s not supported", app_name)
@ -670,7 +683,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity):
# Try the cast platforms
for platform in self.hass.data[CAST_DOMAIN]["cast_platform"].values():
result = await platform.async_play_media(
self.hass, self.entity_id, self._chromecast, media_type, media_id
self.hass, self.entity_id, chromecast, media_type, media_id
)
if result:
return
@ -735,7 +748,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity):
app_data,
)
await self.hass.async_add_executor_job(
quick_play, self._chromecast, "default_media_receiver", app_data
quick_play, chromecast, "default_media_receiver", app_data
)
def _media_status(self):
@ -761,7 +774,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity):
return (media_status, media_status_received)
@property
def state(self):
def state(self) -> str | None:
"""Return the state of the player."""
# The lovelace app loops media to prevent timing out, don't show that
if self.app_id == CAST_APP_ID_HOMEASSISTANT_LOVELACE:
@ -785,7 +798,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity):
return None
@property
def media_content_id(self):
def media_content_id(self) -> str | None:
"""Content ID of current playing media."""
# The lovelace app loops media to prevent timing out, don't show that
if self.app_id == CAST_APP_ID_HOMEASSISTANT_LOVELACE:
@ -794,7 +807,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity):
return media_status.content_id if media_status else None
@property
def media_content_type(self):
def media_content_type(self) -> str | None:
"""Content type of current playing media."""
# The lovelace app loops media to prevent timing out, don't show that
if self.app_id == CAST_APP_ID_HOMEASSISTANT_LOVELACE: