Source code for qci_client.base
- """Base class for API clients."""
- from dataclasses import dataclass, field
- from datetime import datetime
- import functools
- import os
- from posixpath import join
- from typing import Callable, Optional
- import requests
- from requests.adapters import HTTPAdapter, Retry
- RETRY_TOTAL = 7
- BACKOFF_FACTOR = 2
- STATUS_FORCELIST = [502, 503, 504]
- [docs]
- @dataclass
- class BaseApi:
- """
- Base class for clients to QCi APIs, especially the authorization layer.
- :param url: url basepath to API endpoint, including scheme, if None, then falls back
- to QCI_API_URL environment variable
- :param api_token: refresh token for authenticating to API, if None, then falls back
- to QCI_TOKEN environment variable
- :param access_tokens: url path fragment to specify access-tokens API endpoint
- :param set_bearer_token_on_init: flag to turn on/off access token retrieval on
- object initialization
- :param timeout: number of seconds before timing out requests, None waits indefinitely
- :param debug: flag to turn on/off debug prints
- """
- url: Optional[str] = None
-
- api_token: Optional[str] = field(default=None, repr=False)
- access_tokens: str = "auth/v1/access-tokens"
- set_bearer_token_on_init: bool = True
-
- timeout: Optional[float] = None
- debug: bool = False
- def __post_init__(self):
- self.url = self.url if self.url else os.getenv("QCI_API_URL")
- if self.url is None:
- raise AssertionError(
- "QCI_API_URL environment variable is empty. Specify url or add the "
- "necessary environment variable"
- )
-
- self.url.rstrip("/")
- self.api_token = self.api_token if self.api_token else os.getenv("QCI_TOKEN")
- if self.api_token is None:
- raise AssertionError(
- "QCI_TOKEN environment variable is empty. Specify api_token or add the "
- "necessary environment variable"
- )
- self.session = requests.Session()
- retries = Retry(
- total=RETRY_TOTAL,
- backoff_factor=BACKOFF_FACTOR,
- status_forcelist=STATUS_FORCELIST,
- )
- self.session.mount("https://", HTTPAdapter(max_retries=retries))
- self._bearer_info = {}
- if self.set_bearer_token_on_init:
- self.set_bearer_token()
- @property
- def auth_url(self) -> str:
- """Return the URL used for authorization."""
- return join(self.url, self.access_tokens)
- @property
- def headers_without_token(self):
- """Headers without cached bearer token."""
- headers = {
- "Content-Type": "application/json",
- "Connection": "close",
- }
- if self.timeout is not None:
-
- headers.update({"X-Request-Timeout-Nano": str(int(10**9 * self.timeout))})
- return headers
- @property
- def headers(self):
- """Headers with cached bearer token."""
- headers = self.headers_without_token
- headers["Authorization"] = f"Bearer {self._bearer_info.get('access_token', '')}"
- return headers
- @property
- def headers_without_connection_close(self):
- """Headers with cached bearer token, but without connection closing."""
- headers = self.headers
- headers.pop("Connection", None)
- return headers
- @classmethod
- def _check_response_error(cls, response: requests.Response) -> None:
- """
- Single place to update error check and message for API calls
- :param response: a response from any API call using the requests package
- """
- try:
-
- response.raise_for_status()
- except requests.HTTPError as err:
-
- raise requests.HTTPError(
- str(err) + f" with response body: {response.text}"
- ) from err
- [docs]
- def get_bearer_token(self) -> requests.Response:
- """Request new bearer token. (Not cached here, see set_bearer_token.)"""
- payload = {"refresh_token": self.api_token}
- response = self.session.request(
- "POST",
- self.auth_url,
- json=payload,
- headers=self.headers_without_token,
- timeout=self.timeout,
- )
- self._check_response_error(response)
- return response
- [docs]
- def set_bearer_token(self) -> None:
- """Set bearer token from request."""
- resp = self.get_bearer_token()
- self._bearer_info = resp.json()
- [docs]
- def is_bearer_token_expired(self) -> bool:
- """Is current time > 'expires' time."""
- if "expires_at_rfc3339" not in self._bearer_info:
- return True
- expiration = datetime.strptime(
- self._bearer_info["expires_at_rfc3339"], "%Y-%m-%dT%H:%M:%SZ"
- )
- seconds_to_expiration = (expiration - datetime.utcnow()).total_seconds()
-
- return seconds_to_expiration < 10
- [docs]
- @staticmethod
- def refresh_token(func) -> Callable:
- """Return a wrapper function that can check an auth token."""
- @functools.wraps(func)
- def check_token(api, *args, **kwargs):
-
-
- is_expired = api.is_bearer_token_expired()
-
- if is_expired:
- api.set_bearer_token()
- return func(api, *args, **kwargs)
-
- return func(api, *args, **kwargs)
- return check_token