from __future__ import annotations
from datetime import datetime, timedelta
import json
import os
import secrets
import time
from typing import TYPE_CHECKING
from urllib.parse import parse_qsl, urlencode, urlparse
import warnings
import httpx
from .._shared import TokenDatabase, TTLCache, APIClient, OAuth2APIClient
from ._api.albums import AlbumsAPI
from ._api.artists import ArtistsAPI
from ._api.charts import ChartsAPI
from ._api.editorial import EditorialAPI
from ._api.episodes import EpisodesAPI
from ._api.genres import GenresAPI
from ._api.playlists import PlaylistsAPI
from ._api.podcasts import PodcastsAPI
from ._api.radios import RadiosAPI
from ._api.search import SearchAPI
from ._api.tracks import TracksAPI
from ._api.users import UsersAPI
if TYPE_CHECKING:
from typing import Any
from ..._types import Collection
[docs]
class DeezerAPIClient(OAuth2APIClient):
"""
Deezer API client.
"""
AUTH_URL = "https://connect.deezer.com/oauth/auth.php"
BASE_URL = "https://api.deezer.com"
TOKEN_URL = "https://connect.deezer.com/oauth/access_token.php"
_ALLOWED_AUTH_FLOWS = {None, "auth_code", "implicit"}
_ALLOWED_PERMISSIONS = {
"basic_access",
"email",
"offline_access",
"manage_library",
"manage_community",
"delete_library",
"listening_history",
}
_ENV_VAR_PREFIX = "DEEZER_API"
_OPTIONAL_AUTH = True
_PROVIDER = "Deezer"
_QUAL_NAME = f"minim.api.{_PROVIDER.lower()}.{__qualname__}"
_rate_limit_per_second = 10.0
__slots__ = (
"_access_token",
"_app_id",
"_app_secret",
"_permissions",
"albums",
"artists",
"charts",
"editorial",
"episodes",
"genres",
"playlists",
"podcasts",
"radios",
"search",
"tracks",
"users",
)
def __init__(
self,
*,
auth_flow: str | None = None,
app_id: str | None = None,
app_secret: str | None = None,
user_identifier: str | None = None,
redirect_uri: str | None = None,
permissions: str | Collection[str] = "",
access_token: str | None = None,
expires_at: str | datetime | None = None,
redirect_handler: str | None = None,
open_browser: bool = False,
enable_cache: bool = True,
limit_rate: bool = True,
store_tokens: bool = True,
user_agent: str | None = None,
) -> None:
"""
Parameters
----------
auth_flow : str; keyword-only
Authorization flow.
**Valid values**:
* :code:`None` – No authentication.
* :code:`"auth_code"` – Authorization Code Flow.
* :code:`"implicit"` – Implicit Grant Flow.
app_id : str; keyword-only; optional
Application ID. Required unless set as system environment
variable :code:`DEEZER_API_APP_ID` or stored in the local
token storage.
app_secret : str; keyword-only; optional
Application secret. Required for the Authorization Code flow
unless set as system environment variable
:code:`DEEZER_API_APP_SECRET` or stored in the local token
storage.
user_identifier : str; keyword-only; optional
Identifier for the user account. Used when
:code:`store_tokens=True` to distinguish between multiple
accounts for the same client ID and authorization flow.
If specified, it is used with the client ID and authorization
flow to locate a matching stored token. If none is found, a
new token is obtained and stored under this identifier.
If not specified, the most recently accessed token for the
client ID and authorization flow is used. If none exists, a
new token is obtained and stored using the Deezer user ID
acquired from a successful authorization.
Prefixing the identifier with a tilde (:code:`~`) bypasses
token retrieval, forces reauthorization, and stores the new
token under the suffix.
redirect_uri : str; keyword-only; optional
Redirect URI. Required for the Authorization Code and
Authorization Code with PKCE flows.
permissions : str or Collection[str]; keyword-only; optional
Permissions requested by the client to access user
resources.
.. seealso::
:meth:`resolve_permissions` – Resolve substrings into a
set of permissions.
access_token : str; keyword-only; optional
Access token. If provided, the authorization process is
bypassed.
expires_at : str or datetime.datetime; keyword-only; optional
Expiration time of the access token. If a string, it must be
in ISO 8601 format (:code:`%Y-%m-%dT%H:%M:%SZ`).
redirect_handler : str or None; keyword-only; optional
Backend for handling redirects during the authorization
flow. Redirect handling is only available for hosts
:code:`localhost`, :code:`127.0.0.1`, or :code:`::1`.
**Valid values**:
* :code:`None` – Show authorization URL in and have the user
manually paste the redirect URL into the terminal.
* :code:`"http.server"` – Run a HTTP server to intercept the
redirect after user authorization in any local browser.
* :code:`"playwright"` – Use a Playwright Firefox browser to
complete the user authorization.
open_browser : bool; keyword-only; default: :code:`False`
Whether to automatically open the authorization URL in the
default web browser for the Authorization Code flow. If
:code:`False`, the URL is printed to the terminal.
enable_cache : bool; keyword-only; default: :code:`True`
Whether to enable an in-memory time-to-live (TTL) cache with
a least recently used (LRU) eviction policy for this client.
If :code:`True`, responses from semi-static endpoints are
cached for one minute to one day, depending on their
expected update frequency.
.. seealso::
:meth:`clear_cache` – Clear specific or all cache entries
for this client.
limit_rate : bool; keyword-only; default: :code:`True`
Whether to enable a token bucket rate limiter for this
client.
store_tokens : bool; keyword-only; default: :code:`True`
Whether to enable the local token storage for this client.
If :code:`True`, existing access tokens are retrieved when
found in local storage, and newly acquired tokens and their
metadata are stored for future retrieval. If :code:`False`,
the client neither retrieves nor stores access tokens.
.. seealso::
:meth:`get_tokens` – Retrieve specific or all stored
access tokens for this client.
:meth:`remove_tokens` – Remove specific or all stored
access tokens for this client.
user_agent : str; keyword-only; optional
:code:`User-Agent` value to include in the headers of HTTP
requests.
"""
APIClient.__init__(
self,
enable_cache=enable_cache,
limit_rate=limit_rate,
user_agent=user_agent,
)
# Initialize subclasses for endpoint groups
#: Albums API endpoints for the Deezer API.
self.albums: AlbumsAPI = AlbumsAPI(self)
#: Artists API endpoints for the Deezer API.
self.artists: ArtistsAPI = ArtistsAPI(self)
#: Charts API endpoints for the Deezer API.
self.charts: ChartsAPI = ChartsAPI(self)
#: Editorial API endpoints for the Deezer API.
self.editorial: EditorialAPI = EditorialAPI(self)
#: Episodes API endpoints for the Deezer API.
self.episodes: EpisodesAPI = EpisodesAPI(self)
#: Genres API endpoints for the Deezer API.
self.genres: GenresAPI = GenresAPI(self)
#: Playlists API endpoints for the Deezer API.
self.playlists: PlaylistsAPI = PlaylistsAPI(self)
#: Podcasts API endpoints for the Deezer API.
self.podcasts: PodcastsAPI = PodcastsAPI(self)
#: Radios API endpoints for the Deezer API.
self.radios: RadiosAPI = RadiosAPI(self)
#: Search API endpoints for the Deezer API.
self.search: SearchAPI = SearchAPI(self)
#: Tracks API endpoints for the Deezer API.
self.tracks: TracksAPI = TracksAPI(self)
#: Users API endpoints for the Deezer API.
self.users: UsersAPI = UsersAPI(self)
# If an app ID is not provided, try to retrieve it and its
# corresponding app secret from environment variables
if app_id is None or app_secret is None:
app_id = os.environ.get(f"{self._ENV_VAR_PREFIX}_APP_ID")
app_secret = os.environ.get(f"{self._ENV_VAR_PREFIX}_APP_SECRET")
if auth_flow is not None:
if user_identifier and user_identifier[0] == "~":
user_identifier = user_identifier[1:]
elif store_tokens and (
account := TokenDatabase._get_token(
self.__class__.__name__,
auth_flow=auth_flow,
client_id=app_id,
user_identifier=user_identifier,
)
):
# If an access token is not provided, try to retrieve it
# from local token storage
access_token = account["access_token"]
app_secret = account["client_secret"]
permissions = account["scopes"] or {}
redirect_uri = account["redirect_uri"]
expires_at = account["expires_at"]
self._token_extras = (
json.loads(token_extras)
if isinstance(token_extras := account["extras"], str)
else token_extras
)
self.set_auth_flow(
auth_flow,
app_id=app_id,
app_secret=app_secret,
user_identifier=user_identifier,
redirect_uri=redirect_uri,
permissions=permissions,
redirect_handler=redirect_handler,
open_browser=open_browser,
store_tokens=store_tokens,
authenticate=False,
)
if auth_flow is None:
self._access_token = self._refresh_token = self._expires_at = None
elif access_token:
self.set_access_token(access_token, expires_at=expires_at)
else:
self._obtain_access_token()
[docs]
@classmethod
def get_tokens(
cls,
*,
auth_flows: str | Collection[str] | None = None,
app_ids: str | Collection[str] | None = None,
user_identifiers: str | Collection[str] | None = None,
) -> list[dict[str, Any]] | None:
"""
Retrieve specific or all access tokens and their metadata for
this client from local storage.
Parameters
----------
auth_flows : str or Collection[str]; keyword-only; optional
Authorization flows.
app_ids : str or Collection[str]; keyword-only; optional
Application IDs.
user_identifiers : str or Collection[str]; keyword-only; optional
Identifiers for the user accounts.
"""
TokenDatabase.get_tokens(
client_names=cls.__name__,
auth_flows=auth_flows,
client_ids=app_ids,
user_identifiers=user_identifiers,
)
[docs]
@classmethod
def remove_tokens(
cls,
*,
auth_flows: str | Collection[str] | None = None,
app_ids: str | Collection[str] | None = None,
user_identifiers: str | Collection[str] | None = None,
) -> None:
"""
Remove specific or all access tokens and their metadata for this
client from local storage.
.. warning::
If none of `auth_flows`, `app_ids`, or
`user_identifiers` are provided, all tokens for this client
will be removed from local storage.
Parameters
----------
auth_flows : str or Collection[str]; keyword-only; optional
Authorization flows.
app_ids : str or Collection[str]; keyword-only; optional
Application IDs.
user_identifiers : str or Collection[str]; keyword-only; optional
Identifiers for the user accounts.
"""
TokenDatabase.remove_tokens(
client_names=cls.__name__,
auth_flows=auth_flows,
client_ids=app_ids,
user_identifiers=user_identifiers,
)
[docs]
@classmethod
def resolve_permissions(
cls, matches: str | Collection[str] | None = None
) -> set[str]:
"""
Resolve one or more substrings into a set of permissions.
Parameters
----------
matches : str or Collection[str]; optional
Substrings to match in the available permissions. If not
provided, all available permissions are returned.
Returns
-------
permissions : set[str]
Permissions.
"""
# Return all permissions if no matches are provided
if matches is None:
return cls._ALLOWED_PERMISSIONS.copy()
# Return permissions containing a substring
if isinstance(matches, str):
return {
permission
for permission in cls._ALLOWED_PERMISSIONS
if matches in permission
}
# Recursively gather permissions for multiple
# categories/substrings
return {
permission
for match in matches
for permission in cls.resolve_permissions(match)
}
def _get_auth_code(self) -> str:
"""
Get the authorization code for the Authorization Code flow.
Returns
-------
auth_code : str
Authorization code.
"""
params = {
"app_id": self._app_id,
"perms": ",".join(self._permissions),
"redirect_uri": self._redirect_uri,
"state": secrets.token_urlsafe(),
}
queries = self._handle_redirect(
f"{self.AUTH_URL}?{urlencode(params)}", url_component="query"
)
if error := queries.get("error_reason"):
raise RuntimeError(f"Authorization failed. Error: {error}")
if params["state"] != queries["state"]:
raise RuntimeError("Authorization failed due to state mismatch.")
return queries["code"]
def _obtain_access_token(self, auth_flow: str | None = None) -> None:
"""
Get and set a new access token via the specified or current
authorization flow.
Parameters
----------
auth_flow : str; optional
Authorization flow. If not specified, the current
authorization flow in :attr:`_auth_flow` is used.
**Valid values**:
* :code:`"auth_code"` – Authorization Code Flow.
* :code:`"implicit"` – Implicit Grant Flow.
"""
if not auth_flow:
auth_flow = self._auth_flow
if auth_flow is None:
self.set_access_token(None)
if auth_flow == "implicit":
params = {
"app_id": self._app_id,
"redirect_uri": self._redirect_uri,
"response_type": "token",
"perms": ",".join(self._permissions),
}
resp_json = self._handle_redirect(
f"{self.AUTH_URL}?{urlencode(params)}",
url_component="fragment",
)
if error := resp_json.get("error"):
raise RuntimeError(
f"Authorization failed. "
f"Error: {error} ({resp_json.get('error_reason')})"
)
else: # auth_flow == "auth_code"
resp_json = dict(
parse_qsl(
httpx.post(
self.TOKEN_URL,
data={
"app_id": self._app_id,
"secret": self._app_secret,
"code": self._get_auth_code(),
},
).text
)
)
self.set_access_token(
resp_json.pop("access_token"),
expires_at=(datetime.now() + timedelta(seconds=expires))
if (expires := int(resp_json.pop("expires")))
else None,
)
self._token_extras = resp_json
if not self._user_identifier and self._auth_flow is not None:
self._user_identifier = self._resolve_user_identifier()
if self._store_tokens:
TokenDatabase.add_token(
self.__class__.__name__,
auth_flow=self._auth_flow,
client_id=self._app_id,
client_secret=self._app_secret,
user_identifier=self._user_identifier or "",
redirect_uri=self._redirect_uri,
scopes=self._permissions or "basic_access",
token_type=None,
access_token=self._access_token,
refresh_token=None,
expires_at=self._expires_at,
extras=resp_json,
)
def _request(
self,
method: str,
endpoint: str,
/,
*,
retry: bool = True,
params: dict[str, Any] | None = None,
**kwargs: dict[str, Any],
) -> "httpx.Response":
"""
Make an HTTP request to a Deezer API endpoint.
Parameters
----------
method : str; positional-only
HTTP method.
endpoint : str; positional-only
Deezer API endpoint.
retry : bool; keyword-only; default: :code:`True`
Whether to retry the request if the first attempt returns a
:code:`300 TOKEN_INVALID`.
**kwargs : dict[str, Any]
Keyword parameters to pass to :meth:`httpx.Client.request`.
Returns
-------
response : httpx.Response
HTTP response.
"""
if (rate_limiter := self._rate_limiter) is not None:
rate_limiter.throttle()
if self._expires_at and datetime.now() > self._expires_at:
self._obtain_access_token()
if self._access_token:
if params is None:
params = {}
params["access_token"] = self._access_token
resp = self._client.request(method, endpoint, params=params, **kwargs)
if "error" not in resp.text or not (error := resp.json().get("error")):
return resp
error_code = error.get("code")
if error_code is None:
raise RuntimeError(f"{error['type']} {error['message']}")
if error_code == 4 and retry:
retry_after = 2 / self._rate_limit_per_second
warnings.warn(
"Rate limit exceeded. Retrying after "
f"{retry_after:.3f} second(s)."
)
time.sleep(retry_after)
return self._request(method, endpoint, retry=False, **kwargs)
if error_code == 300 and retry:
self._obtain_access_token()
return self._request(
method, endpoint, retry=False, params=params, **kwargs
)
raise RuntimeError(
f"{error_code} {error['type']} – {error['message']}"
)
def _require_permissions(
self, endpoint_method: str, permissions: str | Collection[str], /
) -> None:
"""
Ensure that the required permissions for an endpoint method have
been granted.
Parameters
----------
endpoint_method : str; positional-only
Name of the endpoint method.
scopes : str or Collection[str]; positional-only
Required authorization scopes.
"""
if not self._permissions and self._auth_flow is not None:
self._permissions = {
perm
for perm, bool_ in self.users.get_user_permissions()[
"permissions"
].items()
if bool_
}
if isinstance(permissions, str):
if permissions not in self._permissions:
raise RuntimeError(
f"{self._QUAL_NAME}.{endpoint_method}() requires "
f"the '{permissions}' permission."
)
else:
for permission in permissions:
self._require_permissions(endpoint_method, permission)
def _resolve_user_identifier(self) -> str:
"""
Return the Deezer user ID as the user identifier for the
current account.
.. note::
Invoking this method may call
:meth:`~minim.api.deezer.UsersAPI.get_user` and
make a request to the Deezer API.
"""
return self.users.get_user()["id"]
[docs]
@TTLCache.cached_method(ttl="static")
def get_country_info(self) -> dict[str, Any]:
"""
Get configuration and availability information for the Deezer
API in the country associated with the current connection.
Returns
-------
info : dict[str, Any]
Configuration and availability information.
.. admonition:: Sample response
:class: response dropdown
.. code-block::
{
"ads": {
"audio": {
"default": {
"interval": <int>,
"start": <int>,
"unit": <str>
}
},
"big_native_ads_home": {
"android": {
"enabled": <bool>
},
"android_tablet": {
"enabled": <bool>
},
"ipad": {
"enabled": <bool>
},
"iphone": {
"enabled": <bool>
}
},
"display": {
"interstitial": {
"interval": <int>,
"start": <int>,
"unit": <str>
}
}
},
"country": <str>,
"country_iso": <str>,
"has_podcasts": <bool>,
"hosts": {
"images": <str>,
"stream": <str>
},
"lyrics_available": <bool>,
"offers": [],
"open": <bool>,
"pop": <str>,
"upload_token": <str>,
"upload_token_lifetime": <int>,
"user_token": <str>
}
"""
return self._request("GET", "infos").json()
[docs]
def set_access_token(
self,
access_token: str | None,
/,
*,
expires_at: str | datetime | None = None,
) -> None:
"""
Set or update the access token and its related metadata.
.. warning::
Calling this method replaces all existing values with the
provided parameters. Parameters not provided explicitly
will be overwritten by their default values.
Parameters
----------
access_token : str or None; positional-only
Access token.
.. important::
If the access token was acquired via a different
authorization flow or client, call
:meth:`set_auth_flow` first to ensure that all
other relevant authorization parameters are set
correctly.
expires_at : str or datetime.datetime; keyword-only; optional
Expiration time of the access token. If a string, it must be
in ISO 8601 format (:code:`%Y-%m-%dT%H:%M:%SZ`).
"""
if access_token is None and self._auth_flow is not None:
raise ValueError(
"`access_token` cannot be None when using the "
f"{self._AUTH_FLOWS[self._auth_flow]}."
)
self._access_token = access_token
self._refresh_token = None
self._expires_at = (
datetime.strptime(expires_at, "%Y-%m-%dT%H:%M:%SZ")
if expires_at and isinstance(expires_at, str)
else expires_at
)
[docs]
def set_auth_flow(
self,
auth_flow: str | None,
/,
*,
app_id: str | None = None,
app_secret: str | None = None,
user_identifier: str | None = None,
redirect_uri: str | None = None,
permissions: str | Collection[str] = "",
redirect_handler: str | None = None,
open_browser: bool = False,
store_tokens: bool = True,
authenticate: bool = True,
) -> None:
"""
Set or update the authorization flow and related parameters.
.. warning::
Calling this method replaces all existing values with the
provided parameters. Parameters not provided explicitly
will be overwritten by their default values.
Parameters
----------
auth_flow : str or None; keyword-only
Authorization flow.
**Valid values**:
* :code:`None` – No authentication.
* :code:`"auth_code"` – Authorization Code Flow.
* :code:`"implicit"` – Implicit Grant Flow.
app_id : str; keyword-only; optional
Application ID. Required unless set as system environment
variable :code:`DEEZER_API_APP_ID`.
app_secret : str; keyword-only; optional
Application secret. Required for the Authorization Code flow
unless set as system environment variable
:code:`DEEZER_API_APP_SECRET`.
user_identifier : str; keyword-only; optional
Identifier for the user account. Used when
:code:`store_tokens=True` to distinguish between multiple
accounts for the same client ID and authorization flow.
If specified, it is used with the client ID and authorization
flow to locate a matching stored token. If none is found, a
new token is obtained and stored under this identifier.
If not specified, the most recently accessed token for the
client ID and authorization flow is used. If none exists, a
new token is obtained and stored using a user identifier
(e.g., user ID) acquired from a successful authorization.
Prefixing the identifier with a tilde (:code:`~`) bypasses
token retrieval, forces reauthorization, and stores the new
token under the suffix.
redirect_uri : str; keyword-only; optional
Redirect URI. Required for the Authorization Code and
Implicit Grant flows.
permissions : str or Collection[str]; keyword-only; optional
Permissions requested by the client to access user
resources.
redirect_handler : str or None; keyword-only; optional
Backend for handling redirects during the authorization
flow. Redirect handling is only available for hosts
:code:`localhost`, :code:`127.0.0.1`, or :code:`::1`.
**Valid values**:
* :code:`None` – Show authorization URL in and have the user
manually paste the redirect URL into the terminal.
* :code:`"http.server"` – Run a HTTP server to intercept the
redirect after user authorization in any local browser.
* :code:`"playwright"` – Use a Playwright Firefox browser to
complete the user authorization.
open_browser : bool; keyword-only; default: :code:`False`
Whether to automatically open the authorization URL in the
default web browser for the Authorization Code and Implicit
Grant flows. If :code:`False`, the URL is printed to the
terminal.
store_tokens : bool; keyword-only; default: :code:`True`
Whether to enable the local token storage for this client.
If :code:`True`, existing access tokens are retrieved when
found in local storage, and newly acquired tokens and their
metadata are stored for future retrieval. If :code:`False`,
the client neither retrieves nor stores access tokens.
.. seealso::
:meth:`get_tokens` – Retrieve specific or all stored
access tokens for this client.
:meth:`remove_tokens` – Remove specific or all stored
access tokens for this client.
authenticate : bool; keyword-only; default: :code:`True`
Whether to immediately initiate the authorization
flow to acquire an access token.
.. important::
Unless :meth:`set_access_token` is called immediately
after, this should be left as :code:`True` to ensure the
client's existing token is compatible with the new
authorization flow and/or scopes.
"""
if auth_flow not in self._ALLOWED_AUTH_FLOWS:
raise ValueError(
f"Invalid authorization flow {auth_flow!r}. "
f"Valid values: {self._join_values(self._ALLOWED_AUTH_FLOWS)}."
)
self._auth_flow = auth_flow
if auth_flow is None and permissions:
warnings.warn(
"Permissions were specified in the `permissions` "
"parameter, but the unauthenticated client does not"
"support permissions."
)
permissions = ""
self._permissions = (
permissions
if isinstance(permissions, set)
else set(
permissions.split()
if isinstance(permissions, str)
else permissions
)
)
if app_id is None or app_secret is None:
app_id = os.environ.get(f"{self._ENV_VAR_PREFIX}_APP_ID")
app_secret = os.environ.get(f"{self._ENV_VAR_PREFIX}_APP_SECRET")
if app_id is None and auth_flow is not None:
raise ValueError(
"An application ID must be provided via the `app_id` "
f"parameter for the {self._AUTH_FLOWS[auth_flow]}."
)
self._app_id = app_id
if (
auth_flow in {"auth_code", "client_credentials"}
) and not app_secret:
raise ValueError(
f"The {self._AUTH_FLOWS[auth_flow]} requires an "
"application secret to be provided via the "
"`app_secret` parameter."
)
self._app_secret = app_secret
self._user_identifier = user_identifier
has_redirect = redirect_uri is not None
if auth_flow is not None:
if not has_redirect:
raise ValueError(
f"The {self._AUTH_FLOWS[auth_flow]} requires a "
"redirect URI to be provided via the "
"`redirect_uri` parameter."
)
parsed = urlparse(redirect_uri)
self._port = (
port
if (port := parsed.port)
else 80
if parsed.scheme == "http"
else 443
if parsed.scheme == "https"
else None
)
self._redirect_uri = redirect_uri
if redirect_handler is not None:
if redirect_handler not in self._REDIRECT_HANDLERS:
handlers_str = self._join_values(self._REDIRECT_HANDLERS)
if handlers_str:
handlers_str = f", '{handlers_str}'"
raise ValueError(
f"Invalid redirect handler {redirect_handler!r}. "
f"Valid value(s): None{handlers_str}."
)
if (hostname := parsed.hostname) not in {
"localhost",
"127.0.0.1",
"::1",
}:
warnings.warn(
"Redirect handling is not available for host "
f"{hostname!r}."
)
redirect_handler = None
self._redirect_handler = redirect_handler
else:
if has_redirect:
warnings.warn(
"A redirect URI was provided via the "
"`redirect_uri` parameter, but the unauthenticated "
"client does not use redirects."
)
self._redirect_uri = None
self._redirect_handler = None
self._open_browser = open_browser
self._store_tokens = store_tokens
if authenticate and auth_flow is not None:
self._obtain_access_token()