Organize functions

This commit is contained in:
vitiko98 2021-03-22 18:57:15 -04:00
parent 10040396ad
commit 9c54988a2d
6 changed files with 621 additions and 615 deletions

View File

@ -9,6 +9,7 @@ import qobuz_dl.spoofbuz as spoofbuz
from qobuz_dl.color import GREEN, RED, YELLOW from qobuz_dl.color import GREEN, RED, YELLOW
from qobuz_dl.commands import qobuz_dl_args from qobuz_dl.commands import qobuz_dl_args
from qobuz_dl.core import QobuzDL from qobuz_dl.core import QobuzDL
from qobuz_dl.downloader import DEFAULT_FOLDER, DEFAULT_TRACK
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@ -25,7 +26,7 @@ CONFIG_FILE = os.path.join(CONFIG_PATH, "config.ini")
QOBUZ_DB = os.path.join(CONFIG_PATH, "qobuz_dl.db") QOBUZ_DB = os.path.join(CONFIG_PATH, "qobuz_dl.db")
def reset_config(config_file): def _reset_config(config_file):
logging.info(f"{YELLOW}Creating config file: {config_file}") logging.info(f"{YELLOW}Creating config file: {config_file}")
config = configparser.ConfigParser() config = configparser.ConfigParser()
config["DEFAULT"]["email"] = input("Enter your email:\n- ") config["DEFAULT"]["email"] = input("Enter your email:\n- ")
@ -55,9 +56,8 @@ def reset_config(config_file):
spoofer = spoofbuz.Spoofer() spoofer = spoofbuz.Spoofer()
config["DEFAULT"]["app_id"] = str(spoofer.getAppId()) config["DEFAULT"]["app_id"] = str(spoofer.getAppId())
config["DEFAULT"]["secrets"] = ",".join(spoofer.getSecrets().values()) config["DEFAULT"]["secrets"] = ",".join(spoofer.getSecrets().values())
config["DEFAULT"]["folder_format"] = "{artist} - {album} ({year}) " config["DEFAULT"]["folder_format"] = DEFAULT_FOLDER
"[{bit_depth}B-{sampling_rate}kHz]" config["DEFAULT"]["track_format"] = DEFAULT_TRACK
config["DEFAULT"]["track_format"] = "{tracknumber}. {tracktitle}"
config["DEFAULT"]["smart_discography"] = "false" config["DEFAULT"]["smart_discography"] = "false"
with open(config_file, "w") as configfile: with open(config_file, "w") as configfile:
config.write(configfile) config.write(configfile)
@ -68,7 +68,7 @@ def reset_config(config_file):
) )
def remove_leftovers(directory): def _remove_leftovers(directory):
directory = os.path.join(directory, "**", ".*.tmp") directory = os.path.join(directory, "**", ".*.tmp")
for i in glob.glob(directory, recursive=True): for i in glob.glob(directory, recursive=True):
try: try:
@ -77,14 +77,41 @@ def remove_leftovers(directory):
pass pass
def main(): def _handle_commands(qobuz, arguments):
try:
if arguments.command == "dl":
qobuz.download_list_of_urls(arguments.SOURCE)
elif arguments.command == "lucky":
query = " ".join(arguments.QUERY)
qobuz.lucky_type = arguments.type
qobuz.lucky_limit = arguments.number
qobuz.lucky_mode(query)
else:
qobuz.interactive_limit = arguments.limit
qobuz.interactive()
except KeyboardInterrupt:
logging.info(
f"{RED}Interrupted by user\n{YELLOW}Already downloaded items will "
"be skipped if you try to download the same releases again."
)
finally:
_remove_leftovers(qobuz.directory)
def _initial_checks():
if not os.path.isdir(CONFIG_PATH) or not os.path.isfile(CONFIG_FILE): if not os.path.isdir(CONFIG_PATH) or not os.path.isfile(CONFIG_FILE):
os.makedirs(CONFIG_PATH, exist_ok=True) os.makedirs(CONFIG_PATH, exist_ok=True)
reset_config(CONFIG_FILE) _reset_config(CONFIG_FILE)
if len(sys.argv) < 2: if len(sys.argv) < 2:
sys.exit(qobuz_dl_args().print_help()) sys.exit(qobuz_dl_args().print_help())
def main():
_initial_checks()
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read(CONFIG_FILE) config.read(CONFIG_FILE)
@ -102,22 +129,6 @@ def main():
no_cover = config.getboolean("DEFAULT", "no_cover") no_cover = config.getboolean("DEFAULT", "no_cover")
no_database = config.getboolean("DEFAULT", "no_database") no_database = config.getboolean("DEFAULT", "no_database")
app_id = config["DEFAULT"]["app_id"] app_id = config["DEFAULT"]["app_id"]
if (
"folder_format" not in config["DEFAULT"]
or "track_format" not in config["DEFAULT"]
or "smart_discography" not in config["DEFAULT"]
):
logging.info(
f"{YELLOW}Config file does not include some settings, updating..."
)
config["DEFAULT"]["folder_format"] = "{artist} - {album} ({year}) "
"[{bit_depth}B-{sampling_rate}kHz]"
config["DEFAULT"]["track_format"] = "{tracknumber}. {tracktitle}"
config["DEFAULT"]["smart_discography"] = "false"
with open(CONFIG_FILE, "w") as cf:
config.write(cf)
smart_discography = config.getboolean("DEFAULT", "smart_discography") smart_discography = config.getboolean("DEFAULT", "smart_discography")
folder_format = config["DEFAULT"]["folder_format"] folder_format = config["DEFAULT"]["folder_format"]
track_format = config["DEFAULT"]["track_format"] track_format = config["DEFAULT"]["track_format"]
@ -128,15 +139,16 @@ def main():
arguments = qobuz_dl_args( arguments = qobuz_dl_args(
default_quality, default_limit, default_folder default_quality, default_limit, default_folder
).parse_args() ).parse_args()
except (KeyError, UnicodeDecodeError, configparser.Error): except (KeyError, UnicodeDecodeError, configparser.Error) as error:
arguments = qobuz_dl_args().parse_args() arguments = qobuz_dl_args().parse_args()
if not arguments.reset: if not arguments.reset:
sys.exit( sys.exit(
f"{RED}Your config file is corrupted! Run 'qobuz-dl -r' to fix this." f"{RED}Your config file is corrupted: {error}! "
"Run 'qobuz-dl -r' to fix this."
) )
if arguments.reset: if arguments.reset:
sys.exit(reset_config(CONFIG_FILE)) sys.exit(_reset_config(CONFIG_FILE))
if arguments.purge: if arguments.purge:
try: try:
@ -161,26 +173,7 @@ def main():
) )
qobuz.initialize_client(email, password, app_id, secrets) qobuz.initialize_client(email, password, app_id, secrets)
try: _handle_commands(qobuz, arguments)
if arguments.command == "dl":
qobuz.download_list_of_urls(arguments.SOURCE)
elif arguments.command == "lucky":
query = " ".join(arguments.QUERY)
qobuz.lucky_type = arguments.type
qobuz.lucky_limit = arguments.number
qobuz.lucky_mode(query)
else:
qobuz.interactive_limit = arguments.limit
qobuz.interactive()
except KeyboardInterrupt:
logging.info(
f"{RED}Interrupted by user\n{YELLOW}Already downloaded items will "
"be skipped if you try to download the same releases again."
)
finally:
remove_leftovers(qobuz.directory)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -1,15 +1,9 @@
import logging import logging
import os import os
import re
import string
import sys import sys
import time
from typing import Tuple
import requests import requests
from bs4 import BeautifulSoup as bso from bs4 import BeautifulSoup as bso
from mutagen.flac import FLAC
from mutagen.mp3 import EasyMP3
from pathvalidate import sanitize_filename from pathvalidate import sanitize_filename
import qobuz_dl.spoofbuz as spoofbuz import qobuz_dl.spoofbuz as spoofbuz
@ -17,11 +11,18 @@ from qobuz_dl import downloader, qopy
from qobuz_dl.color import CYAN, OFF, RED, YELLOW, DF, RESET from qobuz_dl.color import CYAN, OFF, RED, YELLOW, DF, RESET
from qobuz_dl.exceptions import NonStreamable from qobuz_dl.exceptions import NonStreamable
from qobuz_dl.db import create_db, handle_download_id from qobuz_dl.db import create_db, handle_download_id
from qobuz_dl.utils import (
get_url_info,
make_m3u,
smart_discography_filter,
format_duration,
create_and_return_dir,
PartialFormatter,
)
WEB_URL = "https://play.qobuz.com/" WEB_URL = "https://play.qobuz.com/"
ARTISTS_SELECTOR = "td.chartlist-artist > a" ARTISTS_SELECTOR = "td.chartlist-artist > a"
TITLE_SELECTOR = "td.chartlist-name > a" TITLE_SELECTOR = "td.chartlist-name > a"
EXTENSIONS = (".mp3", ".flac")
QUALITIES = { QUALITIES = {
5: "5 - MP3", 5: "5 - MP3",
6: "6 - 16 bit, 44.1kHz", 6: "6 - 16 bit, 44.1kHz",
@ -32,28 +33,6 @@ QUALITIES = {
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class PartialFormatter(string.Formatter):
def __init__(self, missing="n/a", bad_fmt="n/a"):
self.missing, self.bad_fmt = missing, bad_fmt
def get_field(self, field_name, args, kwargs):
try:
val = super(PartialFormatter, self).get_field(field_name, args, kwargs)
except (KeyError, AttributeError):
val = None, field_name
return val
def format_field(self, value, spec):
if not value:
return self.missing
try:
return super(PartialFormatter, self).format_field(value, spec)
except ValueError:
if self.bad_fmt:
return self.bad_fmt
raise
class QobuzDL: class QobuzDL:
def __init__( def __init__(
self, self,
@ -74,7 +53,7 @@ class QobuzDL:
track_format="{tracknumber}. {tracktitle}", track_format="{tracknumber}. {tracktitle}",
smart_discography=False, smart_discography=False,
): ):
self.directory = self.create_dir(directory) self.directory = create_and_return_dir(directory)
self.quality = quality self.quality = quality
self.embed_art = embed_art self.embed_art = embed_art
self.lucky_limit = lucky_limit self.lucky_limit = lucky_limit
@ -101,28 +80,6 @@ class QobuzDL:
secret for secret in spoofer.getSecrets().values() if secret secret for secret in spoofer.getSecrets().values() if secret
] # avoid empty fields ] # avoid empty fields
def create_dir(self, directory=None):
fix = os.path.normpath(directory)
os.makedirs(fix, exist_ok=True)
return fix
def get_url_info(self, url: str) -> Tuple[str, str]:
"""Returns the type of the url and the id.
Compatible with urls of the form:
https://www.qobuz.com/us-en/{type}/{name}/{id}
https://open.qobuz.com/{type}/{id}
https://play.qobuz.com/{type}/{id}
/us-en/{type}/-/{id}
"""
r = re.search(
r"(?:https:\/\/(?:w{3}|open|play)\.qobuz\.com)?(?:\/[a-z]{2}-[a-z]{2})"
r"?\/(album|artist|track|playlist|label)(?:\/[-\w\d]+)?\/([\w\d]+)",
url,
)
return r.groups()
def download_from_id(self, item_id, album=True, alt_path=None): def download_from_id(self, item_id, album=True, alt_path=None):
if handle_download_id(self.downloads_db, item_id, add_id=False): if handle_download_id(self.downloads_db, item_id, add_id=False):
logger.info( logger.info(
@ -132,20 +89,20 @@ class QobuzDL:
) )
return return
try: try:
downloader.download_id_by_type( dloader = downloader.Download(
self.client, self.client,
item_id, item_id,
alt_path or self.directory, alt_path or self.directory,
str(self.quality), int(self.quality),
album,
self.embed_art, self.embed_art,
self.ignore_singles_eps, self.ignore_singles_eps,
self.quality_fallback, self.quality_fallback,
self.cover_og_quality, self.cover_og_quality,
self.no_cover, self.no_cover,
folder_format=self.folder_format, self.folder_format,
track_format=self.track_format, self.track_format,
) )
dloader.download_id_by_type(not album)
handle_download_id(self.downloads_db, item_id, add_id=True) handle_download_id(self.downloads_db, item_id, add_id=True)
except (requests.exceptions.RequestException, NonStreamable) as e: except (requests.exceptions.RequestException, NonStreamable) as e:
logger.error(f"{RED}Error getting release: {e}. Skipping...") logger.error(f"{RED}Error getting release: {e}. Skipping...")
@ -168,7 +125,7 @@ class QobuzDL:
"track": {"album": False, "func": None, "iterable_key": None}, "track": {"album": False, "func": None, "iterable_key": None},
} }
try: try:
url_type, item_id = self.get_url_info(url) url_type, item_id = get_url_info(url)
type_dict = possibles[url_type] type_dict = possibles[url_type]
except (KeyError, IndexError): except (KeyError, IndexError):
logger.info( logger.info(
@ -182,13 +139,13 @@ class QobuzDL:
f"{YELLOW}Downloading all the music from {content_name} " f"{YELLOW}Downloading all the music from {content_name} "
f"({url_type})!" f"({url_type})!"
) )
new_path = self.create_dir( new_path = create_and_return_dir(
os.path.join(self.directory, sanitize_filename(content_name)) os.path.join(self.directory, sanitize_filename(content_name))
) )
if self.smart_discography and url_type == "artist": if self.smart_discography and url_type == "artist":
# change `save_space` and `skip_extras` for customization # change `save_space` and `skip_extras` for customization
items = self._smart_discography_filter( items = smart_discography_filter(
content, content,
save_space=True, save_space=True,
skip_extras=True, skip_extras=True,
@ -205,8 +162,8 @@ class QobuzDL:
True if type_dict["iterable_key"] == "albums" else False, True if type_dict["iterable_key"] == "albums" else False,
new_path, new_path,
) )
if url_type == "playlist": if url_type == "playlist" and not self.no_m3u_for_playlists:
self.make_m3u(new_path) make_m3u(new_path)
else: else:
self.download_from_id(item_id, type_dict["album"]) self.download_from_id(item_id, type_dict["album"])
@ -256,9 +213,6 @@ class QobuzDL:
return results return results
def format_duration(self, duration):
return time.strftime("%H:%M:%S", time.gmtime(duration))
def search_by_type(self, query, item_type, limit=10, lucky=False): def search_by_type(self, query, item_type, limit=10, lucky=False):
if len(query) < 3: if len(query) < 3:
logger.info("{RED}Your search query is too short or invalid") logger.info("{RED}Your search query is too short or invalid")
@ -307,7 +261,7 @@ class QobuzDL:
text = "{} - {} [{}]".format( text = "{} - {} [{}]".format(
text, text,
self.format_duration(i["duration"]), format_duration(i["duration"]),
"HI-RES" if i["hires_streamable"] else "LOSSLESS", "HI-RES" if i["hires_streamable"] else "LOSSLESS",
) )
@ -435,143 +389,11 @@ class QobuzDL:
) )
for i in track_list: for i in track_list:
track_id = self.get_url_info( track_id = get_url_info(self.search_by_type(i, "track", 1, lucky=True)[0])[
self.search_by_type(i, "track", 1, lucky=True)[0] 1
)[1] ]
if track_id: if track_id:
self.download_from_id(track_id, False, pl_directory) self.download_from_id(track_id, False, pl_directory)
self.make_m3u(pl_directory) if not self.no_m3u_for_playlists:
make_m3u(pl_directory)
def make_m3u(self, pl_directory):
if self.no_m3u_for_playlists:
return
track_list = ["#EXTM3U"]
rel_folder = os.path.basename(os.path.normpath(pl_directory))
pl_name = rel_folder + ".m3u"
for local, dirs, files in os.walk(pl_directory):
dirs.sort()
audio_rel_files = [
# os.path.abspath(os.path.join(local, file_))
# os.path.join(rel_folder,
# os.path.basename(os.path.normpath(local)),
# file_)
os.path.join(os.path.basename(os.path.normpath(local)), file_)
for file_ in files
if os.path.splitext(file_)[-1] in EXTENSIONS
]
audio_files = [
os.path.abspath(os.path.join(local, file_))
for file_ in files
if os.path.splitext(file_)[-1] in EXTENSIONS
]
if not audio_files or len(audio_files) != len(audio_rel_files):
continue
for audio_rel_file, audio_file in zip(audio_rel_files, audio_files):
try:
pl_item = (
EasyMP3(audio_file)
if ".mp3" in audio_file
else FLAC(audio_file)
)
title = pl_item["TITLE"][0]
artist = pl_item["ARTIST"][0]
length = int(pl_item.info.length)
index = "#EXTINF:{}, {} - {}\n{}".format(
length, artist, title, audio_rel_file
)
except: # noqa
continue
track_list.append(index)
if len(track_list) > 1:
with open(os.path.join(pl_directory, pl_name), "w") as pl:
pl.write("\n\n".join(track_list))
def _smart_discography_filter(
self, contents: list, save_space: bool = False, skip_extras: bool = False
) -> list:
"""When downloading some artists' discography, many random and spam-like
albums can get downloaded. This helps filter those out to just get the good stuff.
This function removes:
* albums by other artists, which may contain a feature from the requested artist
* duplicate albums in different qualities
* (optionally) removes collector's, deluxe, live albums
:param list contents: contents returned by qobuz API
:param bool save_space: choose highest bit depth, lowest sampling rate
:param bool remove_extras: remove albums with extra material (i.e. live, deluxe,...)
:returns: filtered items list
"""
# for debugging
def print_album(album: dict) -> None:
logger.debug(
f"{album['title']} - {album.get('version', '~~')} ({album['maximum_bit_depth']}/{album['maximum_sampling_rate']} by {album['artist']['name']}) {album['id']}"
)
TYPE_REGEXES = {
"remaster": r"(?i)(re)?master(ed)?",
"extra": r"(?i)(anniversary|deluxe|live|collector|demo|expanded)",
}
def is_type(album_t: str, album: dict) -> bool:
"""Check if album is of type `album_t`"""
version = album.get("version", "")
title = album.get("title", "")
regex = TYPE_REGEXES[album_t]
return re.search(regex, f"{title} {version}") is not None
def essence(album: dict) -> str:
"""Ignore text in parens/brackets, return all lowercase.
Used to group two albums that may be named similarly, but not exactly
the same.
"""
r = re.match(r"([^\(]+)(?:\s*[\(\[][^\)][\)\]])*", album)
return r.group(1).strip().lower()
requested_artist = contents[0]["name"]
items = [item["albums"]["items"] for item in contents][0]
# use dicts to group duplicate albums together by title
title_grouped = dict()
for item in items:
title_ = essence(item["title"])
if title_ not in title_grouped: # ?
# if (t := essence(item["title"])) not in title_grouped:
title_grouped[title_] = []
title_grouped[title_].append(item)
items = []
for albums in title_grouped.values():
best_bit_depth = max(a["maximum_bit_depth"] for a in albums)
get_best = min if save_space else max
best_sampling_rate = get_best(
a["maximum_sampling_rate"]
for a in albums
if a["maximum_bit_depth"] == best_bit_depth
)
remaster_exists = any(is_type("remaster", a) for a in albums)
def is_valid(album: dict) -> bool:
return (
album["maximum_bit_depth"] == best_bit_depth
and album["maximum_sampling_rate"] == best_sampling_rate
and album["artist"]["name"] == requested_artist
and not ( # states that are not allowed
(remaster_exists and not is_type("remaster", album))
or (skip_extras and is_type("extra", album))
)
)
filtered = tuple(filter(is_valid, albums))
# most of the time, len is 0 or 1.
# if greater, it is a complete duplicate,
# so it doesn't matter which is chosen
if len(filtered) >= 1:
items.append(filtered[0])
return items

View File

@ -23,9 +23,288 @@ DEFAULT_FORMATS = {
], ],
} }
DEFAULT_FOLDER = "{artist} - {album} ({year}) [{bit_depth}B-{sampling_rate}kHz]"
DEFAULT_TRACK = "{tracknumber}. {tracktitle}"
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Download:
def __init__(
self,
client,
item_id: str,
path: str,
quality: int,
embed_art: bool = False,
albums_only: bool = False,
downgrade_quality: bool = False,
cover_og_quality: bool = False,
no_cover: bool = False,
folder_format=None,
track_format=None,
):
self.client = client
self.item_id = item_id
self.path = path
self.quality = quality
self.albums_only = albums_only
self.embed_art = embed_art
self.downgrade_quality = downgrade_quality
self.cover_og_quality = cover_og_quality
self.no_cover = no_cover
self.folder_format = folder_format or DEFAULT_FOLDER
self.track_format = track_format or DEFAULT_TRACK
def download_id_by_type(self, track=True):
if not track:
self.download_release()
else:
self.download_track()
def download_release(self):
count = 0
meta = self.client.get_album_meta(self.item_id)
if not meta.get("streamable"):
raise NonStreamable("This release is not streamable")
if self.albums_only and (
meta.get("release_type") != "album"
or meta.get("artist").get("name") == "Various Artists"
):
logger.info(f'{OFF}Ignoring Single/EP/VA: {meta.get("title", "n/a")}')
return
album_title = _get_title(meta)
format_info = self._get_format(meta)
file_format, quality_met, bit_depth, sampling_rate = format_info
if not self.downgrade_quality and not quality_met:
logger.info(
f"{OFF}Skipping {album_title} as it doesn't meet quality requirement"
)
return
logger.info(
f"\n{YELLOW}Downloading: {album_title}\nQuality: {file_format}"
f" ({bit_depth}/{sampling_rate})\n"
)
album_attr = self._get_album_attr(
meta, album_title, file_format, bit_depth, sampling_rate
)
folder_format, track_format = _clean_format_str(
self.folder_format, self.track_format, file_format
)
sanitized_title = sanitize_filename(folder_format.format(**album_attr))
dirn = os.path.join(self.path, sanitized_title)
os.makedirs(dirn, exist_ok=True)
if self.no_cover:
logger.info(f"{OFF}Skipping cover")
else:
_get_extra(meta["image"]["large"], dirn, og_quality=self.cover_og_quality)
if "goodies" in meta:
try:
_get_extra(meta["goodies"][0]["url"], dirn, "booklet.pdf")
except: # noqa
pass
media_numbers = [track["media_number"] for track in meta["tracks"]["items"]]
is_multiple = True if len([*{*media_numbers}]) > 1 else False
for i in meta["tracks"]["items"]:
parse = self.client.get_track_url(i["id"], fmt_id=self.quality)
if "sample" not in parse and parse["sampling_rate"]:
is_mp3 = True if int(self.quality) == 5 else False
self._download_and_tag(
dirn,
count,
parse,
i,
meta,
False,
is_mp3,
i["media_number"] if is_multiple else None,
)
else:
logger.info(f"{OFF}Demo. Skipping")
count = count + 1
logger.info(f"{GREEN}Completed")
def download_track(self):
parse = self.client.get_track_url(self.item_id, self.quality)
if "sample" not in parse and parse["sampling_rate"]:
meta = self.client.get_track_meta(self.item_id)
track_title = _get_title(meta)
logger.info(f"\n{YELLOW}Downloading: {track_title}")
format_info = self._get_format(meta, is_track_id=True, track_url_dict=parse)
file_format, quality_met, bit_depth, sampling_rate = format_info
folder_format, track_format = _clean_format_str(
self.folder_format, self.track_format, str(bit_depth)
)
if not self.downgrade_quality and not quality_met:
logger.info(
f"{OFF}Skipping {track_title} as it doesn't "
"meet quality requirement"
)
return
track_attr = self._get_track_attr(
meta, track_title, bit_depth, sampling_rate
)
sanitized_title = sanitize_filename(folder_format.format(**track_attr))
dirn = os.path.join(self.path, sanitized_title)
os.makedirs(dirn, exist_ok=True)
if self.no_cover:
logger.info(f"{OFF}Skipping cover")
else:
_get_extra(
meta["album"]["image"]["large"],
dirn,
og_quality=self.cover_og_quality,
)
is_mp3 = True if int(self.quality) == 5 else False
self._download_and_tag(
dirn,
1,
parse,
meta,
meta,
True,
is_mp3,
self.embed_art,
)
else:
logger.info(f"{OFF}Demo. Skipping")
logger.info(f"{GREEN}Completed")
def _download_and_tag(
self,
root_dir,
tmp_count,
track_url_dict,
track_metadata,
album_or_track_metadata,
is_track,
is_mp3,
multiple=None,
):
extension = ".mp3" if is_mp3 else ".flac"
try:
url = track_url_dict["url"]
except KeyError:
logger.info(f"{OFF}Track not available for download")
return
if multiple:
root_dir = os.path.join(root_dir, f"Disc {multiple}")
os.makedirs(root_dir, exist_ok=True)
filename = os.path.join(root_dir, f".{tmp_count:02}.tmp")
# Determine the filename
track_title = track_metadata.get("title")
artist = _safe_get(track_metadata, "performer", "name")
filename_attr = self._get_filename_attr(artist, track_metadata, track_title)
# track_format is a format string
# e.g. '{tracknumber}. {artist} - {tracktitle}'
formatted_path = sanitize_filename(self.track_format.format(**filename_attr))
final_file = os.path.join(root_dir, formatted_path)[:250] + extension
if os.path.isfile(final_file):
logger.info(f"{OFF}{track_title} was already downloaded")
return
desc = _get_description(track_url_dict, track_title, multiple)
tqdm_download(url, filename, desc)
tag_function = metadata.tag_mp3 if is_mp3 else metadata.tag_flac
try:
tag_function(
filename,
root_dir,
final_file,
track_metadata,
album_or_track_metadata,
is_track,
self.embed_art,
)
except Exception as e:
logger.error(f"{RED}Error tagging the file: {e}", exc_info=True)
@staticmethod
def _get_filename_attr(artist, track_metadata, track_title):
return {
"artist": artist,
"albumartist": _safe_get(
track_metadata, "album", "artist", "name", default=artist
),
"bit_depth": track_metadata["maximum_bit_depth"],
"sampling_rate": track_metadata["maximum_sampling_rate"],
"tracktitle": track_title,
"version": track_metadata.get("version"),
"tracknumber": f"{track_metadata['track_number']:02}",
}
@staticmethod
def _get_track_attr(meta, track_title, bit_depth, sampling_rate):
return {
"album": meta["album"]["title"],
"artist": meta["album"]["artist"]["name"],
"tracktitle": track_title,
"year": meta["album"]["release_date_original"].split("-")[0],
"bit_depth": bit_depth,
"sampling_rate": sampling_rate,
}
@staticmethod
def _get_album_attr(meta, album_title, file_format, bit_depth, sampling_rate):
return {
"artist": meta["artist"]["name"],
"album": album_title,
"year": meta["release_date_original"].split("-")[0],
"format": file_format,
"bit_depth": bit_depth,
"sampling_rate": sampling_rate,
}
def _get_format(self, item_dict, is_track_id=False, track_url_dict=None):
quality_met = True
if int(self.quality) == 5:
return ("MP3", quality_met, None, None)
track_dict = item_dict
if not is_track_id:
track_dict = item_dict["tracks"]["items"][0]
try:
new_track_dict = (
self.client.get_track_url(track_dict["id"], fmt_id=self.quality)
if not track_url_dict
else track_url_dict
)
restrictions = new_track_dict.get("restrictions")
if isinstance(restrictions, list):
if any(
restriction.get("code") == QL_DOWNGRADE
for restriction in restrictions
):
quality_met = False
return (
"FLAC",
quality_met,
new_track_dict["bit_depth"],
new_track_dict["sampling_rate"],
)
except (KeyError, requests.exceptions.HTTPError):
return ("Unknown", quality_met, None, None)
def tqdm_download(url, fname, track_name): def tqdm_download(url, fname, track_name):
r = requests.get(url, allow_redirects=True, stream=True) r = requests.get(url, allow_redirects=True, stream=True)
total = int(r.headers.get("content-length", 0)) total = int(r.headers.get("content-length", 0))
@ -42,48 +321,15 @@ def tqdm_download(url, fname, track_name):
bar.update(size) bar.update(size)
def get_description(u: dict, track_title, multiple=None): def _get_description(item: dict, track_title, multiple=None):
downloading_title = f"{track_title} " downloading_title = f"{track_title} "
f'[{u["bit_depth"]}/{u["sampling_rate"]}]' f'[{item["bit_depth"]}/{item["sampling_rate"]}]'
if multiple: if multiple:
downloading_title = f"[Disc {multiple}] {downloading_title}" downloading_title = f"[Disc {multiple}] {downloading_title}"
return downloading_title return downloading_title
def get_format( def _get_title(item_dict):
client, item_dict, quality, is_track_id=False, track_url_dict=None
) -> Tuple[str, bool, int, int]:
quality_met = True
if int(quality) == 5:
return ("MP3", quality_met, None, None)
track_dict = item_dict
if not is_track_id:
track_dict = item_dict["tracks"]["items"][0]
try:
new_track_dict = (
client.get_track_url(track_dict["id"], quality)
if not track_url_dict
else track_url_dict
)
restrictions = new_track_dict.get("restrictions")
if isinstance(restrictions, list):
if any(
restriction.get("code") == QL_DOWNGRADE for restriction in restrictions
):
quality_met = False
return (
"FLAC",
quality_met,
new_track_dict["bit_depth"],
new_track_dict["sampling_rate"],
)
except (KeyError, requests.exceptions.HTTPError):
return ("Unknown", quality_met, None, None)
def get_title(item_dict):
album_title = item_dict["title"] album_title = item_dict["title"]
version = item_dict.get("version") version = item_dict.get("version")
if version: if version:
@ -95,266 +341,18 @@ def get_title(item_dict):
return album_title return album_title
def get_extra(i, dirn, extra="cover.jpg", og_quality=False): def _get_extra(item, dirn, extra="cover.jpg", og_quality=False):
extra_file = os.path.join(dirn, extra) extra_file = os.path.join(dirn, extra)
if os.path.isfile(extra_file): if os.path.isfile(extra_file):
logger.info(f"{OFF}{extra} was already downloaded") logger.info(f"{OFF}{extra} was already downloaded")
return return
tqdm_download( tqdm_download(
i.replace("_600.", "_org.") if og_quality else i, item.replace("_600.", "_org.") if og_quality else item,
extra_file, extra_file,
extra, extra,
) )
# Download and tag a file
def download_and_tag(
root_dir,
tmp_count,
track_url_dict,
track_metadata,
album_or_track_metadata,
is_track,
is_mp3,
embed_art=False,
multiple=None,
track_format="{tracknumber}. {tracktitle}",
):
"""
Download and tag a file
:param str root_dir: Root directory where the track will be stored
:param int tmp_count: Temporal download file number
:param dict track_url_dict: get_track_url dictionary from Qobuz client
:param dict track_metadata: Track item dictionary from Qobuz client
:param dict album_or_track_metadata: Album/track dict from Qobuz client
:param bool is_track
:param bool is_mp3
:param bool embed_art: Embed cover art into file (FLAC-only)
:param str track_format format-string that determines file naming
:param multiple: Multiple disc integer
:type multiple: integer or None
"""
extension = ".mp3" if is_mp3 else ".flac"
try:
url = track_url_dict["url"]
except KeyError:
logger.info(f"{OFF}Track not available for download")
return
if multiple:
root_dir = os.path.join(root_dir, f"Disc {multiple}")
os.makedirs(root_dir, exist_ok=True)
filename = os.path.join(root_dir, f".{tmp_count:02}.tmp")
# Determine the filename
track_title = track_metadata.get("title")
artist = _safe_get(track_metadata, "performer", "name")
filename_attr = {
"artist": artist,
"albumartist": _safe_get(
track_metadata, "album", "artist", "name", default=artist
),
"bit_depth": track_metadata["maximum_bit_depth"],
"sampling_rate": track_metadata["maximum_sampling_rate"],
"tracktitle": track_title,
"version": track_metadata.get("version"),
"tracknumber": f"{track_metadata['track_number']:02}",
}
# track_format is a format string
# e.g. '{tracknumber}. {artist} - {tracktitle}'
formatted_path = sanitize_filename(track_format.format(**filename_attr))
final_file = os.path.join(root_dir, formatted_path)[:250] + extension
if os.path.isfile(final_file):
logger.info(f"{OFF}{track_title} was already downloaded")
return
desc = get_description(track_url_dict, track_title, multiple)
tqdm_download(url, filename, desc)
tag_function = metadata.tag_mp3 if is_mp3 else metadata.tag_flac
try:
tag_function(
filename,
root_dir,
final_file,
track_metadata,
album_or_track_metadata,
is_track,
embed_art,
)
except Exception as e:
logger.error(f"{RED}Error tagging the file: {e}", exc_info=True)
def download_id_by_type(
client,
item_id,
path,
quality,
album=False,
embed_art=False,
albums_only=False,
downgrade_quality=True,
cover_og_quality=False,
no_cover=False,
folder_format="{artist} - {album} ({year}) " "[{bit_depth}B-{sampling_rate}kHz]",
track_format="{tracknumber}. {tracktitle}",
):
"""
Download and get metadata by ID and type (album or track)
:param Qopy client: qopy Client
:param int item_id: Qobuz item id
:param str path: The root directory where the item will be downloaded
:param int quality: Audio quality (5, 6, 7, 27)
:param bool album: album type or not
:param embed_art album: Embed cover art into files
:param bool albums_only: Ignore Singles, EPs and VA releases
:param bool downgrade: Skip releases not available in set quality
:param bool cover_og_quality: Download cover in its original quality
:param bool no_cover: Don't download cover art
:param str folder_format: format string that determines folder naming
:param str track_format: format string that determines track naming
"""
count = 0
if album:
meta = client.get_album_meta(item_id)
if not meta.get("streamable"):
raise NonStreamable("This release is not streamable")
if albums_only and (
meta.get("release_type") != "album"
or meta.get("artist").get("name") == "Various Artists"
):
logger.info(f'{OFF}Ignoring Single/EP/VA: {meta.get("title", "")}')
return
album_title = get_title(meta)
format_info = get_format(client, meta, quality)
file_format, quality_met, bit_depth, sampling_rate = format_info
if not downgrade_quality and not quality_met:
logger.info(
f"{OFF}Skipping {album_title} as it doesn't " "meet quality requirement"
)
return
logger.info(
f"\n{YELLOW}Downloading: {album_title}\nQuality: {file_format} ({bit_depth}/{sampling_rate})\n"
)
album_attr = {
"artist": meta["artist"]["name"],
"album": album_title,
"year": meta["release_date_original"].split("-")[0],
"format": file_format,
"bit_depth": bit_depth,
"sampling_rate": sampling_rate,
}
folder_format, track_format = _clean_format_str(
folder_format, track_format, file_format
)
sanitized_title = sanitize_filename(folder_format.format(**album_attr))
dirn = os.path.join(path, sanitized_title)
os.makedirs(dirn, exist_ok=True)
if no_cover:
logger.info(f"{OFF}Skipping cover")
else:
get_extra(meta["image"]["large"], dirn, og_quality=cover_og_quality)
if "goodies" in meta:
try:
get_extra(meta["goodies"][0]["url"], dirn, "booklet.pdf")
except: # noqa
pass
media_numbers = [track["media_number"] for track in meta["tracks"]["items"]]
is_multiple = True if len([*{*media_numbers}]) > 1 else False
for i in meta["tracks"]["items"]:
parse = client.get_track_url(i["id"], quality)
if "sample" not in parse and parse["sampling_rate"]:
is_mp3 = True if int(quality) == 5 else False
download_and_tag(
dirn,
count,
parse,
i,
meta,
False,
is_mp3,
embed_art,
i["media_number"] if is_multiple else None,
track_format=track_format,
)
else:
logger.info(f"{OFF}Demo. Skipping")
count = count + 1
else:
parse = client.get_track_url(item_id, quality)
if "sample" not in parse and parse["sampling_rate"]:
meta = client.get_track_meta(item_id)
track_title = get_title(meta)
logger.info(f"\n{YELLOW}Downloading: {track_title}")
format_info = get_format(
client, meta, quality, is_track_id=True, track_url_dict=parse
)
file_format, quality_met, bit_depth, sampling_rate = format_info
folder_format, track_format = _clean_format_str(
folder_format, track_format, bit_depth
)
if not downgrade_quality and not quality_met:
logger.info(
f"{OFF}Skipping {track_title} as it doesn't "
"meet quality requirement"
)
return
track_attr = {
"album": meta["album"]["title"],
"artist": meta["album"]["artist"]["name"],
"tracktitle": track_title,
"year": meta["album"]["release_date_original"].split("-")[0],
"bit_depth": bit_depth,
"sampling_rate": sampling_rate,
}
sanitized_title = sanitize_filename(folder_format.format(**track_attr))
dirn = os.path.join(path, sanitized_title)
os.makedirs(dirn, exist_ok=True)
if no_cover:
logger.info(f"{OFF}Skipping cover")
else:
get_extra(
meta["album"]["image"]["large"], dirn, og_quality=cover_og_quality
)
is_mp3 = True if int(quality) == 5 else False
download_and_tag(
dirn,
count,
parse,
meta,
meta,
True,
is_mp3,
embed_art,
track_format=track_format,
)
else:
logger.info(f"{OFF}Demo. Skipping")
logger.info(f"{GREEN}Completed")
# ----------- Utilities -----------
def _clean_format_str(folder: str, track: str, file_format: str) -> Tuple[str, str]: def _clean_format_str(folder: str, track: str, file_format: str) -> Tuple[str, str]:
"""Cleans up the format strings, avoids errors """Cleans up the format strings, avoids errors
with MP3 files. with MP3 files.

View File

@ -15,15 +15,31 @@ COPYRIGHT, PHON_COPYRIGHT = "\u2117", "\u00a9"
# and the file won't be tagged # and the file won't be tagged
FLAC_MAX_BLOCKSIZE = 16777215 FLAC_MAX_BLOCKSIZE = 16777215
ID3_LEGEND = {
"album": id3.TALB,
"albumartist": id3.TPE2,
"artist": id3.TPE1,
"comment": id3.COMM,
"composer": id3.TCOM,
"copyright": id3.TCOP,
"date": id3.TDAT,
"genre": id3.TCON,
"isrc": id3.TSRC,
"label": id3.TPUB,
"performer": id3.TOPE,
"title": id3.TIT2,
"year": id3.TYER,
}
def get_title(track_dict):
def _get_title(track_dict):
title = track_dict["title"] title = track_dict["title"]
version = track_dict.get("version") version = track_dict.get("version")
if version: if version:
title = f"{title} ({version})" title = f"{title} ({version})"
# for classical works # for classical works
if track_dict.get("work"): if track_dict.get("work"):
title = "{}: {}".format(track_dict["work"], title) title = f"{track_dict['work']}: {title}"
return title return title
@ -46,6 +62,50 @@ def _format_genres(genres: list) -> str:
return ", ".join(no_repeats) return ", ".join(no_repeats)
def _embed_flac_img(root_dir, audio: FLAC):
emb_image = os.path.join(root_dir, "cover.jpg")
multi_emb_image = os.path.join(
os.path.abspath(os.path.join(root_dir, os.pardir)), "cover.jpg"
)
if os.path.isfile(emb_image):
cover_image = emb_image
else:
cover_image = multi_emb_image
try:
# rest of the metadata still gets embedded
# when the image size is too big
if os.path.getsize(cover_image) > FLAC_MAX_BLOCKSIZE:
raise Exception(
"downloaded cover size too large to embed. "
"turn off `og_cover` to avoid error"
)
image = Picture()
image.type = 3
image.mime = "image/jpeg"
image.desc = "cover"
with open(cover_image, "rb") as img:
image.data = img.read()
audio.add_picture(image)
except Exception as e:
logger.error(f"Error embedding image: {e}", exc_info=True)
def _embed_id3_img(root_dir, audio: id3.ID3):
emb_image = os.path.join(root_dir, "cover.jpg")
multi_emb_image = os.path.join(
os.path.abspath(os.path.join(root_dir, os.pardir)), "cover.jpg"
)
if os.path.isfile(emb_image):
cover_image = emb_image
else:
cover_image = multi_emb_image
with open(cover_image, "rb") as cover:
audio.add(id3.APIC(3, "image/jpeg", 3, "", cover.read()))
# Use KeyError catching instead of dict.get to avoid empty tags # Use KeyError catching instead of dict.get to avoid empty tags
def tag_flac(filename, root_dir, final_name, d, album, istrack=True, em_image=False): def tag_flac(filename, root_dir, final_name, d, album, istrack=True, em_image=False):
""" """
@ -61,7 +121,7 @@ def tag_flac(filename, root_dir, final_name, d, album, istrack=True, em_image=Fa
""" """
audio = FLAC(filename) audio = FLAC(filename)
audio["TITLE"] = get_title(d) audio["TITLE"] = _get_title(d)
audio["TRACKNUMBER"] = str(d["track_number"]) # TRACK NUMBER audio["TRACKNUMBER"] = str(d["track_number"]) # TRACK NUMBER
@ -73,18 +133,13 @@ def tag_flac(filename, root_dir, final_name, d, album, istrack=True, em_image=Fa
except KeyError: except KeyError:
pass pass
try: artist_ = d.get("performer", {}).get("name") # TRACK ARTIST
audio["ARTIST"] = d["performer"]["name"] # TRACK ARTIST if istrack:
except KeyError: audio["ARTIST"] = artist_ or d["album"]["artist"]["name"] # TRACK ARTIST
if istrack: else:
audio["ARTIST"] = d["album"]["artist"]["name"] # TRACK ARTIST audio["ARTIST"] = artist_ or album["artist"]["name"]
else:
audio["ARTIST"] = album["artist"]["name"]
try: audio["LABEL"] = album.get("label", {}).get("name", "n/a")
audio["LABEL"] = album["label"]["name"]
except KeyError:
pass
if istrack: if istrack:
audio["GENRE"] = _format_genres(d["album"]["genres_list"]) audio["GENRE"] = _format_genres(d["album"]["genres_list"])
@ -102,33 +157,7 @@ def tag_flac(filename, root_dir, final_name, d, album, istrack=True, em_image=Fa
audio["COPYRIGHT"] = _format_copyright(album.get("copyright", "n/a")) audio["COPYRIGHT"] = _format_copyright(album.get("copyright", "n/a"))
if em_image: if em_image:
emb_image = os.path.join(root_dir, "cover.jpg") _embed_flac_img(root_dir, audio)
multi_emb_image = os.path.join(
os.path.abspath(os.path.join(root_dir, os.pardir)), "cover.jpg"
)
if os.path.isfile(emb_image):
cover_image = emb_image
else:
cover_image = multi_emb_image
try:
# rest of the metadata still gets embedded
# when the image size is too big
if os.path.getsize(cover_image) > FLAC_MAX_BLOCKSIZE:
raise Exception(
"downloaded cover size too large to embed. "
"turn off `og_cover` to avoid error"
)
image = Picture()
image.type = 3
image.mime = "image/jpeg"
image.desc = "cover"
with open(cover_image, "rb") as img:
image.data = img.read()
audio.add_picture(image)
except Exception as e:
logger.error(f"Error embedding image: {e}", exc_info=True)
audio.save() audio.save()
os.rename(filename, final_name) os.rename(filename, final_name)
@ -146,21 +175,6 @@ def tag_mp3(filename, root_dir, final_name, d, album, istrack=True, em_image=Fal
:param bool em_image: Embed cover art into file :param bool em_image: Embed cover art into file
""" """
id3_legend = {
"album": id3.TALB,
"albumartist": id3.TPE2,
"artist": id3.TPE1,
"comment": id3.COMM,
"composer": id3.TCOM,
"copyright": id3.TCOP,
"date": id3.TDAT,
"genre": id3.TCON,
"isrc": id3.TSRC,
"label": id3.TPUB,
"performer": id3.TOPE,
"title": id3.TIT2,
"year": id3.TYER,
}
try: try:
audio = id3.ID3(filename) audio = id3.ID3(filename)
except ID3NoHeaderError: except ID3NoHeaderError:
@ -168,19 +182,17 @@ def tag_mp3(filename, root_dir, final_name, d, album, istrack=True, em_image=Fal
# temporarily holds metadata # temporarily holds metadata
tags = dict() tags = dict()
tags["title"] = get_title(d) tags["title"] = _get_title(d)
try: try:
tags["label"] = album["label"]["name"] tags["label"] = album["label"]["name"]
except KeyError: except KeyError:
pass pass
try: artist_ = d.get("performer", {}).get("name") # TRACK ARTIST
tags["artist"] = d["performer"]["name"] if istrack:
except KeyError: audio["artist"] = artist_ or d["album"]["artist"]["name"] # TRACK ARTIST
if istrack: else:
tags["artist"] = d["album"]["artist"]["name"] audio["artist"] = artist_ or album["artist"]["name"]
else:
tags["artist"] = album["artist"]["name"]
if istrack: if istrack:
tags["genre"] = _format_genres(d["album"]["genres_list"]) tags["genre"] = _format_genres(d["album"]["genres_list"])
@ -204,21 +216,11 @@ def tag_mp3(filename, root_dir, final_name, d, album, istrack=True, em_image=Fal
# write metadata in `tags` to file # write metadata in `tags` to file
for k, v in tags.items(): for k, v in tags.items():
id3tag = id3_legend[k] id3tag = ID3_LEGEND[k]
audio[id3tag.__name__] = id3tag(encoding=3, text=v) audio[id3tag.__name__] = id3tag(encoding=3, text=v)
if em_image: if em_image:
emb_image = os.path.join(root_dir, "cover.jpg") _embed_id3_img(root_dir, audio)
multi_emb_image = os.path.join(
os.path.abspath(os.path.join(root_dir, os.pardir)), "cover.jpg"
)
if os.path.isfile(emb_image):
cover_image = emb_image
else:
cover_image = multi_emb_image
with open(cover_image, "rb") as cover:
audio.add(id3.APIC(3, "image/jpeg", 3, "", cover.read()))
audio.save(filename, "v2_version=3") audio.save(filename, "v2_version=3")
os.rename(filename, final_name) os.rename(filename, final_name)

View File

@ -26,7 +26,7 @@ class Client:
def __init__(self, email, pwd, app_id, secrets): def __init__(self, email, pwd, app_id, secrets):
logger.info(f"{YELLOW}Logging...") logger.info(f"{YELLOW}Logging...")
self.secrets = secrets self.secrets = secrets
self.id = app_id self.id = str(app_id)
self.session = requests.Session() self.session = requests.Session()
self.session.headers.update( self.session.headers.update(
{ {
@ -196,8 +196,9 @@ class Client:
def cfg_setup(self): def cfg_setup(self):
for secret in self.secrets: for secret in self.secrets:
if self.test_secret(secret): if secret:
self.sec = secret if self.test_secret(secret):
break self.sec = secret
break
if not hasattr(self, "sec"): if not hasattr(self, "sec"):
raise InvalidAppSecretError("Invalid app secret.\n" + RESET) raise InvalidAppSecretError("Invalid app secret.\n" + RESET)

190
qobuz_dl/utils.py Normal file
View File

@ -0,0 +1,190 @@
import re
import string
import os
import logging
import time
from mutagen.mp3 import EasyMP3
from mutagen.flac import FLAC
logger = logging.getLogger(__name__)
EXTENSIONS = (".mp3", ".flac")
class PartialFormatter(string.Formatter):
def __init__(self, missing="n/a", bad_fmt="n/a"):
self.missing, self.bad_fmt = missing, bad_fmt
def get_field(self, field_name, args, kwargs):
try:
val = super(PartialFormatter, self).get_field(field_name, args, kwargs)
except (KeyError, AttributeError):
val = None, field_name
return val
def format_field(self, value, spec):
if not value:
return self.missing
try:
return super(PartialFormatter, self).format_field(value, spec)
except ValueError:
if self.bad_fmt:
return self.bad_fmt
raise
def make_m3u(pl_directory):
track_list = ["#EXTM3U"]
rel_folder = os.path.basename(os.path.normpath(pl_directory))
pl_name = rel_folder + ".m3u"
for local, dirs, files in os.walk(pl_directory):
dirs.sort()
audio_rel_files = [
os.path.join(os.path.basename(os.path.normpath(local)), file_)
for file_ in files
if os.path.splitext(file_)[-1] in EXTENSIONS
]
audio_files = [
os.path.abspath(os.path.join(local, file_))
for file_ in files
if os.path.splitext(file_)[-1] in EXTENSIONS
]
if not audio_files or len(audio_files) != len(audio_rel_files):
continue
for audio_rel_file, audio_file in zip(audio_rel_files, audio_files):
try:
pl_item = (
EasyMP3(audio_file) if ".mp3" in audio_file else FLAC(audio_file)
)
title = pl_item["TITLE"][0]
artist = pl_item["ARTIST"][0]
length = int(pl_item.info.length)
index = "#EXTINF:{}, {} - {}\n{}".format(
length, artist, title, audio_rel_file
)
except: # noqa
continue
track_list.append(index)
if len(track_list) > 1:
with open(os.path.join(pl_directory, pl_name), "w") as pl:
pl.write("\n\n".join(track_list))
def smart_discography_filter(
contents: list, save_space: bool = False, skip_extras: bool = False
) -> list:
"""When downloading some artists' discography, many random and spam-like
albums can get downloaded. This helps filter those out to just get the good stuff.
This function removes:
* albums by other artists, which may contain a feature from the requested artist
* duplicate albums in different qualities
* (optionally) removes collector's, deluxe, live albums
:param list contents: contents returned by qobuz API
:param bool save_space: choose highest bit depth, lowest sampling rate
:param bool remove_extras: remove albums with extra material (i.e. live, deluxe,...)
:returns: filtered items list
"""
# for debugging
def print_album(album: dict) -> None:
logger.debug(
f"{album['title']} - {album.get('version', '~~')} "
"({album['maximum_bit_depth']}/{album['maximum_sampling_rate']}"
" by {album['artist']['name']}) {album['id']}"
)
TYPE_REGEXES = {
"remaster": r"(?i)(re)?master(ed)?",
"extra": r"(?i)(anniversary|deluxe|live|collector|demo|expanded)",
}
def is_type(album_t: str, album: dict) -> bool:
"""Check if album is of type `album_t`"""
version = album.get("version", "")
title = album.get("title", "")
regex = TYPE_REGEXES[album_t]
return re.search(regex, f"{title} {version}") is not None
def essence(album: dict) -> str:
"""Ignore text in parens/brackets, return all lowercase.
Used to group two albums that may be named similarly, but not exactly
the same.
"""
r = re.match(r"([^\(]+)(?:\s*[\(\[][^\)][\)\]])*", album)
return r.group(1).strip().lower()
requested_artist = contents[0]["name"]
items = [item["albums"]["items"] for item in contents][0]
# use dicts to group duplicate albums together by title
title_grouped = dict()
for item in items:
title_ = essence(item["title"])
if title_ not in title_grouped: # ?
# if (t := essence(item["title"])) not in title_grouped:
title_grouped[title_] = []
title_grouped[title_].append(item)
items = []
for albums in title_grouped.values():
best_bit_depth = max(a["maximum_bit_depth"] for a in albums)
get_best = min if save_space else max
best_sampling_rate = get_best(
a["maximum_sampling_rate"]
for a in albums
if a["maximum_bit_depth"] == best_bit_depth
)
remaster_exists = any(is_type("remaster", a) for a in albums)
def is_valid(album: dict) -> bool:
return (
album["maximum_bit_depth"] == best_bit_depth
and album["maximum_sampling_rate"] == best_sampling_rate
and album["artist"]["name"] == requested_artist
and not ( # states that are not allowed
(remaster_exists and not is_type("remaster", album))
or (skip_extras and is_type("extra", album))
)
)
filtered = tuple(filter(is_valid, albums))
# most of the time, len is 0 or 1.
# if greater, it is a complete duplicate,
# so it doesn't matter which is chosen
if len(filtered) >= 1:
items.append(filtered[0])
return items
def format_duration(duration):
return time.strftime("%H:%M:%S", time.gmtime(duration))
def create_and_return_dir(directory):
fix = os.path.normpath(directory)
os.makedirs(fix, exist_ok=True)
return fix
def get_url_info(url):
"""Returns the type of the url and the id.
Compatible with urls of the form:
https://www.qobuz.com/us-en/{type}/{name}/{id}
https://open.qobuz.com/{type}/{id}
https://play.qobuz.com/{type}/{id}
/us-en/{type}/-/{id}
"""
r = re.search(
r"(?:https:\/\/(?:w{3}|open|play)\.qobuz\.com)?(?:\/[a-z]{2}-[a-z]{2})"
r"?\/(album|artist|track|playlist|label)(?:\/[-\w\d]+)?\/([\w\d]+)",
url,
)
return r.groups()