1
mirror of https://github.com/home-assistant/core synced 2024-08-06 09:34:49 +02:00

Move key sequences to bridge in SamsungTV (#67762)

Co-authored-by: epenet <epenet@users.noreply.github.com>
This commit is contained in:
epenet 2022-03-08 07:46:34 +01:00 committed by GitHub
parent d302b0d14e
commit 1793c29fac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 56 additions and 54 deletions

View File

@ -45,6 +45,8 @@ from .const import (
WEBSOCKET_PORTS,
)
KEY_PRESS_TIMEOUT = 1.2
def mac_from_device_info(info: dict[str, Any]) -> str | None:
"""Extract the mac address from the device info."""
@ -129,8 +131,8 @@ class SamsungTVBridge(ABC):
"""Tells if the TV is on."""
@abstractmethod
async def async_send_key(self, key: str) -> None:
"""Send a key to the tv and handles exceptions."""
async def async_send_keys(self, keys: list[str]) -> None:
"""Send a list of keys to the tv."""
@abstractmethod
async def async_close_remote(self) -> None:
@ -238,12 +240,18 @@ class SamsungTVLegacyBridge(SamsungTVBridge):
pass
return self._remote
async def async_send_key(self, key: str) -> None:
"""Send the key using legacy protocol."""
await self.hass.async_add_executor_job(self._send_key, key)
async def async_send_keys(self, keys: list[str]) -> None:
"""Send a list of keys using legacy protocol."""
first_key = True
for key in keys:
if first_key:
first_key = False
else:
await asyncio.sleep(KEY_PRESS_TIMEOUT)
await self.hass.async_add_executor_job(self._send_key, key)
def _send_key(self, key: str) -> None:
"""Send the key using legacy protocol."""
"""Send a key using legacy protocol."""
try:
# recreate connection if connection was dead
retry_count = 1
@ -391,15 +399,18 @@ class SamsungTVWSBridge(SamsungTVBridge):
async def async_launch_app(self, app_id: str) -> None:
"""Send the launch_app command using websocket protocol."""
await self._async_send_command(ChannelEmitCommand.launch_app(app_id))
await self._async_send_commands([ChannelEmitCommand.launch_app(app_id)])
async def async_send_key(self, key: str) -> None:
"""Send the key using websocket protocol."""
if key == "KEY_POWEROFF":
key = "KEY_POWER"
await self._async_send_command(SendRemoteKey.click(key))
async def async_send_keys(self, keys: list[str]) -> None:
"""Send a list of keys using websocket protocol."""
commands: list[SamsungTVCommand] = []
for key in keys:
if key == "KEY_POWEROFF":
key = "KEY_POWER"
commands.append(SendRemoteKey.click(key))
await self._async_send_commands(commands)
async def _async_send_command(self, command: SamsungTVCommand) -> None:
async def _async_send_commands(self, commands: list[SamsungTVCommand]) -> None:
"""Send the commands using websocket protocol."""
try:
# recreate connection if connection was dead
@ -407,7 +418,8 @@ class SamsungTVWSBridge(SamsungTVBridge):
for _ in range(retry_count + 1):
try:
if remote := await self._async_get_remote():
await remote.send_command(command)
for command in commands:
await remote.send_command(command)
break
except (
BrokenPipeError,

View File

@ -1,7 +1,6 @@
"""Support for interface with an Samsung TV."""
from __future__ import annotations
import asyncio
from datetime import datetime, timedelta
from typing import Any
@ -47,7 +46,6 @@ from .const import (
LOGGER,
)
KEY_PRESS_TIMEOUT = 1.2
SOURCES = {"TV": "KEY_TV", "HDMI": "KEY_HDMI"}
SUPPORT_SAMSUNGTV = (
@ -181,12 +179,13 @@ class SamsungTVDevice(MediaPlayerEntity):
assert isinstance(self._bridge, SamsungTVWSBridge)
await self._bridge.async_launch_app(app_id)
async def _async_send_key(self, key: str) -> None:
async def _async_send_keys(self, keys: list[str]) -> None:
"""Send a key to the tv and handles exceptions."""
if self._power_off_in_progress() and key != "KEY_POWEROFF":
LOGGER.info("TV is powering off, not sending key: %s", key)
assert keys
if self._power_off_in_progress() and keys[0] != "KEY_POWEROFF":
LOGGER.info("TV is powering off, not sending keys: %s", keys)
return
await self._bridge.async_send_key(key)
await self._bridge.async_send_keys(keys)
def _power_off_in_progress(self) -> bool:
return (
@ -210,21 +209,21 @@ class SamsungTVDevice(MediaPlayerEntity):
"""Turn off media player."""
self._end_of_power_off = dt_util.utcnow() + SCAN_INTERVAL_PLUS_OFF_TIME
await self._async_send_key("KEY_POWEROFF")
await self._async_send_keys(["KEY_POWEROFF"])
# Force closing of remote session to provide instant UI feedback
await self._bridge.async_close_remote()
async def async_volume_up(self) -> None:
"""Volume up the media player."""
await self._async_send_key("KEY_VOLUP")
await self._async_send_keys(["KEY_VOLUP"])
async def async_volume_down(self) -> None:
"""Volume down media player."""
await self._async_send_key("KEY_VOLDOWN")
await self._async_send_keys(["KEY_VOLDOWN"])
async def async_mute_volume(self, mute: bool) -> None:
"""Send mute command."""
await self._async_send_key("KEY_MUTE")
await self._async_send_keys(["KEY_MUTE"])
async def async_media_play_pause(self) -> None:
"""Simulate play pause media player."""
@ -236,20 +235,20 @@ class SamsungTVDevice(MediaPlayerEntity):
async def async_media_play(self) -> None:
"""Send play command."""
self._playing = True
await self._async_send_key("KEY_PLAY")
await self._async_send_keys(["KEY_PLAY"])
async def async_media_pause(self) -> None:
"""Send media pause command to media player."""
self._playing = False
await self._async_send_key("KEY_PAUSE")
await self._async_send_keys(["KEY_PAUSE"])
async def async_media_next_track(self) -> None:
"""Send next track command."""
await self._async_send_key("KEY_CHUP")
await self._async_send_keys(["KEY_CHUP"])
async def async_media_previous_track(self) -> None:
"""Send the previous track command."""
await self._async_send_key("KEY_CHDOWN")
await self._async_send_keys(["KEY_CHDOWN"])
async def async_play_media(
self, media_type: str, media_id: str, **kwargs: Any
@ -270,10 +269,9 @@ class SamsungTVDevice(MediaPlayerEntity):
LOGGER.error("Media ID must be positive integer")
return
for digit in media_id:
await self._async_send_key(f"KEY_{digit}")
await asyncio.sleep(KEY_PRESS_TIMEOUT)
await self._async_send_key("KEY_ENTER")
await self._async_send_keys(
keys=[f"KEY_{digit}" for digit in media_id] + ["KEY_ENTER"]
)
def _wake_on_lan(self) -> None:
"""Wake the device via wake on lan."""
@ -296,7 +294,7 @@ class SamsungTVDevice(MediaPlayerEntity):
return
if source in SOURCES:
await self._async_send_key(SOURCES[source])
await self._async_send_keys([SOURCES[source]])
return
LOGGER.error("Unsupported source")

View File

@ -1,5 +1,4 @@
"""Tests for samsungtv component."""
import asyncio
from copy import deepcopy
from datetime import datetime, timedelta
import logging
@ -650,7 +649,7 @@ async def test_turn_off_websocket(
assert await hass.services.async_call(
DOMAIN, SERVICE_VOLUME_UP, {ATTR_ENTITY_ID: ENTITY_ID}, True
)
assert "TV is powering off, not sending key: KEY_VOLUP" in caplog.text
assert "TV is powering off, not sending keys: ['KEY_VOLUP']" in caplog.text
assert await hass.services.async_call(
DOMAIN,
SERVICE_SELECT_SOURCE,
@ -853,15 +852,8 @@ async def test_turn_on_without_turnon(hass: HomeAssistant, remote: Mock) -> None
async def test_play_media(hass: HomeAssistant, remote: Mock) -> None:
"""Test for play_media."""
asyncio_sleep = asyncio.sleep
sleeps = []
async def sleep(duration):
sleeps.append(duration)
await asyncio_sleep(0)
await setup_samsungtv(hass, MOCK_CONFIG)
with patch("asyncio.sleep", new=sleep):
with patch("homeassistant.components.samsungtv.bridge.asyncio.sleep") as sleep:
assert await hass.services.async_call(
DOMAIN,
SERVICE_PLAY_MEDIA,
@ -872,17 +864,17 @@ async def test_play_media(hass: HomeAssistant, remote: Mock) -> None:
},
True,
)
# keys and update called
assert remote.control.call_count == 4
assert remote.control.call_args_list == [
call("KEY_5"),
call("KEY_7"),
call("KEY_6"),
call("KEY_ENTER"),
]
assert remote.close.call_count == 1
assert remote.close.call_args_list == [call()]
assert len(sleeps) == 3
# keys and update called
assert remote.control.call_count == 4
assert remote.control.call_args_list == [
call("KEY_5"),
call("KEY_7"),
call("KEY_6"),
call("KEY_ENTER"),
]
assert remote.close.call_count == 1
assert remote.close.call_args_list == [call()]
assert sleep.call_count == 3
async def test_play_media_invalid_type(hass: HomeAssistant) -> None: