mirror of https://github.com/streamlink/streamlink
255 lines
8.3 KiB
Python
255 lines
8.3 KiB
Python
import abc
|
|
import ast
|
|
import pkgutil
|
|
import re
|
|
import tokenize
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Iterator, List, Optional
|
|
|
|
from docutils import nodes
|
|
from docutils.parsers.rst import Directive
|
|
from docutils.statemachine import ViewList
|
|
from sphinx.errors import ExtensionError
|
|
from sphinx.util.nodes import nested_parse_with_titles
|
|
|
|
from streamlink import plugins as streamlink_plugins
|
|
|
|
|
|
class IDatalistItem(abc.ABC):
|
|
@abc.abstractmethod
|
|
def generate(self) -> Iterator[str]:
|
|
raise NotImplementedError
|
|
|
|
|
|
class IMetadataItem(IDatalistItem):
|
|
@abc.abstractmethod
|
|
def set(self, value: Any) -> None:
|
|
raise NotImplementedError
|
|
|
|
|
|
class MetadataItem(IMetadataItem):
|
|
def __init__(self, title: str):
|
|
self.title = title
|
|
self.value: Optional[str] = None
|
|
|
|
def set(self, value: str) -> None:
|
|
self.value = value
|
|
|
|
def generate(self) -> Iterator[str]:
|
|
if self.value is None:
|
|
return
|
|
yield f":{self.title}: {self.value}"
|
|
|
|
|
|
class MetadataList(IMetadataItem):
|
|
def __init__(self, title: str):
|
|
self.title = title
|
|
self.value: List[str] = []
|
|
|
|
def set(self, value: str) -> None:
|
|
self.value.append(value)
|
|
|
|
def generate(self) -> Iterator[str]:
|
|
if not self.value:
|
|
return
|
|
yield f":{self.title}: - {' '.join(self.get_item(0))}"
|
|
indent = " " * len(f":{self.title}:")
|
|
for idx in range(1, len(self.value)):
|
|
yield f"{indent} - {' '.join(self.get_item(idx))}"
|
|
|
|
def get_item(self, idx: int) -> Iterator[str]:
|
|
yield self.value[idx]
|
|
|
|
|
|
class MetadataMetadataList(MetadataList):
|
|
def __init__(self):
|
|
super().__init__("Metadata")
|
|
|
|
def get_item(self, idx: int) -> Iterator[str]:
|
|
variable, *data = str(self.value[idx]).split(" ")
|
|
yield f":ref:`{variable} <cli/metadata:Variables>`"
|
|
if data:
|
|
yield " ".join(["-", *data])
|
|
|
|
|
|
class PluginOnGithub(IDatalistItem):
|
|
url_source = "https://github.com/streamlink/streamlink/blob/master/src/streamlink/plugins/{name}.py"
|
|
url_issues = "https://github.com/streamlink/streamlink/issues?q=is%3Aissue+sort%3Aupdated-desc+plugins.{name}"
|
|
|
|
def __init__(self, pluginname: str):
|
|
self.pluginname = pluginname
|
|
|
|
def generate(self) -> Iterator[str]:
|
|
source = f"`Source <{self.url_source.format(name=self.pluginname)}>`__"
|
|
issues = f"`Issues <{self.url_issues.format(name=self.pluginname)}>`__"
|
|
yield f":GitHub: {source}, {issues}"
|
|
|
|
|
|
class PluginArguments(ast.NodeVisitor, IDatalistItem):
|
|
def __init__(self, pluginname, pluginast):
|
|
super().__init__()
|
|
self.pluginname = pluginname
|
|
self.arguments = []
|
|
self.visit(pluginast)
|
|
|
|
def generate(self) -> Iterator[str]:
|
|
if not self.arguments:
|
|
return
|
|
indent = " " * len(":Arguments: ")
|
|
yield f":Arguments: - :option:`--{self.arguments[0]}`"
|
|
for arg in self.arguments[1:]:
|
|
yield f"{indent} - :option:`--{arg}`"
|
|
|
|
def visit_ClassDef(self, node: ast.ClassDef) -> None:
|
|
for base in node.bases:
|
|
if not isinstance(base, ast.Name):
|
|
continue
|
|
if base.id == "Plugin":
|
|
break
|
|
else:
|
|
return
|
|
|
|
for decorator in node.decorator_list:
|
|
if (
|
|
not isinstance(decorator, ast.Call)
|
|
or not isinstance(decorator.func, ast.Name)
|
|
or decorator.func.id != "pluginargument"
|
|
or (len(decorator.args) == 0 and len(decorator.keywords) == 0)
|
|
):
|
|
continue
|
|
|
|
if any(
|
|
True
|
|
for kw in decorator.keywords
|
|
if (
|
|
kw.arg == "help"
|
|
and type(kw.value) is ast.Attribute
|
|
and kw.value.attr == "SUPPRESS"
|
|
and type(kw.value.value) is ast.Name
|
|
and kw.value.value.id == "argparse"
|
|
)
|
|
):
|
|
continue
|
|
|
|
custom_name = next(
|
|
(kw.value.value for kw in decorator.keywords if kw.arg == "argument_name" and type(kw.value) is ast.Constant),
|
|
None,
|
|
)
|
|
if custom_name:
|
|
self.arguments.append(custom_name)
|
|
continue
|
|
|
|
name = next(
|
|
(kw.value.value for kw in decorator.keywords if kw.arg == "name" and type(kw.value) is ast.Constant),
|
|
None,
|
|
) or (
|
|
decorator.args
|
|
and type(decorator.args[0]) is ast.Constant
|
|
and decorator.args[0].value
|
|
)
|
|
if name:
|
|
self.arguments.append(f"{self.pluginname}-{name}")
|
|
|
|
|
|
class PluginMetadata:
|
|
def __init__(self, name: str, pluginast):
|
|
self.name: str = name
|
|
self.items: Dict[str, IMetadataItem] = dict(
|
|
description=MetadataItem("Description"),
|
|
url=MetadataList("URL(s)"),
|
|
type=MetadataItem("Type"),
|
|
metadata=MetadataMetadataList(),
|
|
region=MetadataItem("Region"),
|
|
account=MetadataItem("Account"),
|
|
notes=MetadataList("Notes"),
|
|
)
|
|
self.additional: List[IDatalistItem] = [
|
|
PluginArguments(name, pluginast),
|
|
PluginOnGithub(name),
|
|
]
|
|
|
|
def set(self, key: str, value: str) -> None:
|
|
if key not in self.items:
|
|
raise KeyError(f"Invalid plugin metadata key '{key}' in plugin '{self.name}'")
|
|
self.items[key].set(value)
|
|
|
|
def generate(self) -> Iterator[str]:
|
|
yield self.name
|
|
yield "-" * len(self.name)
|
|
yield ""
|
|
for metadata in self.items.values():
|
|
yield from metadata.generate()
|
|
for item in self.additional:
|
|
yield from item.generate()
|
|
yield ""
|
|
yield ""
|
|
|
|
|
|
class PluginFinder:
|
|
_re_metadata_item = re.compile(r"\n\$(\w+) (.+)(?=\n\$|$)", re.MULTILINE)
|
|
|
|
protocol_plugins = [
|
|
"http",
|
|
"hls",
|
|
"dash",
|
|
]
|
|
|
|
def __init__(self):
|
|
plugins_path = Path(streamlink_plugins.__path__[0])
|
|
self.plugins = [
|
|
(pname, plugins_path / f"{pname}.py")
|
|
for finder, pname, ispkg in pkgutil.iter_modules([str(plugins_path)])
|
|
if not pname.startswith("common_") and pname not in self.protocol_plugins
|
|
]
|
|
|
|
def get_plugins(self):
|
|
for pluginname, pluginfile in self.plugins:
|
|
pluginmetadata = self._parse_plugin(pluginname, pluginfile)
|
|
if pluginmetadata:
|
|
yield pluginmetadata
|
|
|
|
def _parse_plugin(self, pluginname: str, pluginfile: Path) -> Optional[PluginMetadata]:
|
|
with pluginfile.open() as handle:
|
|
# read until the first token has been parsed
|
|
for tokeninfo in tokenize.generate_tokens(handle.readline):
|
|
# the very first token needs to be a string / block comment with the metadata
|
|
if tokeninfo.type != tokenize.STRING or not self._re_metadata_item.search(tokeninfo.string):
|
|
return None
|
|
metadata = tokeninfo.string.strip()
|
|
break
|
|
|
|
try:
|
|
# continue reading the plugin file with the same handle
|
|
pluginsource = handle.read()
|
|
# build AST from plugin source for finding the used plugin arguments
|
|
pluginast = ast.parse(pluginsource, str(pluginfile))
|
|
|
|
pluginmetadata = PluginMetadata(pluginname, pluginast)
|
|
for item in self._re_metadata_item.finditer(metadata):
|
|
key, value = item.groups()
|
|
pluginmetadata.set(key, value)
|
|
|
|
return pluginmetadata
|
|
|
|
except Exception as err:
|
|
raise ExtensionError(f"Error while parsing plugin file {pluginfile.name}", err) from err
|
|
|
|
|
|
class PluginsDirective(Directive):
|
|
def run(self):
|
|
pluginfinder = PluginFinder()
|
|
|
|
node = nodes.section()
|
|
node.document = self.state.document
|
|
result = ViewList()
|
|
for pluginmetadata in pluginfinder.get_plugins():
|
|
for line in pluginmetadata.generate():
|
|
result.append(line, "plugins")
|
|
|
|
nested_parse_with_titles(self.state, result, node)
|
|
return node.children
|
|
|
|
|
|
def setup(app):
|
|
app.add_directive("plugins", PluginsDirective)
|