mirror of https://github.com/vitiko98/qobuz-dl
started
This commit is contained in:
parent
60394346f0
commit
1e54c875fc
|
@ -195,6 +195,7 @@ class Client:
|
|||
return False
|
||||
|
||||
def cfg_setup(self):
|
||||
logging.debug(self.secrets)
|
||||
for secret in self.secrets:
|
||||
if self.test_secret(secret):
|
||||
self.sec = secret
|
||||
|
|
|
@ -6,6 +6,7 @@ from typing import Union
|
|||
import requests
|
||||
import tidalapi
|
||||
|
||||
from .downloader import Album, Artist, Playlist, Track
|
||||
from .exceptions import (
|
||||
AuthenticationError,
|
||||
IneligibleError,
|
||||
|
@ -95,13 +96,18 @@ class QobuzClient(SecureClientInterface):
|
|||
media_type = media_type[:-1]
|
||||
|
||||
f_map = {
|
||||
"album": self.search_albums,
|
||||
"artist": self.search_artists,
|
||||
"playlist": self.search_playlists,
|
||||
"track": self.search_tracks,
|
||||
"album": (self.search_albums, Album),
|
||||
"artist": (self.search_artists, Artist),
|
||||
"playlist": (self.search_playlists, Playlist),
|
||||
"track": (self.search_tracks, Track),
|
||||
}
|
||||
|
||||
return f_map[media_type](query, limit=limit)
|
||||
media_funcs = f_map[media_type]
|
||||
resp = media_funcs[0](query, limit=limit)
|
||||
return (
|
||||
media_funcs[1].from_api(item, self, "qobuz")
|
||||
for item in resp["albums"]["items"]
|
||||
)
|
||||
|
||||
def get(self, meta_id: Union[str, int], media_type: str = "album"):
|
||||
f_map = {
|
||||
|
@ -353,7 +359,7 @@ class TidalClient(SecureClientInterface):
|
|||
|
||||
logger.info("Ok")
|
||||
|
||||
def search(self, query: str, media_type: str, limit: int = 50):
|
||||
def search(self, query: str, media_type: str = 'album', limit: int = 50):
|
||||
"""
|
||||
:param query:
|
||||
:type query: str
|
||||
|
|
|
@ -8,10 +8,10 @@ from mutagen.flac import FLAC
|
|||
from mutagen.id3 import ID3, ID3NoHeaderError
|
||||
from tqdm import tqdm
|
||||
|
||||
from .clients import ClientInterface
|
||||
from .constants import EXT
|
||||
from .exceptions import InvalidQuality, NonStreamable
|
||||
from .metadata import TrackMetadata
|
||||
from .qopy import Client
|
||||
from .util import safe_get
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -23,7 +23,7 @@ class Track:
|
|||
def __init__(
|
||||
self,
|
||||
track_id: Optional[Union[str, int]] = None,
|
||||
client: Optional[Client] = None,
|
||||
client: Optional[ClientInterface] = None,
|
||||
meta: Optional[TrackMetadata] = None,
|
||||
**kwargs,
|
||||
):
|
||||
|
@ -32,7 +32,7 @@ class Track:
|
|||
:param track_id: track id returned by Qobuz API
|
||||
:type track_id: Optional[Union[str, int]]
|
||||
:param client: qopy client
|
||||
:type client: Optional[Client]
|
||||
:type client: Optional[ClientInterface]
|
||||
:param meta: TrackMetadata object
|
||||
:type meta: Optional[TrackMetadata]
|
||||
:param kwargs:
|
||||
|
@ -115,7 +115,7 @@ class Track:
|
|||
return self.final_path
|
||||
|
||||
@classmethod
|
||||
def from_album_meta(cls, album: dict, pos: int, client: Client):
|
||||
def from_album_meta(cls, album: dict, pos: int, client: ClientInterface):
|
||||
"""Create a new Track object from album metadata.
|
||||
|
||||
:param album: album metadata returned by API
|
||||
|
@ -218,16 +218,26 @@ class Tracklist(list):
|
|||
class Album(Tracklist):
|
||||
"""Represents a downloadable Qobuz album."""
|
||||
|
||||
def __init__(self, client: Client, album_id: Union[str, int], **kwargs):
|
||||
def __init__(self, client: ClientInterface, album_id: Union[str, int], **kwargs):
|
||||
"""Create a new Album object.
|
||||
|
||||
:param client: a qopy client instance
|
||||
:type client: Client
|
||||
:type client: ClientInterface
|
||||
:param album_id: album id returned by qobuz api
|
||||
:type album_id: Union[str, int]
|
||||
:param kwargs:
|
||||
"""
|
||||
self.client = client
|
||||
self.id = album_id
|
||||
|
||||
for k, v in kwargs.items():
|
||||
setattr(self, k, v)
|
||||
|
||||
# to improve from_api method speed
|
||||
if kwargs.get("load_on_init"):
|
||||
self.load_meta()
|
||||
|
||||
def load_meta(self):
|
||||
self.meta = client.get_album_meta(album_id)
|
||||
self.title = self.meta.get("title")
|
||||
self.version = self.meta.get("version")
|
||||
|
@ -236,8 +246,6 @@ class Album(Tracklist):
|
|||
raise NonStreamable(f"This album is not streamable ({album_id} ID)")
|
||||
|
||||
self._load_tracks()
|
||||
for k, v in kwargs.items():
|
||||
setattr(self, k, v)
|
||||
|
||||
def _load_tracks(self):
|
||||
"""Load tracks from the album metadata."""
|
||||
|
@ -247,6 +255,35 @@ class Album(Tracklist):
|
|||
Track.from_album_meta(album=self.meta, pos=i, client=self.client)
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_api(cls, item: dict, client: ClientInterface, source: str = "qobuz"):
|
||||
"""Create an Album object from the api response of Qobuz, Tidal,
|
||||
or Deezer.
|
||||
|
||||
:param resp: response dict
|
||||
:type resp: dict
|
||||
:param source: in ('qobuz', 'deezer', 'tidal')
|
||||
:type source: str
|
||||
"""
|
||||
if source == 'qobuz':
|
||||
# only collect minimal information for identification purposes
|
||||
info = {
|
||||
'title': item['title'],
|
||||
'albumartist': item['artist']['name'],
|
||||
'id': item['id'], # this is the important part
|
||||
'version': item['version'],
|
||||
'url': item['url'],
|
||||
'quality': (item['maximum_bit_depth'], item['maximum_sampling_rate']),
|
||||
'streamable': item['streamable']
|
||||
}
|
||||
elif source == 'tidal':
|
||||
pass
|
||||
elif source == 'deezer':
|
||||
pass
|
||||
else:
|
||||
raise ValueError
|
||||
|
||||
|
||||
@property
|
||||
def title(self) -> str:
|
||||
"""Return the title of the album.
|
||||
|
@ -280,3 +317,11 @@ class Album(Tracklist):
|
|||
for track in self:
|
||||
track.download(quality, folder, progress_bar)
|
||||
track.tag(album_meta=self.meta)
|
||||
|
||||
|
||||
class Playlist(Tracklist):
|
||||
pass
|
||||
|
||||
|
||||
class Artist(Tracklist):
|
||||
pass
|
||||
|
|
|
@ -24,7 +24,8 @@ class Spoofer:
|
|||
self.bundle = bundle_req.text
|
||||
|
||||
def get_app_id(self):
|
||||
return re.search(self.app_id_regex, self.bundle).group("app_id")
|
||||
match = re.search(self.app_id_regex, self.bundle).group("app_id")
|
||||
return str(match)
|
||||
|
||||
def get_secrets(self):
|
||||
seed_matches = re.finditer(self.seed_timezone_regex, self.bundle)
|
||||
|
@ -50,4 +51,4 @@ class Spoofer:
|
|||
secrets[secret_pair] = base64.standard_b64decode(
|
||||
"".join(secrets[secret_pair])[:-44]
|
||||
).decode("utf-8")
|
||||
return secrets
|
||||
return secrets.values()
|
||||
|
|
Loading…
Reference in New Issue