qobuz-dl/qobuz_dl/downloader.py

387 lines
13 KiB
Python
Raw Normal View History

import logging
2020-08-10 04:47:51 +02:00
import os
2021-03-02 00:18:08 +01:00
from typing import Tuple
2020-10-17 18:52:35 +02:00
2020-08-10 04:47:51 +02:00
import requests
2020-10-05 17:02:33 +02:00
from pathvalidate import sanitize_filename
2020-08-10 04:47:51 +02:00
from tqdm import tqdm
import qobuz_dl.metadata as metadata
from qobuz_dl.color import OFF, GREEN, RED, YELLOW, CYAN
2021-01-26 23:42:29 +01:00
from qobuz_dl.exceptions import NonStreamable
2020-10-17 18:52:35 +02:00
QL_DOWNGRADE = "FormatRestrictedByFormatAvailability"
2021-03-01 06:31:43 +01:00
DEFAULT_MP3_FOLDER_FORMAT = '{artist} - {album} [MP3]'
DEFAULT_MP3_TRACK_FORMAT = '{tracknumber}. {tracktitle}'
2021-03-02 00:18:08 +01:00
DEFAULT_UNKNOWN_FOLDER_FORMAT = '{artist} - {album}'
DEFAULT_UNKNOWN_TRACK_FORMAT = '{tracknumber}. {tracktitle}'
2021-03-01 06:31:43 +01:00
logger = logging.getLogger(__name__)
2020-08-10 04:47:51 +02:00
def tqdm_download(url, fname, track_name):
2020-08-10 04:47:51 +02:00
r = requests.get(url, allow_redirects=True, stream=True)
2020-10-17 18:52:35 +02:00
total = int(r.headers.get("content-length", 0))
with open(fname, "wb") as file, tqdm(
2020-08-10 04:47:51 +02:00
total=total,
2020-10-17 18:52:35 +02:00
unit="iB",
2020-08-10 04:47:51 +02:00
unit_scale=True,
unit_divisor=1024,
desc=track_name,
bar_format=CYAN + "{n_fmt}/{total_fmt} /// {desc}",
2020-08-10 04:47:51 +02:00
) as bar:
for data in r.iter_content(chunk_size=1024):
size = file.write(data)
bar.update(size)
def get_description(u: dict, track_title, multiple=None):
2021-03-01 06:31:43 +01:00
downloading_title = f'{track_title} '
f'[{u["bit_depth"]}/{u["sampling_rate"]}]'
if multiple:
downloading_title = f"[Disc {multiple}] {downloading_title}"
return downloading_title
2020-12-07 16:32:37 +01:00
2020-08-10 04:47:51 +02:00
2021-03-02 00:18:08 +01:00
def get_format(client, item_dict,
quality, is_track_id=False,
track_url_dict=None) -> Tuple[str, bool, int, int]:
quality_met = True
if int(quality) == 5:
2021-03-01 06:31:43 +01:00
return ("MP3", quality_met, None, None)
track_dict = item_dict
if not is_track_id:
2020-12-15 03:38:46 +01:00
track_dict = item_dict["tracks"]["items"][0]
try:
new_track_dict = (
client.get_track_url(track_dict["id"], quality)
if not track_url_dict
else track_url_dict
)
restrictions = new_track_dict.get("restrictions")
if isinstance(restrictions, list):
if any(
2021-03-01 06:31:43 +01:00
restriction.get("code") == QL_DOWNGRADE
for restriction in restrictions
):
quality_met = False
2021-03-01 06:31:43 +01:00
return (
2021-03-01 06:31:43 +01:00
"FLAC",
quality_met,
2021-03-01 06:31:43 +01:00
new_track_dict["bit_depth"],
new_track_dict["sampling_rate"],
)
except (KeyError, requests.exceptions.HTTPError):
2021-03-02 00:18:08 +01:00
return ("Unknown", quality_met, None, None)
2020-12-07 16:32:37 +01:00
2020-08-10 04:47:51 +02:00
2020-12-13 19:03:18 +01:00
def get_title(item_dict):
album_title = item_dict["title"]
version = item_dict.get("version")
if version:
2020-12-13 19:03:18 +01:00
album_title = (
f"{album_title} ({version})"
if version.lower() not in album_title.lower()
else album_title
2020-12-13 19:03:18 +01:00
)
return album_title
2020-12-13 19:03:18 +01:00
def get_extra(i, dirn, extra="cover.jpg", og_quality=False):
extra_file = os.path.join(dirn, extra)
if os.path.isfile(extra_file):
logger.info(f"{OFF}{extra} was already downloaded")
return
tqdm_download(
i.replace("_600.", "_org.") if og_quality else i,
extra_file,
extra,
)
2020-08-10 04:47:51 +02:00
# Download and tag a file
def download_and_tag(
root_dir,
tmp_count,
track_url_dict,
track_metadata,
album_or_track_metadata,
is_track,
is_mp3,
embed_art=False,
multiple=None,
2021-03-01 06:31:43 +01:00
track_format='{tracknumber}. {tracktitle}',
):
"""
Download and tag a file
:param str root_dir: Root directory where the track will be stored
:param int tmp_count: Temporal download file number
:param dict track_url_dict: get_track_url dictionary from Qobuz client
:param dict track_metadata: Track item dictionary from Qobuz client
2021-03-01 06:31:43 +01:00
:param dict album_or_track_metadata: Album/track dict from Qobuz client
:param bool is_track
:param bool is_mp3
:param bool embed_art: Embed cover art into file (FLAC-only)
2021-03-02 00:18:08 +01:00
:param str track_format format-string that determines file naming
:param multiple: Multiple disc integer
:type multiple: integer or None
"""
2021-03-01 06:31:43 +01:00
extension = ".mp3" if is_mp3 else ".flac"
try:
url = track_url_dict["url"]
except KeyError:
logger.info(f"{OFF}Track not available for download")
return
if multiple:
root_dir = os.path.join(root_dir, f"Disc {multiple}")
2020-12-15 18:58:25 +01:00
os.makedirs(root_dir, exist_ok=True)
filename = os.path.join(root_dir, f".{tmp_count:02}.tmp")
# Determine the filename
2021-03-01 06:31:43 +01:00
track_title = track_metadata["title"]
2021-03-02 00:18:08 +01:00
print(track_metadata)
2021-03-01 06:31:43 +01:00
filename_attr = {
2021-03-02 00:18:08 +01:00
'artist': track_metadata["performer"]["name"],
'albumartist': track_metadata["album"]["artist"]["name"],
2021-03-01 06:31:43 +01:00
'bit_depth': track_metadata['maximum_bit_depth'],
'sampling_rate': track_metadata['maximum_sampling_rate'],
'tracktitle': track_title,
'version': track_metadata["version"],
'tracknumber': f"{track_metadata['track_number']:02}"
}
# track_format is a format string
# e.g. '{tracknumber}. {artist} - {tracktitle}'
formatted_path = sanitize_filename(track_format.format(**filename_attr))
final_file = os.path.join(root_dir, formatted_path)[:250] + extension
if os.path.isfile(final_file):
2021-03-01 06:31:43 +01:00
logger.info(f"{OFF}{track_title} was already downloaded")
return
2021-03-01 06:31:43 +01:00
desc = get_description(track_url_dict, track_title, multiple)
tqdm_download(url, filename, desc)
tag_function = metadata.tag_mp3 if is_mp3 else metadata.tag_flac
try:
tag_function(
filename,
root_dir,
final_file,
track_metadata,
album_or_track_metadata,
is_track,
embed_art,
)
except Exception as e:
2020-12-19 17:56:59 +01:00
logger.error(f"{RED}Error tagging the file: {e}", exc_info=True)
2020-08-10 04:47:51 +02:00
def download_id_by_type(
client,
item_id,
path,
quality,
album=False,
embed_art=False,
albums_only=False,
downgrade_quality=True,
cover_og_quality=False,
2020-12-18 22:17:20 +01:00
no_cover=False,
2021-03-01 06:31:43 +01:00
folder_format='{artist} - {album} ({year}) '
'[{bit_depth}B-{sampling_rate}kHz]',
track_format='{tracknumber}. {tracktitle}',
):
"""
Download and get metadata by ID and type (album or track)
:param Qopy client: qopy Client
:param int item_id: Qobuz item id
:param str path: The root directory where the item will be downloaded
:param int quality: Audio quality (5, 6, 7, 27)
:param bool album: album type or not
:param embed_art album: Embed cover art into files
:param bool albums_only: Ignore Singles, EPs and VA releases
:param bool downgrade: Skip releases not available in set quality
:param bool cover_og_quality: Download cover in its original quality
2020-12-18 22:17:20 +01:00
:param bool no_cover: Don't download cover art
2021-03-01 06:31:43 +01:00
:param str folder_format: format string that determines folder naming
:param str track_format: format string that determines track naming
"""
2020-08-10 04:47:51 +02:00
count = 0
if album:
meta = client.get_album_meta(item_id)
2021-01-26 23:42:29 +01:00
if not meta.get("streamable"):
raise NonStreamable("This release is not streamable")
if albums_only and (
meta.get("release_type") != "album"
or meta.get("artist").get("name") == "Various Artists"
):
logger.info(f'{OFF}Ignoring Single/EP/VA: {meta.get("title", "")}')
return
2020-12-13 19:03:18 +01:00
album_title = get_title(meta)
2021-03-01 06:31:43 +01:00
format_info = get_format(client, meta, quality)
file_format, quality_met, bit_depth, sampling_rate = format_info
if not downgrade_quality and not quality_met:
logger.info(
2021-03-01 06:31:43 +01:00
f"{OFF}Skipping {album_title} as it doesn't "
"meet quality requirement"
)
return
2021-03-01 06:31:43 +01:00
logger.info(f"\n{YELLOW}Downloading: {album_title}\n"
f"Quality: {file_format}\n")
album_attr = {
'artist': meta["artist"]["name"],
'album': album_title,
'year': meta["release_date_original"].split("-")[0],
'format': file_format,
'bit_depth': bit_depth,
'sampling_rate': sampling_rate
}
folder_format, track_format = clean_format_str(folder_format,
track_format,
2021-03-02 00:18:08 +01:00
file_format)
2021-03-01 06:31:43 +01:00
sanitized_title = sanitize_filename(
folder_format.format(**album_attr)
2020-10-17 18:52:35 +02:00
)
dirn = os.path.join(path, sanitized_title)
2020-12-15 18:58:25 +01:00
os.makedirs(dirn, exist_ok=True)
if no_cover:
logger.info(f"{OFF}Skipping cover")
else:
2021-03-01 06:31:43 +01:00
get_extra(meta["image"]["large"], dirn,
og_quality=cover_og_quality)
2020-12-05 18:56:08 +01:00
if "goodies" in meta:
try:
get_extra(meta["goodies"][0]["url"], dirn, "booklet.pdf")
except: # noqa
pass
2021-03-01 06:31:43 +01:00
media_numbers = [track["media_number"] for track in
meta["tracks"]["items"]]
is_multiple = True if len([*{*media_numbers}]) > 1 else False
2020-10-17 18:52:35 +02:00
for i in meta["tracks"]["items"]:
parse = client.get_track_url(i["id"], quality)
if "sample" not in parse and parse["sampling_rate"]:
2020-10-17 18:52:35 +02:00
is_mp3 = True if int(quality) == 5 else False
download_and_tag(
dirn,
count,
parse,
i,
meta,
False,
is_mp3,
embed_art,
i["media_number"] if is_multiple else None,
2021-03-01 06:31:43 +01:00
track_format=track_format,
)
2020-08-10 04:47:51 +02:00
else:
logger.info(f"{OFF}Demo. Skipping")
2020-08-10 04:47:51 +02:00
count = count + 1
else:
parse = client.get_track_url(item_id, quality)
2020-08-10 04:47:51 +02:00
if "sample" not in parse and parse["sampling_rate"]:
meta = client.get_track_meta(item_id)
2020-12-13 19:03:18 +01:00
track_title = get_title(meta)
logger.info(f"\n{YELLOW}Downloading: {track_title}")
2021-03-01 06:31:43 +01:00
format_info = get_format(client, meta, quality,
is_track_id=True, track_url_dict=parse)
file_format, quality_met, bit_depth, sampling_rate = format_info
folder_format, track_format = clean_format_str(folder_format,
track_format,
bit_depth)
if not downgrade_quality and not quality_met:
logger.info(
2021-03-01 06:31:43 +01:00
f"{OFF}Skipping {track_title} as it doesn't "
"meet quality requirement"
)
return
2021-03-01 06:31:43 +01:00
track_attr = {
'artist': meta["album"]["artist"]["name"],
'tracktitle': track_title,
'year': meta["album"]["release_date_original"].split("-")[0],
'bit_depth': bit_depth,
'sampling_rate': sampling_rate
}
sanitized_title = sanitize_filename(
folder_format.format(**track_attr)
2020-10-17 18:52:35 +02:00
)
dirn = os.path.join(path, sanitized_title)
2020-12-15 18:58:25 +01:00
os.makedirs(dirn, exist_ok=True)
if no_cover:
logger.info(f"{OFF}Skipping cover")
else:
2020-12-18 22:17:20 +01:00
get_extra(
2021-03-01 06:31:43 +01:00
meta["album"]["image"]["large"], dirn,
og_quality=cover_og_quality
2020-12-18 22:17:20 +01:00
)
2020-10-17 18:52:35 +02:00
is_mp3 = True if int(quality) == 5 else False
2021-03-01 06:31:43 +01:00
download_and_tag(dirn, count, parse, meta,
meta, True, is_mp3, embed_art,
track_format=track_format)
2020-08-10 04:47:51 +02:00
else:
logger.info(f"{OFF}Demo. Skipping")
logger.info(f"{GREEN}Completed")
2021-03-02 00:18:08 +01:00
# ----------- Utilities -----------
def _clean_format_gen(s: str) -> str:
'''General clean for format strings. Avoids user errors.
'''
if s.endswith('.mp3'):
s = s[:-4]
elif s.endswith('.flac'):
s = s[:-5]
s = s.strip()
return s
def _not_mp3_valid(s: str) -> bool:
return 'bit_depth' in s or 'sample_rate' in s
def clean_format_str(folder: str, track: str,
file_format: str) -> Tuple[str, str]:
'''Cleans up the format strings to avoid errors
with MP3 files.
'''
folder = _clean_format_gen(folder)
track = _clean_format_gen(track)
if file_format == 'MP3':
if _not_mp3_valid(folder):
logger.error(f'{RED}invalid format string for MP3: "{folder}"'
f'\ndefaulting to "{DEFAULT_MP3_FOLDER_FORMAT}"')
folder = DEFAULT_MP3_FOLDER_FORMAT
if _not_mp3_valid(track):
logger.error(f'{RED}invalid format string for MP3: "{track}"'
f'\ndefaulting to "{DEFAULT_MP3_TRACK_FORMAT}"')
track = DEFAULT_MP3_TRACK_FORMAT
elif file_format == 'Unknown':
if _not_mp3_valid(folder):
logger.error(f'{RED}Error getting format. Defaulting format '
f'string to "{DEFAULT_UNKNOWN_FOLDER_FORMAT}"')
folder = DEFAULT_UNKNOWN_FOLDER_FORMAT
if _not_mp3_valid(track):
logger.error(f'{RED}Error getting format. Defaulting format '
f'string to "{DEFAULT_UNKNOWN_TRACK_FORMAT}"')
track = DEFAULT_UNKNOWN_TRACK_FORMAT
return (folder, track)