chore: add "A" rules to ruff config

This commit is contained in:
bastimeyer 2023-02-20 22:30:07 +01:00 committed by Forrest
parent ba78ad2ec3
commit 85ea235d13
11 changed files with 89 additions and 53 deletions

View File

@ -50,23 +50,23 @@ class ArgparseDirective(Directive):
_headlines = ["^", "~"]
def process_help(self, help):
def process_help(self, helptext):
# Dedent the help to make sure we are always dealing with
# non-indented text.
help = dedent(help)
helptext = dedent(helptext)
help = _inline_code_block_re.sub(
helptext = _inline_code_block_re.sub(
lambda m: (
":code:`{0}`".format(m.group(1).replace("\\", "\\\\"))
),
help,
helptext,
)
help = _example_inline_code_block_re.sub(r":code:`\1`", help)
helptext = _example_inline_code_block_re.sub(r":code:`\1`", helptext)
# Replace option references with links.
# Do this before indenting blocks and notes.
help = _option_line_re.sub(
helptext = _option_line_re.sub(
lambda m: (
_option_re.sub(
lambda m2: (
@ -77,34 +77,34 @@ class ArgparseDirective(Directive):
m.group(1),
)
),
help,
helptext,
)
# Create simple blocks.
help = _block_re.sub("::\n\n ", help)
helptext = _block_re.sub("::\n\n ", helptext)
# Boldify the default value.
help = _default_re.sub(r"Default is: **\1**.\n", help)
helptext = _default_re.sub(r"Default is: **\1**.\n", helptext)
# Create note directives from "Note: " paragraphs.
help = _note_re.sub(
helptext = _note_re.sub(
lambda m: ".. note::\n\n" + indent(m.group(1)) + "\n\n",
help,
helptext,
)
# workaround to replace %(prog)s with streamlink
help = _prog_re.sub("streamlink", help)
helptext = _prog_re.sub("streamlink", helptext)
# fix escaped chars for percent-formatted argparse help strings
help = _percent_re.sub("%", help)
helptext = _percent_re.sub("%", helptext)
# create cross-link for the "Metadata variables" section
help = _cli_metadata_variables_section_cross_link_re.sub(
helptext = _cli_metadata_variables_section_cross_link_re.sub(
"the \":ref:`Metadata variables <cli/metadata:Variables>`\" section",
help,
helptext,
)
return indent(help)
return indent(helptext)
def generate_group_rst(self, group):
for action in group._group_actions:

View File

@ -79,10 +79,8 @@ select = [
"PLW",
# isort
"I",
# flake8-tidy-imports
"TID",
# flake8-quotes
"Q",
# flake8-builtins
"A",
# flake8-commas
"COM",
# flake8-comprehensions
@ -93,8 +91,13 @@ select = [
"PIE",
# flake8-pytest-style
"PT",
# flake8-quotes
"Q",
# flake8-tidy-imports
"TID",
]
extend-ignore = [
"A003", # builtin-attribute-shadowing
"C408", # unnecessary-collection-call
"ISC003", # explicit-string-concatenation
]

View File

@ -184,7 +184,7 @@ def basicConfig(
filemode: str = "a",
stream: Optional[IO] = None,
level: Optional[str] = None,
format: str = FORMAT_BASE,
format: str = FORMAT_BASE, # noqa: A002 # TODO: rename to "fmt" (breaking)
style: str = FORMAT_STYLE, # TODO: py38: Literal["%", "{", "$"]
datefmt: str = FORMAT_DATE,
remove_base: Optional[List[str]] = None,

View File

@ -1,7 +1,7 @@
from typing import TYPE_CHECKING
# noinspection PyPep8Naming,PyShadowingBuiltins
from streamlink.plugin.api.validate._schemas import (
from streamlink.plugin.api.validate._schemas import ( # noqa: A001
AllSchema as all,
AnySchema as any,
AttrSchema as attr,
@ -22,7 +22,7 @@ from streamlink.plugin.api.validate._validate import (
)
# noinspection PyShadowingBuiltins
from streamlink.plugin.api.validate._validators import (
from streamlink.plugin.api.validate._validators import ( # noqa: A001
validator_contains as contains,
validator_endswith as endswith,
validator_filter as filter,

View File

@ -87,7 +87,7 @@ class AdultSwim(Plugin):
validate.filter(lambda k, v: k.startswith("Video:")),
)
def _get_stream_data(self, id):
def _get_stream_data(self, streamid):
res = self.session.http.get(self.url)
m = self.json_data_re.search(res.text)
if m and m.group(1):
@ -96,9 +96,8 @@ class AdultSwim(Plugin):
raise PluginError("Failed to get json_data")
for stream in streams:
if "id" in stream:
if id == stream["id"] and "stream" in stream:
return stream["stream"]
if "id" in stream and streamid == stream["id"] and "stream" in stream:
return stream["stream"]
def _get_video_data(self, slug):
m = self.truncate_url_re.search(self.url)

View File

@ -73,8 +73,8 @@ class Booyah(Plugin):
res = self.session.http.post(self.auth_api_url)
self.session.http.json(res, self.auth_schema)
def get_vod(self, id):
res = self.session.http.get(self.vod_api_url.format(id))
def get_vod(self, vodid):
res = self.session.http.get(self.vod_api_url.format(vodid))
user_data = self.session.http.json(res, schema=self.vod_schema)
self.author = user_data["user"]["nickname"]
@ -93,8 +93,8 @@ class Booyah(Plugin):
stream["stream_url"],
)
def get_live(self, id):
res = self.session.http.get(self.live_api_url.format(id))
def get_live(self, liveid):
res = self.session.http.get(self.live_api_url.format(liveid))
user_data = self.session.http.json(res, schema=self.live_schema)
if user_data["channel"]["is_streaming"]:

View File

@ -44,7 +44,7 @@ class PlutoHLSStream(HLSStream):
)/?$
""", re.VERBOSE))
class Pluto(Plugin):
def _get_api_data(self, type, slug, filter=None):
def _get_api_data(self, kind, slug, slugfilter=None):
log.debug(f"slug={slug}")
app_version = self.session.http.get(self.url, schema=validate.Schema(
validate.parse_html(),
@ -67,7 +67,7 @@ class Pluto(Plugin):
"deviceType": "web",
"clientID": str(uuid4()),
"clientModelNumber": "1.0",
type: slug,
kind: slug,
},
schema=validate.Schema(
validate.parse_json(), {
@ -91,14 +91,17 @@ class Pluto(Plugin):
"path": str,
},
validate.optional("seasons"): [{
"episodes": validate.all([{
"name": str,
"_id": str,
"slug": str,
"stitched": {
"path": str,
},
}], validate.filter(lambda k: filter and k["slug"] == filter)),
"episodes": validate.all(
[{
"name": str,
"_id": str,
"slug": str,
"stitched": {
"path": str,
},
}],
validate.filter(lambda k: slugfilter and k["slug"] == slugfilter),
),
}],
}],
"sessionToken": str,
@ -131,7 +134,7 @@ class Pluto(Plugin):
path = media["stitched"]["path"]
elif m["slug_series"] and m["slug_episode"]:
data = self._get_api_data("episodeSlugs", m["slug_series"], filter=m["slug_episode"])
data = self._get_api_data("episodeSlugs", m["slug_series"], slugfilter=m["slug_episode"])
media = self._get_media_data(data, "VOD", m["slug_series"])
if not media or "seasons" not in media:
return

View File

@ -216,9 +216,9 @@ class TVP(Plugin):
log.error("The content is not available in your region")
return
for format in data.get("formats"):
if format.get("mimeType") == "application/x-mpegurl":
return HLSStream.parse_variant_playlist(self.session, format.get("url"))
for formatitem in data.get("formats"):
if formatitem.get("mimeType") == "application/x-mpegurl":
return HLSStream.parse_variant_playlist(self.session, formatitem.get("url"))
def _get_streams(self):
if self.matches["tvp_info"]:

View File

@ -16,8 +16,10 @@ log = logging.getLogger(__name__)
epoch_start = datetime.datetime(1970, 1, 1, tzinfo=utc)
# TODO: use NamedTuple or dataclass
class Segment:
def __init__(self, url, duration, init=False, content=True, available_at=epoch_start, range=None):
# noinspection PyShadowingBuiltins
def __init__(self, url, duration, init=False, content=True, available_at=epoch_start, range=None): # noqa: A002
self.url = url
self.duration = duration
self.init = init

View File

@ -58,7 +58,8 @@ def keyvalue(value):
return match.group("key", "value")
def num(type, min=None, max=None):
# noinspection PyShadowingBuiltins
def num(type, min=None, max=None): # noqa: A002
def func(value):
value = type(value)

View File

@ -57,9 +57,9 @@ DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
class TagDateRangeAd(Tag):
def __init__(self, start=DATETIME_BASE, duration=1, id="stitched-ad-1234", classname="twitch-stitched-ad", custom=None):
def __init__(self, start=DATETIME_BASE, duration=1, attrid="stitched-ad-1234", classname="twitch-stitched-ad", custom=None):
attrs = {
"ID": self.val_quoted_string(id),
"ID": self.val_quoted_string(attrid),
"CLASS": self.val_quoted_string(classname),
"START-DATE": self.val_quoted_string(start.strftime(DATETIME_FORMAT)),
"DURATION": duration,
@ -129,7 +129,14 @@ class TestTwitchHLSStream(TestMixinStreamHLS, unittest.TestCase):
return session
def test_hls_disable_ads_daterange_unknown(self):
daterange = TagDateRangeAd(start=DATETIME_BASE, duration=1, id="foo", classname="bar", custom=None)
daterange = TagDateRangeAd(
start=DATETIME_BASE,
duration=1,
attrid="foo",
classname="bar",
custom=None,
)
thread, segments = self.subject([
Playlist(0, [daterange, Segment(0), Segment(1)], end=True),
], disable_ads=True, low_latency=False)
@ -140,7 +147,14 @@ class TestTwitchHLSStream(TestMixinStreamHLS, unittest.TestCase):
assert all(self.called(s) for s in segments.values()), "Downloads all segments"
def test_hls_disable_ads_daterange_by_class(self):
daterange = TagDateRangeAd(start=DATETIME_BASE, duration=1, id="foo", classname="twitch-stitched-ad", custom=None)
daterange = TagDateRangeAd(
start=DATETIME_BASE,
duration=1,
attrid="foo",
classname="twitch-stitched-ad",
custom=None,
)
thread, segments = self.subject([
Playlist(0, [daterange, Segment(0), Segment(1)], end=True),
], disable_ads=True, low_latency=False)
@ -151,7 +165,14 @@ class TestTwitchHLSStream(TestMixinStreamHLS, unittest.TestCase):
assert all(self.called(s) for s in segments.values()), "Downloads all segments"
def test_hls_disable_ads_daterange_by_id(self):
daterange = TagDateRangeAd(start=DATETIME_BASE, duration=1, id="stitched-ad-1234", classname="/", custom=None)
daterange = TagDateRangeAd(
start=DATETIME_BASE,
duration=1,
attrid="stitched-ad-1234",
classname="/",
custom=None,
)
thread, segments = self.subject([
Playlist(0, [daterange, Segment(0), Segment(1)], end=True),
], disable_ads=True, low_latency=False)
@ -162,7 +183,14 @@ class TestTwitchHLSStream(TestMixinStreamHLS, unittest.TestCase):
assert all(self.called(s) for s in segments.values()), "Downloads all segments"
def test_hls_disable_ads_daterange_by_attr(self):
daterange = TagDateRangeAd(start=DATETIME_BASE, duration=1, id="foo", classname="/", custom={"X-TV-TWITCH-AD-URL": "/"})
daterange = TagDateRangeAd(
start=DATETIME_BASE,
duration=1,
attrid="foo",
classname="/",
custom={"X-TV-TWITCH-AD-URL": "/"},
)
thread, segments = self.subject([
Playlist(0, [daterange, Segment(0), Segment(1)], end=True),
], disable_ads=True, low_latency=False)