mirror of https://github.com/streamlink/streamlink
Ensure retries with HLS Streams (#522)
* stream.hls: ensure that other http requests in the hls stream class make retries. eg. playlist_reload, aes key requests as well as segments * stream.hls: fix unaccounted for errors when streaming HLS data
This commit is contained in:
parent
2c34781be4
commit
f5d862e281
|
@ -1,10 +1,13 @@
|
|||
import time
|
||||
from requests import Session, __build__ as requests_version
|
||||
from requests.adapters import HTTPAdapter
|
||||
from requests.exceptions import RequestException
|
||||
|
||||
from streamlink.packages.requests_file import FileAdapter
|
||||
|
||||
try:
|
||||
from requests.packages.urllib3.util import Timeout
|
||||
|
||||
TIMEOUT_ADAPTER_NEEDED = requests_version < 0x020300
|
||||
except ImportError:
|
||||
TIMEOUT_ADAPTER_NEEDED = False
|
||||
|
@ -142,25 +145,39 @@ class HTTPSession(Session):
|
|||
schema = kwargs.pop("schema", None)
|
||||
session = kwargs.pop("session", None)
|
||||
timeout = kwargs.pop("timeout", self.timeout)
|
||||
total_retries = kwargs.pop("retries", 0)
|
||||
retry_backoff = kwargs.pop("retry_backoff", 0.3)
|
||||
retry_max_backoff = kwargs.pop("retry_max_backoff", 10.0)
|
||||
retries = 0
|
||||
|
||||
if session:
|
||||
headers.update(session.headers)
|
||||
params.update(session.params)
|
||||
|
||||
try:
|
||||
res = Session.request(self, method, url,
|
||||
headers=headers,
|
||||
params=params,
|
||||
timeout=timeout,
|
||||
proxies=proxies,
|
||||
*args, **kwargs)
|
||||
if raise_for_status and res.status_code not in acceptable_status:
|
||||
res.raise_for_status()
|
||||
except (RequestException, IOError) as rerr:
|
||||
err = exception("Unable to open URL: {url} ({err})".format(url=url,
|
||||
err=rerr))
|
||||
err.err = rerr
|
||||
raise err
|
||||
while True:
|
||||
try:
|
||||
res = Session.request(self, method, url,
|
||||
headers=headers,
|
||||
params=params,
|
||||
timeout=timeout,
|
||||
proxies=proxies,
|
||||
*args, **kwargs)
|
||||
if raise_for_status and res.status_code not in acceptable_status:
|
||||
res.raise_for_status()
|
||||
break
|
||||
except KeyboardInterrupt:
|
||||
raise
|
||||
except Exception as rerr:
|
||||
if retries >= total_retries:
|
||||
err = exception("Unable to open URL: {url} ({err})".format(url=url,
|
||||
err=rerr))
|
||||
err.err = rerr
|
||||
raise err
|
||||
retries += 1
|
||||
# back off retrying, but only to a maximum sleep time
|
||||
delay = min(retry_max_backoff,
|
||||
retry_backoff * (2 ** (retries - 1)))
|
||||
time.sleep(delay)
|
||||
|
||||
if schema:
|
||||
res = schema.validate(res.text, name="response text", exception=PluginError)
|
||||
|
|
|
@ -53,6 +53,7 @@ class Streamlink(object):
|
|||
"hls-segment-threads": 1,
|
||||
"hls-segment-timeout": 10.0,
|
||||
"hls-timeout": 60.0,
|
||||
"hls-playlist-reload-attempts": 3,
|
||||
"http-stream-timeout": 60.0,
|
||||
"ringbuffer-size": 1024 * 1024 * 16, # 16 MB
|
||||
"rtmp-timeout": 60.0,
|
||||
|
|
|
@ -1,31 +1,23 @@
|
|||
import struct
|
||||
from collections import defaultdict, namedtuple
|
||||
|
||||
from Crypto.Cipher import AES
|
||||
|
||||
from streamlink.stream import hls_playlist
|
||||
from streamlink.stream.ffmpegmux import FFMPEGMuxer, MuxedStream
|
||||
from streamlink.utils.l10n import Localization
|
||||
|
||||
try:
|
||||
from Crypto.Cipher import AES
|
||||
import struct
|
||||
|
||||
|
||||
def num_to_iv(n):
|
||||
return struct.pack(">8xq", n)
|
||||
|
||||
|
||||
CAN_DECRYPT = True
|
||||
except ImportError:
|
||||
CAN_DECRYPT = False
|
||||
|
||||
from . import hls_playlist
|
||||
from .http import HTTPStream
|
||||
from .segmented import (SegmentedStreamReader,
|
||||
SegmentedStreamWriter,
|
||||
SegmentedStreamWorker)
|
||||
from streamlink.stream.http import HTTPStream
|
||||
from streamlink.stream.segmented import (SegmentedStreamReader,
|
||||
SegmentedStreamWriter,
|
||||
SegmentedStreamWorker)
|
||||
from ..exceptions import StreamError
|
||||
|
||||
Sequence = namedtuple("Sequence", "num segment")
|
||||
|
||||
|
||||
def num_to_iv(n):
|
||||
return struct.pack(">8xq", n)
|
||||
|
||||
|
||||
class HLSStreamWriter(SegmentedStreamWriter):
|
||||
def __init__(self, reader, *args, **kwargs):
|
||||
options = reader.stream.session.options
|
||||
|
@ -83,48 +75,37 @@ class HLSStreamWriter(SegmentedStreamWriter):
|
|||
try:
|
||||
request_params = self.create_request_params(sequence)
|
||||
return self.session.http.get(sequence.segment.uri,
|
||||
stream=True,
|
||||
timeout=self.timeout,
|
||||
exception=StreamError,
|
||||
retries=self.retries,
|
||||
**request_params)
|
||||
except StreamError as err:
|
||||
self.logger.error("Failed to open segment {0}: {1}", sequence.num, err)
|
||||
return self.fetch(sequence, retries - 1)
|
||||
|
||||
def write(self, sequence, res, chunk_size=8192, retries=None):
|
||||
retries = retries or self.retries
|
||||
if retries == 0:
|
||||
self.logger.error("Failed to open segment {0}", sequence.num)
|
||||
return
|
||||
try:
|
||||
if sequence.segment.key and sequence.segment.key.method != "NONE":
|
||||
try:
|
||||
decryptor = self.create_decryptor(sequence.segment.key,
|
||||
sequence.num)
|
||||
except StreamError as err:
|
||||
self.logger.error("Failed to create decryptor: {0}", err)
|
||||
self.close()
|
||||
return
|
||||
|
||||
for chunk in res.iter_content(chunk_size):
|
||||
# If the input data is not a multiple of 16, cut off any garbage
|
||||
garbage_len = len(chunk) % 16
|
||||
if garbage_len:
|
||||
self.logger.debug("Cutting off {0} bytes of garbage "
|
||||
"before decrypting", garbage_len)
|
||||
decrypted_chunk = decryptor.decrypt(chunk[:-garbage_len])
|
||||
else:
|
||||
decrypted_chunk = decryptor.decrypt(chunk)
|
||||
self.reader.buffer.write(decrypted_chunk)
|
||||
else:
|
||||
for chunk in res.iter_content(chunk_size):
|
||||
self.reader.buffer.write(chunk)
|
||||
except StreamError as err:
|
||||
self.logger.error("Failed to open segment {0}: {1}", sequence.num, err)
|
||||
return self.write(sequence,
|
||||
self.fetch(sequence, retries=self.retries),
|
||||
chunk_size=chunk_size,
|
||||
retries=retries - 1)
|
||||
def write(self, sequence, res, chunk_size=8192):
|
||||
if sequence.segment.key and sequence.segment.key.method != "NONE":
|
||||
try:
|
||||
decryptor = self.create_decryptor(sequence.segment.key,
|
||||
sequence.num)
|
||||
except StreamError as err:
|
||||
self.logger.error("Failed to create decryptor: {0}", err)
|
||||
self.close()
|
||||
return
|
||||
|
||||
for chunk in res.iter_content(chunk_size):
|
||||
# If the input data is not a multiple of 16, cut off any garbage
|
||||
garbage_len = len(chunk) % 16
|
||||
if garbage_len:
|
||||
self.logger.debug("Cutting off {0} bytes of garbage "
|
||||
"before decrypting", garbage_len)
|
||||
decrypted_chunk = decryptor.decrypt(chunk[:-garbage_len])
|
||||
else:
|
||||
decrypted_chunk = decryptor.decrypt(chunk)
|
||||
self.reader.buffer.write(decrypted_chunk)
|
||||
else:
|
||||
for chunk in res.iter_content(chunk_size):
|
||||
self.reader.buffer.write(chunk)
|
||||
|
||||
self.logger.debug("Download of segment {0} complete", sequence.num)
|
||||
|
||||
|
@ -139,6 +120,7 @@ class HLSStreamWorker(SegmentedStreamWorker):
|
|||
self.playlist_sequences = []
|
||||
self.playlist_reload_time = 15
|
||||
self.live_edge = self.session.options.get("hls-live-edge")
|
||||
self.playlist_reload_retries = self.session.options.get("hls-playlist-reload-attempts")
|
||||
|
||||
self.reload_playlist()
|
||||
|
||||
|
@ -150,8 +132,8 @@ class HLSStreamWorker(SegmentedStreamWorker):
|
|||
self.logger.debug("Reloading playlist")
|
||||
res = self.session.http.get(self.stream.url,
|
||||
exception=StreamError,
|
||||
retries=self.playlist_reload_retries,
|
||||
**self.reader.request_params)
|
||||
|
||||
try:
|
||||
playlist = hls_playlist.load(res.text, res.url)
|
||||
except ValueError as err:
|
||||
|
@ -177,9 +159,6 @@ class HLSStreamWorker(SegmentedStreamWorker):
|
|||
if first_sequence.segment.key and first_sequence.segment.key.method != "NONE":
|
||||
self.logger.debug("Segments in this playlist are encrypted")
|
||||
|
||||
if not CAN_DECRYPT:
|
||||
raise StreamError("Need pyCrypto or pycryptodome installed to decrypt this stream")
|
||||
|
||||
self.playlist_changed = ([s.num for s in self.playlist_sequences] !=
|
||||
[s.num for s in sequences])
|
||||
self.playlist_reload_time = (playlist.target_duration or
|
||||
|
@ -366,6 +345,8 @@ class HLSStream(HTTPStream):
|
|||
if check_streams:
|
||||
try:
|
||||
session_.http.get(playlist.uri, **request_params)
|
||||
except KeyboardInterrupt:
|
||||
raise
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
|
|
|
@ -600,6 +600,17 @@ transport.add_argument(
|
|||
Default is 3.
|
||||
"""
|
||||
)
|
||||
transport.add_argument(
|
||||
"--hls-playlist-reload-attempts",
|
||||
type=num(int, min=0),
|
||||
metavar="ATTEMPTS",
|
||||
help="""
|
||||
How many attempts should be done to reload the HLS playlist
|
||||
before giving up.
|
||||
|
||||
Default is 3.
|
||||
"""
|
||||
)
|
||||
transport.add_argument(
|
||||
"--hls-segment-threads",
|
||||
type=num(int, max=10),
|
||||
|
|
|
@ -707,6 +707,9 @@ def setup_options():
|
|||
if args.hls_segment_attempts:
|
||||
streamlink.set_option("hls-segment-attempts", args.hls_segment_attempts)
|
||||
|
||||
if args.hls_playlist_reload_attempts:
|
||||
streamlink.set_option("hls-playlist-reload-attempts", args.hls_playlist_reload_attempts)
|
||||
|
||||
if args.hls_segment_threads:
|
||||
streamlink.set_option("hls-segment-threads", args.hls_segment_threads)
|
||||
|
||||
|
|
Loading…
Reference in New Issue