Merge pull request #80 from nathom/master

Switched to Black formatting, and a small change
This commit is contained in:
Vitiko 2021-03-08 21:18:31 -04:00 committed by GitHub
commit 4a3ae133a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 294 additions and 182 deletions

7
.flake8 Normal file
View File

@ -0,0 +1,7 @@
[flake8]
extend-ignore = E203, E266, E501
# line length is intentionally set to 80 here because black uses Bugbear
# See https://github.com/psf/black/blob/master/docs/the_black_code_style.md#line-length for more details
max-line-length = 80
max-complexity = 18
select = B,C,E,F,W,T4,B9

View File

@ -58,6 +58,7 @@ def reset_config(config_file):
config["DEFAULT"]["folder_format"] = "{artist} - {album} ({year}) " config["DEFAULT"]["folder_format"] = "{artist} - {album} ({year}) "
"[{bit_depth}B-{sampling_rate}kHz]" "[{bit_depth}B-{sampling_rate}kHz]"
config["DEFAULT"]["track_format"] = "{tracknumber}. {tracktitle}" config["DEFAULT"]["track_format"] = "{tracknumber}. {tracktitle}"
config["DEFAULT"]["smart_discography"] = "false"
with open(config_file, "w") as configfile: with open(config_file, "w") as configfile:
config.write(configfile) config.write(configfile)
logging.info( logging.info(
@ -102,16 +103,22 @@ def main():
no_database = config.getboolean("DEFAULT", "no_database") no_database = config.getboolean("DEFAULT", "no_database")
app_id = config["DEFAULT"]["app_id"] app_id = config["DEFAULT"]["app_id"]
if ("folder_format" not in config["DEFAULT"] if (
or "track_format" not in config["DEFAULT"]): "folder_format" not in config["DEFAULT"]
logging.info(f'{YELLOW}Config file does not include format string,' or "track_format" not in config["DEFAULT"]
' updating...') or "smart_discography" not in config["DEFAULT"]
):
logging.info(
f"{YELLOW}Config file does not include some settings, updating..."
)
config["DEFAULT"]["folder_format"] = "{artist} - {album} ({year}) " config["DEFAULT"]["folder_format"] = "{artist} - {album} ({year}) "
"[{bit_depth}B-{sampling_rate}kHz]" "[{bit_depth}B-{sampling_rate}kHz]"
config["DEFAULT"]["track_format"] = "{tracknumber}. {tracktitle}" config["DEFAULT"]["track_format"] = "{tracknumber}. {tracktitle}"
with open(CONFIG_FILE, 'w') as cf: config["DEFAULT"]["smart_discography"] = "false"
with open(CONFIG_FILE, "w") as cf:
config.write(cf) config.write(cf)
smart_discography = config.getboolean("DEFAULT", "smart_discography")
folder_format = config["DEFAULT"]["folder_format"] folder_format = config["DEFAULT"]["folder_format"]
track_format = config["DEFAULT"]["track_format"] track_format = config["DEFAULT"]["track_format"]
@ -148,10 +155,9 @@ def main():
cover_og_quality=arguments.og_cover or og_cover, cover_og_quality=arguments.og_cover or og_cover,
no_cover=arguments.no_cover or no_cover, no_cover=arguments.no_cover or no_cover,
downloads_db=None if no_database or arguments.no_db else QOBUZ_DB, downloads_db=None if no_database or arguments.no_db else QOBUZ_DB,
folder_format=arguments.folder_format folder_format=arguments.folder_format or folder_format,
if arguments.folder_format is not None else folder_format, track_format=arguments.track_format or track_format,
track_format=arguments.track_format smart_discography=arguments.smart_discography or smart_discography,
if arguments.track_format is not None else track_format,
) )
qobuz.initialize_client(email, password, app_id, secrets) qobuz.initialize_client(email, password, app_id, secrets)

View File

@ -105,17 +105,27 @@ def add_common_arg(custom_parser, default_folder, default_quality):
custom_parser.add_argument( custom_parser.add_argument(
"-ff", "-ff",
"--folder-format", "--folder-format",
metavar='PATTERN', metavar="PATTERN",
help='pattern for formatting folder names, e.g ' help="""pattern for formatting folder names, e.g
'"{artist} - {album} ({year})". available keys: artist, ' "{artist} - {album} ({year})". available keys: artist,
'albumartist, album, year, sampling_rate, bit_rate, tracktitle. ' albumartist, album, year, sampling_rate, bit_rate, tracktitle, version.
'cannot contain characters used by the system, which includes /:<>', cannot contain characters used by the system, which includes /:<>""",
) )
custom_parser.add_argument( custom_parser.add_argument(
"-tf", "-tf",
"--track-format", "--track-format",
metavar='PATTERN', metavar="PATTERN",
help='pattern for formatting track names. see `folder-format`.', help="pattern for formatting track names. see `folder-format`.",
)
# TODO: add customization options
custom_parser.add_argument(
"-s",
"--smart-discography",
action="store_true",
help="""Try to filter out spam-like albums when requesting an artist's
discography, and other optimizations. Filters albums not made by requested
artist, and deluxe/live/collection albums. Gives preference to remastered
albums, high bit depth/dynamic range, and low sampling rates (to save space).""",
) )

View File

@ -4,6 +4,7 @@ import re
import string import string
import sys import sys
import time import time
from typing import Tuple
import requests import requests
from bs4 import BeautifulSoup as bso from bs4 import BeautifulSoup as bso
@ -21,8 +22,12 @@ WEB_URL = "https://play.qobuz.com/"
ARTISTS_SELECTOR = "td.chartlist-artist > a" ARTISTS_SELECTOR = "td.chartlist-artist > a"
TITLE_SELECTOR = "td.chartlist-name > a" TITLE_SELECTOR = "td.chartlist-name > a"
EXTENSIONS = (".mp3", ".flac") EXTENSIONS = (".mp3", ".flac")
QUALITIES = {5: "5 - MP3", 6: "6 - FLAC", QUALITIES = {
7: "7 - 24B<96kHz", 27: "27 - 24B>96kHz"} 5: "5 - MP3",
6: "6 - 16 bit, 44.1kHz",
7: "7 - 24 bit, <96kHz",
27: "27 - 24 bit, >96kHz",
}
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -33,8 +38,7 @@ class PartialFormatter(string.Formatter):
def get_field(self, field_name, args, kwargs): def get_field(self, field_name, args, kwargs):
try: try:
val = super(PartialFormatter, self).get_field(field_name, val = super(PartialFormatter, self).get_field(field_name, args, kwargs)
args, kwargs)
except (KeyError, AttributeError): except (KeyError, AttributeError):
val = None, field_name val = None, field_name
return val return val
@ -65,9 +69,10 @@ class QobuzDL:
cover_og_quality=False, cover_og_quality=False,
no_cover=False, no_cover=False,
downloads_db=None, downloads_db=None,
folder_format='{artist} - {album} ({year}) [{bit_depth}B-' folder_format="{artist} - {album} ({year}) [{bit_depth}B-"
'{sampling_rate}kHz]', "{sampling_rate}kHz]",
track_format='{tracknumber}. {tracktitle}', track_format="{tracknumber}. {tracktitle}",
smart_discography=False,
): ):
self.directory = self.create_dir(directory) self.directory = self.create_dir(directory)
self.quality = quality self.quality = quality
@ -83,10 +88,11 @@ class QobuzDL:
self.downloads_db = create_db(downloads_db) if downloads_db else None self.downloads_db = create_db(downloads_db) if downloads_db else None
self.folder_format = folder_format self.folder_format = folder_format
self.track_format = track_format self.track_format = track_format
self.smart_discography = smart_discography
def initialize_client(self, email, pwd, app_id, secrets): def initialize_client(self, email, pwd, app_id, secrets):
self.client = qopy.Client(email, pwd, app_id, secrets) self.client = qopy.Client(email, pwd, app_id, secrets)
logger.info(f"{YELLOW}Set quality: {QUALITIES[int(self.quality)]}\n") logger.info(f"{YELLOW}Set max quality: {QUALITIES[int(self.quality)]}\n")
def get_tokens(self): def get_tokens(self):
spoofer = spoofbuz.Spoofer() spoofer = spoofbuz.Spoofer()
@ -100,30 +106,22 @@ class QobuzDL:
os.makedirs(fix, exist_ok=True) os.makedirs(fix, exist_ok=True)
return fix return fix
def get_id(self, url): def get_url_info(self, url: str) -> Tuple[str, str]:
return re.match( """Returns the type of the url and the id.
r"https?://(?:w{0,3}|play|open)\.qobuz\.com/(?:(?:album|track"
r"|artist|playlist|label)/|[a-z]{2}-[a-z]{2}/album/-?\w+(?:-\w+)*"
r"-?/|user/library/favorites/)(\w+)",
url,
).group(1)
def get_type(self, url): Compatible with urls of the form:
if re.match(r'https?', url) is not None: https://www.qobuz.com/us-en/{type}/{name}/{id}
url_type = url.split('/')[3] https://open.qobuz.com/{type}/{id}
if url_type not in ['album', 'artist', 'playlist', https://play.qobuz.com/{type}/{id}
'track', 'label']: /us-en/{type}/-/{id}
if url_type == "user": """
url_type = url.split('/')[-1]
else: r = re.search(
# url is from Qobuz store r"(?:https:\/\/(?:w{3}|open|play)\.qobuz\.com)?(?:\/[a-z]{2}-[a-z]{2})"
# e.g. "https://www.qobuz.com/us-en/album/..." r"?\/(album|artist|track|playlist|label)(?:\/[-\w\d]+)?\/([\w\d]+)",
url_type = url.split('/')[4] url,
else: )
# url missing base return r.groups()
# e.g. "/us-en/album/{artist}/{id}"
url_type = url.split('/')[2]
return url_type
def download_from_id(self, item_id, album=True, alt_path=None): def download_from_id(self, item_id, album=True, alt_path=None):
if handle_download_id(self.downloads_db, item_id, add_id=False): if handle_download_id(self.downloads_db, item_id, add_id=False):
@ -146,7 +144,7 @@ class QobuzDL:
self.cover_og_quality, self.cover_og_quality,
self.no_cover, self.no_cover,
folder_format=self.folder_format, folder_format=self.folder_format,
track_format=self.track_format track_format=self.track_format,
) )
handle_download_id(self.downloads_db, item_id, add_id=True) handle_download_id(self.downloads_db, item_id, add_id=True)
except (requests.exceptions.RequestException, NonStreamable) as e: except (requests.exceptions.RequestException, NonStreamable) as e:
@ -170,13 +168,11 @@ class QobuzDL:
"track": {"album": False, "func": None, "iterable_key": None}, "track": {"album": False, "func": None, "iterable_key": None},
} }
try: try:
url_type = self.get_type(url) url_type, item_id = self.get_url_info(url)
type_dict = possibles[url_type] type_dict = possibles[url_type]
item_id = self.get_id(url)
except (KeyError, IndexError): except (KeyError, IndexError):
logger.info( logger.info(
f'{RED}Invalid url: "{url}". Use urls from ' f'{RED}Invalid url: "{url}". Use urls from ' "https://play.qobuz.com!"
'https://play.qobuz.com!'
) )
return return
if type_dict["func"]: if type_dict["func"]:
@ -189,8 +185,19 @@ class QobuzDL:
new_path = self.create_dir( new_path = self.create_dir(
os.path.join(self.directory, sanitize_filename(content_name)) os.path.join(self.directory, sanitize_filename(content_name))
) )
items = [item[type_dict["iterable_key"]]["items"]
for item in content][0] if self.smart_discography and url_type == "artist":
# change `save_space` and `skip_extras` for customization
items = self._smart_discography_filter(
content,
save_space=True,
skip_extras=True,
)
else:
items = [item[type_dict["iterable_key"]]["items"] for item in content][
0
]
logger.info(f"{YELLOW}{len(items)} downloads in queue") logger.info(f"{YELLOW}{len(items)} downloads in queue")
for item in items: for item in items:
self.download_from_id( self.download_from_id(
@ -242,8 +249,7 @@ class QobuzDL:
f"{YELLOW}qobuz-dl will attempt to download the first " f"{YELLOW}qobuz-dl will attempt to download the first "
f"{self.lucky_limit} results." f"{self.lucky_limit} results."
) )
results = self.search_by_type(query, self.lucky_type, results = self.search_by_type(query, self.lucky_type, self.lucky_limit, True)
self.lucky_limit, True)
if download: if download:
self.download_list_of_urls(results) self.download_list_of_urls(results)
@ -306,8 +312,7 @@ class QobuzDL:
) )
url = "{}{}/{}".format(WEB_URL, item_type, i.get("id", "")) url = "{}{}/{}".format(WEB_URL, item_type, i.get("id", ""))
item_list.append({"text": text, "url": url} if not lucky item_list.append({"text": text, "url": url} if not lucky else url)
else url)
return item_list return item_list
except (KeyError, IndexError): except (KeyError, IndexError):
logger.info(f"{RED}Invalid type: {item_type}") logger.info(f"{RED}Invalid type: {item_type}")
@ -319,7 +324,7 @@ class QobuzDL:
except (ImportError, ModuleNotFoundError): except (ImportError, ModuleNotFoundError):
if os.name == "nt": if os.name == "nt":
sys.exit( sys.exit(
'Please install curses with ' "Please install curses with "
'"pip3 install windows-curses" to continue' '"pip3 install windows-curses" to continue'
) )
raise raise
@ -339,15 +344,15 @@ class QobuzDL:
try: try:
item_types = ["Albums", "Tracks", "Artists", "Playlists"] item_types = ["Albums", "Tracks", "Artists", "Playlists"]
selected_type = pick(item_types, selected_type = pick(item_types, "I'll search for:\n[press Intro]")[0][
"I'll search for:\n[press Intro]" :-1
)[0][:-1].lower() ].lower()
logger.info(f"{YELLOW}Ok, we'll search for " logger.info(f"{YELLOW}Ok, we'll search for " f"{selected_type}s{RESET}")
f"{selected_type}s{RESET}")
final_url_list = [] final_url_list = []
while True: while True:
query = input(f"{CYAN}Enter your search: [Ctrl + c to quit]\n" query = input(
f"-{DF} ") f"{CYAN}Enter your search: [Ctrl + c to quit]\n" f"-{DF} "
)
logger.info(f"{YELLOW}Searching...{RESET}") logger.info(f"{YELLOW}Searching...{RESET}")
options = self.search_by_type( options = self.search_by_type(
query, selected_type, self.interactive_limit query, selected_type, self.interactive_limit
@ -369,8 +374,7 @@ class QobuzDL:
options_map_func=get_title_text, options_map_func=get_title_text,
) )
if len(selected_items) > 0: if len(selected_items) > 0:
[final_url_list.append(i[0]["url"]) [final_url_list.append(i[0]["url"]) for i in selected_items]
for i in selected_items]
y_n = pick( y_n = pick(
["Yes", "No"], ["Yes", "No"],
"Items were added to queue to be downloaded. " "Items were added to queue to be downloaded. "
@ -427,13 +431,13 @@ class QobuzDL:
pl_title = sanitize_filename(soup.select_one("h1").text) pl_title = sanitize_filename(soup.select_one("h1").text)
pl_directory = os.path.join(self.directory, pl_title) pl_directory = os.path.join(self.directory, pl_title)
logger.info( logger.info(
f"{YELLOW}Downloading playlist: {pl_title} " f"{YELLOW}Downloading playlist: {pl_title} " f"({len(track_list)} tracks)"
f"({len(track_list)} tracks)"
) )
for i in track_list: for i in track_list:
track_id = self.get_id(self.search_by_type(i, "track", 1, track_id = self.get_url_info(
lucky=True)[0]) self.search_by_type(i, "track", 1, lucky=True)[0]
)[1]
if track_id: if track_id:
self.download_from_id(track_id, False, pl_directory) self.download_from_id(track_id, False, pl_directory)
@ -465,8 +469,7 @@ class QobuzDL:
if not audio_files or len(audio_files) != len(audio_rel_files): if not audio_files or len(audio_files) != len(audio_rel_files):
continue continue
for audio_rel_file, audio_file in zip(audio_rel_files, for audio_rel_file, audio_file in zip(audio_rel_files, audio_files):
audio_files):
try: try:
pl_item = ( pl_item = (
EasyMP3(audio_file) EasyMP3(audio_file)
@ -486,3 +489,87 @@ class QobuzDL:
if len(track_list) > 1: if len(track_list) > 1:
with open(os.path.join(pl_directory, pl_name), "w") as pl: with open(os.path.join(pl_directory, pl_name), "w") as pl:
pl.write("\n\n".join(track_list)) pl.write("\n\n".join(track_list))
def _smart_discography_filter(
self, contents: list, save_space: bool = False, skip_extras: bool = False
) -> list:
"""When downloading some artists' discography, many random and spam-like
albums can get downloaded. This helps filter those out to just get the good stuff.
This function removes:
* albums by other artists, which may contain a feature from the requested artist
* duplicate albums in different qualities
* (optionally) removes collector's, deluxe, live albums
:param list contents: contents returned by qobuz API
:param bool save_space: choose highest bit depth, lowest sampling rate
:param bool remove_extras: remove albums with extra material (i.e. live, deluxe,...)
:returns: filtered items list
"""
# for debugging
def print_album(album: dict) -> None:
logger.debug(
f"{album['title']} - {album.get('version', '~~')} ({album['maximum_bit_depth']}/{album['maximum_sampling_rate']} by {album['artist']['name']}) {album['id']}"
)
TYPE_REGEXES = {
"remaster": r"(?i)(re)?master(ed)?",
"extra": r"(?i)(anniversary|deluxe|live|collector|demo|expanded)",
}
def is_type(album_t: str, album: dict) -> bool:
"""Check if album is of type `album_t`"""
version = album.get("version", "")
title = album.get("title", "")
regex = TYPE_REGEXES[album_t]
return re.search(regex, f"{title} {version}") is not None
def essence(album: dict) -> str:
"""Ignore text in parens/brackets, return all lowercase.
Used to group two albums that may be named similarly, but not exactly
the same.
"""
r = re.match(r"([^\(]+)(?:\s*[\(\[][^\)][\)\]])*", album)
return r.group(1).strip().lower()
requested_artist = contents[0]["name"]
items = [item["albums"]["items"] for item in contents][0]
# use dicts to group duplicate albums together by title
title_grouped = dict()
for item in items:
if (t := essence(item["title"])) not in title_grouped:
title_grouped[t] = []
title_grouped[t].append(item)
items = []
for albums in title_grouped.values():
best_bit_depth = max(a["maximum_bit_depth"] for a in albums)
get_best = min if save_space else max
best_sampling_rate = get_best(
a["maximum_sampling_rate"]
for a in albums
if a["maximum_bit_depth"] == best_bit_depth
)
remaster_exists = any(is_type("remaster", a) for a in albums)
def is_valid(album: dict) -> bool:
return (
album["maximum_bit_depth"] == best_bit_depth
and album["maximum_sampling_rate"] == best_sampling_rate
and album["artist"]["name"] == requested_artist
and not ( # states that are not allowed
(remaster_exists and not is_type("remaster", album))
or (skip_extras and is_type("extra", album))
)
)
filtered = tuple(filter(is_valid, albums))
# most of the time, len is 0 or 1.
# if greater, it is a complete duplicate,
# so it doesn't matter which is chosen
if len(filtered) >= 1:
items.append(filtered[0])
return items

View File

@ -13,14 +13,14 @@ from qobuz_dl.exceptions import NonStreamable
QL_DOWNGRADE = "FormatRestrictedByFormatAvailability" QL_DOWNGRADE = "FormatRestrictedByFormatAvailability"
# used in case of error # used in case of error
DEFAULT_FORMATS = { DEFAULT_FORMATS = {
'MP3': [ "MP3": [
'{artist} - {album} ({year}) [MP3]', "{artist} - {album} ({year}) [MP3]",
'{tracknumber}. {tracktitle}', "{tracknumber}. {tracktitle}",
],
"Unknown": [
"{artist} - {album}",
"{tracknumber}. {tracktitle}",
], ],
'Unknown': [
'{artist} - {album}',
'{tracknumber}. {tracktitle}',
]
} }
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -43,16 +43,16 @@ def tqdm_download(url, fname, track_name):
def get_description(u: dict, track_title, multiple=None): def get_description(u: dict, track_title, multiple=None):
downloading_title = f'{track_title} ' downloading_title = f"{track_title} "
f'[{u["bit_depth"]}/{u["sampling_rate"]}]' f'[{u["bit_depth"]}/{u["sampling_rate"]}]'
if multiple: if multiple:
downloading_title = f"[Disc {multiple}] {downloading_title}" downloading_title = f"[Disc {multiple}] {downloading_title}"
return downloading_title return downloading_title
def get_format(client, item_dict, def get_format(
quality, is_track_id=False, client, item_dict, quality, is_track_id=False, track_url_dict=None
track_url_dict=None) -> Tuple[str, bool, int, int]: ) -> Tuple[str, bool, int, int]:
quality_met = True quality_met = True
if int(quality) == 5: if int(quality) == 5:
return ("MP3", quality_met, None, None) return ("MP3", quality_met, None, None)
@ -69,8 +69,7 @@ def get_format(client, item_dict,
restrictions = new_track_dict.get("restrictions") restrictions = new_track_dict.get("restrictions")
if isinstance(restrictions, list): if isinstance(restrictions, list):
if any( if any(
restriction.get("code") == QL_DOWNGRADE restriction.get("code") == QL_DOWNGRADE for restriction in restrictions
for restriction in restrictions
): ):
quality_met = False quality_met = False
@ -119,7 +118,7 @@ def download_and_tag(
is_mp3, is_mp3,
embed_art=False, embed_art=False,
multiple=None, multiple=None,
track_format='{tracknumber}. {tracktitle}', track_format="{tracknumber}. {tracktitle}",
): ):
""" """
Download and tag a file Download and tag a file
@ -155,14 +154,15 @@ def download_and_tag(
track_title = track_metadata.get("title") track_title = track_metadata.get("title")
artist = _safe_get(track_metadata, "performer", "name") artist = _safe_get(track_metadata, "performer", "name")
filename_attr = { filename_attr = {
'artist': artist, "artist": artist,
'albumartist': _safe_get(track_metadata, "album", "artist", "name", "albumartist": _safe_get(
default=artist), track_metadata, "album", "artist", "name", default=artist
'bit_depth': track_metadata['maximum_bit_depth'], ),
'sampling_rate': track_metadata['maximum_sampling_rate'], "bit_depth": track_metadata["maximum_bit_depth"],
'tracktitle': track_title, "sampling_rate": track_metadata["maximum_sampling_rate"],
'version': track_metadata.get("version"), "tracktitle": track_title,
'tracknumber': f"{track_metadata['track_number']:02}" "version": track_metadata.get("version"),
"tracknumber": f"{track_metadata['track_number']:02}",
} }
# track_format is a format string # track_format is a format string
# e.g. '{tracknumber}. {artist} - {tracktitle}' # e.g. '{tracknumber}. {artist} - {tracktitle}'
@ -201,9 +201,8 @@ def download_id_by_type(
downgrade_quality=True, downgrade_quality=True,
cover_og_quality=False, cover_og_quality=False,
no_cover=False, no_cover=False,
folder_format='{artist} - {album} ({year}) ' folder_format="{artist} - {album} ({year}) " "[{bit_depth}B-{sampling_rate}kHz]",
'[{bit_depth}B-{sampling_rate}kHz]', track_format="{tracknumber}. {tracktitle}",
track_format='{tracknumber}. {tracktitle}',
): ):
""" """
Download and get metadata by ID and type (album or track) Download and get metadata by ID and type (album or track)
@ -243,43 +242,39 @@ def download_id_by_type(
if not downgrade_quality and not quality_met: if not downgrade_quality and not quality_met:
logger.info( logger.info(
f"{OFF}Skipping {album_title} as it doesn't " f"{OFF}Skipping {album_title} as it doesn't " "meet quality requirement"
"meet quality requirement"
) )
return return
logger.info(f"\n{YELLOW}Downloading: {album_title}\n" logger.info(
f"Quality: {file_format}\n") f"\n{YELLOW}Downloading: {album_title}\nQuality: {file_format} ({bit_depth}/{sampling_rate})\n"
album_attr = {
'artist': meta["artist"]["name"],
'album': album_title,
'year': meta["release_date_original"].split("-")[0],
'format': file_format,
'bit_depth': bit_depth,
'sampling_rate': sampling_rate
}
folder_format, track_format = _clean_format_str(folder_format,
track_format,
file_format)
sanitized_title = sanitize_filename(
folder_format.format(**album_attr)
) )
album_attr = {
"artist": meta["artist"]["name"],
"album": album_title,
"year": meta["release_date_original"].split("-")[0],
"format": file_format,
"bit_depth": bit_depth,
"sampling_rate": sampling_rate,
}
folder_format, track_format = _clean_format_str(
folder_format, track_format, file_format
)
sanitized_title = sanitize_filename(folder_format.format(**album_attr))
dirn = os.path.join(path, sanitized_title) dirn = os.path.join(path, sanitized_title)
os.makedirs(dirn, exist_ok=True) os.makedirs(dirn, exist_ok=True)
if no_cover: if no_cover:
logger.info(f"{OFF}Skipping cover") logger.info(f"{OFF}Skipping cover")
else: else:
get_extra(meta["image"]["large"], dirn, get_extra(meta["image"]["large"], dirn, og_quality=cover_og_quality)
og_quality=cover_og_quality)
if "goodies" in meta: if "goodies" in meta:
try: try:
get_extra(meta["goodies"][0]["url"], dirn, "booklet.pdf") get_extra(meta["goodies"][0]["url"], dirn, "booklet.pdf")
except: # noqa except: # noqa
pass pass
media_numbers = [track["media_number"] for track in media_numbers = [track["media_number"] for track in meta["tracks"]["items"]]
meta["tracks"]["items"]]
is_multiple = True if len([*{*media_numbers}]) > 1 else False is_multiple = True if len([*{*media_numbers}]) > 1 else False
for i in meta["tracks"]["items"]: for i in meta["tracks"]["items"]:
parse = client.get_track_url(i["id"], quality) parse = client.get_track_url(i["id"], quality)
@ -307,13 +302,14 @@ def download_id_by_type(
meta = client.get_track_meta(item_id) meta = client.get_track_meta(item_id)
track_title = get_title(meta) track_title = get_title(meta)
logger.info(f"\n{YELLOW}Downloading: {track_title}") logger.info(f"\n{YELLOW}Downloading: {track_title}")
format_info = get_format(client, meta, quality, format_info = get_format(
is_track_id=True, track_url_dict=parse) client, meta, quality, is_track_id=True, track_url_dict=parse
)
file_format, quality_met, bit_depth, sampling_rate = format_info file_format, quality_met, bit_depth, sampling_rate = format_info
folder_format, track_format = _clean_format_str(folder_format, folder_format, track_format = _clean_format_str(
track_format, folder_format, track_format, bit_depth
bit_depth) )
if not downgrade_quality and not quality_met: if not downgrade_quality and not quality_met:
logger.info( logger.info(
@ -322,15 +318,13 @@ def download_id_by_type(
) )
return return
track_attr = { track_attr = {
'artist': meta["album"]["artist"]["name"], "artist": meta["album"]["artist"]["name"],
'tracktitle': track_title, "tracktitle": track_title,
'year': meta["album"]["release_date_original"].split("-")[0], "year": meta["album"]["release_date_original"].split("-")[0],
'bit_depth': bit_depth, "bit_depth": bit_depth,
'sampling_rate': sampling_rate "sampling_rate": sampling_rate,
} }
sanitized_title = sanitize_filename( sanitized_title = sanitize_filename(folder_format.format(**track_attr))
folder_format.format(**track_attr)
)
dirn = os.path.join(path, sanitized_title) dirn = os.path.join(path, sanitized_title)
os.makedirs(dirn, exist_ok=True) os.makedirs(dirn, exist_ok=True)
@ -338,13 +332,20 @@ def download_id_by_type(
logger.info(f"{OFF}Skipping cover") logger.info(f"{OFF}Skipping cover")
else: else:
get_extra( get_extra(
meta["album"]["image"]["large"], dirn, meta["album"]["image"]["large"], dirn, og_quality=cover_og_quality
og_quality=cover_og_quality
) )
is_mp3 = True if int(quality) == 5 else False is_mp3 = True if int(quality) == 5 else False
download_and_tag(dirn, count, parse, meta, download_and_tag(
meta, True, is_mp3, embed_art, dirn,
track_format=track_format) count,
parse,
meta,
meta,
True,
is_mp3,
embed_art,
track_format=track_format,
)
else: else:
logger.info(f"{OFF}Demo. Skipping") logger.info(f"{OFF}Demo. Skipping")
logger.info(f"{GREEN}Completed") logger.info(f"{GREEN}Completed")
@ -352,25 +353,28 @@ def download_id_by_type(
# ----------- Utilities ----------- # ----------- Utilities -----------
def _clean_format_str(folder: str, track: str,
file_format: str) -> Tuple[str, str]: def _clean_format_str(folder: str, track: str, file_format: str) -> Tuple[str, str]:
'''Cleans up the format strings, avoids errors """Cleans up the format strings, avoids errors
with MP3 files. with MP3 files.
''' """
final = [] final = []
for i, fs in enumerate((folder, track)): for i, fs in enumerate((folder, track)):
if fs.endswith('.mp3'): if fs.endswith(".mp3"):
fs = fs[:-4] fs = fs[:-4]
elif fs.endswith('.flac'): elif fs.endswith(".flac"):
fs = fs[:-5] fs = fs[:-5]
fs = fs.strip() fs = fs.strip()
# default to pre-chosen string if format is invalid # default to pre-chosen string if format is invalid
if (file_format in ('MP3', 'Unknown') and if file_format in ("MP3", "Unknown") and (
'bit_depth' in file_format or 'sampling_rate' in file_format): "bit_depth" in fs or "sampling_rate" in fs
):
default = DEFAULT_FORMATS[file_format][i] default = DEFAULT_FORMATS[file_format][i]
logger.error(f'{RED}invalid format string for format {file_format}' logger.error(
f'. defaulting to {default}') f"{RED}invalid format string for format {file_format}"
f". defaulting to {default}"
)
fs = default fs = default
final.append(fs) final.append(fs)
@ -378,18 +382,18 @@ def _clean_format_str(folder: str, track: str,
def _safe_get(d: dict, *keys, default=None): def _safe_get(d: dict, *keys, default=None):
'''A replacement for chained `get()` statements on dicts: """A replacement for chained `get()` statements on dicts:
>>> d = {'foo': {'bar': 'baz'}} >>> d = {'foo': {'bar': 'baz'}}
>>> _safe_get(d, 'baz') >>> _safe_get(d, 'baz')
None None
>>> _safe_get(d, 'foo', 'bar') >>> _safe_get(d, 'foo', 'bar')
'baz' 'baz'
''' """
curr = d curr = d
res = default res = default
for key in keys: for key in keys:
res = curr.get(key, default) res = curr.get(key, default)
if res == default or not hasattr(res, '__getitem__'): if res == default or not hasattr(res, "__getitem__"):
return res return res
else: else:
curr = res curr = res

View File

@ -1,3 +1,4 @@
import re
import os import os
import logging import logging
@ -9,7 +10,7 @@ logger = logging.getLogger(__name__)
# unicode symbols # unicode symbols
COPYRIGHT, PHON_COPYRIGHT = '\u2117', '\u00a9' COPYRIGHT, PHON_COPYRIGHT = "\u2117", "\u00a9"
# if a metadata block exceeds this, mutagen will raise error # if a metadata block exceeds this, mutagen will raise error
# and the file won't be tagged # and the file won't be tagged
FLAC_MAX_BLOCKSIZE = 16777215 FLAC_MAX_BLOCKSIZE = 16777215
@ -28,27 +29,25 @@ def get_title(track_dict):
def _format_copyright(s: str) -> str: def _format_copyright(s: str) -> str:
s = s.replace('(P)', PHON_COPYRIGHT) s = s.replace("(P)", PHON_COPYRIGHT)
s = s.replace('(C)', COPYRIGHT) s = s.replace("(C)", COPYRIGHT)
return s return s
def _format_genres(genres: list) -> str: def _format_genres(genres: list) -> str:
'''Fixes the weirdly formatted genre lists returned by the API. """Fixes the weirdly formatted genre lists returned by the API.
>>> g = ['Pop/Rock', 'Pop/Rock→Rock', 'Pop/Rock→Rock→Alternatif et Indé'] >>> g = ['Pop/Rock', 'Pop/Rock→Rock', 'Pop/Rock→Rock→Alternatif et Indé']
>>> _format_genres(g) >>> _format_genres(g)
'Pop/Rock, Rock, Alternatif et Indé' 'Pop, Rock, Alternatif et Indé'
''' """
genres = re.findall(r"([^\u2192\/]+)", "/".join(genres))
if genres == []: no_repeats = []
return '' [no_repeats.append(g) for g in genres if g not in no_repeats]
else: return ", ".join(no_repeats)
return ', '.join(genres[-1].split('\u2192'))
# Use KeyError catching instead of dict.get to avoid empty tags # Use KeyError catching instead of dict.get to avoid empty tags
def tag_flac(filename, root_dir, final_name, d, album, def tag_flac(filename, root_dir, final_name, d, album, istrack=True, em_image=False):
istrack=True, em_image=False):
""" """
Tag a FLAC file Tag a FLAC file
@ -116,8 +115,10 @@ def tag_flac(filename, root_dir, final_name, d, album,
# rest of the metadata still gets embedded # rest of the metadata still gets embedded
# when the image size is too big # when the image size is too big
if os.path.getsize(cover_image) > FLAC_MAX_BLOCKSIZE: if os.path.getsize(cover_image) > FLAC_MAX_BLOCKSIZE:
raise Exception("downloaded cover size too large to embed. " raise Exception(
"turn off `og_cover` to avoid error") "downloaded cover size too large to embed. "
"turn off `og_cover` to avoid error"
)
image = Picture() image = Picture()
image.type = 3 image.type = 3
@ -133,8 +134,7 @@ def tag_flac(filename, root_dir, final_name, d, album,
os.rename(filename, final_name) os.rename(filename, final_name)
def tag_mp3(filename, root_dir, final_name, d, album, def tag_mp3(filename, root_dir, final_name, d, album, istrack=True, em_image=False):
istrack=True, em_image=False):
""" """
Tag an mp3 file Tag an mp3 file
@ -159,7 +159,7 @@ def tag_mp3(filename, root_dir, final_name, d, album,
"label": id3.TPUB, "label": id3.TPUB,
"performer": id3.TOPE, "performer": id3.TOPE,
"title": id3.TIT2, "title": id3.TIT2,
"year": id3.TYER "year": id3.TYER,
} }
try: try:
audio = id3.ID3(filename) audio = id3.ID3(filename)
@ -168,19 +168,19 @@ def tag_mp3(filename, root_dir, final_name, d, album,
# temporarily holds metadata # temporarily holds metadata
tags = dict() tags = dict()
tags['title'] = get_title(d) tags["title"] = get_title(d)
try: try:
tags['label'] = album["label"]["name"] tags["label"] = album["label"]["name"]
except KeyError: except KeyError:
pass pass
try: try:
tags['artist'] = d["performer"]["name"] tags["artist"] = d["performer"]["name"]
except KeyError: except KeyError:
if istrack: if istrack:
tags['artist'] = d["album"]["artist"]["name"] tags["artist"] = d["album"]["artist"]["name"]
else: else:
tags['artist'] = album["artist"]["name"] tags["artist"] = album["artist"]["name"]
if istrack: if istrack:
tags["genre"] = _format_genres(d["album"]["genres_list"]) tags["genre"] = _format_genres(d["album"]["genres_list"])
@ -197,12 +197,10 @@ def tag_mp3(filename, root_dir, final_name, d, album,
tags["copyright"] = _format_copyright(album["copyright"]) tags["copyright"] = _format_copyright(album["copyright"])
tracktotal = str(album["tracks_count"]) tracktotal = str(album["tracks_count"])
tags['year'] = tags['date'][:4] tags["year"] = tags["date"][:4]
audio['TRCK'] = id3.TRCK(encoding=3, audio["TRCK"] = id3.TRCK(encoding=3, text=f'{d["track_number"]}/{tracktotal}')
text=f'{d["track_number"]}/{tracktotal}') audio["TPOS"] = id3.TPOS(encoding=3, text=str(d["media_number"]))
audio['TPOS'] = id3.TPOS(encoding=3,
text=str(d["media_number"]))
# write metadata in `tags` to file # write metadata in `tags` to file
for k, v in tags.items(): for k, v in tags.items():
@ -219,8 +217,8 @@ def tag_mp3(filename, root_dir, final_name, d, album,
else: else:
cover_image = multi_emb_image cover_image = multi_emb_image
with open(cover_image, 'rb') as cover: with open(cover_image, "rb") as cover:
audio.add(id3.APIC(3, 'image/jpeg', 3, '', cover.read())) audio.add(id3.APIC(3, "image/jpeg", 3, "", cover.read()))
audio.save(filename, 'v2_version=3') audio.save(filename, "v2_version=3")
os.rename(filename, final_name) os.rename(filename, final_name)