Download

Source code for eqc_direct.client

  • """
  • :class:`.EqcClient` contains all RPC calls to process, get system status,
  • and fetch results.
  • """
  • import logging
  • import time
  • import os
  • import warnings
  • from typing import TypedDict, List, Optional, Union
  • import grpc
  • from grpc._channel import _InactiveRpcError
  • import numpy as np
  • from . import eqc_pb2, eqc_pb2_grpc
  • from .utils import (
  • SysStatus,
  • message_to_dict,
  • PREC_MIN_RECOMMENDED_LEVELS,
  • get_decimal_places
  • )
  • [docs]
  • class InactiveRpcError(Exception):
  • """Custom exception wrapper around grpc._channel._InactiveRpcError."""
  • [docs]
  • class EqcResult(TypedDict):
  • """
  • EQC results object. Will not contain a energy or solution if err_code is not 0.
  • :param err_code: the error code for a given job. Full list of :code:`err_code`
  • values can be found :class:`eqc_direct.utils.JobCodes`
  • :param err_desc: the error description for a given job submission. Full list of
  • :code:`err_desc` values can be found in :class:`eqc_direct.utils.JobCodes`
  • :param preprocessing_time: data validation and time to re-format input data for
  • running on the device in seconds
  • :param runtime: solving time in seconds for Dirac hardware
  • :param energy: energy for best solution found (float32 precision)
  • :param solution: vector representing the lowest energy solution (float32 precision)
  • :param distilled_runtime: runtime for distillation of solutions in seconds
  • :param distilled_energy: energy for distilled solution for input polynomial
  • (float32 precision)
  • :param distilled_solution: a vector representing the solution after
  • the distillation procedure is applied to the original solution
  • derived from the hardware. (float32 precision)
  • :note:
  • * solutions are length n vector of floats \
  • that sum to the device constraint
  • .. Must use native python types to ensure can be dumped to json
  • """
  • err_code: int
  • err_desc: str
  • preprocessing_time: float
  • runtime: float
  • energy: Optional[float]
  • solution: Optional[List[float]]
  • distilled_runtime: Optional[float]
  • distilled_energy: Optional[float]
  • distilled_solution: Optional[List[float]]
  • [docs]
  • class EqcClient:
  • """
  • Provides calls to process jobs using EQC RPC server
  • :param ip_address: The IP address of the RPC server
  • :param port: The port that the RPC server is running on
  • :param max_data_size: the max send and recieve message length for RPC server
  • .. note::
  • :code:`lock_id` is used by a variety of class functions.
  • It is set to an empty string by default since default for device server
  • :code:`lock_id` is also an empty string. This allows for single user
  • processing without having to acquire a device lock.
  • .. All GRPC calls follow a specific pattern:
  • .. 1. Fill in data to be sent in message stub
  • .. 2. Send data using stub service method
  • .. 3. Parse response
  • """
  • def __init__(
  • self,
  • ip_address: str = os.getenv("DEVICE_IP_ADDRESS", "localhost"),
  • port: str = os.getenv("DEVICE_PORT", "50051"),
  • max_data_size: int = 512 * 1024 * 1024,
  • ):
  • self._ip_address = ip_address
  • self._max_data_size = max_data_size
  • self._ip_add_port = ip_address + ":" + port
  • self._channel_opt = [
  • ("grpc.max_send_message_length", max_data_size),
  • ("grpc.max_receive_message_length", max_data_size),
  • ]
  • self.channel = grpc.insecure_channel(
  • self._ip_add_port,
  • options=self._channel_opt,
  • )
  • self.eqc_stub = eqc_pb2_grpc.EqcServiceStub(self.channel)
  • [docs]
  • def submit_job( # pylint: disable=R0913, R0914
  • self,
  • poly_coefficients: np.ndarray,
  • poly_indices: np.ndarray,
  • num_variables: Optional[int] = None,
  • lock_id: str = "",
  • sum_constraint: Union[int, float] = 10000,
  • relaxation_schedule: int = 2,
  • solution_precision: Optional[float] = None,
  • ) -> dict:
  • """
  • Submits data to be processed by EQC device
  • :param poly_coefficients:
  • coefficient values for the polynomial to be minimized
  • :param poly_indices:
  • list of lists containing polynomial indices associated with
  • coefficient values for problem to be optimized.
  • :param num_variables: the number of total variables for the submitted
  • polynomial must not be less than max index in :code:`poly_indices`.
  • If no value is provided then will be set to max value in
  • :code:`poly_indices`.
  • :param lock_id: a UUID to allow for multi-user processing
  • :param sum_constraint: a normalization constraint that is applied to the
  • problem space that is used to calculate :code:`energy`. This
  • parameter will be rounded if exceeds float32 precision
  • (e.g. 7-decimal places). Value must be between 1 and 10000.
  • :param relaxation_schedule: four different schedules represented
  • in integer parameter. Higher values reduce the variation in
  • the analog spin values and therefore, are more probable to lead to
  • improved objective function energy for input problem.
  • Accepts range of values in set {1, 2, 3, 4}.
  • :param solution_precision: the level of precision to apply to the solutions.
  • This parameter will be rounded if exceeds float32 precision
  • (e.g. 7-decimal places). If specified a distillation method is
  • applied to the continuous solutions to map them to the submitted
  • :code:`solution_precision`. Input :code:`solution_precision` must
  • satisfy :code:`solution_precision` greater than or equal to
  • :code:`sum_constraint`/10000 in order to be valid.
  • Also :code:`sum_constraint` must be divisible by :code:`solution_precision`.
  • If :code:`solution_precision` is not specified no distillation will be
  • applied to the solution derived by the device.
  • :return: a member of :class:`eqc_direct.utils.JobCodes` as a dict
  • with the following keys:
  • - **err_code**: `int`- job submission error code
  • - **err_desc**: `str`- error code description for submission
  • """
  • # set here and in process_job
  • # need to set default as
  • if solution_precision is None:
  • solution_precision = 0
  • poly_coefficients = np.array(poly_coefficients)
  • poly_indices = np.array(poly_indices)
  • coefficient_dtype = poly_coefficients.dtype
  • if not (
  • np.issubdtype(coefficient_dtype, np.integer)
  • or (
  • np.issubdtype(coefficient_dtype, np.floating)
  • and np.finfo(coefficient_dtype).bits <= 32
  • )
  • ):
  • warn_dtype_msg = (
  • f"Max precision for EQC device is float32 input type "
  • f"was dtype {np.dtype(coefficient_dtype).name}."
  • f" Input matrix will be rounded"
  • )
  • logging.warning(warn_dtype_msg)
  • warnings.warn(warn_dtype_msg, Warning)
  • if get_decimal_places(solution_precision)>7:
  • soln_prec_warn = (
  • f"`solution_precision`precision is greater than 7 "
  • f"decimal places. Will be modified on submission to "
  • f"device to float32 precision"
  • )
  • logging.warning(soln_prec_warn)
  • warnings.warn(soln_prec_warn, Warning)
  • if get_decimal_places(sum_constraint)>7:
  • sum_constraint_warn = (
  • f"`sum_constraint` precision is greater than 7 decimal "
  • f"places. Will be modified on submission to device "
  • f"to float32"
  • )
  • logging.warning(sum_constraint_warn)
  • warnings.warn(sum_constraint_warn, Warning)
  • try:
  • _, degree_poly = poly_indices.shape
  • except ValueError as err:
  • err_msg = "`poly_indices` array must be two dimensions"
  • logging.error(err_msg, exc_info=True)
  • raise ValueError(err_msg) from err
  • if not num_variables:
  • num_variables = np.max(poly_indices)
  • # flatten rowwise for matrix
  • poly_indices = poly_indices.flatten(order="c").tolist()
  • job_input = eqc_pb2.JobInput(
  • num_variables=num_variables,
  • degree=degree_poly,
  • poly_indices=poly_indices,
  • coef_values=poly_coefficients.tolist(),
  • sum_constraint=sum_constraint,
  • relaxation_schedule=relaxation_schedule,
  • soln_precision=solution_precision,
  • lock_id=lock_id,
  • )
  • try:
  • job_results = self.eqc_stub.SubmitJob(job_input)
  • except _InactiveRpcError as exc:
  • # Make error easier to read/detect by consumers.
  • raise InactiveRpcError(
  • "EQC submit_job failed due to grpc._channel._InactiveRpcError."
  • ) from exc
  • return message_to_dict(job_results)
  • [docs]
  • def fetch_result(self, lock_id: str = "") -> EqcResult:
  • """
  • Request last EQC job results. Returns results from the most recent
  • run on the device.
  • :param lock_id: a valid :code:`lock_id` that matches current device
  • :code:`lock_id`
  • :return: an :class:`.EqcResult` object
  • """
  • fetch_input = eqc_pb2.LockMessage(lock_id=lock_id)
  • try:
  • eqc_results = self.eqc_stub.FetchResults(fetch_input)
  • except _InactiveRpcError as exc:
  • # Make error easier to read/detect by consumers.
  • raise InactiveRpcError(
  • "EQC fetch_results failed due to grpc._channel._InactiveRpcError."
  • ) from exc
  • result = message_to_dict(eqc_results)
  • # need to interpret result with correct precision
  • # grpc serialization deserialization causes a change in the values
  • # in order to ensure that results can be dumped to json
  • # must use native python types if use float(np.float32(num))
  • # then will get corrupted bits so must cast to str first
  • result["solution"] = [
  • float(f"{np.float32(val):.7f}") for val in result["solution"]
  • ]
  • result["distilled_solution"] = [
  • float(f"{np.float32(val):.7f}") for val in result["distilled_solution"]
  • ]
  • result["energy"] = float(f"{np.float32(result['energy']):.7f}")
  • result["distilled_energy"] = float(
  • f"{np.float32(result['distilled_energy']):.7f}"
  • )
  • return result
  • [docs]
  • def system_status(self) -> dict:
  • """
  • Client call to obtain EQC system status
  • :returns: a member of :class:`eqc_direct.utils.SysStatus` as a dict:
  • - **status_code**: `int`- current system status code
  • - **status_desc**: `str`- description of current system status
  • """
  • try:
  • sys_resp = self.eqc_stub.SystemStatus(eqc_pb2.Empty())
  • except _InactiveRpcError as exc:
  • raise InactiveRpcError(
  • "EQC system_status failed due to grpc._channel._InactiveRpcError."
  • ) from exc
  • return message_to_dict(sys_resp)
  • [docs]
  • def acquire_lock(self) -> dict:
  • """
  • Makes a single attempt to acquire exclusive lock on hardware execution.
  • Locking can be used to ensure orderly processing in multi-user environments.
  • Lock can only be acquired when no other user has acquired the lock or when
  • the system has been idle for 60 seconds while another user has the lock.
  • This idle timeout prevents one user from blocking other users from using
  • the machine even if they are not active.
  • :return:
  • a member of :class:`eqc_direct.utils.LockManageStatus` as a dict along
  • with an additional key :code:`lock_id`:
  • - **lock_id**: `str`- if acquired the current device `lock_id`
  • else empty string
  • - **status_code**: `int`- status code for lock id acquisition
  • - **status_desc**: `str`- a description for the associated status code
  • """
  • try:
  • acquire_lock_resp = self.eqc_stub.AcquireLock(eqc_pb2.Empty())
  • except _InactiveRpcError as exc:
  • raise InactiveRpcError(
  • "EQC acquire_lock failed due to grpc._channel._InactiveRpcError."
  • ) from exc
  • return {
  • "lock_id": acquire_lock_resp.lock_id,
  • "status_code": acquire_lock_resp.lock_status.status_code,
  • "status_desc": acquire_lock_resp.lock_status.status_desc,
  • }
  • [docs]
  • def release_lock(self, lock_id: str = "") -> dict:
  • """
  • Releases exclusive lock for running health check or submitting job
  • :param lock_id: a UUID with currently acquired exclusive device lock
  • :return: a member of :class:`eqc_direct.utils.LockManageStatus` as a dict:
  • - **status_code**: `int`- status code for lock id acquisition
  • - **status_desc**: `str`- a description for the associated status code
  • """
  • release_input = eqc_pb2.LockMessage(lock_id=lock_id)
  • try:
  • release_lock_resp = self.eqc_stub.ReleaseLock(release_input)
  • except _InactiveRpcError as exc:
  • raise InactiveRpcError(
  • "EQC release_lock failed due to grpc._channel._InactiveRpcError."
  • ) from exc
  • return message_to_dict(release_lock_resp)
  • [docs]
  • def check_lock(self, lock_id: str = "") -> dict:
  • """
  • Checks if submitted :code:`lock_id` has execution lock on the device
  • :param lock_id: a UUID which will be checked to determine if has exclusive
  • device execution lock
  • :return: a member of :class:`eqc_direct.utils.LockCheckStatus` as a dict:
  • - **status_code**: `int`- status code for lock check
  • - **status_desc**: `str`- a description for the associated status code
  • """
  • check_input = eqc_pb2.LockMessage(lock_id=lock_id)
  • check_output = self.eqc_stub.CheckLock(check_input)
  • return message_to_dict(check_output)
  • [docs]
  • def stop_running_process(self, lock_id: str = "") -> dict:
  • """
  • Stops a running process either a health check or a Eqc job.
  • Process locks will release automatically based on a timeout
  • which is maintained in the server code if they are
  • not released using this.
  • :param lock_id: requires a lock_id that was acquired by
  • :return:
  • a member of :class:`eqc_direct.utils.SysStatus`
  • as dict with following keys:
  • - **status_code**: `int`- the system code after stopping
  • - **status_desc**: `str`- the associated system status description
  • """
  • stop_input = eqc_pb2.LockMessage(lock_id=lock_id)
  • try:
  • stop_resp = self.eqc_stub.StopRunning(stop_input)
  • except _InactiveRpcError as exc:
  • raise InactiveRpcError(
  • "EQC stop_running_process failed due to "
  • "grpc._channel._InactiveRpcError."
  • ) from exc
  • return message_to_dict(stop_resp)
  • [docs]
  • def wait_for_lock(self) -> tuple:
  • """
  • Waits for lock indefinitely calling :func:`acquire_lock`
  • :return: a tuple of the following items:
  • - **lock_id**: `str`- exclusive lock for device execution with a timeout
  • - **start_queue_ts**: `int`- time in ns on which lock was acquired is an int
  • - **end_queue_ts**: `int`- time in ns on which queue for
  • lock ended is an int.
  • """
  • lock_id = ""
  • start_queue_ts = time.time_ns()
  • while lock_id == "":
  • sys_code = self.system_status()["status_code"]
  • # this is based on the error statuses are 3 and above
  • if sys_code >= 3:
  • raise RuntimeError(f"System unavailable status_code: {sys_code}")
  • lock_id = self.acquire_lock()["lock_id"]
  • # only sleep if didn't get lock on device
  • if lock_id == "":
  • time.sleep(1)
  • end_queue_ts = time.time_ns()
  • return lock_id, start_queue_ts, end_queue_ts
  • [docs]
  • def system_version(self) -> dict:
  • """
  • Provides information regarding Dirac server
  • :return: a dict with a single item:
  • - **server_version**: `str` - the current gRPC server version
  • """
  • try:
  • sys_ver_resp = self.eqc_stub.ServerVersion(eqc_pb2.Empty())
  • except _InactiveRpcError as exc:
  • raise InactiveRpcError(
  • "EQC system_version call failed due to inactive grpc channel"
  • ) from exc
  • return message_to_dict(sys_ver_resp)
  • [docs]
  • def process_job( # pylint: disable=R0913
  • self,
  • poly_coefficients: np.ndarray,
  • poly_indices: np.ndarray,
  • num_variables: Optional[int] = None,
  • lock_id: str = "",
  • sum_constraint: Union[int, float] = 10000,
  • relaxation_schedule: int = 2,
  • solution_precision: Optional[float] = None,
  • ) -> dict:
  • """
  • Processes a job by:
  • 1. Submitting job
  • 2. Checks for status, until completes or fails
  • 3. Returns results
  • :param poly_coefficients: coefficient values for the polynomial to be minimized
  • :param poly_indices:
  • list of lists containing polynomial indices associated with
  • coefficient values for problem to be optimized.
  • :param lock_id: a UUID to allow for multi-user processing
  • :param sum_constraint: a normalization constraint that is applied to the
  • problem space that is used to calculate :code:`energy`. This
  • parameter will be rounded if exceeds float32 precision
  • (e.g. 7-decimal places). Value must be between 1 and 10000.
  • :param relaxation_schedule: four different schedules represented
  • in integer parameter. Higher values reduce the variation in
  • the analog spin values and therefore, are more probable to lead to
  • improved objective function energy for input problem.
  • Accepts range of values in set {1, 2, 3, 4}.
  • :param solution_precision: the level of precision to apply to the solutions.
  • This parameter will be rounded if exceeds float32 precision
  • (e.g. 7-decimal places). If specified a distillation method is
  • applied to the continuous solutions to map them to the submitted
  • :code:`solution_precision`. Input :code:`solution_precision` must
  • satisfy :code:`solution_precision` greater than or equal to
  • :code:`sum_constraint`/10000 in order to be valid.
  • Also :code:`sum_constraint` must be divisible by :code:`solution_precision`.
  • If :code:`solution_precision` is not specified no distillation will be
  • applied to the solution derived by the device.
  • :return: dict of results and timings with the following keys:
  • - results: :class:`.EqcResult` dict
  • - start_job_ts: time in ns marking start of job_submission
  • - end_job_ts: time in ns marking end of job submission complete
  • """
  • # Must also change submit_job default set
  • if not solution_precision:
  • solution_precision = 0
  • start_job = time.time_ns()
  • submit_job_resp = self.submit_job(
  • poly_coefficients=poly_coefficients,
  • poly_indices=poly_indices,
  • num_variables=num_variables,
  • lock_id=lock_id,
  • sum_constraint=sum_constraint,
  • relaxation_schedule=relaxation_schedule,
  • solution_precision=solution_precision,
  • )
  • logging.info("Job submitted")
  • if submit_job_resp["err_code"] != 0:
  • err_msg = f"Job submission failed with response: {submit_job_resp}"
  • logging.error(err_msg, exc_info=True)
  • raise RuntimeError(err_msg)
  • sys_code = self.system_status()["status_code"]
  • while sys_code != SysStatus.IDLE["status_code"]:
  • sys_code = self.system_status()["status_code"]
  • # this is based on the error statuses are 3 and above
  • if sys_code > 3:
  • err_msg = f"System unavailable status_code: {sys_code}"
  • logging.error(err_msg, exc_info=True)
  • raise RuntimeError(err_msg)
  • # only sleep if not idle
  • if sys_code != SysStatus.IDLE["status_code"]:
  • time.sleep(1)
  • end_job = time.time_ns()
  • # pull in results after is idle
  • logging.info("Fetching results")
  • job_result = self.fetch_result(lock_id=lock_id)
  • if job_result["err_code"] != 0:
  • raise RuntimeError(
  • f"Job execution error\n"
  • f"err_code: {job_result['err_code']}\n"
  • f"err_desc: {job_result['err_desc']}"
  • )
  • job_result["start_job_ts"] = start_job
  • job_result["end_job_ts"] = end_job
  • return job_result