streamlink/docs/ext_plugins.py

255 lines
8.3 KiB
Python

import abc
import ast
import pkgutil
import re
import tokenize
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional
from docutils import nodes
from docutils.parsers.rst import Directive
from docutils.statemachine import ViewList
from sphinx.errors import ExtensionError
from sphinx.util.nodes import nested_parse_with_titles
from streamlink import plugins as streamlink_plugins
class IDatalistItem(abc.ABC):
@abc.abstractmethod
def generate(self) -> Iterator[str]:
raise NotImplementedError
class IMetadataItem(IDatalistItem):
@abc.abstractmethod
def set(self, value: Any) -> None:
raise NotImplementedError
class MetadataItem(IMetadataItem):
def __init__(self, title: str):
self.title = title
self.value: Optional[str] = None
def set(self, value: str) -> None:
self.value = value
def generate(self) -> Iterator[str]:
if self.value is None:
return
yield f":{self.title}: {self.value}"
class MetadataList(IMetadataItem):
def __init__(self, title: str):
self.title = title
self.value: List[str] = []
def set(self, value: str) -> None:
self.value.append(value)
def generate(self) -> Iterator[str]:
if not self.value:
return
yield f":{self.title}: - {' '.join(self.get_item(0))}"
indent = " " * len(f":{self.title}:")
for idx in range(1, len(self.value)):
yield f"{indent} - {' '.join(self.get_item(idx))}"
def get_item(self, idx: int) -> Iterator[str]:
yield self.value[idx]
class MetadataMetadataList(MetadataList):
def __init__(self):
super().__init__("Metadata")
def get_item(self, idx: int) -> Iterator[str]:
variable, *data = str(self.value[idx]).split(" ")
yield f":ref:`{variable} <cli/metadata:Variables>`"
if data:
yield " ".join(["-", *data])
class PluginOnGithub(IDatalistItem):
url_source = "https://github.com/streamlink/streamlink/blob/master/src/streamlink/plugins/{name}.py"
url_issues = "https://github.com/streamlink/streamlink/issues?q=is%3Aissue+sort%3Aupdated-desc+plugins.{name}"
def __init__(self, pluginname: str):
self.pluginname = pluginname
def generate(self) -> Iterator[str]:
source = f"`Source <{self.url_source.format(name=self.pluginname)}>`__"
issues = f"`Issues <{self.url_issues.format(name=self.pluginname)}>`__"
yield f":GitHub: {source}, {issues}"
class PluginArguments(ast.NodeVisitor, IDatalistItem):
def __init__(self, pluginname, pluginast):
super().__init__()
self.pluginname = pluginname
self.arguments = []
self.visit(pluginast)
def generate(self) -> Iterator[str]:
if not self.arguments:
return
indent = " " * len(":Arguments: ")
yield f":Arguments: - :option:`--{self.arguments[0]}`"
for arg in self.arguments[1:]:
yield f"{indent} - :option:`--{arg}`"
def visit_ClassDef(self, node: ast.ClassDef) -> None:
for base in node.bases:
if not isinstance(base, ast.Name):
continue
if base.id == "Plugin":
break
else:
return
for decorator in node.decorator_list:
if (
not isinstance(decorator, ast.Call)
or not isinstance(decorator.func, ast.Name)
or decorator.func.id != "pluginargument"
or (len(decorator.args) == 0 and len(decorator.keywords) == 0)
):
continue
if any(
True
for kw in decorator.keywords
if (
kw.arg == "help"
and type(kw.value) is ast.Attribute
and kw.value.attr == "SUPPRESS"
and type(kw.value.value) is ast.Name
and kw.value.value.id == "argparse"
)
):
continue
custom_name = next(
(kw.value.value for kw in decorator.keywords if kw.arg == "argument_name" and type(kw.value) is ast.Constant),
None,
)
if custom_name:
self.arguments.append(custom_name)
continue
name = next(
(kw.value.value for kw in decorator.keywords if kw.arg == "name" and type(kw.value) is ast.Constant),
None,
) or (
decorator.args
and type(decorator.args[0]) is ast.Constant
and decorator.args[0].value
)
if name:
self.arguments.append(f"{self.pluginname}-{name}")
class PluginMetadata:
def __init__(self, name: str, pluginast):
self.name: str = name
self.items: Dict[str, IMetadataItem] = dict(
description=MetadataItem("Description"),
url=MetadataList("URL(s)"),
type=MetadataItem("Type"),
metadata=MetadataMetadataList(),
region=MetadataItem("Region"),
account=MetadataItem("Account"),
notes=MetadataList("Notes"),
)
self.additional: List[IDatalistItem] = [
PluginArguments(name, pluginast),
PluginOnGithub(name),
]
def set(self, key: str, value: str) -> None:
if key not in self.items:
raise KeyError(f"Invalid plugin metadata key '{key}' in plugin '{self.name}'")
self.items[key].set(value)
def generate(self) -> Iterator[str]:
yield self.name
yield "-" * len(self.name)
yield ""
for metadata in self.items.values():
yield from metadata.generate()
for item in self.additional:
yield from item.generate()
yield ""
yield ""
class PluginFinder:
_re_metadata_item = re.compile(r"\n\$(\w+) (.+)(?=\n\$|$)", re.MULTILINE)
protocol_plugins = [
"http",
"hls",
"dash",
]
def __init__(self):
plugins_path = Path(streamlink_plugins.__path__[0])
self.plugins = [
(pname, plugins_path / f"{pname}.py")
for finder, pname, ispkg in pkgutil.iter_modules([str(plugins_path)])
if not pname.startswith("common_") and pname not in self.protocol_plugins
]
def get_plugins(self):
for pluginname, pluginfile in self.plugins:
pluginmetadata = self._parse_plugin(pluginname, pluginfile)
if pluginmetadata:
yield pluginmetadata
def _parse_plugin(self, pluginname: str, pluginfile: Path) -> Optional[PluginMetadata]:
with pluginfile.open() as handle:
# read until the first token has been parsed
for tokeninfo in tokenize.generate_tokens(handle.readline):
# the very first token needs to be a string / block comment with the metadata
if tokeninfo.type != tokenize.STRING or not self._re_metadata_item.search(tokeninfo.string):
return None
metadata = tokeninfo.string.strip()
break
try:
# continue reading the plugin file with the same handle
pluginsource = handle.read()
# build AST from plugin source for finding the used plugin arguments
pluginast = ast.parse(pluginsource, str(pluginfile))
pluginmetadata = PluginMetadata(pluginname, pluginast)
for item in self._re_metadata_item.finditer(metadata):
key, value = item.groups()
pluginmetadata.set(key, value)
return pluginmetadata
except Exception as err:
raise ExtensionError(f"Error while parsing plugin file {pluginfile.name}", err) from err
class PluginsDirective(Directive):
def run(self):
pluginfinder = PluginFinder()
node = nodes.section()
node.document = self.state.document
result = ViewList()
for pluginmetadata in pluginfinder.get_plugins():
for line in pluginmetadata.generate():
result.append(line, "plugins")
nested_parse_with_titles(self.state, result, node)
return node.children
def setup(app):
app.add_directive("plugins", PluginsDirective)