Boost the test coverage a bit (#362)

* tests: improving test coverage

* tests: improving test coverage of utils

* tests: ensure that all the plugins can be loaded

* tests: simple parse test for m3u8

* tests: mock tvplayer plugin requests

* tests: use mock from unittest where available

* test_hls_playlist: all strings are unicode

* plugins.{ine,atresplayer}: fix bug in regex for py26
This commit is contained in:
beardypig 2017-01-05 17:27:15 +00:00 committed by Forrest
parent 7e93d0c8df
commit d2717041c8
16 changed files with 527 additions and 21 deletions

View File

@ -8,6 +8,7 @@ omit =
*/python?.?/*
*__init__*
src/streamlink/packages/*
src/streamlink_cli/packages/*
exclude_lines =
pragma: no cover

3
.gitignore vendored
View File

@ -27,3 +27,6 @@ lib/
local/
share/
pip-selfcheck.json
# coverage
htmlcov

View File

@ -13,7 +13,7 @@ class AtresPlayer(Plugin):
url_re = re.compile(r"https?://(?:www.)?atresplayer.com/directos/television/(\w+)/?")
player_re = re.compile(r"""div.*?directo=(\d+)""")
stream_api = "https://servicios.atresplayer.com/api/urlVideoLanguage/v3/{id}/web/{id}|{time}|{hash}/es.xml"
manifest_re = re.compile(r"<resultDes>(.*)?</resultDes>")
manifest_re = re.compile(r"<resultDes>(.*?)</resultDes>")
@classmethod
def can_handle_url(cls, url):

View File

@ -12,7 +12,7 @@ from streamlink.stream import HLSStream
class INE(Plugin):
url_re = re.compile(r"""https://streaming.ine.com/play\#?/
([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})/?
(.*)?""", re.VERBOSE)
(.*?)""", re.VERBOSE)
play_url = "https://streaming.ine.com/play/{vid}/watch"
js_re = re.compile(r'''script type="text/javascript" src="(https://content.jwplatform.com/players/.*?)"''')
jwplayer_re = re.compile(r'''jwplayer\(".*?"\).setup\((\{.*\})\);''', re.DOTALL)

View File

@ -33,16 +33,17 @@ class TVPlayer(Plugin):
res = http.get(self.url, headers={"User-Agent": TVPlayer._user_agent})
stream_attrs = dict((k, v.strip('"')) for k, v in TVPlayer._stream_attrs_.findall(res.text))
# get the stream urls
res = http.post(TVPlayer.API_URL, data=dict(id=stream_attrs["resourceId"],
validate=stream_attrs["validate"],
platform=stream_attrs["platform"]))
if "resourceId" in stream_attrs and "validate" in stream_attrs and "platform" in stream_attrs:
# get the stream urls
res = http.post(TVPlayer.API_URL, data=dict(id=stream_attrs["resourceId"],
validate=stream_attrs["validate"],
platform=stream_attrs["platform"]))
stream_data = http.json(res, schema=TVPlayer._stream_schema)
stream_data = http.json(res, schema=TVPlayer._stream_schema)
return HLSStream.parse_variant_playlist(self.session,
stream_data["tvplayer"]["response"]["stream"],
headers={'user-agent': TVPlayer._user_agent})
return HLSStream.parse_variant_playlist(self.session,
stream_data["tvplayer"]["response"]["stream"],
headers={'user-agent': TVPlayer._user_agent})
__plugin__ = TVPlayer

View File

@ -4,7 +4,7 @@ import zlib
try:
import xml.etree.cElementTree as ET
except ImportError:
except ImportError: # pragma: no cover
import xml.etree.ElementTree as ET
from .compat import urljoin, urlparse, parse_qsl, is_py2
@ -139,7 +139,8 @@ def rtmpparse(url):
import requests
def urlget(url, *args, **kwargs):
def urlget(url, *args, **kwargs): # pragma: no cover
"""This function is deprecated."""
data = kwargs.pop("data", None)
exception = kwargs.pop("exception", PluginError)
@ -167,10 +168,11 @@ def urlget(url, *args, **kwargs):
return res
urlopen = urlget
def urlresolve(url):
def urlresolve(url): # pragma: no cover
"""This function is deprecated."""
res = urlget(url, stream=True, allow_redirects=False)
@ -180,12 +182,12 @@ def urlresolve(url):
return url
def res_xml(res, *args, **kw):
def res_xml(res, *args, **kw): # pragma: no cover
"""This function is deprecated."""
return parse_xml(res.text, *args, **kw)
def res_json(res, jsontype="JSON", exception=PluginError):
def res_json(res, jsontype="JSON", exception=PluginError): # pragma: no cover
"""This function is deprecated."""
try:
jsondata = res.json()
@ -200,12 +202,14 @@ def res_json(res, jsontype="JSON", exception=PluginError):
return jsondata
import hmac
import hashlib
SWF_KEY = b"Genuine Adobe Flash Player 001"
def swfverify(url):
def swfverify(url): # pragma: no cover
"""This function is deprecated."""
res = urlopen(url)
swf = swfdecompress(res.content)

View File

@ -1,6 +1,6 @@
from io import BytesIO
from itertools import repeat
from streamlink import NoStreamsError
from streamlink.plugins import Plugin
from streamlink.options import Options
from streamlink.stream import *
@ -25,6 +25,11 @@ class TestPlugin(Plugin):
return "test.se" in url
def _get_streams(self):
if "empty" in self.url:
return
if "NoStreamsError" in self.url:
raise NoStreamsError(self.url)
streams = {}
streams["test"] = TestStream(self.session)
streams["rtmp"] = RTMPStream(self.session, dict(rtmp="rtmp://test.se"))

View File

@ -1,6 +1,7 @@
import unittest
from streamlink.buffers import Buffer
from streamlink.buffers import Buffer, RingBuffer
class TestBuffer(unittest.TestCase):
def setUp(self):
@ -66,7 +67,51 @@ class TestBuffer(unittest.TestCase):
data[:] = b"reused!!"
self.assertEqual(self.buffer.read(), original)
def test_read_empty(self):
self.assertRaises(
StopIteration,
lambda: next(self.buffer._iterate_chunks(10)))
if __name__ == "__main__":
unittest.main()
class TestRingBuffer(unittest.TestCase):
BUFFER_SIZE = 8192*4
def setUp(self):
self.buffer = RingBuffer(size=self.BUFFER_SIZE)
def test_write(self):
self.buffer.write(b"1" * 8192)
self.buffer.write(b"2" * 4096)
self.assertEqual(self.buffer.length, 8192 + 4096)
def test_read(self):
self.buffer.write(b"1" * 8192)
self.buffer.write(b"2" * 4096)
self.assertEqual(self.buffer.length, 8192 + 4096)
self.assertEqual(self.buffer.read(4096), b"1" * 4096)
self.assertEqual(self.buffer.read(4096), b"1" * 4096)
self.assertEqual(self.buffer.read(), b"2" * 4096)
self.assertEqual(self.buffer.length, 0)
def test_read_timeout(self):
self.assertRaises(
IOError,
self.buffer.read, timeout=0.1)
def test_write_after_close(self):
self.buffer.close()
self.buffer.write(b"1" * 8192)
self.assertEqual(self.buffer.length, 0)
self.assertTrue(self.buffer.closed)
def test_resize(self):
self.assertEqual(self.buffer.buffer_size, self.BUFFER_SIZE)
self.buffer.resize(self.BUFFER_SIZE*2)
self.assertEqual(self.buffer.buffer_size, self.BUFFER_SIZE*2)
def test_free(self):
self.assertEqual(self.buffer.free, self.BUFFER_SIZE)
self.buffer.write(b'1' * 100)
self.assertEqual(self.buffer.free, self.BUFFER_SIZE-100)

77
tests/test_cache.py Normal file
View File

@ -0,0 +1,77 @@
import sys
import unittest
import tempfile
import os.path
import streamlink.cache
from shutil import rmtree
try:
from unittest.mock import patch
except ImportError:
from mock import patch
is_py2 = (sys.version_info[0] == 2)
class TestCache(unittest.TestCase):
def setUp(self):
self.tmp_dir = tempfile.mkdtemp("streamlink-test")
streamlink.cache.cache_dir = self.tmp_dir
self.cache = streamlink.cache.Cache("cache.json")
def tearDown(self):
rmtree(self.tmp_dir)
def test_get_no_file(self):
self.assertEqual(self.cache.get("missing-value"), None)
self.assertEqual(self.cache.get("missing-value", default="default"), "default")
def test_put_get(self):
self.cache.set("value", 1)
self.assertEqual(self.cache.get("value"), 1)
def test_put_get_prefix(self):
self.cache.key_prefix = "test"
self.cache.set("value", 1)
self.assertEqual(self.cache.get("value"), 1)
def test_key_prefix(self):
self.cache.key_prefix = "test"
self.cache.set("value", 1)
self.assertTrue("test:value" in self.cache._cache)
self.assertEqual(1, self.cache._cache["test:value"]["value"])
@patch('os.path.exists', return_value=True)
def test_load_fail(self, exists_mock):
if is_py2:
patch('__builtin__.open', side_effect=IOError)
else:
patch('streamlink.cache.open', side_effect=IOError)
self.cache._load()
self.assertEqual({}, self.cache._cache)
def test_expired(self):
self.cache.set("value", 10, expires=-1)
self.assertEqual(None, self.cache.get("value"))
def test_create_directory(self):
try:
streamlink.cache.cache_dir = os.path.join(tempfile.gettempdir(), "streamlink-test")
cache = streamlink.cache.Cache("cache.json")
self.assertFalse(os.path.exists(cache.filename))
cache.set("value", 10)
self.assertTrue(os.path.exists(cache.filename))
finally:
rmtree(streamlink.cache.cache_dir, ignore_errors=True)
@patch('os.makedirs', side_effect=OSError)
def test_create_directory_fail(self, makedirs):
try:
streamlink.cache.cache_dir = os.path.join(tempfile.gettempdir(), "streamlink-test")
cache = streamlink.cache.Cache("cache.json")
self.assertFalse(os.path.exists(cache.filename))
cache.set("value", 10)
self.assertFalse(os.path.exists(cache.filename))
finally:
rmtree(streamlink.cache.cache_dir, ignore_errors=True)

51
tests/test_cli_main.py Normal file
View File

@ -0,0 +1,51 @@
import tempfile
import sys
if sys.version_info[0:2] == (2, 6):
import unittest2 as unittest
else:
import unittest
import os.path
import streamlink_cli.main
try:
from unittest.mock import Mock, patch
except ImportError:
from mock import Mock, patch
from streamlink_cli.main import check_file_output
from streamlink_cli.output import FileOutput
class TestCLIMain(unittest.TestCase):
def test_check_file_output(self):
streamlink_cli.main.console = Mock()
self.assertIsInstance(check_file_output("test", False), FileOutput)
def test_check_file_output_exists(self):
tmpfile = tempfile.NamedTemporaryFile()
try:
streamlink_cli.main.console = console = Mock()
console.ask.return_value = "y"
self.assertTrue(os.path.exists(tmpfile.name))
self.assertIsInstance(check_file_output(tmpfile.name, False), FileOutput)
finally:
tmpfile.close()
def test_check_file_output_exists_force(self):
tmpfile = tempfile.NamedTemporaryFile()
try:
streamlink_cli.main.console = console = Mock()
self.assertTrue(os.path.exists(tmpfile.name))
self.assertIsInstance(check_file_output(tmpfile.name, True), FileOutput)
finally:
tmpfile.close()
@patch('sys.exit')
def test_check_file_output_exists_no(self, sys_exit):
tmpfile = tempfile.NamedTemporaryFile()
try:
streamlink_cli.main.console = console = Mock()
console.ask.return_value = "n"
self.assertTrue(os.path.exists(tmpfile.name))
check_file_output(tmpfile.name, False)
sys_exit.assert_called_with()
finally:
tmpfile.close()

View File

@ -6,7 +6,10 @@ else:
import os.path
import streamlink_cli.main
from mock import patch, ANY
try:
from unittest.mock import patch, ANY
except ImportError:
from mock import patch, ANY
from streamlink import Streamlink
from streamlink_cli.compat import is_win32

View File

@ -0,0 +1,95 @@
# coding=utf-8
from __future__ import unicode_literals
import sys
from streamlink.stream.hls_playlist import load, StreamInfo, Resolution, Media
if sys.version_info[0:2] == (2, 6):
import unittest2 as unittest
else:
import unittest
M3U8 = """
#EXTM3U
#EXT-X-MEDIA:TYPE=AUDIO,GROUP-ID="stereo",LANGUAGE="en",NAME="English",DEFAULT=YES,AUTOSELECT=YES,URI="audio/stereo/en/128kbit.m3u8"
#EXT-X-MEDIA:TYPE=AUDIO,GROUP-ID="stereo",LANGUAGE="dubbing",NAME="Dubbing",DEFAULT=NO,AUTOSELECT=YES,URI="audio/stereo/none/128kbit.m3u8"
#EXT-X-MEDIA:TYPE=AUDIO,GROUP-ID="surround",LANGUAGE="en",NAME="English",DEFAULT=YES,AUTOSELECT=YES,URI="audio/surround/en/320kbit.m3u8"
#EXT-X-MEDIA:TYPE=AUDIO,GROUP-ID="surround",LANGUAGE="dubbing",NAME="Dubbing",DEFAULT=NO,AUTOSELECT=YES,URI="audio/stereo/none/128kbit.m3u8"
#EXT-X-MEDIA:TYPE=SUBTITLES,GROUP-ID="subs",NAME="Deutsch",DEFAULT=NO,AUTOSELECT=YES,FORCED=NO,LANGUAGE="de",URI="subtitles_de.m3u8"
#EXT-X-MEDIA:TYPE=SUBTITLES,GROUP-ID="subs",NAME="English",DEFAULT=YES,AUTOSELECT=YES,FORCED=NO,LANGUAGE="en",URI="subtitles_en.m3u8"
#EXT-X-MEDIA:TYPE=SUBTITLES,GROUP-ID="subs",NAME="Espanol",DEFAULT=NO,AUTOSELECT=YES,FORCED=NO,LANGUAGE="es",URI="subtitles_es.m3u8"
#EXT-X-MEDIA:TYPE=SUBTITLES,GROUP-ID="subs",NAME="Fran\xc3\xa7ais",DEFAULT=NO,AUTOSELECT=YES,FORCED=NO,LANGUAGE="fr",URI="subtitles_fr.m3u8"
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=258157,CODECS="avc1.4d400d,mp4a.40.2",AUDIO="stereo",RESOLUTION=422x180,SUBTITLES="subs"
video/250kbit.m3u8
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=520929,CODECS="avc1.4d4015,mp4a.40.2",AUDIO="stereo",RESOLUTION=638x272,SUBTITLES="subs"
video/500kbit.m3u8
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=831270,CODECS="avc1.4d4015,mp4a.40.2",AUDIO="stereo",RESOLUTION=638x272,SUBTITLES="subs"
video/800kbit.m3u8
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=1144430,CODECS="avc1.4d401f,mp4a.40.2",AUDIO="surround",RESOLUTION=958x408,SUBTITLES="subs"
video/1100kbit.m3u8
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=1558322,CODECS="avc1.4d401f,mp4a.40.2",AUDIO="surround",RESOLUTION=1277x554,SUBTITLES="subs"
video/1500kbit.m3u8
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=4149264,CODECS="avc1.4d4028,mp4a.40.2",AUDIO="surround",RESOLUTION=1921x818,SUBTITLES="subs"
video/4000kbit.m3u8
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=6214307,CODECS="avc1.4d4028,mp4a.40.2",AUDIO="surround",RESOLUTION=1921x818,SUBTITLES="subs"
video/6000kbit.m3u8
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=10285391,CODECS="avc1.4d4033,mp4a.40.2",AUDIO="surround",RESOLUTION=4096x1744,SUBTITLES="subs"
video/10000kbit.m3u8
"""
class TestHLSPlaylist(unittest.TestCase):
def test_load(self):
playlist = load(M3U8, "http://test.se/")
self.assertEqual(playlist.media,
[Media(uri='http://test.se/audio/stereo/en/128kbit.m3u8', type='AUDIO', group_id='stereo', language='en',
name='English', default=True, autoselect=True, forced=False, characteristics=None),
Media(uri='http://test.se/audio/stereo/none/128kbit.m3u8', type='AUDIO', group_id='stereo',
language='dubbing', name='Dubbing', default=False, autoselect=True, forced=False,
characteristics=None),
Media(uri='http://test.se/audio/surround/en/320kbit.m3u8', type='AUDIO', group_id='surround', language='en',
name='English', default=True, autoselect=True, forced=False, characteristics=None),
Media(uri='http://test.se/audio/stereo/none/128kbit.m3u8', type='AUDIO', group_id='surround',
language='dubbing', name='Dubbing', default=False, autoselect=True, forced=False,
characteristics=None),
Media(uri='http://test.se/subtitles_de.m3u8', type='SUBTITLES', group_id='subs', language='de',
name='Deutsch', default=False, autoselect=True, forced=False, characteristics=None),
Media(uri='http://test.se/subtitles_en.m3u8', type='SUBTITLES', group_id='subs', language='en',
name='English', default=True, autoselect=True, forced=False, characteristics=None),
Media(uri='http://test.se/subtitles_es.m3u8', type='SUBTITLES', group_id='subs', language='es',
name='Espanol', default=False, autoselect=True, forced=False, characteristics=None),
Media(uri='http://test.se/subtitles_fr.m3u8', type='SUBTITLES', group_id='subs', language='fr',
name='Fran\xc3\xa7ais', default=False, autoselect=True, forced=False,
characteristics=None)])
self.assertEqual([p.stream_info for p in playlist.playlists],
[StreamInfo(bandwidth=258157.0, program_id=1, codecs=['avc1.4d400d', 'mp4a.40.2'],
resolution=Resolution(width=422, height=180), audio='stereo', video=None,
subtitles='subs'),
StreamInfo(bandwidth=520929.0, program_id=1, codecs=['avc1.4d4015', 'mp4a.40.2'],
resolution=Resolution(width=638, height=272), audio='stereo', video=None,
subtitles='subs'),
StreamInfo(bandwidth=831270.0, program_id=1, codecs=['avc1.4d4015', 'mp4a.40.2'],
resolution=Resolution(width=638, height=272), audio='stereo', video=None,
subtitles='subs'),
StreamInfo(bandwidth=1144430.0, program_id=1, codecs=['avc1.4d401f', 'mp4a.40.2'],
resolution=Resolution(width=958, height=408), audio='surround', video=None,
subtitles='subs'),
StreamInfo(bandwidth=1558322.0, program_id=1, codecs=['avc1.4d401f', 'mp4a.40.2'],
resolution=Resolution(width=1277, height=554), audio='surround', video=None,
subtitles='subs'),
StreamInfo(bandwidth=4149264.0, program_id=1, codecs=['avc1.4d4028', 'mp4a.40.2'],
resolution=Resolution(width=1921, height=818), audio='surround', video=None,
subtitles='subs'),
StreamInfo(bandwidth=6214307.0, program_id=1, codecs=['avc1.4d4028', 'mp4a.40.2'],
resolution=Resolution(width=1921, height=818), audio='surround', video=None,
subtitles='subs'),
StreamInfo(bandwidth=10285391.0, program_id=1, codecs=['avc1.4d4033', 'mp4a.40.2'],
resolution=Resolution(width=4096, height=1744), audio='surround', video=None,
subtitles='subs')])

View File

@ -1,6 +1,12 @@
import json
import unittest
try:
from unittest.mock import patch, Mock, ANY
except ImportError:
from mock import patch, Mock, ANY
from streamlink.plugins.tvplayer import TVPlayer
from streamlink.stream import HLSStream
class TestPluginTVPlayer(unittest.TestCase):
@ -19,3 +25,60 @@ class TestPluginTVPlayer(unittest.TestCase):
self.assertFalse(TVPlayer.can_handle_url("http://www.tvplayer.com/"))
self.assertFalse(TVPlayer.can_handle_url("http://www.tvcatchup.com/"))
self.assertFalse(TVPlayer.can_handle_url("http://www.youtube.com/"))
@patch('streamlink.plugins.tvplayer.http')
@patch('streamlink.plugins.tvplayer.HLSStream')
def test_get_streams(self, hlsstream, mock_http):
api_data = {
"tvplayer": {
"status": "200 OK",
"response": {
"stream": "http://test.se/stream1"
}
}
}
page_resp = Mock()
page_resp.text = u"""
var validate = "foo";
var resourceId = "1234";
var platform = "test";
"""
api_resp = Mock()
api_resp.text = json.dumps(api_data)
mock_http.get.return_value = page_resp
mock_http.post.return_value = api_resp
mock_http.json.return_value = api_data
hlsstream.parse_variant_playlist.return_value = {"test": HLSStream(None, "http://test.se/stream1")}
plugin = TVPlayer("http://tvplayer.com/watch/dave")
streams = plugin.get_streams()
self.assertTrue("test" in streams)
# test the url is used correctly
mock_http.get.assert_called_with("http://tvplayer.com/watch/dave", headers=ANY)
# test that the correct API call is made
mock_http.post.assert_called_with("http://api.tvplayer.com/api/v2/stream/live", data=dict(id=u"1234",
validate=u"foo",
platform=u"test"))
# test that the correct URL is used for the HLSStream
hlsstream.parse_variant_playlist.assert_called_with(ANY, "http://test.se/stream1", headers=ANY)
@patch('streamlink.plugins.tvplayer.http')
def test_get_invalid_page(self, mock_http):
page_resp = Mock()
page_resp.text = u"""
var validate = "foo";
var resourceId = "1234";
"""
mock_http.get.return_value = page_resp
plugin = TVPlayer("http://tvplayer.com/watch/dave")
streams = plugin.get_streams()
self.assertEqual({}, streams)
# test the url is used correctly
mock_http.get.assert_called_with("http://tvplayer.com/watch/dave", headers=ANY)

43
tests/test_plugins.py Normal file
View File

@ -0,0 +1,43 @@
import pkgutil
import sys
import imp
from streamlink import Streamlink
if sys.version_info[0:2] == (2, 6):
import unittest2 as unittest
else:
import unittest
import streamlink.plugins
import os.path
class PluginTestMeta(type):
def __new__(mcs, name, bases, dict):
plugin_path = os.path.dirname(streamlink.plugins.__file__)
plugins = []
for loader, pname, ispkg in pkgutil.iter_modules([plugin_path]):
file, pathname, desc = imp.find_module(pname, [plugin_path])
module = imp.load_module(pname, file, pathname, desc)
if hasattr(module, "__plugin__"):
plugins.append((pname, file, pathname, desc))
session = Streamlink()
def gentest(pname, file, pathname, desc):
def load_plugin_test(self):
session.load_plugin(pname, file, pathname, desc)
return load_plugin_test
for pname, file, pathname, desc in plugins:
dict['test_{0}_load'.format(pname)] = gentest(pname, file, pathname, desc)
return type.__new__(mcs, name, bases, dict)
class TestPlugins(unittest.TestCase):
"""
Test that an instance of each plugin can be created.
"""
__metaclass__ = PluginTestMeta

View File

@ -0,0 +1,33 @@
import os.path
import unittest
try:
from unittest.mock import patch
except ImportError:
from mock import patch
from streamlink import Streamlink
from streamlink.api import streams
PluginPath = os.path.join(os.path.dirname(__file__), "plugins")
def get_session():
s = Streamlink()
s.load_plugins(PluginPath)
return s
class TestStreamlinkAPI(unittest.TestCase):
@patch('streamlink.api.Streamlink', side_effect=get_session)
def test_find_test_plugin(self, session):
self.assertTrue(
"rtmp" in streams("test.se")
)
@patch('streamlink.api.Streamlink', side_effect=get_session)
def test_no_streams_exception(self, session):
self.assertEqual({}, streams("test.se/NoStreamsError"))
@patch('streamlink.api.Streamlink', side_effect=get_session)
def test_no_streams(self, session):
self.assertEqual({}, streams("test.se/empty"))

82
tests/test_utils.py Normal file
View File

@ -0,0 +1,82 @@
import sys
from streamlink.plugin.api.validate import xml_element, text
try:
import xml.etree.cElementTree as ET
except ImportError:
import xml.etree.ElementTree as ET
from streamlink import PluginError
from streamlink.plugin.api import validate
from streamlink.utils import *
if sys.version_info[0:2] == (2, 6):
import unittest2 as unittest
else:
import unittest
class TestUtil(unittest.TestCase):
def test_verifyjson(self):
self.assertEqual(verifyjson({"test": 1}, "test"),
1)
self.assertRaises(PluginError, verifyjson, None, "test")
self.assertRaises(PluginError, verifyjson, {}, "test")
def test_absolute_url(self):
self.assertEqual("http://test.se/test",
absolute_url("http://test.se", "/test"))
self.assertEqual("http://test2.se/test",
absolute_url("http://test.se", "http://test2.se/test"))
def test_prepend_www(self):
self.assertEqual("http://www.test.se/test",
prepend_www("http://test.se/test"))
self.assertEqual("http://www.test.se",
prepend_www("http://www.test.se"))
def test_parse_json(self):
self.assertEqual({}, parse_json("{}"))
self.assertEqual({"test": 1}, parse_json("""{"test": 1}"""))
self.assertEqual({"test": 1}, parse_json("""{"test": 1}""", schema=validate.Schema({"test": 1})))
self.assertRaises(PluginError, parse_json, """{"test: 1}""")
self.assertRaises(IOError, parse_json, """{"test: 1}""", exception=IOError)
self.assertRaises(PluginError, parse_json, """{"test: 1}"""*10)
def test_parse_xml(self):
expected = ET.Element("test", {"foo": "bar"})
actual = parse_xml(u"""<test foo="bar"/>""", ignore_ns=True)
self.assertEqual(expected.tag, actual.tag)
self.assertEqual(expected.attrib, actual.attrib)
def test_parse_xml_ns_ignore(self):
expected = ET.Element("test", {"foo": "bar"})
actual = parse_xml(u"""<test foo="bar" xmlns="foo:bar"/>""", ignore_ns=True)
self.assertEqual(expected.tag, actual.tag)
self.assertEqual(expected.attrib, actual.attrib)
def test_parse_xml_ns(self):
expected = ET.Element("{foo:bar}test", {"foo": "bar"})
actual = parse_xml(u"""<h:test foo="bar" xmlns:h="foo:bar"/>""")
self.assertEqual(expected.tag, actual.tag)
self.assertEqual(expected.attrib, actual.attrib)
def test_parse_xml_fail(self):
self.assertRaises(PluginError,
parse_xml, u"1"*1000)
self.assertRaises(IOError,
parse_xml, u"1"*1000, exception=IOError)
def test_parse_xml_validate(self):
expected = ET.Element("test", {"foo": "bar"})
actual = parse_xml(u"""<test foo="bar"/>""",
schema=validate.Schema(xml_element(tag="test", attrib={"foo": text})))
self.assertEqual(expected.tag, actual.tag)
self.assertEqual(expected.attrib, actual.attrib)
def test_parse_qsd(self):
self.assertEqual(
{"test": "1", "foo": "bar"},
parse_qsd("test=1&foo=bar", schema=validate.Schema({"test": validate.text, "foo": "bar"})))