webbrowser.cdp: implement CDP client

This commit is contained in:
bastimeyer 2023-07-02 18:13:57 +02:00 committed by Sebastian Meyer
parent 20c8b6be54
commit 60459461fd
8 changed files with 1507 additions and 0 deletions

View File

@ -9,4 +9,5 @@ This is an incomplete reference of the relevant Streamlink APIs.
api/session
api/plugin
api/stream
api/webbrowser
api/exceptions

View File

@ -7,3 +7,5 @@ Exceptions
.. autoexception:: streamlink.exceptions.NoPluginError
.. autoexception:: streamlink.exceptions.NoStreamsError
.. autoexception:: streamlink.exceptions.StreamError
.. autoexception:: streamlink.webbrowser.exceptions.WebbrowserError
.. autoexception:: streamlink.webbrowser.cdp.exceptions.CDPError

16
docs/api/webbrowser.rst Normal file
View File

@ -0,0 +1,16 @@
Webbrowser
----------
.. warning::
The APIs of the ``streamlink.webbrowser`` package are considered unstable. Use at your own risk!
.. module:: streamlink.webbrowser
.. autoclass:: streamlink.webbrowser.cdp.client.CDPClient
.. autoclass:: streamlink.webbrowser.cdp.client.CDPClientSession
.. autoclass:: streamlink.webbrowser.cdp.client.CMRequestProxy
.. autoclass:: streamlink.webbrowser.cdp.connection.CDPBase
.. autoclass:: streamlink.webbrowser.cdp.connection.CDPConnection
.. autoclass:: streamlink.webbrowser.webbrowser.Webbrowser
.. autoclass:: streamlink.webbrowser.chromium.ChromiumWebbrowser

View File

@ -249,6 +249,13 @@ class Streamlink:
"ffmpeg-audio-transcode": None,
"ffmpeg-copyts": False,
"ffmpeg-start-at-zero": False,
"webbrowser": True,
"webbrowser-executable": None,
"webbrowser-timeout": 20.0,
"webbrowser-cdp-host": None,
"webbrowser-cdp-port": None,
"webbrowser-cdp-timeout": 2.0,
"webbrowser-headless": True,
})
if options:
self.options.update(options)
@ -488,6 +495,34 @@ class Streamlink:
- ``bool``
- ``False``
- When ``ffmpeg-copyts`` is ``True``, shift timestamps to zero
* - webbrowser
- ``bool``
- ``True``
- Enable or disable support for Streamlink's webbrowser API
* - webbrowser-executable
- ``str | None``
- ``None``
- Path to the web browser's executable
* - webbrowser-timeout
- ``float``
- ``20.0``
- The maximum amount of time which the webbrowser can take to launch and execute
* - webbrowser-cdp-host
- ``str | None``
- ``None``
- Custom host for the Chrome Devtools Protocol (CDP) interface
* - webbrowser-cdp-port
- ``int | None``
- ``None``
- Custom port for the Chrome Devtools Protocol (CDP) interface
* - webbrowser-cdp-timeout
- ``float``
- ``2.0``
- The maximum amount of time for waiting on a single CDP command response
* - webbrowser-headless
- ``bool``
- ``True``
- Whether to launch the webbrowser in headless mode or not
"""
self.options.set(key, value)

View File

@ -1,2 +1,3 @@
from streamlink.webbrowser.cdp.client import CDPClient, CDPClientSession
from streamlink.webbrowser.cdp.connection import CDPConnection, CDPSession
from streamlink.webbrowser.cdp.exceptions import CDPError

View File

@ -0,0 +1,424 @@
import base64
import re
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import Any, AsyncGenerator, Awaitable, Callable, Coroutine, List, Mapping, Optional, Set
import trio
from streamlink.session import Streamlink
from streamlink.webbrowser.cdp.connection import CDPConnection, CDPSession
from streamlink.webbrowser.cdp.devtools import fetch, network, page, runtime, target
from streamlink.webbrowser.cdp.exceptions import CDPError
from streamlink.webbrowser.chromium import ChromiumWebbrowser
try:
from typing import Self, TypeAlias # type: ignore[attr-defined]
except ImportError: # pragma: no cover
from typing_extensions import Self, TypeAlias
TRequestHandlerCallable: TypeAlias = Callable[["CDPClientSession", fetch.RequestPaused], Awaitable]
_re_url_pattern_wildcard = re.compile(r"(.+?)?(\\+)?([*?])")
@dataclass
class RequestPausedHandler:
async_handler: TRequestHandlerCallable
url_pattern: str = "*"
on_request: bool = False
def __post_init__(self) -> None:
self._re_url: re.Pattern = self._url_pattern_to_regex_pattern(self.url_pattern)
def matches(self, request: fetch.RequestPaused) -> bool:
on_request: bool = request.response_status_code is None and request.response_error_reason is None
return on_request is self.on_request and self._re_url.match(request.request.url) is not None
@staticmethod
def _url_pattern_to_regex_pattern(url_pattern: str) -> re.Pattern:
pos = 0
regex = ""
for match in _re_url_pattern_wildcard.finditer(url_pattern):
regex += re.escape(match[1]) if match[1] else ""
if match[2]:
if len(match[2]) % 2:
regex += f"{re.escape(match[2][:-1])}\\{match[3]}"
else:
regex += re.escape(match[2])
regex += ".+" if match[3] == "*" else "."
else:
regex += ".+" if match[3] == "*" else "."
pos = match.end()
regex += re.escape(url_pattern[pos:])
return re.compile(f"^{regex}$")
@dataclass
class CMRequestProxy:
body: str
response_code: int
response_headers: Optional[Mapping[str, str]]
class CDPClient:
"""
The public interface around :class:`ChromiumWebbrowser <streamlink.webbrowser.chromium.ChromiumWebbrowser>`
and :class:`CDPConnection <streamlink.webbrowser.cdp.connection.CDPConnection>`.
It launches the Chromium-based web browser, establishes the remote debugging WebSocket connection using
the `Chrome Devtools Protocol <https://chromedevtools.github.io/devtools-protocol/>`_, and provides
the :meth:`session()` method for creating a new :class:`CDPClientSession` that is tied to an empty new browser tab.
:class:`CDPClientSession` provides a high-level API for navigating websites, intercepting network requests and responses,
as well as evaluating JavaScript expressions and retrieving async results.
Don't instantiate this class yourself, use the :meth:`CDPClient.launch()` async context manager classmethod.
For low-level Chrome Devtools Protocol interfaces, please see Streamlink's automatically generated
``streamlink.webbrowser.cdp.devtools`` package, but be aware that only a subset of the available domains is supported.
"""
def __init__(self, cdp_connection: CDPConnection, nursery: trio.Nursery):
self.cdp_connection = cdp_connection
self.nursery = nursery
@classmethod
def launch(
cls,
session: Streamlink,
runner: Callable[[Self], Coroutine],
executable: Optional[str] = None,
timeout: Optional[float] = None,
cdp_host: Optional[str] = None,
cdp_port: Optional[int] = None,
cdp_timeout: Optional[float] = None,
headless: Optional[bool] = None,
) -> Any:
"""
Start a new :mod:`trio` runloop and do the following things:
1. Launch the Chromium-based web browser using the provided parameters or respective session options
2. Initialize a new :class:`CDPConnection <streamlink.webbrowser.cdp.connection.CDPConnection>`
and connect to the browser's remote debugging interface
3. Create a new :class:`CDPClient` instance
4. Execute the async runner callback with the :class:`CDPClient` instance as only argument
If the ``webbrowser`` session option is set to ``False``, then a :exc:`CDPError` will be raised.
Example:
.. code-block:: python
async def fake_response(client_session: CDPClientSession, request: devtools.fetch.RequestPaused):
if request.response_status_code is not None and 300 <= request.response_status_code < 400:
await client_session.continue_request(request)
else:
async with client_session.alter_request(request) as cmproxy:
cmproxy.body = "<!doctype html><html><body>foo</body></html>"
async def my_app_logic(client: CDPClient):
async with client.session() as client_session:
client_session.add_request_handler(fake_response, "*")
async with client_session.navigate("https://google.com") as frame_id:
await client_session.loaded(frame_id)
return await client_session.evaluate("document.body.innerText")
assert CDPClient.launch(session, my_app_logic) == "foo"
:param session: The Streamlink session object
:param runner: An async client callback function which receives the :class:`CDPClient` instance as only parameter.
:param executable: Optional path to the Chromium-based web browser executable.
If unset, falls back to the ``webbrowser-executable`` session option.
Otherwise, it'll be looked up according to the rules of the :class:`ChromiumBrowser` implementation.
:param timeout: Optional global timeout value, including web browser launch time.
If unset, falls back to the ``webbrowser-timeout`` session option.
:param cdp_host: Optional remote debugging host.
If unset, falls back to the ``webbrowser-cdp-host`` session option.
Otherwise, ``127.0.0.1`` will be used.
:param cdp_port: Optional remote debugging port.
If unset, falls back to the ``webbrowser-cdp-port`` session option.
Otherwise, a random free port will be chosen.
:param cdp_timeout: Optional CDP command timeout value.
If unset, falls back to the ``webbrowser-cdp-timeout`` session option.
:param headless: Optional boolean flag whether to launch the web browser in headless mode or not.
If unset, falls back to the ``webbrowser-headless`` session option.
"""
if not session.get_option("webbrowser"):
raise CDPError("The webbrowser API has been disabled by the user")
async def run_wrapper() -> Any:
async with cls.run(
session=session,
executable=session.get_option("webbrowser-executable") if executable is None else executable,
timeout=session.get_option("webbrowser-timeout") if timeout is None else timeout,
cdp_host=session.get_option("webbrowser-cdp-host") if cdp_host is None else cdp_host,
cdp_port=session.get_option("webbrowser-cdp-port") if cdp_port is None else cdp_port,
cdp_timeout=session.get_option("webbrowser-cdp-timeout") if cdp_timeout is None else cdp_timeout,
headless=session.get_option("webbrowser-headless") if headless is None else headless,
) as cdp_client:
return await runner(cdp_client)
return trio.run(run_wrapper)
@classmethod
@asynccontextmanager
async def run(
cls,
session: Streamlink,
executable: Optional[str] = None,
timeout: Optional[float] = None,
cdp_host: Optional[str] = None,
cdp_port: Optional[int] = None,
cdp_timeout: Optional[float] = None,
headless: bool = True,
) -> AsyncGenerator[Self, None]:
webbrowser = ChromiumWebbrowser(executable=executable, host=cdp_host, port=cdp_port, headless=headless)
nursery: trio.Nursery
async with webbrowser.launch(timeout=timeout) as nursery:
websocket_url = webbrowser.get_websocket_url(session)
cdp_connection: CDPConnection
async with CDPConnection.create(websocket_url, timeout=cdp_timeout) as cdp_connection:
yield cls(cdp_connection, nursery)
@asynccontextmanager
async def session(self, fail_unhandled_requests: bool = False) -> AsyncGenerator["CDPClientSession", None]:
"""
Create a new CDP session on an empty target (browser tab).
:param fail_unhandled_requests: Whether network requests which are not matched by any request handlers should fail.
"""
cdp_session = await self.cdp_connection.new_target()
yield CDPClientSession(self, cdp_session, fail_unhandled_requests)
class CDPClientSession:
"""
High-level API for navigating websites, intercepting network requests/responses,
and for evaluating async JavaScript expressions.
Don't instantiate this class yourself, use the :meth:`CDPClient.session()` async contextmanager.
"""
def __init__(
self,
cdp_client: CDPClient,
cdp_session: CDPSession,
fail_unhandled_requests: bool = False,
):
self.cdp_client = cdp_client
self.cdp_session = cdp_session
self._fail_unhandled = fail_unhandled_requests
self._request_handlers: List[RequestPausedHandler] = []
self._requests_handled: Set[str] = set()
def add_request_handler(
self,
async_handler: TRequestHandlerCallable,
url_pattern: str = "*",
on_request: bool = False,
):
"""
:param async_handler: An async request handler which must call :meth:`continue_request()`, :meth:`fail_request()`,
:meth:`fulfill_request()` or :meth:`alter_request()`, or the next matching request handler
will be run. If no matching request handler was found or if no matching one called one of
the just mentioned methods, then the request will be continued if the session was initialized
with ``fail_unhandled_requests=False``, otherwise it will be blocked.
:param url_pattern: An optional URL wildcard string which defaults to ``"*"``. Only matching URLs will cause
``Fetch.requestPraused`` events to be emitted over the CDP connection.
The async request handler will be called on each matching URL unless another request handler
has already handled the request (see description above).
:param on_request: Whether to intercept the network request or the network response.
"""
self._request_handlers.append(
RequestPausedHandler(async_handler=async_handler, url_pattern=url_pattern, on_request=on_request),
)
@asynccontextmanager
async def navigate(self, url: str, referrer: Optional[str] = None) -> AsyncGenerator[page.FrameId, None]:
"""
Async context manager for opening the URL with an optional referrer and starting the optional interception
of network requests and responses.
If the target gets detached from the session, e.g. by closing the tab, then the whole CDP connection gets terminated,
including all other concurrent sessions.
Doesn't wait for the request to finish loading. See :meth:`loaded()`.
:param url: The URL.
:param referrer: An optional referrer.
:return: Yields the ``FrameID`` that can be passed to the :meth:`loaded()` call.
"""
request_patterns = [
fetch.RequestPattern(
url_pattern=url_pattern,
request_stage=fetch.RequestStage.REQUEST if on_request else fetch.RequestStage.RESPONSE,
)
for url_pattern, on_request in sorted(
{(request_handler.url_pattern, request_handler.on_request) for request_handler in self._request_handlers},
)
]
async with trio.open_nursery() as nursery:
nursery.start_soon(self._on_target_detached_from_target)
if request_patterns:
nursery.start_soon(self._on_fetch_request_paused)
await self.cdp_session.send(fetch.enable(request_patterns, True))
await self.cdp_session.send(page.enable())
try:
frame_id, loader_id, error = await self.cdp_session.send(page.navigate(url=url, referrer=referrer))
if error:
raise CDPError(f"Navigation error: {error}")
yield frame_id
finally:
await self.cdp_session.send(page.disable())
if request_patterns:
await self.cdp_session.send(fetch.disable())
nursery.cancel_scope.cancel()
async def loaded(self, frame_id: page.FrameId):
"""
Wait for the navigated page to finish loading.
"""
async for frame_stopped_loading in self.cdp_session.listen(page.FrameStoppedLoading): # pragma: no branch
if frame_stopped_loading.frame_id == frame_id:
return
async def evaluate(self, expression: str, await_promise: bool = True, timeout: Optional[float] = None) -> Any:
"""
Evaluate an optionally async JavaScript expression and return its result.
:param expression: The JavaScript expression.
:param await_promise: Whether to await a returned :js:class:`Promise` object.
:param timeout: Optional timeout override value. Uses the session's single CDP command timeout value by default,
which may be too short depending on the script execution time.
:raise CDPError: On evaluation error or if the result is a subtype of :js:class:`window.Error`.
:return: Only JS-primitive result values are supported, e.g. strings or numbers.
Other kinds of return values must be serialized, e.g. via :js:meth:`JSON.stringify()`.
"""
evaluate = runtime.evaluate(
expression=expression,
await_promise=await_promise,
)
remote_obj, error = await self.cdp_session.send(evaluate, timeout=timeout)
if error:
raise CDPError(error.exception and error.exception.description or error.text)
if remote_obj.type_ == "object" and remote_obj.subtype == "error":
raise CDPError(remote_obj.description)
return remote_obj.value
async def continue_request(
self,
request: fetch.RequestPaused,
url: Optional[str] = None,
method: Optional[str] = None,
post_data: Optional[str] = None,
headers: Optional[Mapping[str, str]] = None,
):
"""
Continue a request and optionally override the request method, URL, POST data or request headers.
"""
await self.cdp_session.send(fetch.continue_request(
request_id=request.request_id,
url=url,
method=method,
post_data=base64.b64encode(post_data.encode()).decode() if post_data is not None else None,
headers=self._headers_entries_from_mapping(headers),
))
self._requests_handled.add(request.request_id)
async def fail_request(
self,
request: fetch.RequestPaused,
error_reason: Optional[str] = None,
):
"""
Let a request fail, with an optional error reason which defaults to ``BlockedByClient``.
"""
await self.cdp_session.send(fetch.fail_request(
request_id=request.request_id,
error_reason=network.ErrorReason(error_reason or network.ErrorReason.BLOCKED_BY_CLIENT),
))
self._requests_handled.add(request.request_id)
async def fulfill_request(
self,
request: fetch.RequestPaused,
response_code: int = 200,
response_headers: Optional[Mapping[str, str]] = None,
body: Optional[str] = None,
) -> None:
"""
Fulfill a response and override its status code, headers and body.
"""
await self.cdp_session.send(fetch.fulfill_request(
request_id=request.request_id,
response_code=response_code,
response_headers=self._headers_entries_from_mapping(response_headers),
body=base64.b64encode(body.encode()).decode() if body is not None else None,
))
self._requests_handled.add(request.request_id)
@asynccontextmanager
async def alter_request(
self,
request: fetch.RequestPaused,
response_code: int = 200,
response_headers: Optional[Mapping[str, str]] = None,
) -> AsyncGenerator[CMRequestProxy, None]:
"""
Async context manager wrapper around :meth:`fulfill_request()` which retrieves the response body,
so it can be altered. The status code and headers can be altered in the method call directly,
or by setting the respective parameters on the context manager's proxy object.
"""
if request.response_status_code is None:
body = ""
else:
body, b64encoded = await self.cdp_session.send(fetch.get_response_body(request.request_id))
if b64encoded: # pragma: no branch
body = base64.b64decode(body).decode()
proxy = CMRequestProxy(body=body, response_code=response_code, response_headers=response_headers)
yield proxy
await self.fulfill_request(
request=request,
response_code=proxy.response_code,
response_headers=proxy.response_headers,
body=proxy.body,
)
@staticmethod
def _headers_entries_from_mapping(headers: Optional[Mapping[str, str]]):
return None if headers is None else [
fetch.HeaderEntry(name=name, value=value)
for name, value in headers.items()
]
async def _on_target_detached_from_target(self) -> None:
async for detached_from_target in self.cdp_client.cdp_connection.listen(target.DetachedFromTarget):
if detached_from_target.session_id == self.cdp_session.session_id:
raise CDPError("Target has been detached")
async def _on_fetch_request_paused(self) -> None:
async for request in self.cdp_session.listen(fetch.RequestPaused):
for handler in self._request_handlers:
if not handler.matches(request):
continue
await handler.async_handler(self, request)
if request.request_id in self._requests_handled:
break
else:
if self._fail_unhandled:
await self.fail_request(request)
else:
await self.continue_request(request)

View File

@ -1250,6 +1250,84 @@ def build_parser():
""",
)
webbrowser = parser.add_argument_group("Web browser options")
webbrowser.add_argument(
"--webbrowser",
type=boolean,
metavar="{yes,true,1,on,no,false,0,off}",
default=None,
help="""
Enable or disable support for Streamlink's webbrowser API.
Streamlink's webbrowser API allows plugins which implement it to launch a web browser and extract data from websites
which they otherwise couldn't do via the regular HTTP session in Python due to specific JavaScript restrictions.
The web browser is run isolated and in a clean environment without access to regular user data.
Streamlink currently only supports Chromium-based web browsers using the Chrome Devtools Protocol (CDP).
This includes Chromium itself, Google Chrome, Brave, Vivaldi, and others.
Default is true.
""",
)
webbrowser.add_argument(
"--webbrowser-executable",
metavar="PATH",
help="""
Path to the web browser's executable.
By default, it is looked up automatically according to the rules of the used webbrowser API implementation.
This usually involves a list of known executable names and fallback paths on all supported operating systems.
""",
)
webbrowser.add_argument(
"--webbrowser-timeout",
metavar="TIME",
type=num(float, gt=0),
help="""
The maximum amount of time which the web browser can take to launch and execute.
""",
)
webbrowser.add_argument(
"--webbrowser-cdp-host",
metavar="HOST",
help="""
Host for the web browser's inter-process communication interface (CDP specific).
Default is 127.0.0.1.
""",
)
webbrowser.add_argument(
"--webbrowser-cdp-port",
metavar="PORT",
type=num(int, ge=0, le=65535),
help="""
Port for the web browser's inter-process communication interface (CDP specific).
Tries to find a free port by default.
""",
)
webbrowser.add_argument(
"--webbrowser-cdp-timeout",
metavar="TIME",
type=num(float, gt=0),
help="""
The maximum amount of time for waiting on a single CDP command response.
""",
)
webbrowser.add_argument(
"--webbrowser-headless",
type=boolean,
metavar="{yes,true,1,on,no,false,0,off}",
default=None,
help="""
Whether to launch the web browser in headless mode or not.
When enabled, it stays completely hidden and doesn't require a desktop environment to run.
Default is true.
""",
)
return parser
@ -1311,6 +1389,15 @@ _ARGUMENT_TO_SESSIONOPTION: List[Tuple[str, str, Optional[Callable[[Any], Any]]]
("ffmpeg_audio_transcode", "ffmpeg-audio-transcode", None),
("ffmpeg_copyts", "ffmpeg-copyts", None),
("ffmpeg_start_at_zero", "ffmpeg-start-at-zero", None),
# web browser arguments
("webbrowser", "webbrowser", None),
("webbrowser_executable", "webbrowser-executable", None),
("webbrowser_timeout", "webbrowser-timeout", None),
("webbrowser_cdp_host", "webbrowser-cdp-host", None),
("webbrowser_cdp_port", "webbrowser-cdp-port", None),
("webbrowser_cdp_timeout", "webbrowser-cdp-timeout", None),
("webbrowser_headless", "webbrowser-headless", None),
]

View File

@ -0,0 +1,941 @@
from contextlib import nullcontext
from typing import Awaitable, Callable, Union, cast
from unittest.mock import ANY, AsyncMock, Mock, call
import pytest
import trio
from trio.testing import wait_all_tasks_blocked
from streamlink.session import Streamlink
from streamlink.webbrowser.cdp.client import CDPClient, CDPClientSession, RequestPausedHandler
from streamlink.webbrowser.cdp.connection import CDPConnection, CDPSession
from streamlink.webbrowser.cdp.devtools.fetch import RequestPaused
from streamlink.webbrowser.cdp.devtools.target import SessionID, TargetID
from streamlink.webbrowser.cdp.exceptions import CDPError
from tests.webbrowser.cdp import FakeWebsocketConnection
def async_handler(*args, **kwargs):
return cast(Union[AsyncMock, Callable[[CDPClientSession, RequestPaused], Awaitable]], AsyncMock(*args, **kwargs))
@pytest.fixture()
def chromium_webbrowser(monkeypatch: pytest.MonkeyPatch):
# noinspection PyUnusedLocal
def mock_launch(*args, **kwargs):
return trio.open_nursery()
mock_chromium_webbrowser = Mock(
launch=Mock(side_effect=mock_launch),
get_websocket_url=Mock(return_value="ws://localhost:1234/fake"),
)
mock_chromium_webbrowser_class = Mock(return_value=mock_chromium_webbrowser)
monkeypatch.setattr("streamlink.webbrowser.cdp.client.ChromiumWebbrowser", mock_chromium_webbrowser_class)
return mock_chromium_webbrowser
@pytest.fixture()
async def cdp_client(session: Streamlink, chromium_webbrowser: Mock, websocket_connection: FakeWebsocketConnection):
async with CDPClient.run(session) as cdp_client:
yield cdp_client
@pytest.fixture()
async def cdp_client_session(request: pytest.FixtureRequest, cdp_client: CDPClient):
target_id = TargetID("01234")
session_id = SessionID("56789")
session = cdp_client.cdp_connection.sessions[session_id] = CDPSession(
cdp_client.cdp_connection.websocket,
target_id=target_id,
session_id=session_id,
cmd_timeout=cdp_client.cdp_connection.cmd_timeout,
)
fail_unhandled_requests = getattr(request, "param", False)
return CDPClientSession(cdp_client, session, fail_unhandled_requests)
class TestLaunch:
@pytest.fixture()
def cdp_client(self):
return Mock()
@pytest.fixture(autouse=True)
def mock_run(self, monkeypatch: pytest.MonkeyPatch, cdp_client: Mock):
mock_run = Mock(return_value=Mock(
__aenter__=AsyncMock(return_value=cdp_client),
__aexit__=AsyncMock(),
))
monkeypatch.setattr(CDPClient, "run", mock_run)
return mock_run
@pytest.fixture(autouse=True)
def _mock_launch(self, request: pytest.FixtureRequest, session: Streamlink, mock_run, cdp_client: Mock):
result = object()
mock_runner = AsyncMock(return_value=result)
with getattr(request, "param", nullcontext()):
assert CDPClient.launch(session, mock_runner) is result
assert mock_runner.await_args_list == [call(cdp_client)]
@pytest.mark.parametrize(("session", "options"), [
pytest.param(
{},
dict(executable=None, timeout=20.0, cdp_host=None, cdp_port=None, cdp_timeout=2.0, headless=True),
id="Default options",
),
pytest.param(
{
"webbrowser-executable": "foo",
"webbrowser-timeout": 123.45,
"webbrowser-cdp-host": "::1",
"webbrowser-cdp-port": 1234,
"webbrowser-cdp-timeout": 12.34,
"webbrowser-headless": False,
},
dict(executable="foo", timeout=123.45, cdp_host="::1", cdp_port=1234, cdp_timeout=12.34, headless=False),
id="Custom options",
),
], indirect=["session"])
def test_options(self, session: Streamlink, mock_run: Mock, options: dict):
assert mock_run.call_args_list == [call(session=session, **options)]
# noinspection PyTestParametrized
@pytest.mark.usefixtures("_mock_launch")
@pytest.mark.parametrize(("session", "_mock_launch"), [
pytest.param(
{"webbrowser": False},
pytest.raises(CDPError, match="^The webbrowser API has been disabled by the user$"),
id="Raises CDPError",
),
], indirect=["session", "_mock_launch"])
def test_disabled(self, session: Streamlink, mock_run):
assert not mock_run.called
class TestRun:
@pytest.mark.trio()
async def test_no_session(
self,
session: Streamlink,
chromium_webbrowser: Mock,
cdp_client: CDPClient,
websocket_connection: FakeWebsocketConnection,
):
assert isinstance(cdp_client, CDPClient)
assert isinstance(cdp_client.cdp_connection, CDPConnection)
assert isinstance(cdp_client.nursery, trio.Nursery)
assert chromium_webbrowser.launch.called
assert chromium_webbrowser.get_websocket_url.call_args_list == [call(session)]
assert websocket_connection.sent == []
@pytest.mark.trio()
@pytest.mark.parametrize("fail_unhandled_requests", [False, True])
async def test_session(
self,
cdp_client: CDPClient,
websocket_connection: FakeWebsocketConnection,
fail_unhandled_requests,
):
client_session = None
async def new_session():
nonlocal client_session
async with cdp_client.session(fail_unhandled_requests=fail_unhandled_requests) as client_session:
pass
async with trio.open_nursery() as nursery:
nursery.start_soon(new_session)
await wait_all_tasks_blocked()
nursery.start_soon(websocket_connection.sender.send, """{"id":0,"result":{"targetId":"01234"}}""")
await wait_all_tasks_blocked()
nursery.start_soon(websocket_connection.sender.send, """{"id":1,"result":{"sessionId":"56789"}}""")
assert isinstance(client_session, CDPClientSession)
assert isinstance(client_session.cdp_session, CDPSession)
assert client_session._fail_unhandled == fail_unhandled_requests
assert websocket_connection.sent == [
"""{"id":0,"method":"Target.createTarget","params":{"url":""}}""",
"""{"id":1,"method":"Target.attachToTarget","params":{"flatten":true,"targetId":"01234"}}""",
]
class TestEvaluate:
@pytest.mark.trio()
async def test_success(self, cdp_client_session: CDPClientSession, websocket_connection: FakeWebsocketConnection):
result = None
async def evaluate():
nonlocal result
result = await cdp_client_session.evaluate("new Promise(r=>r('foo'))")
async with trio.open_nursery() as nursery:
nursery.start_soon(evaluate)
await wait_all_tasks_blocked()
await websocket_connection.sender.send(
"""{"id":0,"sessionId":"56789","result":{"result":{"type":"string","value":"foo"}}}""",
)
assert result == "foo"
@pytest.mark.trio()
async def test_exception(self, cdp_client_session: CDPClientSession, websocket_connection: FakeWebsocketConnection):
with pytest.raises(CDPError, match="^SyntaxError: Invalid regular expression: missing /$"): # noqa: PT012
async with trio.open_nursery() as nursery:
nursery.start_soon(cdp_client_session.evaluate, "/")
await wait_all_tasks_blocked()
# language=json
await websocket_connection.sender.send("""
{"id":0, "sessionId":"56789", "result": {
"result": {"type": "object", "subclass": "error"},
"exceptionDetails": {
"exceptionId": 1,
"text": "Uncaught",
"lineNumber": 0,
"columnNumber": 0,
"exception": {
"type": "object",
"subtype": "error",
"className": "SyntaxError",
"description": "SyntaxError: Invalid regular expression: missing /"
}
}
}}
""")
@pytest.mark.trio()
async def test_error(self, cdp_client_session: CDPClientSession, websocket_connection: FakeWebsocketConnection):
with pytest.raises(CDPError, match="^Error: foo\\n at <anonymous>:1:1$"): # noqa: PT012
async with trio.open_nursery() as nursery:
nursery.start_soon(cdp_client_session.evaluate, "new Error('foo')")
await wait_all_tasks_blocked()
# language=json
await websocket_connection.sender.send("""
{"id":0, "sessionId":"56789", "result": {
"result": {
"type": "object",
"subtype": "error",
"className": "Error",
"description": "Error: foo\\n at <anonymous>:1:1"
}
}}
""")
class TestRequestPausedHandler:
@pytest.mark.parametrize(("url_pattern", "regex_pattern"), [
pytest.param(
r"abc?def?xyz",
r"^abc.def.xyz$",
id="Question mark",
),
pytest.param(
r"abc*def*xyz",
r"^abc.+def.+xyz$",
id="Star",
),
pytest.param(
r"^(.[a-z])\d$",
r"^\^\(\.\[a\-z\]\)\\d\$$",
id="Special characters",
),
pytest.param(
r"abc\?def\*xyz",
r"^abc\?def\*xyz$",
id="Escaped question mark and star",
),
pytest.param(
r"abc\\?def\\*xyz",
r"^abc\\\\.def\\\\.+xyz$",
id="2 escape characters",
),
pytest.param(
r"abc\\\?def\\\*xyz",
r"^abc\\\\\?def\\\\\*xyz$",
id="3 escape characters",
),
pytest.param(
r"abc\\\\?def\\\\*xyz",
r"^abc\\\\\\\\.def\\\\\\\\.+xyz$",
id="4 escape characters",
),
pytest.param(
r"abc\\\\\?def\\\\\*xyz",
r"^abc\\\\\\\\\?def\\\\\\\\\*xyz$",
id="5 escape characters",
),
pytest.param(
r"http://*.name.tld/foo\?bar=baz",
r"^http://.+\.name\.tld/foo\?bar=baz$",
id="Typical URL pattern",
),
])
def test_url_pattern_to_regex_pattern(self, url_pattern: str, regex_pattern: str):
assert RequestPausedHandler._url_pattern_to_regex_pattern(url_pattern).pattern == regex_pattern
@pytest.mark.trio()
async def test_client_registration(self, cdp_client_session: CDPClientSession):
assert len(cdp_client_session._request_handlers) == 0
cdp_client_session.add_request_handler(async_handler())
cdp_client_session.add_request_handler(async_handler(), on_request=True)
cdp_client_session.add_request_handler(async_handler(), url_pattern="foo")
cdp_client_session.add_request_handler(async_handler(), url_pattern="foo", on_request=True)
assert len(cdp_client_session._request_handlers) == 4
assert all(request_handler.async_handler for request_handler in cdp_client_session._request_handlers)
assert all(request_handler.url_pattern == "*" for request_handler in cdp_client_session._request_handlers[:2])
assert all(request_handler.url_pattern == "foo" for request_handler in cdp_client_session._request_handlers[2:])
assert not cdp_client_session._request_handlers[0].on_request
assert not cdp_client_session._request_handlers[2].on_request
assert cdp_client_session._request_handlers[1].on_request
assert cdp_client_session._request_handlers[3].on_request
@pytest.mark.parametrize(("args", "matches"), [
pytest.param(
dict(async_handler=async_handler(), on_request=False),
False,
id="On response - Any URL",
),
pytest.param(
dict(async_handler=async_handler(), on_request=True),
True,
id="On request - Any URL",
),
pytest.param(
dict(async_handler=async_handler(), url_pattern="http://localhost/", on_request=True),
True,
id="Matching URL",
),
pytest.param(
dict(async_handler=async_handler(), url_pattern="http://l?c?l*/", on_request=True),
True,
id="Matching wildcard URL",
),
pytest.param(
dict(async_handler=async_handler(), url_pattern="http://other/", on_request=True),
False,
id="Non-matching URL",
),
])
def test_matches_request(self, args: dict, matches: bool):
request = RequestPaused.from_json({
"requestId": "request-1",
"frameId": "frame-1",
"request": {
"url": "http://localhost/",
"method": "GET",
"headers": {},
"initialPriority": "VeryHigh",
"referrerPolicy": "strict-origin-when-cross-origin",
},
"resourceType": "Document",
})
request_handler = RequestPausedHandler(**args)
assert request_handler.matches(request) is matches
@pytest.mark.parametrize(("args", "matches"), [
pytest.param(
dict(async_handler=async_handler(), on_request=False),
True,
id="On response - Any URL",
),
pytest.param(
dict(async_handler=async_handler(), on_request=True),
False,
id="On request - Any URL",
),
pytest.param(
dict(async_handler=async_handler(), url_pattern="http://localhost/", on_request=False),
True,
id="Matching URL",
),
pytest.param(
dict(async_handler=async_handler(), url_pattern="http://l?c?l*/", on_request=False),
True,
id="Matching wildcard URL",
),
pytest.param(
dict(async_handler=async_handler(), url_pattern="http://other/", on_request=False),
False,
id="Non-matching URL",
),
])
def test_matches_response(self, args: dict, matches: bool):
request = RequestPaused.from_json({
"requestId": "request-1",
"frameId": "frame-1",
"request": {
"url": "http://localhost/",
"method": "GET",
"headers": {},
"initialPriority": "VeryHigh",
"referrerPolicy": "strict-origin-when-cross-origin",
},
"resourceType": "Document",
"responseStatusCode": 200,
})
request_handler = RequestPausedHandler(**args)
assert request_handler.matches(request) is matches
class TestNavigate:
@pytest.mark.trio()
async def test_detach(self, cdp_client_session: CDPClientSession, websocket_connection: FakeWebsocketConnection):
async def navigate():
async with cdp_client_session.navigate("https://foo"):
pass # pragma: no cover
with pytest.raises(CDPError, match="^Target has been detached$"): # noqa: PT012
async with trio.open_nursery() as nursery:
nursery.start_soon(navigate)
await wait_all_tasks_blocked()
await websocket_connection.sender.send(
"""{"method":"Target.detachedFromTarget","params":{"sessionId":"unknown"}}""",
)
await wait_all_tasks_blocked()
await websocket_connection.sender.send(
"""{"method":"Target.detachedFromTarget","params":{"sessionId":"56789"}}""",
)
@pytest.mark.trio()
async def test_error(self, cdp_client_session: CDPClientSession, websocket_connection: FakeWebsocketConnection):
async def navigate():
async with cdp_client_session.navigate("https://foo"):
pass # pragma: no cover
with pytest.raises(CDPError, match="^Navigation error: failure$"): # noqa: PT012
async with trio.open_nursery() as nursery:
nursery.start_soon(navigate)
await wait_all_tasks_blocked()
assert websocket_connection.sent == [
"""{"id":0,"method":"Page.enable","sessionId":"56789"}""",
]
await websocket_connection.sender.send(
"""{"id":0,"result":{},"sessionId":"56789"}""",
)
await wait_all_tasks_blocked()
assert websocket_connection.sent == [
"""{"id":0,"method":"Page.enable","sessionId":"56789"}""",
"""{"id":1,"method":"Page.navigate","params":{"url":"https://foo"},"sessionId":"56789"}""",
]
await websocket_connection.sender.send(
"""{"id":1,"result":{"frameId":"frame-id-1","errorText":"failure"},"sessionId":"56789"}""",
)
await wait_all_tasks_blocked()
assert websocket_connection.sent == [
"""{"id":0,"method":"Page.enable","sessionId":"56789"}""",
"""{"id":1,"method":"Page.navigate","params":{"url":"https://foo"},"sessionId":"56789"}""",
"""{"id":2,"method":"Page.disable","sessionId":"56789"}""",
]
await websocket_connection.sender.send(
"""{"id":2,"result":{},"sessionId":"56789"}""",
)
@pytest.mark.trio()
async def test_loaded(
self,
cdp_client_session: CDPClientSession,
websocket_connection: FakeWebsocketConnection,
nursery: trio.Nursery,
):
loaded = False
async def navigate():
nonlocal loaded
async with cdp_client_session.navigate("https://foo") as frame_id:
assert frame_id == "frame-id-1"
await cdp_client_session.loaded(frame_id)
loaded = True
nursery.start_soon(navigate)
await wait_all_tasks_blocked()
assert websocket_connection.sent == [
"""{"id":0,"method":"Page.enable","sessionId":"56789"}""",
]
await websocket_connection.sender.send(
"""{"id":0,"result":{},"sessionId":"56789"}""",
)
await wait_all_tasks_blocked()
assert websocket_connection.sent == [
"""{"id":0,"method":"Page.enable","sessionId":"56789"}""",
"""{"id":1,"method":"Page.navigate","params":{"url":"https://foo"},"sessionId":"56789"}""",
]
await websocket_connection.sender.send(
"""{"id":1,"result":{"frameId":"frame-id-1"},"sessionId":"56789"}""",
)
await wait_all_tasks_blocked()
await websocket_connection.sender.send(
"""{"method":"Page.frameStoppedLoading","params":{"frameId":"frame-id-unknown"},"sessionId":"56789"}""",
)
await wait_all_tasks_blocked()
await websocket_connection.sender.send(
"""{"method":"Page.frameStoppedLoading","params":{"frameId":"frame-id-1"},"sessionId":"56789"}""",
)
await wait_all_tasks_blocked()
assert websocket_connection.sent == [
"""{"id":0,"method":"Page.enable","sessionId":"56789"}""",
"""{"id":1,"method":"Page.navigate","params":{"url":"https://foo"},"sessionId":"56789"}""",
"""{"id":2,"method":"Page.disable","sessionId":"56789"}""",
]
await websocket_connection.sender.send(
"""{"id":2,"result":{},"sessionId":"56789"}""",
)
assert loaded
@pytest.mark.trio()
@pytest.mark.parametrize(("on_request", "fetch_enable_params"), [
pytest.param(
(False,),
(
"""{"handleAuthRequests":true,"patterns":[{"requestStage":"Response","urlPattern":"*"},"""
+ """{"requestStage":"Response","urlPattern":"http://foo"}]}"""
),
id="Single request handler, on response",
),
pytest.param(
(True,),
(
"""{"handleAuthRequests":true,"patterns":[{"requestStage":"Request","urlPattern":"*"},"""
+ """{"requestStage":"Request","urlPattern":"http://foo"}]}"""
),
id="Single request handler, on request",
),
pytest.param(
(False, False),
(
"""{"handleAuthRequests":true,"patterns":[{"requestStage":"Response","urlPattern":"*"},"""
+ """{"requestStage":"Response","urlPattern":"http://foo"}]}"""
),
id="Multiple request handlers, on response",
),
pytest.param(
(True, True),
(
"""{"handleAuthRequests":true,"patterns":[{"requestStage":"Request","urlPattern":"*"},"""
+ """{"requestStage":"Request","urlPattern":"http://foo"}]}"""
),
id="Multiple request handlers, on request",
),
pytest.param(
(False, True),
(
"""{"handleAuthRequests":true,"patterns":[{"requestStage":"Response","urlPattern":"*"},"""
+ """{"requestStage":"Request","urlPattern":"*"},{"requestStage":"Response","urlPattern":"http://foo"},"""
+ """{"requestStage":"Request","urlPattern":"http://foo"}]}"""
),
id="Multiple request handlers, on response and on request",
),
pytest.param(
(True, False),
(
"""{"handleAuthRequests":true,"patterns":[{"requestStage":"Response","urlPattern":"*"},"""
+ """{"requestStage":"Request","urlPattern":"*"},{"requestStage":"Response","urlPattern":"http://foo"},"""
+ """{"requestStage":"Request","urlPattern":"http://foo"}]}"""
),
id="Multiple request handlers, on request and on response",
),
])
async def test_fetch_enable(
self,
monkeypatch: pytest.MonkeyPatch,
cdp_client_session: CDPClientSession,
websocket_connection: FakeWebsocketConnection,
nursery: trio.Nursery,
on_request: tuple,
fetch_enable_params: str,
):
mock_on_fetch_request_paused = AsyncMock()
monkeypatch.setattr(cdp_client_session, "_on_fetch_request_paused", mock_on_fetch_request_paused)
for _on_request in on_request:
cdp_client_session.add_request_handler(async_handler(), on_request=_on_request)
cdp_client_session.add_request_handler(async_handler(), on_request=_on_request)
cdp_client_session.add_request_handler(async_handler(), url_pattern="http://foo", on_request=_on_request)
async def navigate():
async with cdp_client_session.navigate("https://foo"):
pass # pragma: no cover
assert not mock_on_fetch_request_paused.called
nursery.start_soon(navigate)
await wait_all_tasks_blocked()
assert mock_on_fetch_request_paused.called
assert websocket_connection.sent == [
"""{"id":0,"method":"Fetch.enable","params":""" + fetch_enable_params + ""","sessionId":"56789"}""",
]
await websocket_connection.sender.send(
"""{"id":0,"result":{},"sessionId":"56789"}""",
)
await wait_all_tasks_blocked()
assert websocket_connection.sent == [
"""{"id":0,"method":"Fetch.enable","params":""" + fetch_enable_params + ""","sessionId":"56789"}""",
"""{"id":1,"method":"Page.enable","sessionId":"56789"}""",
]
await websocket_connection.sender.send(
"""{"id":1,"result":{},"sessionId":"56789"}""",
)
await wait_all_tasks_blocked()
assert websocket_connection.sent == [
"""{"id":0,"method":"Fetch.enable","params":""" + fetch_enable_params + ""","sessionId":"56789"}""",
"""{"id":1,"method":"Page.enable","sessionId":"56789"}""",
"""{"id":2,"method":"Page.navigate","params":{"url":"https://foo"},"sessionId":"56789"}""",
]
await websocket_connection.sender.send(
"""{"id":2,"result":{"frameId":"frame-id-1"},"sessionId":"56789"}""",
)
await wait_all_tasks_blocked()
assert websocket_connection.sent == [
"""{"id":0,"method":"Fetch.enable","params":""" + fetch_enable_params + ""","sessionId":"56789"}""",
"""{"id":1,"method":"Page.enable","sessionId":"56789"}""",
"""{"id":2,"method":"Page.navigate","params":{"url":"https://foo"},"sessionId":"56789"}""",
"""{"id":3,"method":"Page.disable","sessionId":"56789"}""",
]
await websocket_connection.sender.send(
"""{"id":3,"result":{},"sessionId":"56789"}""",
)
await wait_all_tasks_blocked()
assert websocket_connection.sent == [
"""{"id":0,"method":"Fetch.enable","params":""" + fetch_enable_params + ""","sessionId":"56789"}""",
"""{"id":1,"method":"Page.enable","sessionId":"56789"}""",
"""{"id":2,"method":"Page.navigate","params":{"url":"https://foo"},"sessionId":"56789"}""",
"""{"id":3,"method":"Page.disable","sessionId":"56789"}""",
"""{"id":4,"method":"Fetch.disable","sessionId":"56789"}""",
]
await websocket_connection.sender.send(
"""{"id":4,"result":{},"sessionId":"56789"}""",
)
class TestRequestMethods:
@pytest.fixture()
def req_paused(self):
return RequestPaused.from_json({
"requestId": "request-1",
"frameId": "frame-1",
"request": {
"url": "http://foo/",
"method": "GET",
"headers": {},
"initialPriority": "VeryHigh",
"referrerPolicy": "strict-origin-when-cross-origin",
},
"resourceType": "Document",
})
@pytest.mark.trio()
async def test_continue_request(
self,
cdp_client_session: CDPClientSession,
websocket_connection: FakeWebsocketConnection,
req_paused: RequestPaused,
nursery: trio.Nursery,
):
nursery.start_soon(cdp_client_session.continue_request, req_paused, "http://bar", "POST", "data", {"a": "b", "c": "d"})
await wait_all_tasks_blocked()
assert "request-1" not in cdp_client_session._requests_handled
assert websocket_connection.sent == [
(
"""{"id":0,"method":"Fetch.continueRequest","params":"""
+ """{"headers":[{"name":"a","value":"b"},{"name":"c","value":"d"}],"method":"POST","""
+ """"postData":"ZGF0YQ==","requestId":"request-1","url":"http://bar"},"sessionId":"56789"}"""
),
]
await websocket_connection.sender.send("""{"id":0,"result":{},"sessionId":"56789"}""")
await wait_all_tasks_blocked()
assert "request-1" in cdp_client_session._requests_handled
@pytest.mark.trio()
async def test_fail_request(
self,
cdp_client_session: CDPClientSession,
websocket_connection: FakeWebsocketConnection,
req_paused: RequestPaused,
nursery: trio.Nursery,
):
nursery.start_soon(cdp_client_session.fail_request, req_paused, "TimedOut")
await wait_all_tasks_blocked()
assert "request-1" not in cdp_client_session._requests_handled
assert websocket_connection.sent == [
(
"""{"id":0,"method":"Fetch.failRequest","params":"""
+ """{"errorReason":"TimedOut","requestId":"request-1"},"sessionId":"56789"}"""
),
]
await websocket_connection.sender.send("""{"id":0,"result":{},"sessionId":"56789"}""")
await wait_all_tasks_blocked()
assert "request-1" in cdp_client_session._requests_handled
@pytest.mark.trio()
async def test_fulfill_request(
self,
cdp_client_session: CDPClientSession,
websocket_connection: FakeWebsocketConnection,
req_paused: RequestPaused,
nursery: trio.Nursery,
):
nursery.start_soon(cdp_client_session.fulfill_request, req_paused, 404, {"a": "b", "c": "d"}, "data")
await wait_all_tasks_blocked()
assert "request-1" not in cdp_client_session._requests_handled
assert websocket_connection.sent == [
(
"""{"id":0,"method":"Fetch.fulfillRequest","params":"""
+ """{"body":"ZGF0YQ==","requestId":"request-1","responseCode":404,"""
+ """"responseHeaders":[{"name":"a","value":"b"},{"name":"c","value":"d"}]},"sessionId":"56789"}"""
),
]
await websocket_connection.sender.send("""{"id":0,"result":{},"sessionId":"56789"}""")
await wait_all_tasks_blocked()
assert "request-1" in cdp_client_session._requests_handled
@pytest.mark.trio()
async def test_alter_request(
self,
cdp_client_session: CDPClientSession,
websocket_connection: FakeWebsocketConnection,
req_paused: RequestPaused,
nursery: trio.Nursery,
):
async def alter_request():
async with cdp_client_session.alter_request(req_paused, 404, {"a": "b", "c": "d"}) as cmproxy:
assert cmproxy.body == ""
cmproxy.body = "foo"
nursery.start_soon(alter_request)
await wait_all_tasks_blocked()
assert "request-1" not in cdp_client_session._requests_handled
assert websocket_connection.sent == [
(
"""{"id":0,"method":"Fetch.fulfillRequest","params":"""
+ """{"body":"Zm9v","requestId":"request-1","responseCode":404,"""
+ """"responseHeaders":[{"name":"a","value":"b"},{"name":"c","value":"d"}]},"sessionId":"56789"}"""
),
]
await websocket_connection.sender.send("""{"id":0,"result":{},"sessionId":"56789"}""")
await wait_all_tasks_blocked()
assert "request-1" in cdp_client_session._requests_handled
@pytest.mark.trio()
async def test_alter_request_response(
self,
cdp_client_session: CDPClientSession,
websocket_connection: FakeWebsocketConnection,
req_paused: RequestPaused,
nursery: trio.Nursery,
):
# turn the request into a response
req_paused.response_status_code = 200
async def alter_request():
async with cdp_client_session.alter_request(req_paused, 404, {"a": "b", "c": "d"}) as cmproxy:
assert cmproxy.body == "foo"
assert cmproxy.response_code == 404
assert cmproxy.response_headers == {"a": "b", "c": "d"}
cmproxy.body = cmproxy.body.upper()
cmproxy.response_code -= 3
cmproxy.response_headers["c"] = "e"
nursery.start_soon(alter_request)
await wait_all_tasks_blocked()
assert "request-1" not in cdp_client_session._requests_handled
assert websocket_connection.sent == [
"""{"id":0,"method":"Fetch.getResponseBody","params":{"requestId":"request-1"},"sessionId":"56789"}""",
]
await websocket_connection.sender.send(
"""{"id":0,"result":{"body":"Zm9v","base64Encoded":true},"sessionId":"56789"}""",
)
await wait_all_tasks_blocked()
assert websocket_connection.sent == [
"""{"id":0,"method":"Fetch.getResponseBody","params":{"requestId":"request-1"},"sessionId":"56789"}""",
(
"""{"id":1,"method":"Fetch.fulfillRequest","params":"""
+ """{"body":"Rk9P","requestId":"request-1","responseCode":401,"""
+ """"responseHeaders":[{"name":"a","value":"b"},{"name":"c","value":"e"}]},"sessionId":"56789"}"""
),
]
await websocket_connection.sender.send("""{"id":1,"result":{},"sessionId":"56789"}""")
await wait_all_tasks_blocked()
assert "request-1" in cdp_client_session._requests_handled
class TestOnFetchRequestPaused:
@pytest.mark.trio()
async def test_unhandled_continue(
self,
monkeypatch: pytest.MonkeyPatch,
cdp_client_session: CDPClientSession,
websocket_connection: FakeWebsocketConnection,
nursery: trio.Nursery,
):
mock_fail_request = AsyncMock()
mock_continue_request = AsyncMock()
monkeypatch.setattr(cdp_client_session, "fail_request", mock_fail_request)
monkeypatch.setattr(cdp_client_session, "continue_request", mock_continue_request)
handler_foo = async_handler()
handler_bar = async_handler()
cdp_client_session.add_request_handler(handler_foo, url_pattern="http://foo/")
cdp_client_session.add_request_handler(handler_bar, url_pattern="http://bar/")
nursery.start_soon(cdp_client_session._on_fetch_request_paused)
await wait_all_tasks_blocked()
# language=json
await websocket_connection.sender.send("""
{
"method": "Fetch.requestPaused",
"params": {
"requestId": "request-1",
"frameId": "frame-1",
"request": {
"url": "http://bar/",
"method": "GET",
"headers": {},
"initialPriority": "VeryHigh",
"referrerPolicy": "strict-origin-when-cross-origin"
},
"resourceType": "Document",
"responseStatusCode": 200
},
"sessionId": "56789"
}
""")
await wait_all_tasks_blocked()
assert "request-1" not in cdp_client_session._requests_handled
assert handler_foo.call_args_list == []
assert handler_bar.call_args_list == [call(cdp_client_session, ANY)]
assert isinstance(handler_bar.call_args_list[0][0][1], RequestPaused)
assert mock_fail_request.call_args_list == []
assert mock_continue_request.call_args_list == [call(ANY)]
assert isinstance(mock_continue_request.call_args_list[0][0][0], RequestPaused)
@pytest.mark.trio()
async def test_unhandled_fail(
self,
monkeypatch: pytest.MonkeyPatch,
cdp_client_session: CDPClientSession,
websocket_connection: FakeWebsocketConnection,
nursery: trio.Nursery,
):
# make unhandled requests fail
cdp_client_session._fail_unhandled = True
mock_fail_request = AsyncMock()
mock_continue_request = AsyncMock()
monkeypatch.setattr(cdp_client_session, "fail_request", mock_fail_request)
monkeypatch.setattr(cdp_client_session, "continue_request", mock_continue_request)
handler_foo = async_handler()
handler_bar = async_handler()
cdp_client_session.add_request_handler(handler_foo, url_pattern="http://foo/")
cdp_client_session.add_request_handler(handler_bar, url_pattern="http://bar/")
nursery.start_soon(cdp_client_session._on_fetch_request_paused)
await wait_all_tasks_blocked()
# language=json
await websocket_connection.sender.send("""
{
"method": "Fetch.requestPaused",
"params": {
"requestId": "request-1",
"frameId": "frame-1",
"request": {
"url": "http://bar/",
"method": "GET",
"headers": {},
"initialPriority": "VeryHigh",
"referrerPolicy": "strict-origin-when-cross-origin"
},
"resourceType": "Document",
"responseStatusCode": 200
},
"sessionId": "56789"
}
""")
await wait_all_tasks_blocked()
assert "request-1" not in cdp_client_session._requests_handled
assert handler_foo.call_args_list == []
assert handler_bar.call_args_list == [call(cdp_client_session, ANY)]
assert isinstance(handler_bar.call_args_list[0][0][1], RequestPaused)
assert mock_fail_request.call_args_list == [call(ANY)]
assert mock_continue_request.call_args_list == []
assert isinstance(mock_fail_request.call_args_list[0][0][0], RequestPaused)
@pytest.mark.trio()
async def test_handled(
self,
monkeypatch: pytest.MonkeyPatch,
cdp_client_session: CDPClientSession,
websocket_connection: FakeWebsocketConnection,
nursery: trio.Nursery,
):
# make unhandled requests fail
cdp_client_session._fail_unhandled = True
mock_fail_request = AsyncMock()
mock_continue_request = AsyncMock()
monkeypatch.setattr(cdp_client_session, "fail_request", mock_fail_request)
monkeypatch.setattr(cdp_client_session, "continue_request", mock_continue_request)
def mock_handled(_cdp_client_session: CDPClientSession, request: RequestPaused):
# pretend that we've called any of the request methods which register that the request was handled appropriately
_cdp_client_session._requests_handled.add(request.request_id)
handler_foo = async_handler()
handler_bar = async_handler(side_effect=mock_handled)
cdp_client_session.add_request_handler(handler_foo, url_pattern="http://foo/")
cdp_client_session.add_request_handler(handler_bar, url_pattern="http://bar/")
nursery.start_soon(cdp_client_session._on_fetch_request_paused)
await wait_all_tasks_blocked()
# language=json
await websocket_connection.sender.send("""
{
"method": "Fetch.requestPaused",
"params": {
"requestId": "request-1",
"frameId": "frame-1",
"request": {
"url": "http://bar/",
"method": "GET",
"headers": {},
"initialPriority": "VeryHigh",
"referrerPolicy": "strict-origin-when-cross-origin"
},
"resourceType": "Document",
"responseStatusCode": 200
},
"sessionId": "56789"
}
""")
await wait_all_tasks_blocked()
assert "request-1" in cdp_client_session._requests_handled
assert handler_foo.call_args_list == []
assert handler_bar.call_args_list == [call(cdp_client_session, ANY)]
assert isinstance(handler_bar.call_args_list[0][0][1], RequestPaused)
assert mock_fail_request.call_args_list == []
assert mock_continue_request.call_args_list == []