Source code for eqc_direct.client
- """
- :class:`.EqcClient` contains all RPC calls to process, get system status,
- and fetch results.
- """
- import logging
- import time
- import os
- import warnings
- from typing import TypedDict, List, Optional, Union
- import grpc
- from grpc._channel import _InactiveRpcError
- import numpy as np
- from . import eqc_pb2, eqc_pb2_grpc
- from .utils import (
- SysStatus,
- message_to_dict,
- PREC_MIN_RECOMMENDED_LEVELS,
- get_decimal_places
- )
- [docs]
- class InactiveRpcError(Exception):
- """Custom exception wrapper around grpc._channel._InactiveRpcError."""
- [docs]
- class EqcResult(TypedDict):
- """
- EQC results object. Will not contain a energy or solution if err_code is not 0.
- :param err_code: the error code for a given job. Full list of :code:`err_code`
- values can be found :class:`eqc_direct.utils.JobCodes`
- :param err_desc: the error description for a given job submission. Full list of
- :code:`err_desc` values can be found in :class:`eqc_direct.utils.JobCodes`
- :param preprocessing_time: data validation and time to re-format input data for
- running on the device in seconds
- :param runtime: solving time in seconds for Dirac hardware
- :param energy: energy for best solution found (float32 precision)
- :param solution: vector representing the lowest energy solution (float32 precision)
- :param distilled_runtime: runtime for distillation of solutions in seconds
- :param distilled_energy: energy for distilled solution for input polynomial
- (float32 precision)
- :param distilled_solution: a vector representing the solution after
- the distillation procedure is applied to the original solution
- derived from the hardware. (float32 precision)
- :note:
- * solutions are length n vector of floats \
- that sum to the device constraint
- .. Must use native python types to ensure can be dumped to json
- """
- err_code: int
- err_desc: str
- preprocessing_time: float
- runtime: float
- energy: Optional[float]
- solution: Optional[List[float]]
- distilled_runtime: Optional[float]
- distilled_energy: Optional[float]
- distilled_solution: Optional[List[float]]
- [docs]
- class EqcClient:
- """
- Provides calls to process jobs using EQC RPC server
- :param ip_address: The IP address of the RPC server
- :param port: The port that the RPC server is running on
- :param max_data_size: the max send and recieve message length for RPC server
- .. note::
- :code:`lock_id` is used by a variety of class functions.
- It is set to an empty string by default since default for device server
- :code:`lock_id` is also an empty string. This allows for single user
- processing without having to acquire a device lock.
- .. All GRPC calls follow a specific pattern:
- .. 1. Fill in data to be sent in message stub
- .. 2. Send data using stub service method
- .. 3. Parse response
- """
- def __init__(
- self,
- ip_address: str = os.getenv("DEVICE_IP_ADDRESS", "localhost"),
- port: str = os.getenv("DEVICE_PORT", "50051"),
- max_data_size: int = 512 * 1024 * 1024,
- ):
- self._ip_address = ip_address
- self._max_data_size = max_data_size
- self._ip_add_port = ip_address + ":" + port
- self._channel_opt = [
- ("grpc.max_send_message_length", max_data_size),
- ("grpc.max_receive_message_length", max_data_size),
- ]
- self.channel = grpc.insecure_channel(
- self._ip_add_port,
- options=self._channel_opt,
- )
- self.eqc_stub = eqc_pb2_grpc.EqcServiceStub(self.channel)
- [docs]
- def submit_job(
- self,
- poly_coefficients: np.ndarray,
- poly_indices: np.ndarray,
- num_variables: Optional[int] = None,
- lock_id: str = "",
- sum_constraint: Union[int, float] = 10000,
- relaxation_schedule: int = 2,
- solution_precision: Optional[float] = None,
- ) -> dict:
- """
- Submits data to be processed by EQC device
- :param poly_coefficients:
- coefficient values for the polynomial to be minimized
- :param poly_indices:
- list of lists containing polynomial indices associated with
- coefficient values for problem to be optimized.
- :param num_variables: the number of total variables for the submitted
- polynomial must not be less than max index in :code:`poly_indices`.
- If no value is provided then will be set to max value in
- :code:`poly_indices`.
- :param lock_id: a UUID to allow for multi-user processing
- :param sum_constraint: a normalization constraint that is applied to the
- problem space that is used to calculate :code:`energy`. This
- parameter will be rounded if exceeds float32 precision
- (e.g. 7-decimal places). Value must be between 1 and 10000.
- :param relaxation_schedule: four different schedules represented
- in integer parameter. Higher values reduce the variation in
- the analog spin values and therefore, are more probable to lead to
- improved objective function energy for input problem.
- Accepts range of values in set {1, 2, 3, 4}.
- :param solution_precision: the level of precision to apply to the solutions.
- This parameter will be rounded if exceeds float32 precision
- (e.g. 7-decimal places). If specified a distillation method is
- applied to the continuous solutions to map them to the submitted
- :code:`solution_precision`. Input :code:`solution_precision` must
- satisfy :code:`solution_precision` greater than or equal to
- :code:`sum_constraint`/10000 in order to be valid.
- Also :code:`sum_constraint` must be divisible by :code:`solution_precision`.
- If :code:`solution_precision` is not specified no distillation will be
- applied to the solution derived by the device.
-
- :return: a member of :class:`eqc_direct.utils.JobCodes` as a dict
- with the following keys:
- - **err_code**: `int`- job submission error code
- - **err_desc**: `str`- error code description for submission
- """
-
-
- if solution_precision is None:
- solution_precision = 0
-
- poly_coefficients = np.array(poly_coefficients)
- poly_indices = np.array(poly_indices)
- coefficient_dtype = poly_coefficients.dtype
- if not (
- np.issubdtype(coefficient_dtype, np.integer)
- or (
- np.issubdtype(coefficient_dtype, np.floating)
- and np.finfo(coefficient_dtype).bits <= 32
- )
- ):
- warn_dtype_msg = (
- f"Max precision for EQC device is float32 input type "
- f"was dtype {np.dtype(coefficient_dtype).name}."
- f" Input matrix will be rounded"
- )
- logging.warning(warn_dtype_msg)
- warnings.warn(warn_dtype_msg, Warning)
- if get_decimal_places(solution_precision)>7:
- soln_prec_warn = (
- f"`solution_precision`precision is greater than 7 "
- f"decimal places. Will be modified on submission to "
- f"device to float32 precision"
- )
- logging.warning(soln_prec_warn)
- warnings.warn(soln_prec_warn, Warning)
- if get_decimal_places(sum_constraint)>7:
- sum_constraint_warn = (
- f"`sum_constraint` precision is greater than 7 decimal "
- f"places. Will be modified on submission to device "
- f"to float32"
- )
- logging.warning(sum_constraint_warn)
- warnings.warn(sum_constraint_warn, Warning)
-
- try:
- _, degree_poly = poly_indices.shape
- except ValueError as err:
- err_msg = "`poly_indices` array must be two dimensions"
- logging.error(err_msg, exc_info=True)
- raise ValueError(err_msg) from err
- if not num_variables:
- num_variables = np.max(poly_indices)
-
- poly_indices = poly_indices.flatten(order="c").tolist()
- job_input = eqc_pb2.JobInput(
- num_variables=num_variables,
- degree=degree_poly,
- poly_indices=poly_indices,
- coef_values=poly_coefficients.tolist(),
- sum_constraint=sum_constraint,
- relaxation_schedule=relaxation_schedule,
- soln_precision=solution_precision,
- lock_id=lock_id,
- )
- try:
- job_results = self.eqc_stub.SubmitJob(job_input)
- except _InactiveRpcError as exc:
-
- raise InactiveRpcError(
- "EQC submit_job failed due to grpc._channel._InactiveRpcError."
- ) from exc
- return message_to_dict(job_results)
- [docs]
- def fetch_result(self, lock_id: str = "") -> EqcResult:
- """
- Request last EQC job results. Returns results from the most recent
- run on the device.
- :param lock_id: a valid :code:`lock_id` that matches current device
- :code:`lock_id`
- :return: an :class:`.EqcResult` object
- """
- fetch_input = eqc_pb2.LockMessage(lock_id=lock_id)
- try:
- eqc_results = self.eqc_stub.FetchResults(fetch_input)
- except _InactiveRpcError as exc:
-
- raise InactiveRpcError(
- "EQC fetch_results failed due to grpc._channel._InactiveRpcError."
- ) from exc
- result = message_to_dict(eqc_results)
-
-
-
-
-
- result["solution"] = [
- float(f"{np.float32(val):.7f}") for val in result["solution"]
- ]
- result["distilled_solution"] = [
- float(f"{np.float32(val):.7f}") for val in result["distilled_solution"]
- ]
- result["energy"] = float(f"{np.float32(result['energy']):.7f}")
- result["distilled_energy"] = float(
- f"{np.float32(result['distilled_energy']):.7f}"
- )
- return result
- [docs]
- def system_status(self) -> dict:
- """
- Client call to obtain EQC system status
- :returns: a member of :class:`eqc_direct.utils.SysStatus` as a dict:
- - **status_code**: `int`- current system status code
- - **status_desc**: `str`- description of current system status
- """
- try:
- sys_resp = self.eqc_stub.SystemStatus(eqc_pb2.Empty())
- except _InactiveRpcError as exc:
- raise InactiveRpcError(
- "EQC system_status failed due to grpc._channel._InactiveRpcError."
- ) from exc
- return message_to_dict(sys_resp)
- [docs]
- def acquire_lock(self) -> dict:
- """
- Makes a single attempt to acquire exclusive lock on hardware execution.
- Locking can be used to ensure orderly processing in multi-user environments.
- Lock can only be acquired when no other user has acquired the lock or when
- the system has been idle for 60 seconds while another user has the lock.
- This idle timeout prevents one user from blocking other users from using
- the machine even if they are not active.
- :return:
- a member of :class:`eqc_direct.utils.LockManageStatus` as a dict along
- with an additional key :code:`lock_id`:
- - **lock_id**: `str`- if acquired the current device `lock_id`
- else empty string
- - **status_code**: `int`- status code for lock id acquisition
- - **status_desc**: `str`- a description for the associated status code
- """
- try:
- acquire_lock_resp = self.eqc_stub.AcquireLock(eqc_pb2.Empty())
- except _InactiveRpcError as exc:
- raise InactiveRpcError(
- "EQC acquire_lock failed due to grpc._channel._InactiveRpcError."
- ) from exc
- return {
- "lock_id": acquire_lock_resp.lock_id,
- "status_code": acquire_lock_resp.lock_status.status_code,
- "status_desc": acquire_lock_resp.lock_status.status_desc,
- }
- [docs]
- def release_lock(self, lock_id: str = "") -> dict:
- """
- Releases exclusive lock for running health check or submitting job
- :param lock_id: a UUID with currently acquired exclusive device lock
- :return: a member of :class:`eqc_direct.utils.LockManageStatus` as a dict:
- - **status_code**: `int`- status code for lock id acquisition
- - **status_desc**: `str`- a description for the associated status code
- """
- release_input = eqc_pb2.LockMessage(lock_id=lock_id)
- try:
- release_lock_resp = self.eqc_stub.ReleaseLock(release_input)
- except _InactiveRpcError as exc:
- raise InactiveRpcError(
- "EQC release_lock failed due to grpc._channel._InactiveRpcError."
- ) from exc
- return message_to_dict(release_lock_resp)
- [docs]
- def check_lock(self, lock_id: str = "") -> dict:
- """
- Checks if submitted :code:`lock_id` has execution lock on the device
- :param lock_id: a UUID which will be checked to determine if has exclusive
- device execution lock
- :return: a member of :class:`eqc_direct.utils.LockCheckStatus` as a dict:
- - **status_code**: `int`- status code for lock check
- - **status_desc**: `str`- a description for the associated status code
- """
- check_input = eqc_pb2.LockMessage(lock_id=lock_id)
- check_output = self.eqc_stub.CheckLock(check_input)
- return message_to_dict(check_output)
- [docs]
- def stop_running_process(self, lock_id: str = "") -> dict:
- """
- Stops a running process either a health check or a Eqc job.
- Process locks will release automatically based on a timeout
- which is maintained in the server code if they are
- not released using this.
- :param lock_id: requires a lock_id that was acquired by
- :return:
- a member of :class:`eqc_direct.utils.SysStatus`
- as dict with following keys:
- - **status_code**: `int`- the system code after stopping
- - **status_desc**: `str`- the associated system status description
- """
- stop_input = eqc_pb2.LockMessage(lock_id=lock_id)
- try:
- stop_resp = self.eqc_stub.StopRunning(stop_input)
- except _InactiveRpcError as exc:
- raise InactiveRpcError(
- "EQC stop_running_process failed due to "
- "grpc._channel._InactiveRpcError."
- ) from exc
- return message_to_dict(stop_resp)
- [docs]
- def wait_for_lock(self) -> tuple:
- """
- Waits for lock indefinitely calling :func:`acquire_lock`
- :return: a tuple of the following items:
- - **lock_id**: `str`- exclusive lock for device execution with a timeout
- - **start_queue_ts**: `int`- time in ns on which lock was acquired is an int
- - **end_queue_ts**: `int`- time in ns on which queue for
- lock ended is an int.
- """
- lock_id = ""
- start_queue_ts = time.time_ns()
- while lock_id == "":
- sys_code = self.system_status()["status_code"]
-
- if sys_code >= 3:
- raise RuntimeError(f"System unavailable status_code: {sys_code}")
- lock_id = self.acquire_lock()["lock_id"]
-
- if lock_id == "":
- time.sleep(1)
- end_queue_ts = time.time_ns()
- return lock_id, start_queue_ts, end_queue_ts
- [docs]
- def system_version(self) -> dict:
- """
- Provides information regarding Dirac server
-
- :return: a dict with a single item:
-
- - **server_version**: `str` - the current gRPC server version
- """
- try:
- sys_ver_resp = self.eqc_stub.ServerVersion(eqc_pb2.Empty())
- except _InactiveRpcError as exc:
- raise InactiveRpcError(
- "EQC system_version call failed due to inactive grpc channel"
- ) from exc
- return message_to_dict(sys_ver_resp)
- [docs]
- def process_job(
- self,
- poly_coefficients: np.ndarray,
- poly_indices: np.ndarray,
- num_variables: Optional[int] = None,
- lock_id: str = "",
- sum_constraint: Union[int, float] = 10000,
- relaxation_schedule: int = 2,
- solution_precision: Optional[float] = None,
- ) -> dict:
- """
- Processes a job by:
- 1. Submitting job
- 2. Checks for status, until completes or fails
- 3. Returns results
- :param poly_coefficients: coefficient values for the polynomial to be minimized
- :param poly_indices:
- list of lists containing polynomial indices associated with
- coefficient values for problem to be optimized.
- :param lock_id: a UUID to allow for multi-user processing
- :param sum_constraint: a normalization constraint that is applied to the
- problem space that is used to calculate :code:`energy`. This
- parameter will be rounded if exceeds float32 precision
- (e.g. 7-decimal places). Value must be between 1 and 10000.
- :param relaxation_schedule: four different schedules represented
- in integer parameter. Higher values reduce the variation in
- the analog spin values and therefore, are more probable to lead to
- improved objective function energy for input problem.
- Accepts range of values in set {1, 2, 3, 4}.
- :param solution_precision: the level of precision to apply to the solutions.
- This parameter will be rounded if exceeds float32 precision
- (e.g. 7-decimal places). If specified a distillation method is
- applied to the continuous solutions to map them to the submitted
- :code:`solution_precision`. Input :code:`solution_precision` must
- satisfy :code:`solution_precision` greater than or equal to
- :code:`sum_constraint`/10000 in order to be valid.
- Also :code:`sum_constraint` must be divisible by :code:`solution_precision`.
- If :code:`solution_precision` is not specified no distillation will be
- applied to the solution derived by the device.
- :return: dict of results and timings with the following keys:
- - results: :class:`.EqcResult` dict
- - start_job_ts: time in ns marking start of job_submission
- - end_job_ts: time in ns marking end of job submission complete
- """
-
- if not solution_precision:
- solution_precision = 0
- start_job = time.time_ns()
- submit_job_resp = self.submit_job(
- poly_coefficients=poly_coefficients,
- poly_indices=poly_indices,
- num_variables=num_variables,
- lock_id=lock_id,
- sum_constraint=sum_constraint,
- relaxation_schedule=relaxation_schedule,
- solution_precision=solution_precision,
- )
- logging.info("Job submitted")
- if submit_job_resp["err_code"] != 0:
- err_msg = f"Job submission failed with response: {submit_job_resp}"
- logging.error(err_msg, exc_info=True)
- raise RuntimeError(err_msg)
- sys_code = self.system_status()["status_code"]
- while sys_code != SysStatus.IDLE["status_code"]:
- sys_code = self.system_status()["status_code"]
-
- if sys_code > 3:
- err_msg = f"System unavailable status_code: {sys_code}"
- logging.error(err_msg, exc_info=True)
- raise RuntimeError(err_msg)
-
- if sys_code != SysStatus.IDLE["status_code"]:
- time.sleep(1)
- end_job = time.time_ns()
-
- logging.info("Fetching results")
- job_result = self.fetch_result(lock_id=lock_id)
- if job_result["err_code"] != 0:
- raise RuntimeError(
- f"Job execution error\n"
- f"err_code: {job_result['err_code']}\n"
- f"err_desc: {job_result['err_desc']}"
- )
- job_result["start_job_ts"] = start_job
- job_result["end_job_ts"] = end_job
- return job_result