Rewritten StreamProcess class (#441)

* Remove the unmaintained vendored pbs package

stream.streamprocess: remove use of pbs from StreamProcess in favour of subprocess
stream.rtmpdump: update to use the StreamProcess base class
packages.pbs: remove the now unused pbs module
stream.ffmpegmux: remove use of pbs in FFMPEGMuxer
cli.utils.stream: handle list parameters when making an rtmp url

* stream.rtmpdump: set rtmpdump logging settings based on streamlink logging settings
This commit is contained in:
beardypig 2017-01-27 17:35:24 +00:00 committed by Forrest
parent 9ce489dd76
commit 2cc12ec2fb
7 changed files with 275 additions and 627 deletions

View File

@ -6,6 +6,16 @@ is_py3 = (sys.version_info[0] == 3)
is_py33 = (sys.version_info[0] == 3 and sys.version_info[1] == 3)
is_win32 = os.name == "nt"
# win/nix compatible devnull
try:
from subprocess import DEVNULL
def devnull():
return DEVNULL
except ImportError:
def devnull():
return open(os.path.devnull, 'w')
if is_py2:
_str = str
str = unicode
@ -29,7 +39,12 @@ except ImportError:
from urllib import quote, unquote, urlencode
import Queue as queue
try:
from shutil import which
except ImportError:
from backports.shutil_which import which
__all__ = ["is_py2", "is_py3", "is_py33", "is_win32", "str", "bytes",
"urlparse", "urlunparse", "urljoin", "parse_qsl", "quote",
"unquote", "queue", "range", "urlencode"]
"unquote", "queue", "range", "urlencode", "devnull", "which"]

View File

@ -1,481 +0,0 @@
#===============================================================================
# Copyright (C) 2011-2012 by Andrew Moffat
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#===============================================================================
import subprocess as subp
import sys
import traceback
import os
import re
from glob import glob as original_glob
from types import ModuleType
from functools import partial
import warnings
import platform
__version__ = "0.110"
__project_url__ = "https://github.com/amoffat/pbs"
IS_PY3 = sys.version_info[0] == 3
if IS_PY3:
raw_input = input
unicode = str
else:
pass
DEFAULT_ENCODING = "utf-8"
class ErrorReturnCode(Exception):
truncate_cap = 200
def __init__(self, full_cmd, stdout, stderr):
self.full_cmd = full_cmd
self.stdout = stdout
self.stderr = stderr
if self.stdout is None:
tstdout = "<redirected>"
else:
tstdout = self.stdout[:self.truncate_cap]
out_delta = len(self.stdout) - len(tstdout)
if out_delta:
tstdout += ("... (%d more, please see e.stdout)" % out_delta).encode()
if self.stderr is None:
tstderr = "<redirected>"
else:
tstderr = self.stderr[:self.truncate_cap]
err_delta = len(self.stderr) - len(tstderr)
if err_delta:
tstderr += ("... (%d more, please see e.stderr)" % err_delta).encode()
msg = "\n\nRan: %r\n\nSTDOUT:\n\n %s\n\nSTDERR:\n\n %s" %\
(full_cmd, tstdout.decode(DEFAULT_ENCODING, "replace"),
tstderr.decode(DEFAULT_ENCODING, "replace"))
super(ErrorReturnCode, self).__init__(msg)
class CommandNotFound(Exception):
pass
rc_exc_regex = re.compile("ErrorReturnCode_(\d+)")
rc_exc_cache = {}
def get_rc_exc(rc):
rc = int(rc)
try:
return rc_exc_cache[rc]
except KeyError:
pass
name = "ErrorReturnCode_%d" % rc
exc = type(name, (ErrorReturnCode,), {})
rc_exc_cache[rc] = exc
return exc
def which(program):
def is_exe(fpath):
return os.path.exists(fpath) and os.access(fpath, os.X_OK)
fpath, fname = os.path.split(program)
if fpath:
if is_exe(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
return None
def resolve_program(program):
path = which(program)
if not path:
# our actual command might have a dash in it, but we can't call
# that from python (we have to use underscores), so we'll check
# if a dash version of our underscore command exists and use that
# if it does
if "_" in program:
path = which(program.replace("_", "-"))
if not path:
return None
return path
def glob(arg):
return original_glob(arg) or arg
def create_command(cmd):
return Command._create(cmd)
class RunningCommand(object):
def __init__(self, command_ran, process, call_args, stdin=None):
self.command_ran = command_ran
self.process = process
self._stdout = None
self._stderr = None
self.call_args = call_args
# we're running in the background, return self and let us lazily
# evaluate
if self.call_args["bg"]:
return
# we're running this command as a with context, don't do anything
# because nothing was started to run from Command.__call__
if self.call_args["with"]:
return
# run and block
if stdin:
stdin = stdin.encode(DEFAULT_ENCODING)
self._stdout, self._stderr = self.process.communicate(stdin)
self._handle_exit_code(self.process.wait())
def __enter__(self):
# we don't actually do anything here because anything that should
# have been done would have been done in the Command.__call__ call.
# essentially all that has to happen is the comand be pushed on
# the prepend stack.
pass
def __exit__(self, typ, value, traceback):
if self.call_args["with"] and Command._prepend_stack:
Command._prepend_stack.pop()
def __str__(self):
if IS_PY3:
return self.__unicode__()
else:
return unicode(self).encode(DEFAULT_ENCODING)
def __unicode__(self):
if self.process:
if self.call_args["bg"]:
self.wait()
if self._stdout:
return self.stdout()
else:
return ""
def __eq__(self, other):
return unicode(self) == unicode(other)
__hash__ = None # Avoid DeprecationWarning in Python < 3
def __contains__(self, item):
return item in str(self)
def __getattr__(self, p):
# let these three attributes pass through to the Popen object
if p in ("send_signal", "terminate", "kill"):
if self.process:
return getattr(self.process, p)
else:
raise AttributeError
return getattr(unicode(self), p)
def __repr__(self):
return "<RunningCommand %r, pid:%d, special_args:%r" % (
self.command_ran, self.process.pid, self.call_args)
def __long__(self):
return long(str(self).strip())
def __float__(self):
return float(str(self).strip())
def __int__(self):
return int(str(self).strip())
def stdout(self):
if self.call_args["bg"]:
self.wait()
return self._stdout.decode(DEFAULT_ENCODING, "replace")
def stderr(self):
if self.call_args["bg"]:
self.wait()
return self._stderr.decode(DEFAULT_ENCODING, "replace")
def wait(self):
self._stdout, self._stderr = self.process.communicate()
self._handle_exit_code(self.process.wait())
return str(self)
def _handle_exit_code(self, rc):
if rc not in self.call_args["ok_code"]:
raise get_rc_exc(rc)(self.command_ran, self._stdout, self._stderr)
def __len__(self):
return len(str(self))
class Command(object):
_prepend_stack = []
call_args = {
"fg": False, # run command in foreground
"bg": False, # run command in background
"with": False, # prepend the command to every command after it
"out": None, # redirect STDOUT
"err": None, # redirect STDERR
"err_to_out": None, # redirect STDERR to STDOUT
"in": None,
"env": os.environ,
"cwd": None,
# this is for commands that may have a different exit status than the
# normal 0. this can either be an integer or a list/tuple of ints
"ok_code": 0,
}
@classmethod
def _create(cls, program):
path = resolve_program(program)
if not path:
raise CommandNotFound(program)
return cls(path)
def __init__(self, path):
self._path = path
self._partial = False
self._partial_baked_args = []
self._partial_call_args = {}
def __getattribute__(self, name):
# convenience
getattr = partial(object.__getattribute__, self)
if name.startswith("_"):
return getattr(name)
if name == "bake":
return getattr("bake")
return getattr("bake")(name)
@staticmethod
def _extract_call_args(kwargs):
kwargs = kwargs.copy()
call_args = Command.call_args.copy()
for parg, default in call_args.items():
key = "_" + parg
if key in kwargs:
call_args[parg] = kwargs[key]
del kwargs[key]
return call_args, kwargs
def _format_arg(self, arg):
if IS_PY3:
arg = str(arg)
else:
try:
arg = unicode(arg, DEFAULT_ENCODING).encode(DEFAULT_ENCODING)
except TypeError:
arg = unicode(arg).encode(DEFAULT_ENCODING)
if self._partial:
escaped = arg.replace('"', '\\"')
escaped = escaped.replace("$", "\$")
escaped = escaped.replace("`", "\`")
arg = '"{0}"'.format(escaped)
return arg
def _compile_args(self, args, kwargs):
processed_args = []
# aggregate positional args
for arg in args:
if isinstance(arg, (list, tuple)):
if not arg:
warnings.warn("Empty list passed as an argument to %r. \
If you're using glob.glob(), please use pbs.glob() instead." % self.path, stacklevel=3)
for sub_arg in arg:
processed_args.append(self._format_arg(sub_arg))
else:
processed_args.append(self._format_arg(arg))
# aggregate the keyword arguments
for k, v in kwargs.items():
# we're passing a short arg as a kwarg, example:
# cut(d="\t")
if len(k) == 1:
processed_args.append("-" + k)
if v is not True:
processed_args.append(self._format_arg(v))
# we're doing a long arg
else:
k = k.replace("_", "-")
if v is True:
processed_args.append("--" + k)
else:
processed_args.append("--%s=%s" % (k, self._format_arg(v)))
return processed_args
def bake(self, *args, **kwargs):
fn = Command(self._path)
fn._partial = True
call_args, kwargs = self._extract_call_args(kwargs)
pruned_call_args = call_args
for k, v in Command.call_args.items():
try:
if pruned_call_args[k] == v:
del pruned_call_args[k]
except KeyError:
continue
fn._partial_call_args.update(self._partial_call_args)
fn._partial_call_args.update(pruned_call_args)
fn._partial_baked_args.extend(self._partial_baked_args)
fn._partial_baked_args.extend(fn._compile_args(args, kwargs))
return fn
def __str__(self):
if IS_PY3:
return self.__unicode__()
else:
return unicode(self).encode(DEFAULT_ENCODING)
def __repr__(self):
return str(self)
def __unicode__(self):
baked_args = " ".join(self._partial_baked_args)
if baked_args:
baked_args = " " + baked_args
return self._path + baked_args
def __eq__(self, other):
try:
return str(self) == str(other)
except Exception:
return False
__hash__ = None # Avoid DeprecationWarning in Python < 3
def __enter__(self):
Command._prepend_stack.append([self._path])
def __exit__(self, typ, value, traceback):
Command._prepend_stack.pop()
def __call__(self, *args, **kwargs):
kwargs = kwargs.copy()
args = list(args)
cmd = []
# aggregate any with contexts
for prepend in self._prepend_stack:
cmd.extend(prepend)
cmd.append(self._path)
call_args, kwargs = self._extract_call_args(kwargs)
call_args.update(self._partial_call_args)
# here we normalize the ok_code to be something we can do
# "if return_code in call_args["ok_code"]" on
if not isinstance(call_args["ok_code"], (tuple, list)):
call_args["ok_code"] = [call_args["ok_code"]]
# set pipe to None if we're outputting straight to CLI
pipe = None if call_args["fg"] else subp.PIPE
# check if we're piping via composition
input_stream = pipe
input_data = None
if args:
first_arg = args.pop(0)
if isinstance(first_arg, RunningCommand):
# it makes sense that if the input pipe of a command is running
# in the background, then this command should run in the
# background as well
if first_arg.call_args["bg"]:
call_args["bg"] = True
input_stream = first_arg.process.stdout
else:
input_data = first_arg.stdout()
else:
args.insert(0, first_arg)
processed_args = self._compile_args(args, kwargs)
# makes sure our arguments are broken up correctly
split_args = self._partial_baked_args + processed_args
final_args = split_args
cmd.extend(final_args)
command_ran = " ".join(cmd)
# with contexts shouldn't run at all yet, they prepend
# to every command in the context
if call_args["with"]:
Command._prepend_stack.append(cmd)
return RunningCommand(command_ran, None, call_args)
# stdin from string
input = call_args["in"]
if input:
input_data = input
# stdout redirection
stdout = pipe
out = call_args["out"]
if out:
if hasattr(out, "write"):
stdout = out
else:
stdout = open(str(out), "w")
# stderr redirection
stderr = pipe
err = call_args["err"]
if err:
if hasattr(err, "write"):
stderr = err
else:
stderr = open(str(err), "w")
if call_args["err_to_out"]:
stderr = subp.STDOUT
# leave shell=False
process = subp.Popen(cmd, shell=False, env=call_args["env"],
cwd=call_args["cwd"],
stdin=input_stream, stdout=stdout, stderr=stderr)
return RunningCommand(command_ran, process, call_args, input_data)

View File

@ -5,21 +5,10 @@ import threading
import subprocess
import sys
from streamlink import StreamError
from streamlink.packages import pbs
from streamlink.packages.pbs import CommandNotFound
from streamlink.stream import Stream
from streamlink.stream.stream import StreamIO
from streamlink.utils import NamedPipe
try:
from subprocess import DEVNULL
def devnull():
return DEVNULL
except ImportError:
def devnull():
return open(os.path.devnull, 'w')
from streamlink.compat import devnull, which
class MuxedStream(Stream):
__shortname__ = "muxed-stream"
@ -40,7 +29,7 @@ class MuxedStream(Stream):
return FFMPEGMuxer.is_usable(session)
class FFMPEGMuxer(object):
class FFMPEGMuxer(StreamIO):
__commands__ = ['ffmpeg', 'ffmpeg.exe', 'avconv', 'avconv.exe']
@staticmethod
@ -124,11 +113,8 @@ class FFMPEGMuxer(object):
if session.options.get("ffmpeg-ffmpeg"):
command.append(session.options.get("ffmpeg-ffmpeg"))
for cmd in command or cls.__commands__:
try:
pbs.create_command(cmd)
if which(cmd):
return cmd
except CommandNotFound:
continue
def read(self, size=-1):
data = self.process.stdout.read(size)

View File

@ -2,11 +2,12 @@ import re
from time import sleep
from .streamprocess import StreamProcess
from ..compat import str
from ..exceptions import StreamError
from ..packages import pbs as sh
from ..utils import rtmpparse
import subprocess
from streamlink.stream.streamprocess import StreamProcess
from streamlink.compat import str, which
from streamlink.exceptions import StreamError
from streamlink.utils import rtmpparse
class RTMPStream(StreamProcess):
@ -18,107 +19,100 @@ class RTMPStream(StreamProcess):
"""
__shortname__ = "rtmp"
logging_parameters = ("quiet", "verbose", "debug", "q", "V", "z")
def __init__(self, session, params, redirect=False):
StreamProcess.__init__(self, session, params)
def __init__(self, session, params, redirect=False, **kwargs):
StreamProcess.__init__(self, session, params=params, **kwargs)
self.cmd = self.session.options.get("rtmp-rtmpdump")
self.timeout = self.session.options.get("rtmp-timeout")
self.redirect = redirect
self.logger = session.logger.new_module("stream.rtmp")
# set rtmpdump logging level
if self.session.options.get("subprocess-errorlog-path") or \
self.session.options.get("subprocess-errorlog"):
# disable any current logging level
for p in self.logging_parameters:
self.parameters.pop(p, None)
if self.session.logger.Levels[self.session.logger.level] == "debug":
self.parameters["debug"] = True
else:
self.parameters["verbose"] = True
def __repr__(self):
return ("<RTMPStream({0!r}, redirect={1!r}>").format(self.params,
self.redirect)
return "<RTMPStream({0!r}, redirect={1!r}>".format(self.parameters,
self.redirect)
def __json__(self):
return dict(type=RTMPStream.shortname(), params=self.params)
return dict(type=RTMPStream.shortname(),
args=self.arguments,
params=self.parameters)
def open(self):
if self.session.options.get("rtmp-proxy"):
if not self._supports_param("socks"):
raise StreamError("Installed rtmpdump does not support --socks argument")
self.params["socks"] = self.session.options.get("rtmp-proxy")
self.parameters["socks"] = self.session.options.get("rtmp-proxy")
if "jtv" in self.params and not self._supports_param("jtv"):
if "jtv" in self.parameters and not self._supports_param("jtv"):
raise StreamError("Installed rtmpdump does not support --jtv argument")
if "weeb" in self.params and not self._supports_param("weeb"):
if "weeb" in self.parameters and not self._supports_param("weeb"):
raise StreamError("Installed rtmpdump does not support --weeb argument")
if self.redirect:
self._check_redirect()
self.params["flv"] = "-"
self.parameters["flv"] = "-"
return StreamProcess.open(self)
def _check_redirect(self, timeout=20):
cmd = self._check_cmd()
params = self.params.copy()
params = self.parameters.copy()
# remove any existing logging parameters
for p in self.logging_parameters:
params.pop(p, None)
# and explicitly set verbose
params["verbose"] = True
params["_bg"] = True
self.logger.debug("Attempting to find tcURL redirect")
stream = cmd(**params)
elapsed = 0
process_alive = True
while elapsed < timeout and process_alive:
stream.process.poll()
process_alive = stream.process.returncode is None
sleep(0.25)
elapsed += 0.25
if process_alive:
try:
stream.process.kill()
except Exception:
pass
stream.process.wait()
try:
stderr = stream.stderr()
except sh.ErrorReturnCode as err:
self._update_redirect(err.stderr)
process = self.spawn(params, timeout=timeout, stderr=subprocess.PIPE)
self._update_redirect(process.stderr.read())
def _update_redirect(self, stderr):
tcurl, redirect = None, None
stderr = str(stderr, "utf8")
m = re.search("DEBUG: Property: <Name:\s+redirect,\s+STRING:\s+(\w+://.+?)>", stderr)
m = re.search(r"DEBUG: Property: <Name:\s+redirect,\s+STRING:\s+(\w+://.+?)>", stderr)
if m:
redirect = m.group(1)
if redirect:
self.logger.debug("Found redirect tcUrl: {0}", redirect)
if "rtmp" in self.params:
tcurl, playpath = rtmpparse(self.params["rtmp"])
if "rtmp" in self.parameters:
tcurl, playpath = rtmpparse(self.parameters["rtmp"])
if playpath:
rtmp = "{redirect}/{playpath}".format(redirect=redirect, playpath=playpath)
else:
rtmp = redirect
self.params["rtmp"] = rtmp
self.parameters["rtmp"] = rtmp
if "tcUrl" in self.params:
self.params["tcUrl"] = redirect
def _supports_param(self, param):
cmd = self._check_cmd()
if "tcUrl" in self.parameters:
self.parameters["tcUrl"] = redirect
def _supports_param(self, param, timeout=5.0):
try:
help = cmd(help=True, _err_to_out=True)
except sh.ErrorReturnCode as err:
err = str(err.stdout, "ascii")
raise StreamError("Error while checking rtmpdump compatibility: {0}".format(err))
rtmpdump = self.spawn(dict(help=True), timeout=timeout, stderr=subprocess.PIPE)
except StreamError as err:
raise StreamError("Error while checking rtmpdump compatibility: {0}".format(err.message))
for line in help.splitlines():
m = re.match("^--(\w+)", line)
for line in rtmpdump.stderr.readlines():
m = re.match(r"^--(\w+)", str(line, "ascii"))
if not m:
continue
@ -132,4 +126,4 @@ class RTMPStream(StreamProcess):
def is_usable(cls, session):
cmd = session.options.get("rtmp-rtmpdump")
return StreamProcess.is_usable(cmd)
return which(cmd) is not None

View File

@ -1,100 +1,166 @@
from .stream import Stream
from .wrappers import StreamIOThreadWrapper
from ..compat import str
from ..exceptions import StreamError
from ..packages import pbs as sh
try:
from shutil import which
except ImportError:
from backports.shutil_which import which
import subprocess
from operator import itemgetter
from streamlink.stream import Stream
from streamlink.stream.wrappers import StreamIOThreadWrapper
from streamlink.compat import devnull, which
from streamlink.exceptions import StreamError
import os
import time
import tempfile
class StreamProcessIO(StreamIOThreadWrapper):
def __init__(self, session, process, **kwargs):
def __init__(self, session, process, fd, **kwargs):
self.process = process
StreamIOThreadWrapper.__init__(self, session,
process.stdout,
**kwargs)
super(StreamProcessIO, self).__init__(session, fd, **kwargs)
def close(self):
try:
self.process.kill()
except Exception:
pass
StreamIOThreadWrapper.close(self)
finally:
super(StreamProcessIO, self).close()
class StreamProcess(Stream):
def __init__(self, session, params=None, timeout=60.0):
Stream.__init__(self, session)
def __init__(self, session, params=None, args=None, timeout=60.0):
"""
if not params:
params = {}
:param session: Streamlink session
:param params: keyword arguments mapped to process argument
:param args: positional arguments
:param timeout: timeout for process
"""
super(StreamProcess, self).__init__(session)
self.cmd = None
self.parameters = params or {}
self.arguments = args or []
self.timeout = timeout
self.logger = session.logger.new_module("stream.process")
self.params = params
self.errorlog = self.session.options.get("subprocess-errorlog")
self.errorlog_path = self.session.options.get("subprocess-errorlog-path")
self.timeout = timeout
def open(self):
cmd = self._check_cmd()
params = self.params.copy()
params["_bg"] = True
if self.errorlog_path:
params["_err"] = open(self.errorlog_path, "w")
self.stderr = open(self.errorlog_path, "w")
elif self.errorlog:
tmpfile = tempfile.NamedTemporaryFile(prefix="streamlink",
suffix=".err", delete=False)
params["_err"] = tmpfile
self.stderr = tempfile.NamedTemporaryFile(prefix="streamlink", suffix=".err", delete=False)
else:
params["_err"] = open(os.devnull, "wb")
self.stderr = devnull()
with params["_err"]:
stream = cmd(**params)
@property
def params(self):
return self.parameters
@classmethod
def is_usable(cls, session):
raise NotImplementedError
def open(self):
process = self.spawn(self.parameters, self.arguments)
# Wait 0.5 seconds to see if program exited prematurely
time.sleep(0.5)
process_alive = stream.process.returncode is None
if not process_alive:
if self.errorlog:
if not process.poll() is None:
if hasattr(self.stderr, "name"):
raise StreamError(("Error while executing subprocess, "
"error output logged to: {0}").format(tmpfile.name))
"error output logged to: {0}").format(self.stderr.name))
else:
raise StreamError("Error while executing subprocess")
return StreamProcessIO(self.session, stream.process,
timeout=self.timeout)
return StreamProcessIO(self.session, process, process.stdout, timeout=self.timeout)
@classmethod
def bake(cls, cmd, parameters=None, arguments=None, short_option_prefix="-", long_option_prefix="--"):
cmdline = [cmd]
parameters = parameters or {}
arguments = arguments or []
def to_option(key):
if len(key) == 1: # short argument
return "{0}{1}".format(short_option_prefix, key)
else: # long argument
return "{0}{1}".format(long_option_prefix, key.replace("_", "-"))
# sorted for stability
for k, v in sorted(parameters.items(), key=itemgetter(0)):
if not isinstance(v, list): # long argument
cmdline.append(to_option(k))
if v is not True:
cmdline.append("{0}".format(v))
else: # duplicate the argument if given a list of values
for sv in v:
cmdline.append(to_option(k))
cmdline.append("{0}".format(sv))
# positional arguments last
cmdline.extend(arguments)
return cmdline
def spawn(self, parameters=None, arguments=None, stderr=None, timeout=None, short_option_prefix="-", long_option_prefix="--"):
"""
Spawn the process defined in `cmd`
parameters is converted to options the short and long option prefixes
if a list is given as the value, the parameter is repeated with each
value
If timeout is set the spawn will block until the process returns or
the timeout expires.
:param parameters: optional parameters
:param arguments: positional arguments
:param stderr: where to redirect stderr to
:param timeout: timeout for short lived process
:param long_option_prefix: option prefix, default -
:param short_option_prefix: long option prefix, default --
:return: spawned process
"""
stderr = stderr or self.stderr
cmd = self.bake(self._check_cmd(), parameters, arguments, short_option_prefix, long_option_prefix)
self.logger.debug("Spawning command: {}", subprocess.list2cmdline(cmd))
try:
process = subprocess.Popen(cmd, stderr=stderr, stdout=subprocess.PIPE)
except (OSError, IOError) as err:
raise StreamError("Failed to start process: {} ({})".format(self._check_cmd(), str(err)))
if timeout:
elapsed = 0
while elapsed < timeout and not process.poll():
time.sleep(0.25)
elapsed += 0.25
# kill after the timeout has expired and the process still hasn't ended
if not process.poll():
try:
self.logger.debug("Process timeout expired ({}s), killing process".format(timeout))
process.kill()
except Exception:
pass
process.wait()
return process
def cmdline(self):
return subprocess.list2cmdline(self.bake(self._check_cmd(), self.parameters, self.arguments))
def _check_cmd(self):
try:
cmd = sh.create_command(which(self.cmd) or self.cmd)
except sh.CommandNotFound as err:
raise StreamError("Unable to find {0} command".format(err))
if not self.cmd:
raise StreamError("`cmd' attribute not set")
cmd = which(self.cmd)
if not cmd:
raise StreamError("Unable to find `{0}' command".format(self.cmd))
return cmd
def cmdline(self):
cmd = self._check_cmd()
return str(cmd.bake(**self.params))
@classmethod
def is_usable(cls, cmd):
try:
cmd = sh.create_command(which(cmd) or cmd)
except sh.CommandNotFound:
return False
return True
__all__ = ["StreamProcess"]

View File

@ -1,3 +1,16 @@
def escape_librtmp(value):
if isinstance(value, bool):
value = "1" if value else "0"
if isinstance(value, int):
value = str(value)
# librtmp expects some characters to be escaped
value = value.replace("\\", "\\5c")
value = value.replace(" ", "\\20")
value = value.replace('"', "\\22")
return value
def stream_to_url(stream):
stream_type = type(stream).shortname()
@ -18,17 +31,11 @@ def stream_to_url(stream):
stream_params.pop("swfsize", None)
for key, value in stream_params.items():
if isinstance(value, bool):
value = str(int(value))
if isinstance(value, int):
value = str(value)
# librtmp expects some characters to be escaped
value = value.replace("\\", "\\5c")
value = value.replace(" ", "\\20")
value = value.replace('"', "\\22")
params.append("{0}={1}".format(key, value))
if isinstance(value, list):
for svalue in value:
params.append("{0}={1}".format(key, escape_librtmp(svalue)))
else:
params.append("{0}={1}".format(key, escape_librtmp(value)))
url = " ".join(params)

View File

@ -0,0 +1,61 @@
import unittest
import pytest
from mock import patch
from streamlink import StreamError
from streamlink import Streamlink
from streamlink.stream import StreamProcess
@pytest.mark.parametrize("parameters,arguments,expected", [
(dict(h=True), None, ["test", "-h"]),
(dict(foo="bar"), None, ["test", "--foo", "bar"]),
(dict(L="big"), None, ["test", "-L", "big"]),
(None, ["foo", "bar"], ["test", "foo", "bar"]),
(dict(extra="nothing", verbose=True, L="big"), None, ["test", "-L", "big", "--extra", "nothing", "--verbose"]),
(dict(extra=["a", "b", "c"]), None, ["test", "--extra", "a", "--extra", "b", "--extra", "c"]),
(dict(e=["a", "b", "c"]), None, ["test", "-e", "a", "-e", "b", "-e", "c"]),
])
def test_bake(parameters, arguments, expected):
assert expected == StreamProcess.bake("test", parameters or {}, arguments or [])
class TestStreamProcess(unittest.TestCase):
def test_bake_different_prefix(self):
self.assertEqual(["test", "/H", "/foo", "bar", "/help"],
StreamProcess.bake("test", dict(help=True, H=True, foo="bar"),
long_option_prefix="/", short_option_prefix="/"))
self.assertEqual(["test", "/?"],
StreamProcess.bake("test", {"?": True},
long_option_prefix="/", short_option_prefix="/"))
def test_check_cmd_none(self):
s = StreamProcess(Streamlink())
self.assertRaises(StreamError, s._check_cmd)
@patch('streamlink.stream.streamprocess.which')
def test_check_cmd_cat(self, which):
s = StreamProcess(Streamlink())
which.return_value = s.cmd = "test"
self.assertEqual("test", s._check_cmd())
@patch('streamlink.stream.streamprocess.which')
def test_check_cmd_nofound(self, which):
s = StreamProcess(Streamlink())
s.cmd = "test"
which.return_value = None
self.assertRaises(StreamError, s._check_cmd)
@patch('streamlink.stream.streamprocess.which')
def test_check_cmdline(self, which):
s = StreamProcess(Streamlink(), params=dict(help=True))
which.return_value = s.cmd = "test"
self.assertEqual("test --help", s.cmdline())
@patch('streamlink.stream.streamprocess.which')
def test_check_cmdline_long(self, which):
s = StreamProcess(Streamlink(), params=dict(out_file="test file.txt"))
which.return_value = s.cmd = "test"
self.assertEqual("test --out-file \"test file.txt\"", s.cmdline())