diff --git a/qobuz_dl/cli.py b/qobuz_dl/cli.py index 9ee0cb8..b0ac590 100644 --- a/qobuz_dl/cli.py +++ b/qobuz_dl/cli.py @@ -9,6 +9,7 @@ import qobuz_dl.spoofbuz as spoofbuz from qobuz_dl.color import GREEN, RED, YELLOW from qobuz_dl.commands import qobuz_dl_args from qobuz_dl.core import QobuzDL +from qobuz_dl.downloader import DEFAULT_FOLDER, DEFAULT_TRACK logging.basicConfig( level=logging.INFO, @@ -25,7 +26,7 @@ CONFIG_FILE = os.path.join(CONFIG_PATH, "config.ini") QOBUZ_DB = os.path.join(CONFIG_PATH, "qobuz_dl.db") -def reset_config(config_file): +def _reset_config(config_file): logging.info(f"{YELLOW}Creating config file: {config_file}") config = configparser.ConfigParser() config["DEFAULT"]["email"] = input("Enter your email:\n- ") @@ -55,9 +56,8 @@ def reset_config(config_file): spoofer = spoofbuz.Spoofer() config["DEFAULT"]["app_id"] = str(spoofer.getAppId()) config["DEFAULT"]["secrets"] = ",".join(spoofer.getSecrets().values()) - config["DEFAULT"]["folder_format"] = "{artist} - {album} ({year}) " - "[{bit_depth}B-{sampling_rate}kHz]" - config["DEFAULT"]["track_format"] = "{tracknumber}. {tracktitle}" + config["DEFAULT"]["folder_format"] = DEFAULT_FOLDER + config["DEFAULT"]["track_format"] = DEFAULT_TRACK config["DEFAULT"]["smart_discography"] = "false" with open(config_file, "w") as configfile: config.write(configfile) @@ -68,7 +68,7 @@ def reset_config(config_file): ) -def remove_leftovers(directory): +def _remove_leftovers(directory): directory = os.path.join(directory, "**", ".*.tmp") for i in glob.glob(directory, recursive=True): try: @@ -77,14 +77,41 @@ def remove_leftovers(directory): pass -def main(): +def _handle_commands(qobuz, arguments): + try: + if arguments.command == "dl": + qobuz.download_list_of_urls(arguments.SOURCE) + elif arguments.command == "lucky": + query = " ".join(arguments.QUERY) + qobuz.lucky_type = arguments.type + qobuz.lucky_limit = arguments.number + qobuz.lucky_mode(query) + else: + qobuz.interactive_limit = arguments.limit + qobuz.interactive() + + except KeyboardInterrupt: + logging.info( + f"{RED}Interrupted by user\n{YELLOW}Already downloaded items will " + "be skipped if you try to download the same releases again." + ) + + finally: + _remove_leftovers(qobuz.directory) + + +def _initial_checks(): if not os.path.isdir(CONFIG_PATH) or not os.path.isfile(CONFIG_FILE): os.makedirs(CONFIG_PATH, exist_ok=True) - reset_config(CONFIG_FILE) + _reset_config(CONFIG_FILE) if len(sys.argv) < 2: sys.exit(qobuz_dl_args().print_help()) + +def main(): + _initial_checks() + config = configparser.ConfigParser() config.read(CONFIG_FILE) @@ -102,22 +129,6 @@ def main(): no_cover = config.getboolean("DEFAULT", "no_cover") no_database = config.getboolean("DEFAULT", "no_database") app_id = config["DEFAULT"]["app_id"] - - if ( - "folder_format" not in config["DEFAULT"] - or "track_format" not in config["DEFAULT"] - or "smart_discography" not in config["DEFAULT"] - ): - logging.info( - f"{YELLOW}Config file does not include some settings, updating..." - ) - config["DEFAULT"]["folder_format"] = "{artist} - {album} ({year}) " - "[{bit_depth}B-{sampling_rate}kHz]" - config["DEFAULT"]["track_format"] = "{tracknumber}. {tracktitle}" - config["DEFAULT"]["smart_discography"] = "false" - with open(CONFIG_FILE, "w") as cf: - config.write(cf) - smart_discography = config.getboolean("DEFAULT", "smart_discography") folder_format = config["DEFAULT"]["folder_format"] track_format = config["DEFAULT"]["track_format"] @@ -128,15 +139,16 @@ def main(): arguments = qobuz_dl_args( default_quality, default_limit, default_folder ).parse_args() - except (KeyError, UnicodeDecodeError, configparser.Error): + except (KeyError, UnicodeDecodeError, configparser.Error) as error: arguments = qobuz_dl_args().parse_args() if not arguments.reset: sys.exit( - f"{RED}Your config file is corrupted! Run 'qobuz-dl -r' to fix this." + f"{RED}Your config file is corrupted: {error}! " + "Run 'qobuz-dl -r' to fix this." ) if arguments.reset: - sys.exit(reset_config(CONFIG_FILE)) + sys.exit(_reset_config(CONFIG_FILE)) if arguments.purge: try: @@ -161,26 +173,7 @@ def main(): ) qobuz.initialize_client(email, password, app_id, secrets) - try: - if arguments.command == "dl": - qobuz.download_list_of_urls(arguments.SOURCE) - elif arguments.command == "lucky": - query = " ".join(arguments.QUERY) - qobuz.lucky_type = arguments.type - qobuz.lucky_limit = arguments.number - qobuz.lucky_mode(query) - else: - qobuz.interactive_limit = arguments.limit - qobuz.interactive() - - except KeyboardInterrupt: - logging.info( - f"{RED}Interrupted by user\n{YELLOW}Already downloaded items will " - "be skipped if you try to download the same releases again." - ) - - finally: - remove_leftovers(qobuz.directory) + _handle_commands(qobuz, arguments) if __name__ == "__main__": diff --git a/qobuz_dl/core.py b/qobuz_dl/core.py index e27b23d..82bcc74 100644 --- a/qobuz_dl/core.py +++ b/qobuz_dl/core.py @@ -1,15 +1,9 @@ import logging import os -import re -import string import sys -import time -from typing import Tuple import requests from bs4 import BeautifulSoup as bso -from mutagen.flac import FLAC -from mutagen.mp3 import EasyMP3 from pathvalidate import sanitize_filename import qobuz_dl.spoofbuz as spoofbuz @@ -17,11 +11,18 @@ from qobuz_dl import downloader, qopy from qobuz_dl.color import CYAN, OFF, RED, YELLOW, DF, RESET from qobuz_dl.exceptions import NonStreamable from qobuz_dl.db import create_db, handle_download_id +from qobuz_dl.utils import ( + get_url_info, + make_m3u, + smart_discography_filter, + format_duration, + create_and_return_dir, + PartialFormatter, +) WEB_URL = "https://play.qobuz.com/" ARTISTS_SELECTOR = "td.chartlist-artist > a" TITLE_SELECTOR = "td.chartlist-name > a" -EXTENSIONS = (".mp3", ".flac") QUALITIES = { 5: "5 - MP3", 6: "6 - 16 bit, 44.1kHz", @@ -32,28 +33,6 @@ QUALITIES = { logger = logging.getLogger(__name__) -class PartialFormatter(string.Formatter): - def __init__(self, missing="n/a", bad_fmt="n/a"): - self.missing, self.bad_fmt = missing, bad_fmt - - def get_field(self, field_name, args, kwargs): - try: - val = super(PartialFormatter, self).get_field(field_name, args, kwargs) - except (KeyError, AttributeError): - val = None, field_name - return val - - def format_field(self, value, spec): - if not value: - return self.missing - try: - return super(PartialFormatter, self).format_field(value, spec) - except ValueError: - if self.bad_fmt: - return self.bad_fmt - raise - - class QobuzDL: def __init__( self, @@ -74,7 +53,7 @@ class QobuzDL: track_format="{tracknumber}. {tracktitle}", smart_discography=False, ): - self.directory = self.create_dir(directory) + self.directory = create_and_return_dir(directory) self.quality = quality self.embed_art = embed_art self.lucky_limit = lucky_limit @@ -101,28 +80,6 @@ class QobuzDL: secret for secret in spoofer.getSecrets().values() if secret ] # avoid empty fields - def create_dir(self, directory=None): - fix = os.path.normpath(directory) - os.makedirs(fix, exist_ok=True) - return fix - - def get_url_info(self, url: str) -> Tuple[str, str]: - """Returns the type of the url and the id. - - Compatible with urls of the form: - https://www.qobuz.com/us-en/{type}/{name}/{id} - https://open.qobuz.com/{type}/{id} - https://play.qobuz.com/{type}/{id} - /us-en/{type}/-/{id} - """ - - r = re.search( - r"(?:https:\/\/(?:w{3}|open|play)\.qobuz\.com)?(?:\/[a-z]{2}-[a-z]{2})" - r"?\/(album|artist|track|playlist|label)(?:\/[-\w\d]+)?\/([\w\d]+)", - url, - ) - return r.groups() - def download_from_id(self, item_id, album=True, alt_path=None): if handle_download_id(self.downloads_db, item_id, add_id=False): logger.info( @@ -132,20 +89,20 @@ class QobuzDL: ) return try: - downloader.download_id_by_type( + dloader = downloader.Download( self.client, item_id, alt_path or self.directory, - str(self.quality), - album, + int(self.quality), self.embed_art, self.ignore_singles_eps, self.quality_fallback, self.cover_og_quality, self.no_cover, - folder_format=self.folder_format, - track_format=self.track_format, + self.folder_format, + self.track_format, ) + dloader.download_id_by_type(not album) handle_download_id(self.downloads_db, item_id, add_id=True) except (requests.exceptions.RequestException, NonStreamable) as e: logger.error(f"{RED}Error getting release: {e}. Skipping...") @@ -168,7 +125,7 @@ class QobuzDL: "track": {"album": False, "func": None, "iterable_key": None}, } try: - url_type, item_id = self.get_url_info(url) + url_type, item_id = get_url_info(url) type_dict = possibles[url_type] except (KeyError, IndexError): logger.info( @@ -182,13 +139,13 @@ class QobuzDL: f"{YELLOW}Downloading all the music from {content_name} " f"({url_type})!" ) - new_path = self.create_dir( + new_path = create_and_return_dir( os.path.join(self.directory, sanitize_filename(content_name)) ) if self.smart_discography and url_type == "artist": # change `save_space` and `skip_extras` for customization - items = self._smart_discography_filter( + items = smart_discography_filter( content, save_space=True, skip_extras=True, @@ -205,8 +162,8 @@ class QobuzDL: True if type_dict["iterable_key"] == "albums" else False, new_path, ) - if url_type == "playlist": - self.make_m3u(new_path) + if url_type == "playlist" and not self.no_m3u_for_playlists: + make_m3u(new_path) else: self.download_from_id(item_id, type_dict["album"]) @@ -256,9 +213,6 @@ class QobuzDL: return results - def format_duration(self, duration): - return time.strftime("%H:%M:%S", time.gmtime(duration)) - def search_by_type(self, query, item_type, limit=10, lucky=False): if len(query) < 3: logger.info("{RED}Your search query is too short or invalid") @@ -307,7 +261,7 @@ class QobuzDL: text = "{} - {} [{}]".format( text, - self.format_duration(i["duration"]), + format_duration(i["duration"]), "HI-RES" if i["hires_streamable"] else "LOSSLESS", ) @@ -435,143 +389,11 @@ class QobuzDL: ) for i in track_list: - track_id = self.get_url_info( - self.search_by_type(i, "track", 1, lucky=True)[0] - )[1] + track_id = get_url_info(self.search_by_type(i, "track", 1, lucky=True)[0])[ + 1 + ] if track_id: self.download_from_id(track_id, False, pl_directory) - self.make_m3u(pl_directory) - - def make_m3u(self, pl_directory): - if self.no_m3u_for_playlists: - return - - track_list = ["#EXTM3U"] - rel_folder = os.path.basename(os.path.normpath(pl_directory)) - pl_name = rel_folder + ".m3u" - for local, dirs, files in os.walk(pl_directory): - dirs.sort() - audio_rel_files = [ - # os.path.abspath(os.path.join(local, file_)) - # os.path.join(rel_folder, - # os.path.basename(os.path.normpath(local)), - # file_) - os.path.join(os.path.basename(os.path.normpath(local)), file_) - for file_ in files - if os.path.splitext(file_)[-1] in EXTENSIONS - ] - audio_files = [ - os.path.abspath(os.path.join(local, file_)) - for file_ in files - if os.path.splitext(file_)[-1] in EXTENSIONS - ] - if not audio_files or len(audio_files) != len(audio_rel_files): - continue - - for audio_rel_file, audio_file in zip(audio_rel_files, audio_files): - try: - pl_item = ( - EasyMP3(audio_file) - if ".mp3" in audio_file - else FLAC(audio_file) - ) - title = pl_item["TITLE"][0] - artist = pl_item["ARTIST"][0] - length = int(pl_item.info.length) - index = "#EXTINF:{}, {} - {}\n{}".format( - length, artist, title, audio_rel_file - ) - except: # noqa - continue - track_list.append(index) - - if len(track_list) > 1: - with open(os.path.join(pl_directory, pl_name), "w") as pl: - pl.write("\n\n".join(track_list)) - - def _smart_discography_filter( - self, contents: list, save_space: bool = False, skip_extras: bool = False - ) -> list: - """When downloading some artists' discography, many random and spam-like - albums can get downloaded. This helps filter those out to just get the good stuff. - - This function removes: - * albums by other artists, which may contain a feature from the requested artist - * duplicate albums in different qualities - * (optionally) removes collector's, deluxe, live albums - - :param list contents: contents returned by qobuz API - :param bool save_space: choose highest bit depth, lowest sampling rate - :param bool remove_extras: remove albums with extra material (i.e. live, deluxe,...) - :returns: filtered items list - """ - - # for debugging - def print_album(album: dict) -> None: - logger.debug( - f"{album['title']} - {album.get('version', '~~')} ({album['maximum_bit_depth']}/{album['maximum_sampling_rate']} by {album['artist']['name']}) {album['id']}" - ) - - TYPE_REGEXES = { - "remaster": r"(?i)(re)?master(ed)?", - "extra": r"(?i)(anniversary|deluxe|live|collector|demo|expanded)", - } - - def is_type(album_t: str, album: dict) -> bool: - """Check if album is of type `album_t`""" - version = album.get("version", "") - title = album.get("title", "") - regex = TYPE_REGEXES[album_t] - return re.search(regex, f"{title} {version}") is not None - - def essence(album: dict) -> str: - """Ignore text in parens/brackets, return all lowercase. - Used to group two albums that may be named similarly, but not exactly - the same. - """ - r = re.match(r"([^\(]+)(?:\s*[\(\[][^\)][\)\]])*", album) - return r.group(1).strip().lower() - - requested_artist = contents[0]["name"] - items = [item["albums"]["items"] for item in contents][0] - - # use dicts to group duplicate albums together by title - title_grouped = dict() - for item in items: - title_ = essence(item["title"]) - if title_ not in title_grouped: # ? -# if (t := essence(item["title"])) not in title_grouped: - title_grouped[title_] = [] - title_grouped[title_].append(item) - - items = [] - for albums in title_grouped.values(): - best_bit_depth = max(a["maximum_bit_depth"] for a in albums) - get_best = min if save_space else max - best_sampling_rate = get_best( - a["maximum_sampling_rate"] - for a in albums - if a["maximum_bit_depth"] == best_bit_depth - ) - remaster_exists = any(is_type("remaster", a) for a in albums) - - def is_valid(album: dict) -> bool: - return ( - album["maximum_bit_depth"] == best_bit_depth - and album["maximum_sampling_rate"] == best_sampling_rate - and album["artist"]["name"] == requested_artist - and not ( # states that are not allowed - (remaster_exists and not is_type("remaster", album)) - or (skip_extras and is_type("extra", album)) - ) - ) - - filtered = tuple(filter(is_valid, albums)) - # most of the time, len is 0 or 1. - # if greater, it is a complete duplicate, - # so it doesn't matter which is chosen - if len(filtered) >= 1: - items.append(filtered[0]) - - return items + if not self.no_m3u_for_playlists: + make_m3u(pl_directory) diff --git a/qobuz_dl/downloader.py b/qobuz_dl/downloader.py index 2dcafb9..66ef450 100644 --- a/qobuz_dl/downloader.py +++ b/qobuz_dl/downloader.py @@ -23,9 +23,288 @@ DEFAULT_FORMATS = { ], } +DEFAULT_FOLDER = "{artist} - {album} ({year}) [{bit_depth}B-{sampling_rate}kHz]" +DEFAULT_TRACK = "{tracknumber}. {tracktitle}" + logger = logging.getLogger(__name__) +class Download: + def __init__( + self, + client, + item_id: str, + path: str, + quality: int, + embed_art: bool = False, + albums_only: bool = False, + downgrade_quality: bool = False, + cover_og_quality: bool = False, + no_cover: bool = False, + folder_format=None, + track_format=None, + ): + self.client = client + self.item_id = item_id + self.path = path + self.quality = quality + self.albums_only = albums_only + self.embed_art = embed_art + self.downgrade_quality = downgrade_quality + self.cover_og_quality = cover_og_quality + self.no_cover = no_cover + self.folder_format = folder_format or DEFAULT_FOLDER + self.track_format = track_format or DEFAULT_TRACK + + def download_id_by_type(self, track=True): + if not track: + self.download_release() + else: + self.download_track() + + def download_release(self): + count = 0 + meta = self.client.get_album_meta(self.item_id) + + if not meta.get("streamable"): + raise NonStreamable("This release is not streamable") + + if self.albums_only and ( + meta.get("release_type") != "album" + or meta.get("artist").get("name") == "Various Artists" + ): + logger.info(f'{OFF}Ignoring Single/EP/VA: {meta.get("title", "n/a")}') + return + + album_title = _get_title(meta) + + format_info = self._get_format(meta) + file_format, quality_met, bit_depth, sampling_rate = format_info + + if not self.downgrade_quality and not quality_met: + logger.info( + f"{OFF}Skipping {album_title} as it doesn't meet quality requirement" + ) + return + + logger.info( + f"\n{YELLOW}Downloading: {album_title}\nQuality: {file_format}" + f" ({bit_depth}/{sampling_rate})\n" + ) + album_attr = self._get_album_attr( + meta, album_title, file_format, bit_depth, sampling_rate + ) + folder_format, track_format = _clean_format_str( + self.folder_format, self.track_format, file_format + ) + sanitized_title = sanitize_filename(folder_format.format(**album_attr)) + dirn = os.path.join(self.path, sanitized_title) + os.makedirs(dirn, exist_ok=True) + + if self.no_cover: + logger.info(f"{OFF}Skipping cover") + else: + _get_extra(meta["image"]["large"], dirn, og_quality=self.cover_og_quality) + + if "goodies" in meta: + try: + _get_extra(meta["goodies"][0]["url"], dirn, "booklet.pdf") + except: # noqa + pass + media_numbers = [track["media_number"] for track in meta["tracks"]["items"]] + is_multiple = True if len([*{*media_numbers}]) > 1 else False + for i in meta["tracks"]["items"]: + parse = self.client.get_track_url(i["id"], fmt_id=self.quality) + if "sample" not in parse and parse["sampling_rate"]: + is_mp3 = True if int(self.quality) == 5 else False + self._download_and_tag( + dirn, + count, + parse, + i, + meta, + False, + is_mp3, + i["media_number"] if is_multiple else None, + ) + else: + logger.info(f"{OFF}Demo. Skipping") + count = count + 1 + logger.info(f"{GREEN}Completed") + + def download_track(self): + parse = self.client.get_track_url(self.item_id, self.quality) + + if "sample" not in parse and parse["sampling_rate"]: + meta = self.client.get_track_meta(self.item_id) + track_title = _get_title(meta) + logger.info(f"\n{YELLOW}Downloading: {track_title}") + format_info = self._get_format(meta, is_track_id=True, track_url_dict=parse) + file_format, quality_met, bit_depth, sampling_rate = format_info + + folder_format, track_format = _clean_format_str( + self.folder_format, self.track_format, str(bit_depth) + ) + + if not self.downgrade_quality and not quality_met: + logger.info( + f"{OFF}Skipping {track_title} as it doesn't " + "meet quality requirement" + ) + return + track_attr = self._get_track_attr( + meta, track_title, bit_depth, sampling_rate + ) + sanitized_title = sanitize_filename(folder_format.format(**track_attr)) + + dirn = os.path.join(self.path, sanitized_title) + os.makedirs(dirn, exist_ok=True) + if self.no_cover: + logger.info(f"{OFF}Skipping cover") + else: + _get_extra( + meta["album"]["image"]["large"], + dirn, + og_quality=self.cover_og_quality, + ) + is_mp3 = True if int(self.quality) == 5 else False + self._download_and_tag( + dirn, + 1, + parse, + meta, + meta, + True, + is_mp3, + self.embed_art, + ) + else: + logger.info(f"{OFF}Demo. Skipping") + logger.info(f"{GREEN}Completed") + + def _download_and_tag( + self, + root_dir, + tmp_count, + track_url_dict, + track_metadata, + album_or_track_metadata, + is_track, + is_mp3, + multiple=None, + ): + extension = ".mp3" if is_mp3 else ".flac" + + try: + url = track_url_dict["url"] + except KeyError: + logger.info(f"{OFF}Track not available for download") + return + + if multiple: + root_dir = os.path.join(root_dir, f"Disc {multiple}") + os.makedirs(root_dir, exist_ok=True) + + filename = os.path.join(root_dir, f".{tmp_count:02}.tmp") + + # Determine the filename + track_title = track_metadata.get("title") + artist = _safe_get(track_metadata, "performer", "name") + filename_attr = self._get_filename_attr(artist, track_metadata, track_title) + + # track_format is a format string + # e.g. '{tracknumber}. {artist} - {tracktitle}' + formatted_path = sanitize_filename(self.track_format.format(**filename_attr)) + final_file = os.path.join(root_dir, formatted_path)[:250] + extension + + if os.path.isfile(final_file): + logger.info(f"{OFF}{track_title} was already downloaded") + return + + desc = _get_description(track_url_dict, track_title, multiple) + tqdm_download(url, filename, desc) + tag_function = metadata.tag_mp3 if is_mp3 else metadata.tag_flac + try: + tag_function( + filename, + root_dir, + final_file, + track_metadata, + album_or_track_metadata, + is_track, + self.embed_art, + ) + except Exception as e: + logger.error(f"{RED}Error tagging the file: {e}", exc_info=True) + + @staticmethod + def _get_filename_attr(artist, track_metadata, track_title): + return { + "artist": artist, + "albumartist": _safe_get( + track_metadata, "album", "artist", "name", default=artist + ), + "bit_depth": track_metadata["maximum_bit_depth"], + "sampling_rate": track_metadata["maximum_sampling_rate"], + "tracktitle": track_title, + "version": track_metadata.get("version"), + "tracknumber": f"{track_metadata['track_number']:02}", + } + + @staticmethod + def _get_track_attr(meta, track_title, bit_depth, sampling_rate): + return { + "album": meta["album"]["title"], + "artist": meta["album"]["artist"]["name"], + "tracktitle": track_title, + "year": meta["album"]["release_date_original"].split("-")[0], + "bit_depth": bit_depth, + "sampling_rate": sampling_rate, + } + + @staticmethod + def _get_album_attr(meta, album_title, file_format, bit_depth, sampling_rate): + return { + "artist": meta["artist"]["name"], + "album": album_title, + "year": meta["release_date_original"].split("-")[0], + "format": file_format, + "bit_depth": bit_depth, + "sampling_rate": sampling_rate, + } + + def _get_format(self, item_dict, is_track_id=False, track_url_dict=None): + quality_met = True + if int(self.quality) == 5: + return ("MP3", quality_met, None, None) + track_dict = item_dict + if not is_track_id: + track_dict = item_dict["tracks"]["items"][0] + + try: + new_track_dict = ( + self.client.get_track_url(track_dict["id"], fmt_id=self.quality) + if not track_url_dict + else track_url_dict + ) + restrictions = new_track_dict.get("restrictions") + if isinstance(restrictions, list): + if any( + restriction.get("code") == QL_DOWNGRADE + for restriction in restrictions + ): + quality_met = False + + return ( + "FLAC", + quality_met, + new_track_dict["bit_depth"], + new_track_dict["sampling_rate"], + ) + except (KeyError, requests.exceptions.HTTPError): + return ("Unknown", quality_met, None, None) + + def tqdm_download(url, fname, track_name): r = requests.get(url, allow_redirects=True, stream=True) total = int(r.headers.get("content-length", 0)) @@ -42,48 +321,15 @@ def tqdm_download(url, fname, track_name): bar.update(size) -def get_description(u: dict, track_title, multiple=None): +def _get_description(item: dict, track_title, multiple=None): downloading_title = f"{track_title} " - f'[{u["bit_depth"]}/{u["sampling_rate"]}]' + f'[{item["bit_depth"]}/{item["sampling_rate"]}]' if multiple: downloading_title = f"[Disc {multiple}] {downloading_title}" return downloading_title -def get_format( - client, item_dict, quality, is_track_id=False, track_url_dict=None -) -> Tuple[str, bool, int, int]: - quality_met = True - if int(quality) == 5: - return ("MP3", quality_met, None, None) - track_dict = item_dict - if not is_track_id: - track_dict = item_dict["tracks"]["items"][0] - - try: - new_track_dict = ( - client.get_track_url(track_dict["id"], quality) - if not track_url_dict - else track_url_dict - ) - restrictions = new_track_dict.get("restrictions") - if isinstance(restrictions, list): - if any( - restriction.get("code") == QL_DOWNGRADE for restriction in restrictions - ): - quality_met = False - - return ( - "FLAC", - quality_met, - new_track_dict["bit_depth"], - new_track_dict["sampling_rate"], - ) - except (KeyError, requests.exceptions.HTTPError): - return ("Unknown", quality_met, None, None) - - -def get_title(item_dict): +def _get_title(item_dict): album_title = item_dict["title"] version = item_dict.get("version") if version: @@ -95,266 +341,18 @@ def get_title(item_dict): return album_title -def get_extra(i, dirn, extra="cover.jpg", og_quality=False): +def _get_extra(item, dirn, extra="cover.jpg", og_quality=False): extra_file = os.path.join(dirn, extra) if os.path.isfile(extra_file): logger.info(f"{OFF}{extra} was already downloaded") return tqdm_download( - i.replace("_600.", "_org.") if og_quality else i, + item.replace("_600.", "_org.") if og_quality else item, extra_file, extra, ) -# Download and tag a file -def download_and_tag( - root_dir, - tmp_count, - track_url_dict, - track_metadata, - album_or_track_metadata, - is_track, - is_mp3, - embed_art=False, - multiple=None, - track_format="{tracknumber}. {tracktitle}", -): - """ - Download and tag a file - - :param str root_dir: Root directory where the track will be stored - :param int tmp_count: Temporal download file number - :param dict track_url_dict: get_track_url dictionary from Qobuz client - :param dict track_metadata: Track item dictionary from Qobuz client - :param dict album_or_track_metadata: Album/track dict from Qobuz client - :param bool is_track - :param bool is_mp3 - :param bool embed_art: Embed cover art into file (FLAC-only) - :param str track_format format-string that determines file naming - :param multiple: Multiple disc integer - :type multiple: integer or None - """ - - extension = ".mp3" if is_mp3 else ".flac" - - try: - url = track_url_dict["url"] - except KeyError: - logger.info(f"{OFF}Track not available for download") - return - - if multiple: - root_dir = os.path.join(root_dir, f"Disc {multiple}") - os.makedirs(root_dir, exist_ok=True) - - filename = os.path.join(root_dir, f".{tmp_count:02}.tmp") - - # Determine the filename - track_title = track_metadata.get("title") - artist = _safe_get(track_metadata, "performer", "name") - filename_attr = { - "artist": artist, - "albumartist": _safe_get( - track_metadata, "album", "artist", "name", default=artist - ), - "bit_depth": track_metadata["maximum_bit_depth"], - "sampling_rate": track_metadata["maximum_sampling_rate"], - "tracktitle": track_title, - "version": track_metadata.get("version"), - "tracknumber": f"{track_metadata['track_number']:02}", - } - # track_format is a format string - # e.g. '{tracknumber}. {artist} - {tracktitle}' - formatted_path = sanitize_filename(track_format.format(**filename_attr)) - final_file = os.path.join(root_dir, formatted_path)[:250] + extension - - if os.path.isfile(final_file): - logger.info(f"{OFF}{track_title} was already downloaded") - return - - desc = get_description(track_url_dict, track_title, multiple) - tqdm_download(url, filename, desc) - tag_function = metadata.tag_mp3 if is_mp3 else metadata.tag_flac - try: - tag_function( - filename, - root_dir, - final_file, - track_metadata, - album_or_track_metadata, - is_track, - embed_art, - ) - except Exception as e: - logger.error(f"{RED}Error tagging the file: {e}", exc_info=True) - - -def download_id_by_type( - client, - item_id, - path, - quality, - album=False, - embed_art=False, - albums_only=False, - downgrade_quality=True, - cover_og_quality=False, - no_cover=False, - folder_format="{artist} - {album} ({year}) " "[{bit_depth}B-{sampling_rate}kHz]", - track_format="{tracknumber}. {tracktitle}", -): - """ - Download and get metadata by ID and type (album or track) - - :param Qopy client: qopy Client - :param int item_id: Qobuz item id - :param str path: The root directory where the item will be downloaded - :param int quality: Audio quality (5, 6, 7, 27) - :param bool album: album type or not - :param embed_art album: Embed cover art into files - :param bool albums_only: Ignore Singles, EPs and VA releases - :param bool downgrade: Skip releases not available in set quality - :param bool cover_og_quality: Download cover in its original quality - :param bool no_cover: Don't download cover art - :param str folder_format: format string that determines folder naming - :param str track_format: format string that determines track naming - """ - count = 0 - - if album: - meta = client.get_album_meta(item_id) - - if not meta.get("streamable"): - raise NonStreamable("This release is not streamable") - - if albums_only and ( - meta.get("release_type") != "album" - or meta.get("artist").get("name") == "Various Artists" - ): - logger.info(f'{OFF}Ignoring Single/EP/VA: {meta.get("title", "")}') - return - - album_title = get_title(meta) - - format_info = get_format(client, meta, quality) - file_format, quality_met, bit_depth, sampling_rate = format_info - - if not downgrade_quality and not quality_met: - logger.info( - f"{OFF}Skipping {album_title} as it doesn't " "meet quality requirement" - ) - return - - logger.info( - f"\n{YELLOW}Downloading: {album_title}\nQuality: {file_format} ({bit_depth}/{sampling_rate})\n" - ) - album_attr = { - "artist": meta["artist"]["name"], - "album": album_title, - "year": meta["release_date_original"].split("-")[0], - "format": file_format, - "bit_depth": bit_depth, - "sampling_rate": sampling_rate, - } - folder_format, track_format = _clean_format_str( - folder_format, track_format, file_format - ) - sanitized_title = sanitize_filename(folder_format.format(**album_attr)) - dirn = os.path.join(path, sanitized_title) - os.makedirs(dirn, exist_ok=True) - - if no_cover: - logger.info(f"{OFF}Skipping cover") - else: - get_extra(meta["image"]["large"], dirn, og_quality=cover_og_quality) - - if "goodies" in meta: - try: - get_extra(meta["goodies"][0]["url"], dirn, "booklet.pdf") - except: # noqa - pass - media_numbers = [track["media_number"] for track in meta["tracks"]["items"]] - is_multiple = True if len([*{*media_numbers}]) > 1 else False - for i in meta["tracks"]["items"]: - parse = client.get_track_url(i["id"], quality) - if "sample" not in parse and parse["sampling_rate"]: - is_mp3 = True if int(quality) == 5 else False - download_and_tag( - dirn, - count, - parse, - i, - meta, - False, - is_mp3, - embed_art, - i["media_number"] if is_multiple else None, - track_format=track_format, - ) - else: - logger.info(f"{OFF}Demo. Skipping") - count = count + 1 - else: - parse = client.get_track_url(item_id, quality) - - if "sample" not in parse and parse["sampling_rate"]: - meta = client.get_track_meta(item_id) - track_title = get_title(meta) - logger.info(f"\n{YELLOW}Downloading: {track_title}") - format_info = get_format( - client, meta, quality, is_track_id=True, track_url_dict=parse - ) - file_format, quality_met, bit_depth, sampling_rate = format_info - - folder_format, track_format = _clean_format_str( - folder_format, track_format, bit_depth - ) - - if not downgrade_quality and not quality_met: - logger.info( - f"{OFF}Skipping {track_title} as it doesn't " - "meet quality requirement" - ) - return - track_attr = { - "album": meta["album"]["title"], - "artist": meta["album"]["artist"]["name"], - "tracktitle": track_title, - "year": meta["album"]["release_date_original"].split("-")[0], - "bit_depth": bit_depth, - "sampling_rate": sampling_rate, - } - sanitized_title = sanitize_filename(folder_format.format(**track_attr)) - - dirn = os.path.join(path, sanitized_title) - os.makedirs(dirn, exist_ok=True) - if no_cover: - logger.info(f"{OFF}Skipping cover") - else: - get_extra( - meta["album"]["image"]["large"], dirn, og_quality=cover_og_quality - ) - is_mp3 = True if int(quality) == 5 else False - download_and_tag( - dirn, - count, - parse, - meta, - meta, - True, - is_mp3, - embed_art, - track_format=track_format, - ) - else: - logger.info(f"{OFF}Demo. Skipping") - logger.info(f"{GREEN}Completed") - - -# ----------- Utilities ----------- - - def _clean_format_str(folder: str, track: str, file_format: str) -> Tuple[str, str]: """Cleans up the format strings, avoids errors with MP3 files. diff --git a/qobuz_dl/metadata.py b/qobuz_dl/metadata.py index 2be1e3a..db5ba57 100644 --- a/qobuz_dl/metadata.py +++ b/qobuz_dl/metadata.py @@ -15,15 +15,31 @@ COPYRIGHT, PHON_COPYRIGHT = "\u2117", "\u00a9" # and the file won't be tagged FLAC_MAX_BLOCKSIZE = 16777215 +ID3_LEGEND = { + "album": id3.TALB, + "albumartist": id3.TPE2, + "artist": id3.TPE1, + "comment": id3.COMM, + "composer": id3.TCOM, + "copyright": id3.TCOP, + "date": id3.TDAT, + "genre": id3.TCON, + "isrc": id3.TSRC, + "label": id3.TPUB, + "performer": id3.TOPE, + "title": id3.TIT2, + "year": id3.TYER, +} -def get_title(track_dict): + +def _get_title(track_dict): title = track_dict["title"] version = track_dict.get("version") if version: title = f"{title} ({version})" # for classical works if track_dict.get("work"): - title = "{}: {}".format(track_dict["work"], title) + title = f"{track_dict['work']}: {title}" return title @@ -46,6 +62,50 @@ def _format_genres(genres: list) -> str: return ", ".join(no_repeats) +def _embed_flac_img(root_dir, audio: FLAC): + emb_image = os.path.join(root_dir, "cover.jpg") + multi_emb_image = os.path.join( + os.path.abspath(os.path.join(root_dir, os.pardir)), "cover.jpg" + ) + if os.path.isfile(emb_image): + cover_image = emb_image + else: + cover_image = multi_emb_image + + try: + # rest of the metadata still gets embedded + # when the image size is too big + if os.path.getsize(cover_image) > FLAC_MAX_BLOCKSIZE: + raise Exception( + "downloaded cover size too large to embed. " + "turn off `og_cover` to avoid error" + ) + + image = Picture() + image.type = 3 + image.mime = "image/jpeg" + image.desc = "cover" + with open(cover_image, "rb") as img: + image.data = img.read() + audio.add_picture(image) + except Exception as e: + logger.error(f"Error embedding image: {e}", exc_info=True) + + +def _embed_id3_img(root_dir, audio: id3.ID3): + emb_image = os.path.join(root_dir, "cover.jpg") + multi_emb_image = os.path.join( + os.path.abspath(os.path.join(root_dir, os.pardir)), "cover.jpg" + ) + if os.path.isfile(emb_image): + cover_image = emb_image + else: + cover_image = multi_emb_image + + with open(cover_image, "rb") as cover: + audio.add(id3.APIC(3, "image/jpeg", 3, "", cover.read())) + + # Use KeyError catching instead of dict.get to avoid empty tags def tag_flac(filename, root_dir, final_name, d, album, istrack=True, em_image=False): """ @@ -61,7 +121,7 @@ def tag_flac(filename, root_dir, final_name, d, album, istrack=True, em_image=Fa """ audio = FLAC(filename) - audio["TITLE"] = get_title(d) + audio["TITLE"] = _get_title(d) audio["TRACKNUMBER"] = str(d["track_number"]) # TRACK NUMBER @@ -73,18 +133,13 @@ def tag_flac(filename, root_dir, final_name, d, album, istrack=True, em_image=Fa except KeyError: pass - try: - audio["ARTIST"] = d["performer"]["name"] # TRACK ARTIST - except KeyError: - if istrack: - audio["ARTIST"] = d["album"]["artist"]["name"] # TRACK ARTIST - else: - audio["ARTIST"] = album["artist"]["name"] + artist_ = d.get("performer", {}).get("name") # TRACK ARTIST + if istrack: + audio["ARTIST"] = artist_ or d["album"]["artist"]["name"] # TRACK ARTIST + else: + audio["ARTIST"] = artist_ or album["artist"]["name"] - try: - audio["LABEL"] = album["label"]["name"] - except KeyError: - pass + audio["LABEL"] = album.get("label", {}).get("name", "n/a") if istrack: audio["GENRE"] = _format_genres(d["album"]["genres_list"]) @@ -102,33 +157,7 @@ def tag_flac(filename, root_dir, final_name, d, album, istrack=True, em_image=Fa audio["COPYRIGHT"] = _format_copyright(album.get("copyright", "n/a")) if em_image: - emb_image = os.path.join(root_dir, "cover.jpg") - multi_emb_image = os.path.join( - os.path.abspath(os.path.join(root_dir, os.pardir)), "cover.jpg" - ) - if os.path.isfile(emb_image): - cover_image = emb_image - else: - cover_image = multi_emb_image - - try: - # rest of the metadata still gets embedded - # when the image size is too big - if os.path.getsize(cover_image) > FLAC_MAX_BLOCKSIZE: - raise Exception( - "downloaded cover size too large to embed. " - "turn off `og_cover` to avoid error" - ) - - image = Picture() - image.type = 3 - image.mime = "image/jpeg" - image.desc = "cover" - with open(cover_image, "rb") as img: - image.data = img.read() - audio.add_picture(image) - except Exception as e: - logger.error(f"Error embedding image: {e}", exc_info=True) + _embed_flac_img(root_dir, audio) audio.save() os.rename(filename, final_name) @@ -146,21 +175,6 @@ def tag_mp3(filename, root_dir, final_name, d, album, istrack=True, em_image=Fal :param bool em_image: Embed cover art into file """ - id3_legend = { - "album": id3.TALB, - "albumartist": id3.TPE2, - "artist": id3.TPE1, - "comment": id3.COMM, - "composer": id3.TCOM, - "copyright": id3.TCOP, - "date": id3.TDAT, - "genre": id3.TCON, - "isrc": id3.TSRC, - "label": id3.TPUB, - "performer": id3.TOPE, - "title": id3.TIT2, - "year": id3.TYER, - } try: audio = id3.ID3(filename) except ID3NoHeaderError: @@ -168,19 +182,17 @@ def tag_mp3(filename, root_dir, final_name, d, album, istrack=True, em_image=Fal # temporarily holds metadata tags = dict() - tags["title"] = get_title(d) + tags["title"] = _get_title(d) try: tags["label"] = album["label"]["name"] except KeyError: pass - try: - tags["artist"] = d["performer"]["name"] - except KeyError: - if istrack: - tags["artist"] = d["album"]["artist"]["name"] - else: - tags["artist"] = album["artist"]["name"] + artist_ = d.get("performer", {}).get("name") # TRACK ARTIST + if istrack: + audio["artist"] = artist_ or d["album"]["artist"]["name"] # TRACK ARTIST + else: + audio["artist"] = artist_ or album["artist"]["name"] if istrack: tags["genre"] = _format_genres(d["album"]["genres_list"]) @@ -204,21 +216,11 @@ def tag_mp3(filename, root_dir, final_name, d, album, istrack=True, em_image=Fal # write metadata in `tags` to file for k, v in tags.items(): - id3tag = id3_legend[k] + id3tag = ID3_LEGEND[k] audio[id3tag.__name__] = id3tag(encoding=3, text=v) if em_image: - emb_image = os.path.join(root_dir, "cover.jpg") - multi_emb_image = os.path.join( - os.path.abspath(os.path.join(root_dir, os.pardir)), "cover.jpg" - ) - if os.path.isfile(emb_image): - cover_image = emb_image - else: - cover_image = multi_emb_image - - with open(cover_image, "rb") as cover: - audio.add(id3.APIC(3, "image/jpeg", 3, "", cover.read())) + _embed_id3_img(root_dir, audio) audio.save(filename, "v2_version=3") os.rename(filename, final_name) diff --git a/qobuz_dl/qopy.py b/qobuz_dl/qopy.py index e7dabab..0a64741 100644 --- a/qobuz_dl/qopy.py +++ b/qobuz_dl/qopy.py @@ -26,7 +26,7 @@ class Client: def __init__(self, email, pwd, app_id, secrets): logger.info(f"{YELLOW}Logging...") self.secrets = secrets - self.id = app_id + self.id = str(app_id) self.session = requests.Session() self.session.headers.update( { @@ -196,8 +196,9 @@ class Client: def cfg_setup(self): for secret in self.secrets: - if self.test_secret(secret): - self.sec = secret - break + if secret: + if self.test_secret(secret): + self.sec = secret + break if not hasattr(self, "sec"): raise InvalidAppSecretError("Invalid app secret.\n" + RESET) diff --git a/qobuz_dl/utils.py b/qobuz_dl/utils.py new file mode 100644 index 0000000..b125bd5 --- /dev/null +++ b/qobuz_dl/utils.py @@ -0,0 +1,190 @@ +import re +import string +import os +import logging +import time + +from mutagen.mp3 import EasyMP3 +from mutagen.flac import FLAC + +logger = logging.getLogger(__name__) + +EXTENSIONS = (".mp3", ".flac") + + +class PartialFormatter(string.Formatter): + def __init__(self, missing="n/a", bad_fmt="n/a"): + self.missing, self.bad_fmt = missing, bad_fmt + + def get_field(self, field_name, args, kwargs): + try: + val = super(PartialFormatter, self).get_field(field_name, args, kwargs) + except (KeyError, AttributeError): + val = None, field_name + return val + + def format_field(self, value, spec): + if not value: + return self.missing + try: + return super(PartialFormatter, self).format_field(value, spec) + except ValueError: + if self.bad_fmt: + return self.bad_fmt + raise + + +def make_m3u(pl_directory): + track_list = ["#EXTM3U"] + rel_folder = os.path.basename(os.path.normpath(pl_directory)) + pl_name = rel_folder + ".m3u" + for local, dirs, files in os.walk(pl_directory): + dirs.sort() + audio_rel_files = [ + os.path.join(os.path.basename(os.path.normpath(local)), file_) + for file_ in files + if os.path.splitext(file_)[-1] in EXTENSIONS + ] + audio_files = [ + os.path.abspath(os.path.join(local, file_)) + for file_ in files + if os.path.splitext(file_)[-1] in EXTENSIONS + ] + if not audio_files or len(audio_files) != len(audio_rel_files): + continue + + for audio_rel_file, audio_file in zip(audio_rel_files, audio_files): + try: + pl_item = ( + EasyMP3(audio_file) if ".mp3" in audio_file else FLAC(audio_file) + ) + title = pl_item["TITLE"][0] + artist = pl_item["ARTIST"][0] + length = int(pl_item.info.length) + index = "#EXTINF:{}, {} - {}\n{}".format( + length, artist, title, audio_rel_file + ) + except: # noqa + continue + track_list.append(index) + + if len(track_list) > 1: + with open(os.path.join(pl_directory, pl_name), "w") as pl: + pl.write("\n\n".join(track_list)) + + +def smart_discography_filter( + contents: list, save_space: bool = False, skip_extras: bool = False +) -> list: + """When downloading some artists' discography, many random and spam-like + albums can get downloaded. This helps filter those out to just get the good stuff. + + This function removes: + * albums by other artists, which may contain a feature from the requested artist + * duplicate albums in different qualities + * (optionally) removes collector's, deluxe, live albums + + :param list contents: contents returned by qobuz API + :param bool save_space: choose highest bit depth, lowest sampling rate + :param bool remove_extras: remove albums with extra material (i.e. live, deluxe,...) + :returns: filtered items list + """ + + # for debugging + def print_album(album: dict) -> None: + logger.debug( + f"{album['title']} - {album.get('version', '~~')} " + "({album['maximum_bit_depth']}/{album['maximum_sampling_rate']}" + " by {album['artist']['name']}) {album['id']}" + ) + + TYPE_REGEXES = { + "remaster": r"(?i)(re)?master(ed)?", + "extra": r"(?i)(anniversary|deluxe|live|collector|demo|expanded)", + } + + def is_type(album_t: str, album: dict) -> bool: + """Check if album is of type `album_t`""" + version = album.get("version", "") + title = album.get("title", "") + regex = TYPE_REGEXES[album_t] + return re.search(regex, f"{title} {version}") is not None + + def essence(album: dict) -> str: + """Ignore text in parens/brackets, return all lowercase. + Used to group two albums that may be named similarly, but not exactly + the same. + """ + r = re.match(r"([^\(]+)(?:\s*[\(\[][^\)][\)\]])*", album) + return r.group(1).strip().lower() + + requested_artist = contents[0]["name"] + items = [item["albums"]["items"] for item in contents][0] + + # use dicts to group duplicate albums together by title + title_grouped = dict() + for item in items: + title_ = essence(item["title"]) + if title_ not in title_grouped: # ? + # if (t := essence(item["title"])) not in title_grouped: + title_grouped[title_] = [] + title_grouped[title_].append(item) + + items = [] + for albums in title_grouped.values(): + best_bit_depth = max(a["maximum_bit_depth"] for a in albums) + get_best = min if save_space else max + best_sampling_rate = get_best( + a["maximum_sampling_rate"] + for a in albums + if a["maximum_bit_depth"] == best_bit_depth + ) + remaster_exists = any(is_type("remaster", a) for a in albums) + + def is_valid(album: dict) -> bool: + return ( + album["maximum_bit_depth"] == best_bit_depth + and album["maximum_sampling_rate"] == best_sampling_rate + and album["artist"]["name"] == requested_artist + and not ( # states that are not allowed + (remaster_exists and not is_type("remaster", album)) + or (skip_extras and is_type("extra", album)) + ) + ) + + filtered = tuple(filter(is_valid, albums)) + # most of the time, len is 0 or 1. + # if greater, it is a complete duplicate, + # so it doesn't matter which is chosen + if len(filtered) >= 1: + items.append(filtered[0]) + + return items + + +def format_duration(duration): + return time.strftime("%H:%M:%S", time.gmtime(duration)) + + +def create_and_return_dir(directory): + fix = os.path.normpath(directory) + os.makedirs(fix, exist_ok=True) + return fix + + +def get_url_info(url): + """Returns the type of the url and the id. + + Compatible with urls of the form: + https://www.qobuz.com/us-en/{type}/{name}/{id} + https://open.qobuz.com/{type}/{id} + https://play.qobuz.com/{type}/{id} + /us-en/{type}/-/{id} + """ + + r = re.search( + r"(?:https:\/\/(?:w{3}|open|play)\.qobuz\.com)?(?:\/[a-z]{2}-[a-z]{2})" + r"?\/(album|artist|track|playlist|label)(?:\/[-\w\d]+)?\/([\w\d]+)", + url, + ) + return r.groups()