1
mirror of https://github.com/streamlink/streamlink synced 2024-11-01 01:19:33 +01:00

stream: add FilteredStream and abstract filtering

This commit is contained in:
bastimeyer 2022-11-17 00:03:15 +01:00 committed by Forrest
parent 485a5b54b4
commit 28fd94df49
3 changed files with 77 additions and 46 deletions

View File

@ -0,0 +1,47 @@
from threading import Event
from streamlink.buffers import Buffer
from streamlink.stream.stream import StreamIO
class FilteredStream(StreamIO):
"""StreamIO mixin for being able to pause read calls while filtering content"""
buffer: Buffer
def __init__(self, *args, **kwargs):
self._event_filter = Event()
self._event_filter.set()
super().__init__(*args, **kwargs)
def read(self, *args, **kwargs) -> bytes:
read = super().read
while True:
try:
return read(*args, **kwargs)
except OSError:
# wait indefinitely until filtering ends
self._event_filter.wait()
if self.buffer.closed:
return b""
# if data is available, try reading again
if self.buffer.length > 0:
continue
# raise if not filtering and no data available
raise
def close(self) -> None:
super().close()
self._event_filter.set()
def is_paused(self) -> bool:
return not self._event_filter.is_set()
def pause(self) -> None:
self._event_filter.clear()
def resume(self) -> None:
self._event_filter.set()
def filter_wait(self, timeout=None):
return self._event_filter.wait(timeout)

View File

@ -2,7 +2,6 @@ import logging
import re
import struct
from concurrent.futures import Future
from threading import Event
from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Union
from urllib.parse import urlparse
@ -13,8 +12,10 @@ from Crypto.Util.Padding import unpad
from requests import Response
from requests.exceptions import ChunkedEncodingError, ConnectionError, ContentDecodingError
from streamlink.buffers import RingBuffer
from streamlink.exceptions import StreamError
from streamlink.stream.ffmpegmux import FFMPEGMuxer, MuxedStream
from streamlink.stream.filtered import FilteredStream
from streamlink.stream.hls_playlist import ByteRange, Key, M3U8, Map, Media, Segment, load as load_hls_playlist
from streamlink.stream.http import HTTPStream
from streamlink.stream.segmented import SegmentedStreamReader, SegmentedStreamWorker, SegmentedStreamWriter
@ -208,9 +209,9 @@ class HLSStreamWriter(SegmentedStreamWriter):
return self._write(sequence, result, *data)
finally:
# unblock reader thread after writing data to the buffer
if not self.reader.filter_event.is_set():
if self.reader.is_paused():
log.info("Resuming stream output")
self.reader.filter_event.set()
self.reader.resume()
else:
log.debug(f"Discarding segment {sequence.num}")
@ -220,9 +221,9 @@ class HLSStreamWriter(SegmentedStreamWriter):
result.raw.drain_conn()
# block reader thread if filtering out segments
if self.reader.filter_event.is_set():
if not self.reader.is_paused():
log.info("Filtering out segments and pausing stream output")
self.reader.filter_event.clear()
self.reader.pause()
def _write(self, sequence: Sequence, result: Response, is_map: bool):
if sequence.segment.key and sequence.segment.key.method != "NONE":
@ -438,13 +439,14 @@ class HLSStreamWorker(SegmentedStreamWorker):
log.warning(f"Failed to reload playlist: {err}")
class HLSStreamReader(SegmentedStreamReader):
class HLSStreamReader(FilteredStream, SegmentedStreamReader):
__worker__ = HLSStreamWorker
__writer__ = HLSStreamWriter
worker: "HLSStreamWorker"
writer: "HLSStreamWriter"
stream: "HLSStream"
buffer: RingBuffer
def __init__(self, stream: "HLSStream"):
self.request_params = dict(stream.args)
@ -454,30 +456,8 @@ class HLSStreamReader(SegmentedStreamReader):
self.request_params.pop("timeout", None)
self.request_params.pop("url", None)
self.filter_event = Event()
self.filter_event.set()
super().__init__(stream)
def read(self, size):
while True:
try:
return super().read(size)
except OSError:
# wait indefinitely until filtering ends
self.filter_event.wait()
if self.buffer.closed:
return b""
# if data is available, try reading again
if self.buffer.length > 0:
continue
# raise if not filtering and no data available
raise
def close(self):
super().close()
self.filter_event.set()
class MuxedHLSStream(MuxedStream):
"""

View File

@ -53,6 +53,7 @@ class TestFilteredHLSStream(TestMixinStreamHLS, unittest.TestCase):
self.await_write(2)
data = self.await_read()
self.assertEqual(data, self.content(segments), "Does not filter by default")
self.assertTrue(reader.filter_wait(timeout=0))
@patch("streamlink.stream.hls.HLSStreamWriter.should_filter_sequence", new=filter_sequence)
@patch("streamlink.stream.hls.log")
@ -65,18 +66,18 @@ class TestFilteredHLSStream(TestMixinStreamHLS, unittest.TestCase):
])
data = b""
self.assertTrue(reader.filter_event.is_set(), "Doesn't let the reader wait if not filtering")
self.assertFalse(reader.is_paused(), "Doesn't let the reader wait if not filtering")
for i in range(2):
self.await_write(2)
self.assertEqual(len(mock_log.info.mock_calls), i * 2 + 1)
self.assertEqual(mock_log.info.mock_calls[i * 2 + 0], call("Filtering out segments and pausing stream output"))
self.assertFalse(reader.filter_event.is_set(), "Lets the reader wait if filtering")
self.assertTrue(reader.is_paused(), "Lets the reader wait if filtering")
self.await_write(2)
self.assertEqual(len(mock_log.info.mock_calls), i * 2 + 2)
self.assertEqual(mock_log.info.mock_calls[i * 2 + 1], call("Resuming stream output"))
self.assertTrue(reader.filter_event.is_set(), "Doesn't let the reader wait if not filtering")
self.assertFalse(reader.is_paused(), "Doesn't let the reader wait if not filtering")
data += self.await_read()
@ -99,7 +100,7 @@ class TestFilteredHLSStream(TestMixinStreamHLS, unittest.TestCase):
# simulate a timeout by having an empty buffer
# timeout value is set to 0
with self.assertRaises(IOError) as cm:
with self.assertRaises(OSError) as cm:
self.await_read()
self.assertEqual(str(cm.exception), "Read timeout", "Raises a timeout error when no data is available to read")
@ -110,16 +111,19 @@ class TestFilteredHLSStream(TestMixinStreamHLS, unittest.TestCase):
Playlist(2, [Segment(2), Segment(3)], end=True)
])
self.assertTrue(reader.filter_event.is_set(), "Doesn't let the reader wait if not filtering")
self.assertFalse(reader.is_paused(), "Doesn't let the reader wait if not filtering")
self.await_write(2)
self.assertFalse(reader.filter_event.is_set(), "Lets the reader wait if filtering")
self.assertTrue(reader.is_paused(), "Lets the reader wait if filtering")
# test the reader's filter_wait() method
self.assertFalse(reader.filter_wait(timeout=0), "Is filtering")
# make reader read (no data available yet)
thread.read_wait.set()
# once data becomes available, the reader continues reading
self.await_write()
self.assertTrue(reader.filter_event.is_set(), "Reader is not waiting anymore")
self.assertFalse(reader.is_paused(), "Reader is not waiting anymore")
thread.read_done.wait()
thread.read_done.clear()
@ -136,27 +140,27 @@ class TestFilteredHLSStream(TestMixinStreamHLS, unittest.TestCase):
Playlist(0, [SegmentFiltered(0), SegmentFiltered(1)], end=True)
])
# mock the reader thread's filter_event.wait method, so that the main thread can wait on its call
filter_event_wait_called = Event()
orig_wait = reader.filter_event.wait
# mock the reader thread's _event_filter.wait method, so that the main thread can wait on its call
event_filter_wait_called = Event()
orig_wait = reader._event_filter.wait
def mocked_wait(*args, **kwargs):
filter_event_wait_called.set()
event_filter_wait_called.set()
return orig_wait(*args, **kwargs)
with patch.object(reader.filter_event, "wait", side_effect=mocked_wait):
with patch.object(reader._event_filter, "wait", side_effect=mocked_wait):
self.start()
# write first filtered segment and trigger the filter_event's lock
self.assertTrue(reader.filter_event.is_set(), "Doesn't let the reader wait if not filtering")
# write first filtered segment and trigger the event_filter's lock
self.assertFalse(reader.is_paused(), "Doesn't let the reader wait if not filtering")
self.await_write()
self.assertFalse(reader.filter_event.is_set(), "Lets the reader wait if filtering")
self.assertTrue(reader.is_paused(), "Lets the reader wait if filtering")
# make reader read (no data available yet)
thread.read_wait.set()
# before calling reader.close(), wait until reader thread's filter_event.wait was called
if not filter_event_wait_called.wait(timeout=5): # pragma: no cover
raise RuntimeError("Missing filter_event.wait() call")
# before calling reader.close(), wait until reader thread's event_filter.wait was called
if not event_filter_wait_called.wait(timeout=5): # pragma: no cover
raise RuntimeError("Missing event_filter.wait() call")
# close stream while reader is waiting for filtering to end
thread.reader.close()