Rearrange a few things.

- Clean up all the package __init__.py files
  and move code inside them into separate files.
- Add metadata to main __init__.py file.
- Move exceptions into a separate file.
- Let setup.py fetch version from source instead.
This commit is contained in:
Christopher Rosell 2013-02-08 02:00:44 +01:00
parent 7826f7c63d
commit c9a81522ea
19 changed files with 537 additions and 478 deletions

View File

@ -6,31 +6,16 @@
livestreamer documentation
==================================
Livestreamer is a library that can be used to retrieve information and stream data from
various livestreaming services, such as Twitch, Own3D or UStream.
.. automodule:: livestreamer
Exceptions
----------
The :mod:`livstreamer` module defines four exceptions:
The :mod:`livstreamer` module defines three exceptions:
.. exception:: PluginError
Common base class for the plugin related exceptions. It inherits
:exc:`Exception`.
.. exception:: NoPluginError
This exception is triggered when no plugin can found when calling :meth:`Livestreamer.resolve_url`.
It inherits :exc:`PluginError`.
.. exception:: StreamError
Common base class for stream related exceptions. It inherits
:exc:`Exception`.
.. autoexception:: PluginError
.. autoexception:: NoPluginError
.. autoexception:: StreamError
The livestreamer session
------------------------

View File

@ -1,10 +1,9 @@
#!/usr/bin/env python
from setuptools import setup
from sys import version_info
from os import name as os_name
from sys import version_info, path as sys_path
from os.path import abspath, dirname, join
version = "1.4.1"
deps = ["requests>=1.0,<2.0"]
packages = ["livestreamer",
"livestreamer.stream",
@ -17,13 +16,19 @@ if (version_info[0] == 2 and version_info[1] < 7) or \
(version_info[0] == 3 and version_info[1] < 2):
deps.append("argparse")
srcdir = join(dirname(abspath(__file__)), "src/")
sys_path.insert(0, srcdir)
import livestreamer
setup(name="livestreamer",
version=version,
description="CLI program that launches streams from various streaming services in a custom video player",
version=livestreamer.__version__,
description="CLI program that launches streams from various streaming services in a custom video player.",
url="https://github.com/chrippa/livestreamer",
author="Christopher Rosell",
author_email="chrippa@tanuki.se",
license="BSD",
license="Simplified BSD",
packages=packages,
package_dir={ "": "src" },
entry_points={

View File

@ -1,118 +1,22 @@
from . import plugins
from .compat import urlparse, is_win32
from .logger import Logger
from .options import Options
from .plugins import PluginError, NoStreamsError, NoPluginError
from .stream import StreamError
"""
import pkgutil
import imp
Livestreamer is a CLI program that launches live streams from various
streaming services in a custom video player but also provides an API
that allows you to interact with the stream data in your own application.
class Livestreamer(object):
"""
A Livestreamer session is used to keep track of plugins,
options and log settings.
"""
"""
def __init__(self):
self.options = Options({
"rtmpdump": is_win32 and "rtmpdump.exe" or "rtmpdump",
"rtmpdump-proxy": None,
"ringbuffer-size": 8192*4,
"hds-live-edge": 10.0,
"hds-fragment-buffer": 10,
"errorlog": False,
})
self.plugins = {}
self.logger = Logger()
self.load_builtin_plugins()
def set_option(self, key, value):
"""Set option *key* to *value*."""
self.options.set(key, value)
def get_option(self, key):
"""Return option *key*"""
return self.options.get(key)
def set_plugin_option(self, plugin, key, value):
"""Set plugin option *key* to *value* for the plugin *plugin*."""
if plugin in self.plugins:
plugin = self.plugins[plugin]
plugin.set_option(key, value)
def get_plugin_option(self, plugin, key):
"""Return plugin option *key* for the plugin *plugin*."""
if plugin in self.plugins:
plugin = self.plugins[plugin]
return plugin.get_option(key)
def set_loglevel(self, level):
"""
Set the log level to *level*.
Valid levels are: none, error, warning, info, debug.
"""
self.logger.set_level(level)
def set_logoutput(self, output):
"""
Set the log output to *output*. Expects a file like
object with a write method.
"""
self.logger.set_output(output)
def resolve_url(self, url):
"""
Attempt to find the correct plugin for *url* and return it.
Raises :exc:`NoPluginError` on failure.
"""
parsed = urlparse(url)
if len(parsed.scheme) == 0:
url = "http://" + url
for name, plugin in self.plugins.items():
if plugin.can_handle_url(url):
obj = plugin(url)
return obj
raise NoPluginError
def get_plugins(self):
"""
Returns the loaded plugins for the session.
"""
return self.plugins
def load_builtin_plugins(self):
self.load_plugins(plugins.__path__[0])
def load_plugins(self, path):
"""
Attempt to load plugins from the *path* directory.
"""
for loader, name, ispkg in pkgutil.iter_modules([path]):
file, pathname, desc = imp.find_module(name, [path])
self.load_plugin(name, file, pathname, desc)
def load_plugin(self, name, file, pathname, desc):
module = imp.load_module(name, file, pathname, desc)
if hasattr(module, "__plugin__"):
plugin = getattr(module, "__plugin__")
plugin.module = getattr(module, "__name__")
plugin.session = self
self.plugins[plugin.module] = plugin
if file:
file.close()
@property
def version(self):
return __version__
__all__ = ["PluginError", "NoStreamsError", "NoPluginError", "StreamError",
"Livestreamer"]
__title__ = "livestreamer"
__version__ = "1.4.1"
__license__ = "Simplified BSD"
__author__ = "Christopher Rosell"
__copyright__ = "Copyright 2011-2013 Christopher Rosell"
__credits__ = ["Christopher Rosell", "Athanasios Oikonomou",
"Gaspard Jankowiak", "Dominik Dabrowski",
"Toad King", "Niall McAndrew"]
from .exceptions import (PluginError, NoStreamsError,
NoPluginError, StreamError)
from .session import Livestreamer

View File

@ -6,7 +6,8 @@ import os
import sys
import subprocess
from livestreamer import *
from livestreamer import (Livestreamer, StreamError, PluginError,
NoPluginError)
from livestreamer.compat import input, stdout, file, is_win32
from livestreamer.stream import StreamProcess
from livestreamer.utils import ArgumentParser, JSONEncoder, NamedPipe

View File

@ -0,0 +1,23 @@
class PluginError(Exception):
""" Plugin related errors. """
class NoStreamsError(PluginError):
def __init__(self, url):
PluginError.__init__(self, ("No streams found on this URL: {0}").format(url))
class NoPluginError(PluginError):
"""
This exception is triggered when no plugin can found when
calling :meth:`Livestreamer.resolve_url`.
Inherits :exc:`PluginError`.
"""
class StreamError(Exception):
""" Stream related errors. """
__all__ = ["PluginError", "NoPluginError", "NoStreamsError", "StreamError"]

View File

@ -1,6 +1,10 @@
class Options(object):
def __init__(self, defaults={}):
self.options = defaults
def __init__(self, defaults=None):
if not defaults:
defaults = {}
self.defaults = defaults
self.options = defaults.copy()
def set(self, key, value):
self.options[key] = value

128
src/livestreamer/plugin.py Normal file
View File

@ -0,0 +1,128 @@
from .exceptions import NoStreamsError
from .options import Options
import re
SpecialQualityWeights = {
"live": 1080,
"hd": 1080,
"ehq": 720,
"hq": 576,
"sd": 576,
"sq": 360,
"iphonehigh": 230,
"iphonelow": 170,
}
def qualityweight(quality):
if quality in SpecialQualityWeights:
return SpecialQualityWeights[quality]
match = re.match("^(\d+)([k]|[p])$", quality)
if match:
if match.group(2) == "k":
bitrate = int(match.group(1))
# These calculations are very rough
if bitrate > 2000:
return bitrate / 3.4
elif bitrate > 1000:
return bitrate / 2.6
else:
return bitrate / 1.7
elif match.group(2) == "p":
return int(match.group(1))
return 0
class Plugin(object):
"""
A plugin can retrieve stream information from the *url* specified.
"""
options = Options()
def __init__(self, url):
self.url = url
self.logger = self.session.logger.new_module("plugin." + self.module)
@classmethod
def can_handle_url(cls, url):
raise NotImplementedError
@classmethod
def set_option(cls, key, value):
cls.options.set(key, value)
@classmethod
def get_option(cls, key):
return cls.options.get(key)
def get_streams(self, priority=["rtmp", "hls", "hds", "http",
"akamaihd"]):
"""
Retrieves and returns a :class:`dict` containing the streams.
The key is the name of the stream, most commonly the quality.
The value is a :class:`Stream` object.
Can contain the synonyms *best* and *worst* which points to the streams
which are likely to be of highest and lowest quality respectively.
The *priority* argument decides which stream type to use
when there is multiple streams with the same name.
*Changed in version 1.4.2:* Added *priority* argument.
"""
try:
ostreams = self._get_streams()
except NoStreamsError:
return {}
streams = {}
def sort_priority(s):
n = type(s).shortname()
try:
p = priority.index(n)
except ValueError:
p = 99
return p
for name, stream in ostreams.items():
if isinstance(stream, list):
sstream = sorted(stream, key=sort_priority)
for i, stream in enumerate(sstream):
if i == 0:
sname = name
else:
sname = type(stream).shortname()
sname = "{0}_{1}".format(name, sname)
streams[sname] = stream
else:
streams[name] = stream
sort = sorted(filter(qualityweight, streams.keys()),
key=qualityweight)
if len(sort) > 0:
best = sort[-1]
worst = sort[0]
streams["best"] = streams[best]
streams["worst"] = streams[worst]
return streams
def _get_streams(self):
raise NotImplementedError
__all__ = ["Plugin"]

View File

@ -1,139 +1,8 @@
from livestreamer.options import Options
"""
New plugins should use livestreamer.plugin.Plugin instead
of this module, but this is kept here for backwards
compatibility.
"""
import re
SpecialQualityWeights = {
"live": 1080,
"hd": 1080,
"ehq": 720,
"hq": 576,
"sd": 576,
"sq": 360,
"iphonehigh": 230,
"iphonelow": 170,
}
def qualityweight(quality):
if quality in SpecialQualityWeights:
return SpecialQualityWeights[quality]
match = re.match("^(\d+)([k]|[p])$", quality)
if match:
if match.group(2) == "k":
bitrate = int(match.group(1))
# These calculations are very rough
if bitrate > 2000:
return bitrate / 3.4
elif bitrate > 1000:
return bitrate / 2.6
else:
return bitrate / 1.7
elif match.group(2) == "p":
return int(match.group(1))
return 0
class Plugin(object):
"""
A plugin can retrieve stream information from the *url* specified.
"""
options = Options()
def __init__(self, url):
self.url = url
self.logger = self.session.logger.new_module("plugin." + self.module)
@classmethod
def can_handle_url(cls, url):
raise NotImplementedError
@classmethod
def set_option(cls, key, value):
cls.options.set(key, value)
@classmethod
def get_option(cls, key):
return cls.options.get(key)
def get_streams(self, priority=["rtmp", "hls", "hds", "http",
"akamaihd"]):
"""
Retrieves and returns a :class:`dict` containing the streams.
The key is the name of the stream, most commonly the quality.
The value is a :class:`Stream` object.
Can contain the synonyms *best* and *worst* which points to the streams
which are likely to be of highest respecticly lowest quality.
The *priority* argument decides which stream type to use
when there is multiple streams with the same name.
*Changed in version 1.4.2:* Added *priority* argument.
"""
try:
ostreams = self._get_streams()
except NoStreamsError:
return {}
streams = {}
def sort_priority(s):
n = type(s).shortname()
try:
p = priority.index(n)
except ValueError:
p = 99
return p
for name, stream in ostreams.items():
if isinstance(stream, list):
sstream = sorted(stream, key=sort_priority)
for i, stream in enumerate(sstream):
if i == 0:
sname = name
else:
sname = type(stream).shortname()
sname = "{0}_{1}".format(name, sname)
streams[sname] = stream
else:
streams[name] = stream
sort = sorted(filter(qualityweight, streams.keys()),
key=qualityweight)
if len(sort) > 0:
best = sort[-1]
worst = sort[0]
streams["best"] = streams[best]
streams["worst"] = streams[worst]
return streams
def _get_streams(self):
raise NotImplementedError
class PluginError(Exception):
pass
class NoStreamsError(PluginError):
def __init__(self, url):
PluginError.__init__(self, ("No streams found on this URL: {0}").format(url))
class NoPluginError(PluginError):
pass
__all__ = ["Plugin", "PluginError", "NoStreamsError", "NoPluginError"]
from ..exceptions import PluginError, NoStreamsError, NoPluginError
from ..plugin import Plugin

117
src/livestreamer/session.py Normal file
View File

@ -0,0 +1,117 @@
from . import plugins, __version__
from .compat import urlparse, is_win32
from .exceptions import NoPluginError
from .logger import Logger
from .options import Options
import pkgutil
import imp
class Livestreamer(object):
"""
A Livestreamer session is used to keep track of plugins,
options and log settings.
"""
def __init__(self):
self.options = Options({
"rtmpdump": is_win32 and "rtmpdump.exe" or "rtmpdump",
"rtmpdump-proxy": None,
"ringbuffer-size": 8192*4,
"hds-live-edge": 10.0,
"hds-fragment-buffer": 10,
"errorlog": False,
})
self.plugins = {}
self.logger = Logger()
self.load_builtin_plugins()
def set_option(self, key, value):
"""Set option *key* to *value*."""
self.options.set(key, value)
def get_option(self, key):
"""Return option *key*"""
return self.options.get(key)
def set_plugin_option(self, plugin, key, value):
"""Set plugin option *key* to *value* for the plugin *plugin*."""
if plugin in self.plugins:
plugin = self.plugins[plugin]
plugin.set_option(key, value)
def get_plugin_option(self, plugin, key):
"""Return plugin option *key* for the plugin *plugin*."""
if plugin in self.plugins:
plugin = self.plugins[plugin]
return plugin.get_option(key)
def set_loglevel(self, level):
"""
Set the log level to *level*.
Valid levels are: none, error, warning, info, debug.
"""
self.logger.set_level(level)
def set_logoutput(self, output):
"""
Set the log output to *output*. Expects a file like
object with a write method.
"""
self.logger.set_output(output)
def resolve_url(self, url):
"""
Attempt to find the correct plugin for *url* and return it.
Raises :exc:`NoPluginError` on failure.
"""
parsed = urlparse(url)
if len(parsed.scheme) == 0:
url = "http://" + url
for name, plugin in self.plugins.items():
if plugin.can_handle_url(url):
obj = plugin(url)
return obj
raise NoPluginError
def get_plugins(self):
"""
Returns the loaded plugins for the session.
"""
return self.plugins
def load_builtin_plugins(self):
self.load_plugins(plugins.__path__[0])
def load_plugins(self, path):
"""
Attempt to load plugins from the *path* directory.
"""
for loader, name, ispkg in pkgutil.iter_modules([path]):
file, pathname, desc = imp.find_module(name, [path])
self.load_plugin(name, file, pathname, desc)
def load_plugin(self, name, file, pathname, desc):
module = imp.load_module(name, file, pathname, desc)
if hasattr(module, "__plugin__"):
plugin = getattr(module, "__plugin__")
plugin.module = getattr(module, "__name__")
plugin.session = self
self.plugins[plugin.module] = plugin
if file:
file.close()
@property
def version(self):
return __version__
__all__ = ["Livestreamer"]

View File

@ -1,194 +1,10 @@
from ..compat import bytes, str
from ..packages import pbs as sh
from ..utils import RingBuffer
from threading import Thread
import io
import json
import os
import time
import tempfile
class StreamError(Exception):
pass
class Stream(object):
__shortname__ = "stream"
"""
This is a base class that should be inherited when implementing
different stream types. Should only be used directly from plugins.
"""
def __init__(self, session):
self.session = session
def __repr__(self):
return "<Stream()>"
def __json__(self):
return dict(type=Stream.shortname())
def open(self):
"""
Opens a connection to the stream.
Returns a file-like object than can be used to read data.
Raises :exc:`StreamError` on failure.
"""
raise NotImplementedError
@property
def json(self):
obj = self.__json__()
return json.dumps(obj)
@classmethod
def shortname(cls):
return cls.__shortname__
class StreamIOWrapper(io.IOBase):
"""Wraps file-like objects that are not inheriting from IOBase"""
def __init__(self, fd):
self.fd = fd
def read(self, size=-1):
return self.fd.read(size)
def close(self):
if hasattr(self.fd, "close"):
self.fd.close()
class StreamIOThreadWrapper(io.IOBase):
"""
Wraps a file-like object in a thread.
Useful for getting control over read timeout where
timeout handling is missing or out of our control.
"""
class Filler(Thread):
def __init__(self, fd, buffer):
Thread.__init__(self)
self.error = None
self.fd = fd
self.buffer = buffer
self.daemon = True
self.running = False
def run(self):
self.running = True
while self.running:
try:
data = self.fd.read(8192)
except IOError as error:
self.error = error
break
if len(data) == 0:
break
self.buffer.write(data)
self.stop()
def stop(self):
self.running = False
self.buffer.close()
if hasattr(self.fd, "close"):
try:
self.fd.close()
except Exception:
pass
def __init__(self, session, fd, timeout=30):
self.buffer = RingBuffer(session.get_option("ringbuffer-size"))
self.fd = fd
self.timeout = timeout
self.filler = StreamIOThreadWrapper.Filler(self.fd, self.buffer)
self.filler.start()
def read(self, size=-1):
if self.filler.error and self.buffer.length == 0:
raise self.filler.error
return self.buffer.read(size, block=self.filler.is_alive(),
timeout=self.timeout)
def close(self):
self.filler.stop()
if self.filler.is_alive():
self.filler.join()
class StreamProcess(Stream):
def __init__(self, session, params={}, timeout=30):
Stream.__init__(self, session)
self.params = params
self.errorlog = self.session.options.get("errorlog")
self.timeout = timeout
def open(self):
cmd = self._check_cmd()
params = self.params.copy()
params["_bg"] = True
if self.errorlog:
tmpfile = tempfile.NamedTemporaryFile(prefix="livestreamer",
suffix=".err", delete=False)
params["_err"] = tmpfile
else:
params["_err"] = open(os.devnull, "wb")
stream = cmd(**params)
# Wait 0.5 seconds to see if program exited prematurely
time.sleep(0.5)
process_alive = stream.process.returncode is None
if not process_alive:
if self.errorlog:
raise StreamError(("Error while executing subprocess, error output logged to: {0}").format(tmpfile.name))
else:
raise StreamError("Error while executing subprocess")
return StreamIOThreadWrapper(self.session, stream.process.stdout,
timeout=self.timeout)
def _check_cmd(self):
try:
cmd = sh.create_command(self.cmd)
except sh.CommandNotFound as err:
raise StreamError(("Unable to find {0} command").format(str(err)))
return cmd
def cmdline(self):
cmd = self._check_cmd()
return str(cmd.bake(**self.params))
@classmethod
def is_usable(cls, cmd):
try:
cmd = sh.create_command(cmd)
except sh.CommandNotFound as err:
return False
return True
from ..exceptions import StreamError
from .stream import Stream
from .akamaihd import AkamaiHDStream
from .hds import HDSStream
from .hls import HLSStream
from .http import HTTPStream
from .rtmpdump import RTMPStream
__all__ = ["StreamError", "Stream", "StreamProcess", "StreamIOWrapper",
"AkamaiHDStream", "HLSStream", "HDSStream", "HTTPStream", "RTMPStream"]
from .streamprocess import StreamProcess
from .wrappers import StreamIOWrapper, StreamIOThreadWrapper

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python
from . import Stream, StreamError
from .stream import Stream
from ..compat import str, bytes, urlparse
from ..exceptions import StreamError
from ..utils import Buffer, swfdecompress, swfverify, urlget, urlopen
from ..packages.flashmedia import FLV, FLVError
@ -136,7 +135,10 @@ class AkamaiHDStreamIO(io.IOBase):
self.send_control("sendingNewToken", headers=headers,
swf=self.swf)
def send_control(self, cmd, headers={}, **params):
def send_control(self, cmd, headers=None, **params):
if not headers:
headers = {}
url = self.ControlURLFormat.format(host=self.host,
streamname=self.streamname)

View File

@ -1,5 +1,6 @@
from . import Stream, StreamError
from .stream import Stream
from ..compat import urljoin, urlparse, bytes, queue, range
from ..exceptions import StreamError
from ..utils import absolute_url, urlget, res_xml, get_node_text, RingBuffer
from io import BytesIO, IOBase

View File

@ -1,6 +1,7 @@
from . import Stream, StreamError
from ..utils import urlget, RingBuffer, absolute_url
from .stream import Stream
from ..compat import urljoin, queue
from ..exceptions import StreamError
from ..utils import urlget, RingBuffer, absolute_url
from time import time, sleep
from threading import Lock, Thread, Timer

View File

@ -1,4 +1,7 @@
from . import Stream, StreamIOWrapper, StreamError
from .stream import Stream
from .wrappers import StreamIOWrapper
from ..exceptions import StreamError
from ..utils import urlget
from requests import Request

View File

@ -1,7 +1,8 @@
from . import StreamProcess, StreamError
from .streamprocess import StreamProcess
from ..compat import str, urljoin
from ..utils import rtmpparse
from ..exceptions import StreamError
from ..packages import pbs as sh
from ..utils import rtmpparse
from time import sleep

View File

@ -0,0 +1,38 @@
import json
class Stream(object):
__shortname__ = "stream"
"""
This is a base class that should be inherited when implementing
different stream types. Should only be created by plugins.
"""
def __init__(self, session):
self.session = session
def __repr__(self):
return "<Stream()>"
def __json__(self):
return dict(type=Stream.shortname())
def open(self):
"""
Opens a connection to the stream.
Returns a file-like object that can be used to read data.
Raises :exc:`StreamError` on failure.
"""
raise NotImplementedError
@property
def json(self):
obj = self.__json__()
return json.dumps(obj)
@classmethod
def shortname(cls):
return cls.__shortname__
__all__ = ["Stream"]

View File

@ -0,0 +1,73 @@
from .stream import Stream
from .wrappers import StreamIOThreadWrapper
from ..compat import bytes, str
from ..exceptions import StreamError
from ..packages import pbs as sh
import os
import time
import tempfile
class StreamProcess(Stream):
def __init__(self, session, params=None, timeout=30):
Stream.__init__(self, session)
if not params:
params = {}
self.params = params
self.errorlog = self.session.options.get("errorlog")
self.timeout = timeout
def open(self):
cmd = self._check_cmd()
params = self.params.copy()
params["_bg"] = True
if self.errorlog:
tmpfile = tempfile.NamedTemporaryFile(prefix="livestreamer",
suffix=".err", delete=False)
params["_err"] = tmpfile
else:
params["_err"] = open(os.devnull, "wb")
stream = cmd(**params)
# Wait 0.5 seconds to see if program exited prematurely
time.sleep(0.5)
process_alive = stream.process.returncode is None
if not process_alive:
if self.errorlog:
raise StreamError(("Error while executing subprocess, error output logged to: {0}").format(tmpfile.name))
else:
raise StreamError("Error while executing subprocess")
return StreamIOThreadWrapper(self.session, stream.process.stdout,
timeout=self.timeout)
def _check_cmd(self):
try:
cmd = sh.create_command(self.cmd)
except sh.CommandNotFound as err:
raise StreamError(("Unable to find {0} command").format(str(err)))
return cmd
def cmdline(self):
cmd = self._check_cmd()
return str(cmd.bake(**self.params))
@classmethod
def is_usable(cls, cmd):
try:
cmd = sh.create_command(cmd)
except sh.CommandNotFound as err:
return False
return True
__all__ = ["StreamProcess"]

View File

@ -0,0 +1,88 @@
from ..utils import RingBuffer
from threading import Thread
import io
class StreamIOWrapper(io.IOBase):
"""Wraps file-like objects that are not inheriting from IOBase"""
def __init__(self, fd):
self.fd = fd
def read(self, size=-1):
return self.fd.read(size)
def close(self):
if hasattr(self.fd, "close"):
self.fd.close()
class StreamIOThreadWrapper(io.IOBase):
"""
Wraps a file-like object in a thread.
Useful for getting control over read timeout where
timeout handling is missing or out of our control.
"""
class Filler(Thread):
def __init__(self, fd, buffer):
Thread.__init__(self)
self.error = None
self.fd = fd
self.buffer = buffer
self.daemon = True
self.running = False
def run(self):
self.running = True
while self.running:
try:
data = self.fd.read(8192)
except IOError as error:
self.error = error
break
if len(data) == 0:
break
self.buffer.write(data)
self.stop()
def stop(self):
self.running = False
self.buffer.close()
if hasattr(self.fd, "close"):
try:
self.fd.close()
except Exception:
pass
def __init__(self, session, fd, timeout=30):
self.buffer = RingBuffer(session.get_option("ringbuffer-size"))
self.fd = fd
self.timeout = timeout
self.filler = StreamIOThreadWrapper.Filler(self.fd, self.buffer)
self.filler.start()
def read(self, size=-1):
if self.filler.error and self.buffer.length == 0:
raise self.filler.error
return self.buffer.read(size, block=self.filler.is_alive(),
timeout=self.timeout)
def close(self):
self.filler.stop()
if self.filler.is_alive():
self.filler.join()
__all__ = ["StreamIOWrapper", "StreamIOThreadWrapper"]

View File

@ -1,5 +1,5 @@
from .compat import bytes, is_win32, urljoin, urlparse, parse_qsl
from .plugins import PluginError
from .exceptions import PluginError
from threading import Event, Lock