streamlink/tests/test_api_websocket.py

231 lines
8.7 KiB
Python

from unittest.mock import ANY, Mock, call, patch
import pytest
from websocket import ABNF, STATUS_NORMAL # type: ignore[import]
from streamlink.logger import DEBUG, TRACE
from streamlink.plugin.api.useragents import FIREFOX
from streamlink.plugin.api.websocket import WebsocketClient
from streamlink.session import Streamlink
from tests.testutils.handshake import Handshake
@pytest.mark.parametrize(("name", "value"), [
("OPCODE_CONT", ABNF.OPCODE_CONT),
("OPCODE_TEXT", ABNF.OPCODE_TEXT),
("OPCODE_BINARY", ABNF.OPCODE_BINARY),
("OPCODE_CLOSE", ABNF.OPCODE_CLOSE),
("OPCODE_PING", ABNF.OPCODE_PING),
("OPCODE_PONG", ABNF.OPCODE_PONG),
])
def test_opcode_export(name, value):
assert getattr(WebsocketClient, name) == value
class FakeWebsocketClient(WebsocketClient):
ws: Mock
class TestWebsocketClient:
@pytest.fixture()
def websocketapp(self):
with patch("streamlink.plugin.api.websocket.WebSocketApp") as mock_websocketapp:
yield mock_websocketapp
@pytest.fixture()
def client(self, request: pytest.FixtureRequest, session: Streamlink, websocketapp: Mock):
with patch("streamlink.plugin.api.websocket.certify_where", side_effect=Mock(return_value="/path/to/cacert.pem")):
yield WebsocketClient(session, "wss://localhost:0", **getattr(request, "param", {}))
@pytest.mark.parametrize(("level", "expected"), [
pytest.param(DEBUG, [], id="debug"),
pytest.param(TRACE, [call(True, handler=ANY)], id="trace"),
])
def test_log(
self,
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
session: Streamlink,
level: int,
expected: bool,
):
caplog.set_level(level, "streamlink")
mock_enable_trace = Mock()
monkeypatch.setattr("streamlink.plugin.api.websocket.enableTrace", mock_enable_trace)
WebsocketClient(session, "wss://localhost:0")
assert mock_enable_trace.call_args_list == expected
@pytest.mark.parametrize(("client", "expected"), [
pytest.param({}, FIREFOX, id="default"),
pytest.param({"header": ["User-Agent: foo"]}, "foo", id="header list"),
pytest.param({"header": {"User-Agent": "bar"}}, "bar", id="header dict"),
], indirect=["client"])
def test_user_agent(self, client: FakeWebsocketClient, websocketapp: Mock, expected: str):
assert [arg[1].get("header", []) for arg in websocketapp.call_args_list] == [[f"User-Agent: {expected}"]]
@pytest.mark.parametrize(("session", "client"), [
(
{
"http-proxy": "https://username:password@hostname:1234",
},
{
"subprotocols": ["sub1", "sub2"],
"cookie": "cookie",
"sockopt": ("sockopt1", "sockopt2"),
"sslopt": {"ssloptkey": "ssloptval"},
"host": "customhost",
"origin": "customorigin",
"suppress_origin": True,
"ping_interval": 30,
"ping_timeout": 4,
"ping_payload": "ping",
},
),
], indirect=["session", "client"])
def test_args_and_proxy(self, session: Streamlink, client: FakeWebsocketClient, websocketapp: Mock):
assert websocketapp.call_args_list == [
call(
url="wss://localhost:0",
subprotocols=["sub1", "sub2"],
cookie="cookie",
header=ANY,
on_open=ANY,
on_error=ANY,
on_close=ANY,
on_ping=ANY,
on_pong=ANY,
on_message=ANY,
on_cont_message=ANY,
on_data=ANY,
),
]
with patch.object(client.ws, "run_forever") as mock_ws_run_forever:
client.start()
client.join(1)
assert not client.is_alive()
assert mock_ws_run_forever.call_args_list == [
call(
sockopt=("sockopt1", "sockopt2"),
sslopt={
"ssloptkey": "ssloptval",
"ca_certs": "/path/to/cacert.pem",
},
host="customhost",
origin="customorigin",
suppress_origin=True,
ping_interval=30,
ping_timeout=4,
ping_payload="ping",
proxy_type="https",
http_proxy_host="hostname",
http_proxy_port=1234,
http_proxy_auth=("username", "password"),
),
]
def test_handlers(self, session: Streamlink):
client = WebsocketClient(session, "wss://localhost:0")
assert client.ws.on_open == client.on_open
assert client.ws.on_error == client.on_error
assert client.ws.on_close == client.on_close
assert client.ws.on_ping == client.on_ping
assert client.ws.on_pong == client.on_pong
assert client.ws.on_message == client.on_message
assert client.ws.on_cont_message == client.on_cont_message
assert client.ws.on_data == client.on_data
def test_send(self, client: FakeWebsocketClient):
with patch.object(client, "ws") as mock_ws:
client.send("foo")
client.send(b"foo", ABNF.OPCODE_BINARY)
client.send_json({"foo": "bar", "baz": "qux"})
assert mock_ws.send.call_args_list == [
call("foo", ABNF.OPCODE_TEXT),
call(b"foo", ABNF.OPCODE_BINARY),
call("{\"foo\":\"bar\",\"baz\":\"qux\"}", ABNF.OPCODE_TEXT),
]
def test_close(self, session: Streamlink):
handshake = Handshake()
class WebsocketClientSubclass(FakeWebsocketClient):
def run(self):
with handshake():
pass
client = WebsocketClientSubclass(session, "wss://localhost:0")
with patch.object(client.ws, "close") as mock_ws_close:
mock_ws_close.side_effect = lambda *_, **__: handshake.go()
client.start()
client.close(reason="foo")
assert handshake.wait_done(1)
assert not client.is_alive()
assert mock_ws_close.call_args_list == [call(status=STATUS_NORMAL, reason=b"foo", timeout=3)]
def test_close_self(self, session: Streamlink):
handshake = Handshake()
class WebsocketClientSubclass(FakeWebsocketClient):
def run(self):
with handshake(Exception):
self.close(reason=b"bar")
client = WebsocketClientSubclass(session, "wss://localhost:0")
client.start()
assert handshake.step(1)
client.join(timeout=4)
assert not client.is_alive()
assert handshake._context.error is None, "Doesn't join current thread"
def test_reconnect_disconnected(self, client: FakeWebsocketClient, websocketapp: Mock):
handshake = Handshake()
# noinspection PyUnusedLocal
def mock_run_forever(**data):
client.ws.keep_running = False
with handshake():
pass
client.ws.keep_running = True
client.ws.run_forever.side_effect = mock_run_forever
client.start()
assert handshake.step(1), "Enters run_forever loop on ws client thread"
assert websocketapp.call_count == 1
client.reconnect()
assert websocketapp.call_count == 1, "Doesn't reconnect if disconnected"
client.join(timeout=4)
assert not client.is_alive()
def test_reconnect_once(self, client: FakeWebsocketClient, websocketapp: Mock):
handshake = Handshake()
# noinspection PyUnusedLocal
def mock_run_forever(**data):
with handshake():
pass
client.ws.keep_running = True
client.ws.run_forever.side_effect = mock_run_forever
client.start()
assert client.ws.close.call_count == 0
assert websocketapp.call_count == 1, "Creates initial connection"
assert not client._reconnect, "Has not set the _reconnect state"
assert handshake.wait_ready(1), "Enters run_forever loop on client thread"
client.reconnect()
assert client.ws.close.call_count == 1
assert websocketapp.call_count == 2, "Creates new connection"
assert client._reconnect, "Has set the _reconnect state"
assert handshake.step(1)
assert handshake.wait_ready(1), "Enters run_forever loop on client thread again"
assert not client._reconnect, "Has reset the _reconnect state"
assert handshake.step(1)
assert not handshake.wait_ready(0), "Doesn't enter run_forever loop on client thread again"
client.join(1)
assert not client.is_alive()
assert websocketapp.call_count == 2, "Connection has ended regularly"