1
mirror of https://github.com/home-assistant/core synced 2024-09-12 15:16:21 +02:00

Refactor chromecast into media_player platform

This commit is contained in:
Paulus Schoutsen 2015-03-03 23:50:54 -08:00
parent a90dcabe01
commit 84844c242b
9 changed files with 513 additions and 375 deletions

View File

@ -1,339 +0,0 @@
"""
homeassistant.components.chromecast
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Provides functionality to interact with Chromecasts.
"""
import logging
try:
import pychromecast
except ImportError:
# Ignore, we will raise appropriate error later
pass
from homeassistant.loader import get_component
import homeassistant.util as util
from homeassistant.helpers import extract_entity_ids
from homeassistant.const import (
ATTR_ENTITY_ID, ATTR_FRIENDLY_NAME, SERVICE_TURN_OFF, SERVICE_VOLUME_UP,
SERVICE_VOLUME_DOWN, SERVICE_MEDIA_PLAY_PAUSE, SERVICE_MEDIA_PLAY,
SERVICE_MEDIA_PAUSE, SERVICE_MEDIA_NEXT_TRACK, SERVICE_MEDIA_PREV_TRACK)
DOMAIN = 'chromecast'
DEPENDENCIES = []
SERVICE_YOUTUBE_VIDEO = 'play_youtube_video'
ENTITY_ID_FORMAT = DOMAIN + '.{}'
STATE_NO_APP = 'idle'
ATTR_STATE = 'state'
ATTR_OPTIONS = 'options'
ATTR_MEDIA_STATE = 'media_state'
ATTR_MEDIA_CONTENT_ID = 'media_content_id'
ATTR_MEDIA_TITLE = 'media_title'
ATTR_MEDIA_ARTIST = 'media_artist'
ATTR_MEDIA_ALBUM = 'media_album'
ATTR_MEDIA_IMAGE_URL = 'media_image_url'
ATTR_MEDIA_VOLUME = 'media_volume'
ATTR_MEDIA_DURATION = 'media_duration'
MEDIA_STATE_UNKNOWN = 'unknown'
MEDIA_STATE_PLAYING = 'playing'
MEDIA_STATE_STOPPED = 'stopped'
def is_on(hass, entity_id=None):
""" Returns true if specified ChromeCast entity_id is on.
Will check all chromecasts if no entity_id specified. """
entity_ids = [entity_id] if entity_id else hass.states.entity_ids(DOMAIN)
return any(not hass.states.is_state(entity_id, STATE_NO_APP)
for entity_id in entity_ids)
def turn_off(hass, entity_id=None):
""" Will turn off specified Chromecast or all. """
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_TURN_OFF, data)
def volume_up(hass, entity_id=None):
""" Send the chromecast the command for volume up. """
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_VOLUME_UP, data)
def volume_down(hass, entity_id=None):
""" Send the chromecast the command for volume down. """
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_VOLUME_DOWN, data)
def media_play_pause(hass, entity_id=None):
""" Send the chromecast the command for play/pause. """
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_MEDIA_PLAY_PAUSE, data)
def media_play(hass, entity_id=None):
""" Send the chromecast the command for play/pause. """
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_MEDIA_PLAY, data)
def media_pause(hass, entity_id=None):
""" Send the chromecast the command for play/pause. """
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_MEDIA_PAUSE, data)
def media_next_track(hass, entity_id=None):
""" Send the chromecast the command for next track. """
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_MEDIA_NEXT_TRACK, data)
def media_prev_track(hass, entity_id=None):
""" Send the chromecast the command for prev track. """
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_MEDIA_PREV_TRACK, data)
def setup_chromecast(casts, host):
""" Tries to convert host to Chromecast object and set it up. """
# Check if already setup
if any(cast.host == host for cast in casts.values()):
return
try:
cast = pychromecast.PyChromecast(host)
entity_id = util.ensure_unique_string(
ENTITY_ID_FORMAT.format(
util.slugify(cast.device.friendly_name)),
casts.keys())
casts[entity_id] = cast
except pychromecast.ChromecastConnectionError:
pass
def setup(hass, config):
# pylint: disable=unused-argument,too-many-locals
""" Listen for chromecast events. """
logger = logging.getLogger(__name__)
discovery = get_component('discovery')
try:
# pylint: disable=redefined-outer-name
import pychromecast
except ImportError:
logger.exception(("Failed to import pychromecast. "
"Did you maybe not install the 'pychromecast' "
"dependency?"))
return False
casts = {}
# If discovery component not loaded, scan ourselves
if discovery.DOMAIN not in hass.components:
logger.info("Scanning for Chromecasts")
hosts = pychromecast.discover_chromecasts()
for host in hosts:
setup_chromecast(casts, host)
def chromecast_discovered(service, info):
""" Called when a Chromecast has been discovered. """
logger.info("New Chromecast discovered: %s", info[0])
setup_chromecast(casts, info[0])
discovery.listen(
hass, discovery.services.GOOGLE_CAST, chromecast_discovered)
def update_chromecast_state(entity_id, chromecast):
""" Retrieve state of Chromecast and update statemachine. """
chromecast.refresh()
status = chromecast.app
state_attr = {ATTR_FRIENDLY_NAME:
chromecast.device.friendly_name}
if status and status.app_id != pychromecast.APP_ID['HOME']:
state = status.app_id
ramp = chromecast.get_protocol(pychromecast.PROTOCOL_RAMP)
if ramp and ramp.state != pychromecast.RAMP_STATE_UNKNOWN:
if ramp.state == pychromecast.RAMP_STATE_PLAYING:
state_attr[ATTR_MEDIA_STATE] = MEDIA_STATE_PLAYING
else:
state_attr[ATTR_MEDIA_STATE] = MEDIA_STATE_STOPPED
if ramp.content_id:
state_attr[ATTR_MEDIA_CONTENT_ID] = ramp.content_id
if ramp.title:
state_attr[ATTR_MEDIA_TITLE] = ramp.title
if ramp.artist:
state_attr[ATTR_MEDIA_ARTIST] = ramp.artist
if ramp.album:
state_attr[ATTR_MEDIA_ALBUM] = ramp.album
if ramp.image_url:
state_attr[ATTR_MEDIA_IMAGE_URL] = ramp.image_url
if ramp.duration:
state_attr[ATTR_MEDIA_DURATION] = ramp.duration
state_attr[ATTR_MEDIA_VOLUME] = ramp.volume
else:
state = STATE_NO_APP
hass.states.set(entity_id, state, state_attr)
def update_chromecast_states(time):
""" Updates all chromecast states. """
if casts:
logger.info("Updating Chromecast status")
for entity_id, cast in casts.items():
update_chromecast_state(entity_id, cast)
def _service_to_entities(service):
""" Helper method to get entities from service. """
entity_ids = extract_entity_ids(hass, service)
if entity_ids:
for entity_id in entity_ids:
cast = casts.get(entity_id)
if cast:
yield entity_id, cast
else:
yield from casts.items()
def turn_off_service(service):
""" Service to exit any running app on the specified ChromeCast and
shows idle screen. Will quit all ChromeCasts if nothing specified.
"""
for entity_id, cast in _service_to_entities(service):
cast.quit_app()
update_chromecast_state(entity_id, cast)
def volume_up_service(service):
""" Service to send the chromecast the command for volume up. """
for _, cast in _service_to_entities(service):
ramp = cast.get_protocol(pychromecast.PROTOCOL_RAMP)
if ramp:
ramp.volume_up()
def volume_down_service(service):
""" Service to send the chromecast the command for volume down. """
for _, cast in _service_to_entities(service):
ramp = cast.get_protocol(pychromecast.PROTOCOL_RAMP)
if ramp:
ramp.volume_down()
def media_play_pause_service(service):
""" Service to send the chromecast the command for play/pause. """
for _, cast in _service_to_entities(service):
ramp = cast.get_protocol(pychromecast.PROTOCOL_RAMP)
if ramp:
ramp.playpause()
def media_play_service(service):
""" Service to send the chromecast the command for play/pause. """
for _, cast in _service_to_entities(service):
ramp = cast.get_protocol(pychromecast.PROTOCOL_RAMP)
if ramp and ramp.state == pychromecast.RAMP_STATE_STOPPED:
ramp.playpause()
def media_pause_service(service):
""" Service to send the chromecast the command for play/pause. """
for _, cast in _service_to_entities(service):
ramp = cast.get_protocol(pychromecast.PROTOCOL_RAMP)
if ramp and ramp.state == pychromecast.RAMP_STATE_PLAYING:
ramp.playpause()
def media_next_track_service(service):
""" Service to send the chromecast the command for next track. """
for entity_id, cast in _service_to_entities(service):
ramp = cast.get_protocol(pychromecast.PROTOCOL_RAMP)
if ramp:
next(ramp)
update_chromecast_state(entity_id, cast)
def play_youtube_video_service(service, video_id):
""" Plays specified video_id on the Chromecast's YouTube channel. """
if video_id: # if service.data.get('video') returned None
for entity_id, cast in _service_to_entities(service):
pychromecast.play_youtube_video(video_id, cast.host)
update_chromecast_state(entity_id, cast)
hass.track_time_change(update_chromecast_states, second=range(0, 60, 15))
hass.services.register(DOMAIN, SERVICE_TURN_OFF,
turn_off_service)
hass.services.register(DOMAIN, SERVICE_VOLUME_UP,
volume_up_service)
hass.services.register(DOMAIN, SERVICE_VOLUME_DOWN,
volume_down_service)
hass.services.register(DOMAIN, SERVICE_MEDIA_PLAY_PAUSE,
media_play_pause_service)
hass.services.register(DOMAIN, SERVICE_MEDIA_PLAY,
media_play_service)
hass.services.register(DOMAIN, SERVICE_MEDIA_PAUSE,
media_pause_service)
hass.services.register(DOMAIN, SERVICE_MEDIA_NEXT_TRACK,
media_next_track_service)
hass.services.register(DOMAIN, "start_fireplace",
lambda service:
play_youtube_video_service(service, "eyU3bRy2x44"))
hass.services.register(DOMAIN, "start_epic_sax",
lambda service:
play_youtube_video_service(service, "kxopViU98Xo"))
hass.services.register(DOMAIN, SERVICE_YOUTUBE_VIDEO,
lambda service:
play_youtube_video_service(service,
service.data.get(
'video')))
update_chromecast_states(None)
return True

View File

@ -17,7 +17,8 @@ DOMAIN = "demo"
DEPENDENCIES = []
COMPONENTS_WITH_DEMO_PLATFORM = ['switch', 'light', 'thermostat', 'sensor']
COMPONENTS_WITH_DEMO_PLATFORM = [
'switch', 'light', 'thermostat', 'sensor', 'media_player']
def setup(hass, config):
@ -71,12 +72,6 @@ def setup(hass, config):
]
})
# Setup chromecast
hass.states.set("chromecast.Living_Rm", "Plex",
{'friendly_name': 'Living Room',
ATTR_ENTITY_PICTURE:
'http://graph.facebook.com/KillBillMovie/picture'})
# Setup configurator
configurator_ids = []

View File

@ -25,7 +25,7 @@ SCAN_INTERVAL = 300 # seconds
SERVICE_HANDLERS = {
services.BELKIN_WEMO: "switch",
services.GOOGLE_CAST: "chromecast",
services.GOOGLE_CAST: "media_player",
services.PHILIPS_HUE: "light",
}

View File

@ -29,7 +29,7 @@
case "switch":
return "image:flash-on";
case "chromecast":
case "media_player":
var icon = "hardware:cast";
if (state !== "idle") {

View File

@ -0,0 +1,207 @@
"""
homeassistant.components.media_player
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Component to interface with various media players
"""
import logging
from homeassistant.components import discovery
from homeassistant.helpers import Device
from homeassistant.helpers.device_component import DeviceComponent
from homeassistant.const import (
ATTR_ENTITY_ID, SERVICE_TURN_OFF, SERVICE_VOLUME_UP,
SERVICE_VOLUME_DOWN, SERVICE_MEDIA_PLAY_PAUSE, SERVICE_MEDIA_PLAY,
SERVICE_MEDIA_PAUSE, SERVICE_MEDIA_NEXT_TRACK, SERVICE_MEDIA_PREV_TRACK)
DOMAIN = 'media_player'
DEPENDENCIES = []
SCAN_INTERVAL = 30
ENTITY_ID_FORMAT = DOMAIN + '.{}'
DISCOVERY_PLATFORMS = {
discovery.services.GOOGLE_CAST: 'cast',
}
SERVICE_YOUTUBE_VIDEO = 'play_youtube_video'
STATE_NO_APP = 'idle'
ATTR_STATE = 'state'
ATTR_OPTIONS = 'options'
ATTR_MEDIA_STATE = 'media_state'
ATTR_MEDIA_CONTENT_ID = 'media_content_id'
ATTR_MEDIA_TITLE = 'media_title'
ATTR_MEDIA_ARTIST = 'media_artist'
ATTR_MEDIA_ALBUM = 'media_album'
ATTR_MEDIA_IMAGE_URL = 'media_image_url'
ATTR_MEDIA_VOLUME = 'media_volume'
ATTR_MEDIA_DURATION = 'media_duration'
MEDIA_STATE_UNKNOWN = 'unknown'
MEDIA_STATE_PLAYING = 'playing'
MEDIA_STATE_STOPPED = 'stopped'
YOUTUBE_COVER_URL_FORMAT = 'http://img.youtube.com/vi/{}/1.jpg'
def is_on(hass, entity_id=None):
""" Returns true if specified media player entity_id is on.
Will check all media player if no entity_id specified. """
entity_ids = [entity_id] if entity_id else hass.states.entity_ids(DOMAIN)
return any(not hass.states.is_state(entity_id, STATE_NO_APP)
for entity_id in entity_ids)
def turn_off(hass, entity_id=None):
""" Will turn off specified media player or all. """
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_TURN_OFF, data)
def volume_up(hass, entity_id=None):
""" Send the media player the command for volume up. """
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_VOLUME_UP, data)
def volume_down(hass, entity_id=None):
""" Send the media player the command for volume down. """
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_VOLUME_DOWN, data)
def media_play_pause(hass, entity_id=None):
""" Send the media player the command for play/pause. """
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_MEDIA_PLAY_PAUSE, data)
def media_play(hass, entity_id=None):
""" Send the media player the command for play/pause. """
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_MEDIA_PLAY, data)
def media_pause(hass, entity_id=None):
""" Send the media player the command for play/pause. """
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_MEDIA_PAUSE, data)
def media_next_track(hass, entity_id=None):
""" Send the media player the command for next track. """
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_MEDIA_NEXT_TRACK, data)
def media_prev_track(hass, entity_id=None):
""" Send the media player the command for prev track. """
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_MEDIA_PREV_TRACK, data)
SERVICE_TO_METHOD = {
SERVICE_TURN_OFF: 'turn_off',
SERVICE_VOLUME_UP: 'volume_up',
SERVICE_VOLUME_DOWN: 'volume_down',
SERVICE_MEDIA_PLAY_PAUSE: 'media_play_pause',
SERVICE_MEDIA_PLAY: 'media_play',
SERVICE_MEDIA_PAUSE: 'media_pause',
SERVICE_MEDIA_NEXT_TRACK: 'media_next_track',
}
def setup(hass, config):
""" Track states and offer events for media_players. """
component = DeviceComponent(
logging.getLogger(__name__), DOMAIN, hass, SCAN_INTERVAL,
DISCOVERY_PLATFORMS)
component.setup(config)
def media_player_service_handler(service):
""" Maps services to methods on MediaPlayerDevice. """
target_players = component.extract_from_service(service)
method = SERVICE_TO_METHOD[service.service]
for player in target_players:
getattr(player, method)()
if player.should_poll:
player.update_ha_state(True)
for service in SERVICE_TO_METHOD:
hass.services.register(DOMAIN, service, media_player_service_handler)
def play_youtube_video_service(service, media_id):
""" Plays specified media_id on the media player. """
target_players = component.extract_from_service(service)
if media_id:
for player in target_players:
player.play_youtube(media_id)
hass.services.register(DOMAIN, "start_fireplace",
lambda service:
play_youtube_video_service(service, "eyU3bRy2x44"))
hass.services.register(DOMAIN, "start_epic_sax",
lambda service:
play_youtube_video_service(service, "kxopViU98Xo"))
hass.services.register(DOMAIN, SERVICE_YOUTUBE_VIDEO,
lambda service:
play_youtube_video_service(
service, service.data.get('video')))
return True
class MediaPlayerDevice(Device):
""" ABC for media player devices. """
def turn_off(self):
""" turn_off media player. """
pass
def volume_up(self):
""" volume_up media player. """
pass
def volume_down(self):
""" volume_down media player. """
pass
def media_play_pause(self):
""" media_play_pause media player. """
pass
def media_play(self):
""" media_play media player. """
pass
def media_pause(self):
""" media_pause media player. """
pass
def media_next_track(self):
""" media_next_track media player. """
pass
def play_youtube(self, media_id):
""" Plays a YouTube media. """
pass

View File

@ -0,0 +1,162 @@
"""
homeassistant.components.media_player.chromecast
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Provides functionality to interact with Cast devices on the network.
WARNING: This platform is currently not working due to a changed Cast API
"""
import logging
try:
import pychromecast
except ImportError:
# We will throw error later
pass
from homeassistant.components.media_player import (
MediaPlayerDevice, STATE_NO_APP, ATTR_MEDIA_STATE,
ATTR_MEDIA_CONTENT_ID, ATTR_MEDIA_TITLE, ATTR_MEDIA_ARTIST,
ATTR_MEDIA_ALBUM, ATTR_MEDIA_IMAGE_URL, ATTR_MEDIA_DURATION,
ATTR_MEDIA_VOLUME, MEDIA_STATE_PLAYING, MEDIA_STATE_STOPPED)
# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices, discovery_info=None):
""" Sets up the cast platform. """
logger = logging.getLogger(__name__)
try:
# pylint: disable=redefined-outer-name
import pychromecast
except ImportError:
logger.exception(("Failed to import pychromecast. "
"Did you maybe not install the 'pychromecast' "
"dependency?"))
return
if discovery_info:
hosts = [discovery_info[0]]
else:
hosts = pychromecast.discover_chromecasts()
casts = []
for host in hosts:
try:
casts.append(CastDevice(host))
except pychromecast.ChromecastConnectionError:
pass
add_devices(casts)
class CastDevice(MediaPlayerDevice):
""" Represents a Cast device on the network. """
def __init__(self, host):
self.cast = pychromecast.PyChromecast(host)
@property
def name(self):
""" Returns the name of the device. """
return self.cast.device.friendly_name
@property
def state(self):
""" Returns the state of the device. """
status = self.cast.app
if status is None or status.app_id == pychromecast.APP_ID['HOME']:
return STATE_NO_APP
else:
return status.description
@property
def state_attributes(self):
""" Returns the state attributes. """
ramp = self.cast.get_protocol(pychromecast.PROTOCOL_RAMP)
if ramp and ramp.state != pychromecast.RAMP_STATE_UNKNOWN:
state_attr = {}
if ramp.state == pychromecast.RAMP_STATE_PLAYING:
state_attr[ATTR_MEDIA_STATE] = MEDIA_STATE_PLAYING
else:
state_attr[ATTR_MEDIA_STATE] = MEDIA_STATE_STOPPED
if ramp.content_id:
state_attr[ATTR_MEDIA_CONTENT_ID] = ramp.content_id
if ramp.title:
state_attr[ATTR_MEDIA_TITLE] = ramp.title
if ramp.artist:
state_attr[ATTR_MEDIA_ARTIST] = ramp.artist
if ramp.album:
state_attr[ATTR_MEDIA_ALBUM] = ramp.album
if ramp.image_url:
state_attr[ATTR_MEDIA_IMAGE_URL] = ramp.image_url
if ramp.duration:
state_attr[ATTR_MEDIA_DURATION] = ramp.duration
state_attr[ATTR_MEDIA_VOLUME] = ramp.volume
return state_attr
def turn_off(self):
""" Service to exit any running app on the specimedia player ChromeCast and
shows idle screen. Will quit all ChromeCasts if nothing specified.
"""
self.cast.quit_app()
def volume_up(self):
""" Service to send the chromecast the command for volume up. """
ramp = self.cast.get_protocol(pychromecast.PROTOCOL_RAMP)
if ramp:
ramp.volume_up()
def volume_down(self):
""" Service to send the chromecast the command for volume down. """
ramp = self.cast.get_protocol(pychromecast.PROTOCOL_RAMP)
if ramp:
ramp.volume_down()
def media_play_pause(self):
""" Service to send the chromecast the command for play/pause. """
ramp = self.cast.get_protocol(pychromecast.PROTOCOL_RAMP)
if ramp:
ramp.playpause()
def media_play(self):
""" Service to send the chromecast the command for play/pause. """
ramp = self.cast.get_protocol(pychromecast.PROTOCOL_RAMP)
if ramp and ramp.state == pychromecast.RAMP_STATE_STOPPED:
ramp.playpause()
def media_pause(self):
""" Service to send the chromecast the command for play/pause. """
ramp = self.cast.get_protocol(pychromecast.PROTOCOL_RAMP)
if ramp and ramp.state == pychromecast.RAMP_STATE_PLAYING:
ramp.playpause()
def media_next_track(self):
""" Service to send the chromecast the command for next track. """
ramp = self.cast.get_protocol(pychromecast.PROTOCOL_RAMP)
if ramp:
ramp.next()
def play_youtube_video(self, video_id):
""" Plays specified video_id on the Chromecast's YouTube channel. """
pychromecast.play_youtube_video(video_id, self.cast.host)

View File

@ -0,0 +1,100 @@
"""
homeassistant.components.media_player.chromecast
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Demo implementation of the media player.
"""
from homeassistant.components.media_player import (
MediaPlayerDevice, STATE_NO_APP, ATTR_MEDIA_STATE,
ATTR_MEDIA_CONTENT_ID, ATTR_MEDIA_TITLE, ATTR_MEDIA_DURATION,
ATTR_MEDIA_VOLUME, MEDIA_STATE_PLAYING, MEDIA_STATE_STOPPED,
YOUTUBE_COVER_URL_FORMAT)
from homeassistant.const import ATTR_ENTITY_PICTURE
# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices, discovery_info=None):
""" Sets up the cast platform. """
add_devices([
DemoMediaPlayer(
'Living Room', 'eyU3bRy2x44',
'♥♥ The Best Fireplace Video (3 hours)'),
DemoMediaPlayer('Bedroom', 'kxopViU98Xo', 'Epic sax guy 10 hours')
])
class DemoMediaPlayer(MediaPlayerDevice):
""" A Demo media player that only supports YouTube. """
def __init__(self, name, youtube_id=None, media_title=None):
self._name = name
self.is_playing = youtube_id is not None
self.youtube_id = youtube_id
self.media_title = media_title
self.volume = 1.0
@property
def name(self):
""" Returns the name of the device. """
return self._name
@property
def state(self):
""" Returns the state of the device. """
return STATE_NO_APP if self.youtube_id is None else "YouTube"
@property
def state_attributes(self):
""" Returns the state attributes. """
if self.youtube_id is None:
return
state_attr = {
ATTR_MEDIA_CONTENT_ID: self.youtube_id,
ATTR_MEDIA_TITLE: self.media_title,
ATTR_MEDIA_DURATION: 100,
ATTR_MEDIA_VOLUME: self.volume,
ATTR_ENTITY_PICTURE:
YOUTUBE_COVER_URL_FORMAT.format(self.youtube_id)
}
if self.is_playing:
state_attr[ATTR_MEDIA_STATE] = MEDIA_STATE_PLAYING
else:
state_attr[ATTR_MEDIA_STATE] = MEDIA_STATE_STOPPED
return state_attr
def turn_off(self):
""" turn_off media player. """
self.youtube_id = None
self.is_playing = False
def volume_up(self):
""" volume_up media player. """
if self.volume < 1:
self.volume += 0.1
def volume_down(self):
""" volume_down media player. """
if self.volume > 0:
self.volume -= 0.1
def media_play_pause(self):
""" media_play_pause media player. """
self.is_playing = not self.is_playing
def media_play(self):
""" media_play media player. """
self.is_playing = True
def media_pause(self):
""" media_pause media player. """
self.is_playing = False
def play_youtube(self, media_id):
""" Plays a YouTube media. """
self.youtube_id = media_id
self.media_title = 'Demo media title'
self.is_playing = True

View File

@ -2,13 +2,15 @@
Provides helpers for components that handle devices.
"""
from homeassistant.loader import get_component
from homeassistant.helpers import generate_entity_id, config_per_platform
from homeassistant.helpers import (
generate_entity_id, config_per_platform, extract_entity_ids)
from homeassistant.components import group, discovery
from homeassistant.const import ATTR_ENTITY_ID
class DeviceComponent(object):
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-arguments,too-few-public-methods
# pylint: disable=too-many-arguments
"""
Helper class that will help a device component manage its devices.
"""
@ -52,6 +54,18 @@ class DeviceComponent(object):
discovery.listen(self.hass, self.discovery_platforms.keys(),
self._device_discovered)
def extract_from_service(self, service):
"""
Takes a service and extracts all known devices.
Will return all if no entity IDs given in service.
"""
if ATTR_ENTITY_ID not in service.data:
return self.devices.values()
else:
return [self.devices[entity_id] for entity_id
in extract_entity_ids(self.hass, service)
if entity_id in self.devices]
def _update_device_states(self, now):
""" Update the states of all the lights. """
self.logger.info("Updating %s states", self.domain)

View File

@ -1,8 +1,8 @@
"""
tests.test_component_chromecast
tests.test_component_media_player
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Tests Chromecast component.
Tests media_player component.
"""
# pylint: disable=too-many-public-methods,protected-access
import logging
@ -12,28 +12,27 @@ import homeassistant as ha
from homeassistant.const import (
SERVICE_TURN_OFF, SERVICE_VOLUME_UP, SERVICE_VOLUME_DOWN,
SERVICE_MEDIA_PLAY_PAUSE, SERVICE_MEDIA_PLAY, SERVICE_MEDIA_PAUSE,
SERVICE_MEDIA_NEXT_TRACK, SERVICE_MEDIA_PREV_TRACK, ATTR_ENTITY_ID,
CONF_HOSTS)
import homeassistant.components.chromecast as chromecast
SERVICE_MEDIA_NEXT_TRACK, SERVICE_MEDIA_PREV_TRACK, ATTR_ENTITY_ID)
import homeassistant.components.media_player as media_player
from helpers import mock_service
def setUpModule(): # pylint: disable=invalid-name
""" Setup to ignore chromecast errors. """
""" Setup to ignore media_player errors. """
logging.disable(logging.CRITICAL)
class TestChromecast(unittest.TestCase):
""" Test the chromecast module. """
class TestMediaPlayer(unittest.TestCase):
""" Test the media_player module. """
def setUp(self): # pylint: disable=invalid-name
self.hass = ha.HomeAssistant()
self.test_entity = chromecast.ENTITY_ID_FORMAT.format('living_room')
self.hass.states.set(self.test_entity, chromecast.STATE_NO_APP)
self.test_entity = media_player.ENTITY_ID_FORMAT.format('living_room')
self.hass.states.set(self.test_entity, media_player.STATE_NO_APP)
self.test_entity2 = chromecast.ENTITY_ID_FORMAT.format('bedroom')
self.hass.states.set(self.test_entity2, "Youtube")
self.test_entity2 = media_player.ENTITY_ID_FORMAT.format('bedroom')
self.hass.states.set(self.test_entity2, "YouTube")
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
@ -41,33 +40,33 @@ class TestChromecast(unittest.TestCase):
def test_is_on(self):
""" Test is_on method. """
self.assertFalse(chromecast.is_on(self.hass, self.test_entity))
self.assertTrue(chromecast.is_on(self.hass, self.test_entity2))
self.assertFalse(media_player.is_on(self.hass, self.test_entity))
self.assertTrue(media_player.is_on(self.hass, self.test_entity2))
def test_services(self):
"""
Test if the call service methods conver to correct service calls.
"""
services = {
SERVICE_TURN_OFF: chromecast.turn_off,
SERVICE_VOLUME_UP: chromecast.volume_up,
SERVICE_VOLUME_DOWN: chromecast.volume_down,
SERVICE_MEDIA_PLAY_PAUSE: chromecast.media_play_pause,
SERVICE_MEDIA_PLAY: chromecast.media_play,
SERVICE_MEDIA_PAUSE: chromecast.media_pause,
SERVICE_MEDIA_NEXT_TRACK: chromecast.media_next_track,
SERVICE_MEDIA_PREV_TRACK: chromecast.media_prev_track
SERVICE_TURN_OFF: media_player.turn_off,
SERVICE_VOLUME_UP: media_player.volume_up,
SERVICE_VOLUME_DOWN: media_player.volume_down,
SERVICE_MEDIA_PLAY_PAUSE: media_player.media_play_pause,
SERVICE_MEDIA_PLAY: media_player.media_play,
SERVICE_MEDIA_PAUSE: media_player.media_pause,
SERVICE_MEDIA_NEXT_TRACK: media_player.media_next_track,
SERVICE_MEDIA_PREV_TRACK: media_player.media_prev_track
}
for service_name, service_method in services.items():
calls = mock_service(self.hass, chromecast.DOMAIN, service_name)
calls = mock_service(self.hass, media_player.DOMAIN, service_name)
service_method(self.hass)
self.hass.pool.block_till_done()
self.assertEqual(1, len(calls))
call = calls[-1]
self.assertEqual(chromecast.DOMAIN, call.domain)
self.assertEqual(media_player.DOMAIN, call.domain)
self.assertEqual(service_name, call.service)
service_method(self.hass, self.test_entity)
@ -75,7 +74,7 @@ class TestChromecast(unittest.TestCase):
self.assertEqual(2, len(calls))
call = calls[-1]
self.assertEqual(chromecast.DOMAIN, call.domain)
self.assertEqual(media_player.DOMAIN, call.domain)
self.assertEqual(service_name, call.service)
self.assertEqual(self.test_entity,
call.data.get(ATTR_ENTITY_ID))