1
mirror of https://github.com/streamlink/streamlink synced 2024-11-01 01:19:33 +01:00

cli: replace read_stream() with StreamRunner

Replace `read_stream()` with a new `StreamRunner` class, refactor stream
reads and output writes, as well as the progress thread data feeding,
and move player polling into a separate thread which closes the stream
once the player process gets terminated/killed.

This fixes the player polling issue, or rather the detection of its
broken pipe while reading the stream, as stream read calls can stall
for various reasons, e.g. due to segmented streams or the filtering
of stream data which pauses the stream output and disables any timeouts.

- Implement `StreamRunner` and `PlayerPollThread` classes
  in dedicated `streamlink_cli.streamrunner` module
- Remove old `streamlink_cli.main.read_stream()` implementation
- Keep the same log messages
- Make `StreamRunner.run()` raise `OSError` on read/write error and
  catch `OSError`s in main module where `console.exit()` gets called
- Remove `Progress.iter()`, as it's not needed anymore
- Add extensive tests with full code coverage
This commit is contained in:
bastimeyer 2022-12-12 16:46:40 +01:00 committed by back-to
parent d4e056b63c
commit b1c6d8bffa
4 changed files with 871 additions and 88 deletions

View File

@ -1,5 +1,4 @@
import argparse
import errno
import logging
import os
import platform
@ -7,16 +6,13 @@ import re
import signal
import sys
from contextlib import closing, suppress
from functools import partial
from gettext import gettext
from itertools import chain
from pathlib import Path
from time import sleep
from typing import Any, Dict, Iterator, List, Optional, Type, Union
from typing import Any, Dict, List, Optional, Type, Union
import streamlink.logger as logger
from streamlink import NoPluginError, PluginError, StreamError, Streamlink, __version__ as streamlink_version
from streamlink.compat import is_win32
from streamlink.exceptions import FatalPluginError
from streamlink.plugin import Plugin, PluginOptions
from streamlink.stream.stream import Stream, StreamIO
@ -26,17 +22,11 @@ from streamlink_cli.compat import DeprecatedPath, importlib_metadata, stdout
from streamlink_cli.console import ConsoleOutput, ConsoleUserInputRequester
from streamlink_cli.constants import CONFIG_FILES, DEFAULT_STREAM_METADATA, LOG_DIR, PLUGIN_DIRS, STREAM_SYNONYMS
from streamlink_cli.output import FileOutput, PlayerOutput
from streamlink_cli.streamrunner import StreamRunner
from streamlink_cli.utils import Formatter, HTTPServer, datetime
from streamlink_cli.utils.progress import Progress
from streamlink_cli.utils.versioncheck import check_version
ACCEPTABLE_ERRNO = (errno.EPIPE, errno.EINVAL, errno.ECONNRESET)
try:
ACCEPTABLE_ERRNO += (errno.WSAECONNABORTED,) # type: ignore
except AttributeError:
pass # Not windows
QUIET_OPTIONS = ("json", "stream_url", "quiet")
@ -262,7 +252,12 @@ def output_stream_http(
if stream_fd and prebuffer:
log.debug("Writing stream to player")
read_stream(stream_fd, server, prebuffer, formatter)
stream_runner = StreamRunner(stream_fd, server)
try:
stream_runner.run(prebuffer)
except OSError as err:
# TODO: refactor all console.exit() calls
console.exit(str(err))
if not continuous:
break
@ -364,74 +359,21 @@ def output_stream(stream, formatter: Formatter):
console.exit(f"Failed to open output ({err}")
return
with closing(output):
log.debug("Writing stream to output")
read_stream(stream_fd, output, prebuffer, formatter)
try:
with closing(output):
log.debug("Writing stream to output")
# TODO: finally clean up the global variable mess and refactor the streamlink_cli package
# noinspection PyUnboundLocalVariable
stream_runner = StreamRunner(stream_fd, output, args.force_progress)
# noinspection PyUnboundLocalVariable
stream_runner.run(prebuffer)
except OSError as err:
# TODO: refactor all console.exit() calls
console.exit(str(err))
return True
def read_stream(stream, output, prebuffer, formatter: Formatter, chunk_size=8192):
"""Reads data from stream and then writes it to the output."""
is_player = isinstance(output, PlayerOutput)
is_http = isinstance(output, HTTPServer)
is_fifo = is_player and output.namedpipe
show_progress = (
isinstance(output, FileOutput)
and output.fd is not stdout
and (sys.stdout.isatty() or args.force_progress)
)
show_record_progress = (
hasattr(output, "record")
and isinstance(output.record, FileOutput)
and output.record.fd is not stdout
and (sys.stdout.isatty() or args.force_progress)
)
progress: Optional[Progress] = None
stream_iterator: Iterator = chain(
[prebuffer],
iter(partial(stream.read, chunk_size), b"")
)
if show_progress or show_record_progress:
progress = Progress(
sys.stderr,
output.filename or output.record.filename,
)
stream_iterator = progress.iter(stream_iterator)
try:
for data in stream_iterator:
# We need to check if the player process still exists when
# using named pipes on Windows since the named pipe is not
# automatically closed by the player.
if is_win32 and is_fifo:
output.player.poll()
if output.player.returncode is not None:
log.info("Player closed")
break
try:
output.write(data)
except OSError as err:
if is_player and err.errno in ACCEPTABLE_ERRNO:
log.info("Player closed")
elif is_http and err.errno in ACCEPTABLE_ERRNO:
log.info("HTTP connection closed")
else:
console.exit(f"Error when writing to output: {err}, exiting")
break
except OSError as err:
console.exit(f"Error when reading from stream: {err}, exiting")
finally:
if progress:
progress.close()
stream.close()
log.info("Stream ended")
def handle_stream(plugin: Plugin, streams: Dict[str, Stream], stream_name: str) -> None:
"""Decides what to do with the selected stream.

View File

@ -0,0 +1,154 @@
import errno
import logging
import sys
from contextlib import suppress
from pathlib import Path
from threading import Event, Lock, Thread
from typing import Optional, Union
from streamlink.stream.stream import StreamIO
from streamlink_cli.output import FileOutput, PlayerOutput
from streamlink_cli.utils.http_server import HTTPServer
from streamlink_cli.utils.progress import Progress
# Use the main Streamlink CLI module as logger
log = logging.getLogger("streamlink.cli")
ACCEPTABLE_ERRNO = errno.EPIPE, errno.EINVAL, errno.ECONNRESET
with suppress(AttributeError):
ACCEPTABLE_ERRNO += (errno.WSAECONNABORTED,) # type: ignore[assignment,attr-defined]
class _ReadError(BaseException):
pass
class PlayerPollThread(Thread):
"""
Poll the player process in a separate thread, to isolate it from the stream's read-loop in the main thread.
Reading the stream can stall indefinitely when filtering content.
"""
POLLING_INTERVAL: float = 0.5
def __init__(self, stream: StreamIO, output: PlayerOutput):
super().__init__(daemon=True, name=self.__class__.__name__)
self._stream = stream
self._output = output
self._stop_polling = Event()
self._lock = Lock()
def close(self):
self._stop_polling.set()
def playerclosed(self):
# Ensure that "Player closed" does only get logged once, either when writing the read stream data has failed,
# or when the player process was terminated/killed before writing.
with self._lock:
if self._stop_polling.is_set():
return
self.close()
log.info("Player closed")
def poll(self) -> bool:
return self._output.player.poll() is None
def run(self) -> None:
while not self._stop_polling.wait(self.POLLING_INTERVAL):
if self.poll():
continue
self.playerclosed()
# close stream as soon as the player was closed
self._stream.close()
break
class StreamRunner:
"""Read data from a stream and write it to the output."""
playerpoller: Optional[PlayerPollThread] = None
progress: Optional[Progress] = None
# TODO: refactor all output implementations
def __init__(
self,
stream: StreamIO,
output: Union[PlayerOutput, FileOutput, HTTPServer],
force_progress: bool = False,
):
self.stream = stream
self.output = output
self.is_http = isinstance(output, HTTPServer)
filename: Optional[Path] = None
if isinstance(output, PlayerOutput):
self.playerpoller = PlayerPollThread(stream, output)
if output.record:
filename = output.record.filename
elif isinstance(output, FileOutput):
if output.filename:
filename = output.filename
elif output.record:
filename = output.record.filename
if filename and (sys.stdout.isatty() or force_progress):
self.progress = Progress(sys.stderr, filename)
def run(
self,
prebuffer: bytes,
chunk_size: int = 8192,
) -> None:
read = self.stream.read
write = self.output.write
progress = self.progress.write if self.progress else lambda _: None
if self.playerpoller:
self.playerpoller.start()
if self.progress:
self.progress.start()
# TODO: Fix error messages (s/when/while/) and only log "Stream ended" when it ended on its own (data == b"").
# These are considered breaking changes of the CLI output, which is parsed by 3rd party tools.
try:
write(prebuffer)
progress(prebuffer)
del prebuffer
# Don't check for stream.closed, so the buffer's contents can be fully read after the stream ended or was closed
while True:
try:
data = read(chunk_size)
if data == b"":
break
except OSError as err:
raise _ReadError() from err
write(data)
progress(data)
except _ReadError as err:
raise OSError(f"Error when reading from stream: {err.__context__}, exiting") from err.__context__
except OSError as err:
if self.playerpoller and err.errno in ACCEPTABLE_ERRNO:
self.playerpoller.playerclosed()
elif self.is_http and err.errno in ACCEPTABLE_ERRNO:
log.info("HTTP connection closed")
else:
raise OSError(f"Error when writing to output: {err}, exiting") from err
finally:
if self.playerpoller:
self.playerpoller.close()
self.playerpoller.join()
if self.progress:
self.progress.close()
self.progress.join()
self.stream.close()
log.info("Stream ended")

View File

@ -6,7 +6,7 @@ from shutil import get_terminal_size
from string import Formatter as StringFormatter
from threading import Event, RLock, Thread
from time import time
from typing import Callable, Deque, Dict, Iterable, Iterator, List, Optional, TextIO, Tuple, Union
from typing import Callable, Deque, Dict, Iterable, List, Optional, TextIO, Tuple, Union
from streamlink.compat import is_win32
@ -248,19 +248,10 @@ class Progress(Thread):
self.overall += size
self.written += size
def iter(self, iterator: Iterator[bytes]) -> Iterator[bytes]:
self.start()
try:
for chunk in iterator:
self.write(chunk)
yield chunk
finally:
self.close()
def run(self):
self.started = time()
try:
while not self._wait.wait(self.interval):
while not self._wait.wait(self.interval): # pragma: no cover
self.update()
finally:
self.print_end()

View File

@ -0,0 +1,696 @@
import asyncio
import errno
import sys
from collections import deque
from pathlib import Path
from threading import Thread
from typing import Callable, Deque, List, Union
from unittest.mock import Mock, patch
import pytest
from streamlink.stream.stream import StreamIO
from streamlink_cli.output import FileOutput, PlayerOutput
from streamlink_cli.streamrunner import PlayerPollThread, StreamRunner, log as streamrunnerlogger
from streamlink_cli.utils.http_server import HTTPServer
from streamlink_cli.utils.progress import Progress
from tests.testutils.handshake import Handshake
TIMEOUT_AWAIT_HANDSHAKE = 1
TIMEOUT_AWAIT_THREADJOIN = 1
class EventedPlayerPollThread(PlayerPollThread):
POLLING_INTERVAL = 0
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.handshake = Handshake()
def poll(self):
with self.handshake():
return super().poll()
def close(self):
super().close()
# Let thread terminate on close()
self.handshake.go()
class FakeStream(StreamIO):
"""Fake stream implementation, for feeding sample data to the stream runner and simulating read pauses and read errors"""
def __init__(self) -> None:
super().__init__()
self.handshake = Handshake()
self.data: Deque[Union[bytes, Callable]] = deque()
# noinspection PyUnusedLocal
def read(self, *args):
with self.handshake():
if not self.data:
return b""
data = self.data.popleft()
return data() if callable(data) else data
class FakeOutput:
"""Common output/http-server/progress interface, for caching all write() calls and simulating write errors"""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.handshake = Handshake()
self.data: List[bytes] = []
def write(self, data):
with self.handshake():
return self._write(data)
def _write(self, data):
self.data.append(data)
class FakePlayerOutput(FakeOutput, PlayerOutput):
pass
class FakeFileOutput(FakeOutput, FileOutput):
pass
class FakeHTTPServer(FakeOutput, HTTPServer):
def __init__(self, *args, **kwargs):
with patch("streamlink_cli.utils.http_server.socket"):
super().__init__(*args, **kwargs)
class FakeProgress(FakeOutput, Progress):
# we're not interested in any application logic of the Progress class
update = print_end = lambda *_, **__: None
class FakeStreamRunner(StreamRunner):
# override and remove optional typing annotations
playerpoller: EventedPlayerPollThread
progress: FakeProgress
@pytest.fixture(autouse=True)
def _logging(caplog: pytest.LogCaptureFixture):
assert streamrunnerlogger.name == "streamlink.cli"
caplog.set_level(1, "streamlink")
@pytest.fixture(autouse=True)
def isatty(request: pytest.FixtureRequest):
with patch("sys.stdout.isatty", return_value=getattr(request, "param", False)):
yield
@pytest.fixture
def stream():
stream = FakeStream()
yield stream
assert stream.closed
# "stream_runner" fixture dependency declared in downstream scopes
@pytest.fixture
def runnerthread(request: pytest.FixtureRequest, stream_runner: StreamRunner):
class RunnerThread(Thread):
exception = None
def run(self):
try:
super().run()
except BaseException as err:
self.exception = err
thread = RunnerThread(
daemon=True,
name="Runner thread",
target=stream_runner.run,
args=(b"prebuffer",),
)
yield thread
assert_thread_termination(thread, "Runner thread has terminated")
exception = getattr(request, "param", {}).get("exception", None)
assert isinstance(thread.exception, type(exception))
assert str(thread.exception) == str(exception)
async def assert_handshake_steps(*items):
"""
Run handshake steps concurrently, to not be dependent too much on implementation details and the order of handshakes.
For example, concurrently await one read(), one write() and one progress() call.
"""
steps = asyncio.gather(
*(item.handshake.asyncstep(TIMEOUT_AWAIT_HANDSHAKE) for item in items),
return_exceptions=True,
)
assert await steps == [True for _ in items]
def assert_thread_termination(thread: Thread, assertion: str):
thread.join(TIMEOUT_AWAIT_THREADJOIN)
assert not thread.is_alive(), assertion
class TestPlayerOutput:
@pytest.fixture
def player_process(self):
player_process = Mock()
player_process.poll = Mock(return_value=None)
return player_process
@pytest.fixture
def output(self, player_process: Mock):
with patch("subprocess.Popen") as mock_popen, \
patch("streamlink_cli.output.sleep"):
mock_popen.return_value = player_process
output = FakePlayerOutput("mocked")
output.open()
yield output
@pytest.fixture
def stream_runner(self, stream: FakeStream, output: FakePlayerOutput):
with patch("streamlink_cli.streamrunner.PlayerPollThread", EventedPlayerPollThread):
stream_runner = StreamRunner(stream, output)
assert isinstance(stream_runner.playerpoller, EventedPlayerPollThread)
assert not stream_runner.playerpoller.is_alive()
assert not stream_runner.is_http
assert not stream_runner.progress
yield stream_runner
assert not stream_runner.playerpoller.is_alive()
@pytest.mark.asyncio
async def test_read_write(
self,
caplog: pytest.LogCaptureFixture,
runnerthread: Thread,
stream_runner: FakeStreamRunner,
stream: FakeStream,
output: FakePlayerOutput,
):
stream.data.extend((b"foo", b"bar"))
runnerthread.start()
assert output.data == []
# write prebuffer
await assert_handshake_steps(output)
assert output.data == [b"prebuffer"]
# read and write next chunk
await assert_handshake_steps(stream, output)
assert output.data == [b"prebuffer", b"foo"]
# poll player process
await assert_handshake_steps(stream_runner.playerpoller)
assert stream_runner.playerpoller.is_alive()
# read and write next chunk
await assert_handshake_steps(stream, output)
assert output.data == [b"prebuffer", b"foo", b"bar"]
assert not stream.closed, "Stream is not closed"
# read stream end
await assert_handshake_steps(stream)
assert output.data == [b"prebuffer", b"foo", b"bar"]
# wait for runner thread to terminate first before asserting log records
assert_thread_termination(runnerthread, "Runner thread has terminated")
assert [(record.module, record.levelname, record.message) for record in caplog.records] == [
("streamrunner", "info", "Stream ended"),
]
@pytest.mark.asyncio
async def test_paused(
self,
caplog: pytest.LogCaptureFixture,
runnerthread: Thread,
stream_runner: FakeStreamRunner,
stream: FakeStream,
output: FakePlayerOutput,
):
delayed = Handshake()
def item():
with delayed():
return b"delayed"
stream.data.append(item)
runnerthread.start()
assert output.data == []
# write prebuffer
await assert_handshake_steps(output)
assert output.data == [b"prebuffer"]
assert not delayed.wait_ready(0), "Delayed chunk has not been read yet"
# attempt reading delayed chunk
stream.handshake.go()
assert delayed.wait_ready(TIMEOUT_AWAIT_HANDSHAKE), "read() call of delayed chunk is paused"
assert output.data == [b"prebuffer"]
assert not stream.closed, "Stream is not closed"
# poll player process
await assert_handshake_steps(stream_runner.playerpoller)
assert stream_runner.playerpoller.is_alive()
# unpause delayed chunk
delayed.go()
assert stream.handshake.wait_done(TIMEOUT_AWAIT_HANDSHAKE), "Delayed chunk has successfully been read"
await assert_handshake_steps(output)
assert output.data == [b"prebuffer", b"delayed"]
assert not stream.closed, "Stream is not closed"
# read stream end
await assert_handshake_steps(stream)
assert output.data == [b"prebuffer", b"delayed"]
# wait for runner thread to terminate first before asserting log records
assert_thread_termination(runnerthread, "Runner thread has terminated")
assert [(record.module, record.levelname, record.message) for record in caplog.records] == [
("streamrunner", "info", "Stream ended"),
]
@pytest.mark.asyncio
@pytest.mark.parametrize(
"writeerror,runnerthread",
[
pytest.param(
OSError(errno.EPIPE, "Broken pipe"),
{},
id="Acceptable error: EPIPE",
),
pytest.param(
OSError(errno.EINVAL, "Invalid argument"),
{},
id="Acceptable error: EINVAL",
),
pytest.param(
OSError(errno.ECONNRESET, "Connection reset"),
{},
id="Acceptable error: ECONNRESET",
),
pytest.param(
OSError("Unknown error"),
{"exception": OSError("Error when writing to output: Unknown error, exiting")},
id="Non-acceptable error",
),
],
indirect=["runnerthread"],
)
async def test_player_close(
self,
caplog: pytest.LogCaptureFixture,
runnerthread: Thread,
stream_runner: FakeStreamRunner,
stream: FakeStream,
output: FakePlayerOutput,
player_process: Mock,
writeerror: Exception,
):
stream.data.extend((b"foo", b"bar"))
runnerthread.start()
assert output.data == []
# write prebuffer
await assert_handshake_steps(output)
assert output.data == [b"prebuffer"]
# poll player process
await assert_handshake_steps(stream_runner.playerpoller)
assert stream_runner.playerpoller.is_alive()
# read and write next chunk
await assert_handshake_steps(stream, output)
assert output.data == [b"prebuffer", b"foo"]
assert not stream.closed, "Stream is not closed yet"
# close player
with patch.object(output, "_write", side_effect=writeerror):
# let player process terminate with code 0 and poll process once
player_process.poll.return_value = 0
await assert_handshake_steps(stream_runner.playerpoller)
assert_thread_termination(stream_runner.playerpoller, "Polling has stopped after player process terminated")
assert stream.closed, "Stream got closed after the player was closed"
# read and write next chunk (write will now also raise)
await assert_handshake_steps(stream, output)
assert output.data == [b"prebuffer", b"foo"]
# wait for runner thread to terminate first before asserting log records
assert_thread_termination(runnerthread, "Runner thread has terminated")
assert [(record.module, record.levelname, record.message) for record in caplog.records] == [
("streamrunner", "info", "Player closed"),
("streamrunner", "info", "Stream ended"),
]
@pytest.mark.asyncio
async def test_player_close_paused(
self,
caplog: pytest.LogCaptureFixture,
runnerthread: Thread,
stream_runner: FakeStreamRunner,
stream: FakeStream,
output: FakePlayerOutput,
player_process: Mock,
):
delayed = Handshake()
def item():
with delayed():
return b""
stream.data.append(item)
runnerthread.start()
assert output.data == []
# write prebuffer
await assert_handshake_steps(output)
assert output.data == [b"prebuffer"]
assert not delayed.wait_ready(0), "Delayed chunk has not been read yet"
# poll player process
await assert_handshake_steps(stream_runner.playerpoller)
assert stream_runner.playerpoller.is_alive()
stream.handshake.go()
assert delayed.wait_ready(TIMEOUT_AWAIT_HANDSHAKE), "read() call of delayed chunk is paused"
assert output.data == [b"prebuffer"]
assert not stream.closed, "Stream is not closed yet"
# let player process terminate with code 0 and poll process once
player_process.poll.return_value = 0
await assert_handshake_steps(stream_runner.playerpoller)
assert_thread_termination(stream_runner.playerpoller, "Polling has stopped after player process terminated")
assert stream.closed, "Stream got closed after the player was closed, even if the stream was paused"
# unpause delayed chunk (stream end)
delayed.go()
assert stream.handshake.wait_done(TIMEOUT_AWAIT_HANDSHAKE), "Delayed chunk has successfully been read"
assert output.data == [b"prebuffer"]
# wait for runner thread to terminate first before asserting log records
assert_thread_termination(runnerthread, "Runner thread has terminated")
assert [(record.module, record.levelname, record.message) for record in caplog.records] == [
("streamrunner", "info", "Player closed"),
("streamrunner", "info", "Stream ended"),
]
@pytest.mark.asyncio
@pytest.mark.parametrize(
"runnerthread",
[{"exception": OSError("Error when reading from stream: Read timeout, exiting")}],
indirect=["runnerthread"],
)
async def test_readerror(
self,
caplog: pytest.LogCaptureFixture,
runnerthread: Thread,
stream_runner: FakeStreamRunner,
stream: FakeStream,
output: FakePlayerOutput,
):
# make next read() call raise a read-timeout error
stream.data.append(Mock(side_effect=OSError("Read timeout")))
runnerthread.start()
assert output.data == []
# write prebuffer
await assert_handshake_steps(output)
assert output.data == [b"prebuffer"]
# poll player process
await assert_handshake_steps(stream_runner.playerpoller)
assert stream_runner.playerpoller.is_alive()
# read stream (will raise a read timeout)
await assert_handshake_steps(stream)
# poll player process again
await assert_handshake_steps(stream_runner.playerpoller)
assert_thread_termination(stream_runner.playerpoller, "Polling has stopped on read error")
# wait for runner thread to terminate first before asserting log records
assert_thread_termination(runnerthread, "Runner thread has terminated")
assert [(record.module, record.levelname, record.message) for record in caplog.records] == [
("streamrunner", "info", "Stream ended"),
]
class TestHTTPServer:
@pytest.fixture
def output(self):
return FakeHTTPServer()
@pytest.fixture
def stream_runner(self, stream: FakeStream, output: FakeHTTPServer):
stream_runner = StreamRunner(stream, output)
assert not stream_runner.playerpoller
assert not stream_runner.progress
assert stream_runner.is_http
yield stream_runner
@pytest.mark.asyncio
async def test_read_write(
self,
caplog: pytest.LogCaptureFixture,
runnerthread: Thread,
stream_runner: FakeStreamRunner,
stream: FakeStream,
output: FakeHTTPServer,
):
stream.data.extend((b"foo", b"bar"))
runnerthread.start()
assert output.data == []
# write prebuffer
await assert_handshake_steps(output)
assert output.data == [b"prebuffer"]
# read and write next chunk
await assert_handshake_steps(stream, output)
assert output.data == [b"prebuffer", b"foo"]
# read and write next chunk
await assert_handshake_steps(stream, output)
assert output.data == [b"prebuffer", b"foo", b"bar"]
assert not stream.closed, "Stream is not closed"
# read stream end
await assert_handshake_steps(stream)
assert output.data == [b"prebuffer", b"foo", b"bar"]
# wait for runner thread to terminate first before asserting log records
assert_thread_termination(runnerthread, "Runner thread has terminated")
assert [(record.module, record.levelname, record.message) for record in caplog.records] == [
("streamrunner", "info", "Stream ended"),
]
@pytest.mark.parametrize(
"writeerror,logs,runnerthread",
[
pytest.param(
OSError(errno.EPIPE, "Broken pipe"),
True,
{},
id="Acceptable error: EPIPE",
),
pytest.param(
OSError(errno.EINVAL, "Invalid argument"),
True,
{},
id="Acceptable error: EINVAL",
),
pytest.param(
OSError(errno.ECONNRESET, "Connection reset"),
True,
{},
id="Acceptable error: ECONNRESET",
),
pytest.param(
OSError("Unknown error"),
False,
{"exception": OSError("Error when writing to output: Unknown error, exiting")},
id="Non-acceptable error",
),
],
indirect=["runnerthread"],
)
def test_writeerror(
self,
caplog: pytest.LogCaptureFixture,
runnerthread: Thread,
stream_runner: FakeStreamRunner,
stream: FakeStream,
output: FakePlayerOutput,
logs: bool,
writeerror: Exception,
):
runnerthread.start()
with patch.object(output, "_write", side_effect=writeerror):
assert output.handshake.step(TIMEOUT_AWAIT_HANDSHAKE)
assert output.data == []
# wait for runner thread to terminate first before asserting log records
assert_thread_termination(runnerthread, "Runner thread has terminated")
expectedlogs = (
([("streamrunner", "info", "HTTP connection closed")] if logs else [])
+ [("streamrunner", "info", "Stream ended")]
)
assert [(record.module, record.levelname, record.message) for record in caplog.records] == expectedlogs
@pytest.mark.parametrize(
"isatty,force_progress",
[
pytest.param(False, True, id="No TTY, force"),
pytest.param(True, False, id="TTY, no force"),
],
indirect=["isatty"],
)
class TestHasProgress:
@pytest.mark.parametrize(
"output",
[
pytest.param(
FakePlayerOutput("mocked"),
id="Player output without record",
),
pytest.param(
FakeFileOutput(fd=Mock()),
id="FileOutput with file descriptor",
),
pytest.param(
FakeHTTPServer(),
id="HTTPServer",
),
],
)
def test_no_progress(
self,
output: Union[FakePlayerOutput, FakeFileOutput, FakeHTTPServer],
isatty: bool,
force_progress: bool,
):
stream_runner = FakeStreamRunner(StreamIO(), output, force_progress)
assert not stream_runner.progress
@pytest.mark.parametrize(
"output,expected",
[
pytest.param(
FakePlayerOutput("mocked", record=FakeFileOutput(Path("record"))),
Path("record"),
id="PlayerOutput with record",
),
pytest.param(
FakeFileOutput(filename=Path("filename")),
Path("filename"),
id="FileOutput with file name",
),
pytest.param(
FakeFileOutput(record=FakeFileOutput(filename=Path("record"))),
Path("record"),
id="FileOutput with record",
),
pytest.param(
FakeFileOutput(filename=Path("filename"), record=FakeFileOutput(filename=Path("record"))),
Path("filename"),
id="FileOutput with file name and record",
),
],
)
def test_has_progress(
self,
output: Union[FakePlayerOutput, FakeFileOutput],
isatty: bool,
force_progress: bool,
expected: Path,
):
stream_runner = FakeStreamRunner(StreamIO(), output, force_progress)
assert stream_runner.progress
assert not stream_runner.progress.is_alive()
assert stream_runner.progress.stream is sys.stderr
assert stream_runner.progress.path == expected
class TestProgress:
@pytest.fixture
def output(self):
yield FakeFileOutput(Path("filename"))
@pytest.fixture
def stream_runner(self, stream: FakeStream, output: FakeFileOutput):
with patch("streamlink_cli.streamrunner.Progress", FakeProgress):
stream_runner = FakeStreamRunner(stream, output, True)
assert not stream_runner.playerpoller
assert not stream_runner.is_http
assert isinstance(stream_runner.progress, FakeProgress)
assert stream_runner.progress.path == Path("filename")
assert not stream_runner.progress.is_alive()
yield stream_runner
assert not stream_runner.progress.is_alive()
@pytest.mark.asyncio
async def test_read_write(
self,
caplog: pytest.LogCaptureFixture,
runnerthread: Thread,
stream_runner: FakeStreamRunner,
stream: FakeStream,
output: FakeFileOutput,
):
stream.data.extend((b"foo", b"bar"))
runnerthread.start()
assert output.data == []
# write prebuffer
await assert_handshake_steps(output, stream_runner.progress)
assert output.data == [b"prebuffer"]
assert stream_runner.progress.data == [b"prebuffer"]
# read and write next chunk
await assert_handshake_steps(stream, output, stream_runner.progress)
assert output.data == [b"prebuffer", b"foo"]
assert stream_runner.progress.data == [b"prebuffer", b"foo"]
# read and write next chunk
await assert_handshake_steps(stream, output, stream_runner.progress)
assert output.data == [b"prebuffer", b"foo", b"bar"]
assert stream_runner.progress.data == [b"prebuffer", b"foo", b"bar"]
assert not stream.closed, "Stream is not closed"
# read stream end
await assert_handshake_steps(stream)
assert output.data == [b"prebuffer", b"foo", b"bar"]
assert stream_runner.progress.data == [b"prebuffer", b"foo", b"bar"]
# wait for runner thread to terminate first before asserting log records
assert_thread_termination(runnerthread, "Runner thread has terminated")
assert [(record.module, record.levelname, record.message) for record in caplog.records] == [
("streamrunner", "info", "Stream ended"),
]