Download

Source code for eqc_direct.utils

  • """
  • Utilities for running server sim and client
  • """
  • from dataclasses import dataclass
  • from typing import List, Tuple
  • import grpc
  • import time
  • import numpy as np
  • # levels for precision calc
  • PREC_MIN_RECOMMENDED_LEVELS = 200
  • PREC_MAX_LEVELS = 10000
  • [docs]
  • @dataclass
  • class SysStatus:
  • """
  • Status codes for system paired with their descriptions.
  • """
  • IDLE = {"status_code": 0, "status_desc": "IDLE"}
  • JOB_RUNNING = {"status_code": 1, "status_desc": "JOB_RUNNING"}
  • CALIBRATION = {"status_code": 2, "status_desc": "CALIBRATION"}
  • HEALTH_CHECK = {"status_code": 3, "status_desc": "HEALTH_CHECK"}
  • HARDWARE_FAILURE = {"status_code": [4, 5, 6, 7], "status_desc": "HARDWARE_FAILURE"}
  • [docs]
  • @dataclass
  • class LockCheckStatus:
  • """
  • Statuses codes for checking lock status paired with their descriptions
  • """
  • AVAILABLE = {"status_code": 0, "status_desc": "Lock available"}
  • USER_LOCKED = {
  • "status_code": 1,
  • "status_desc": "lock_id matches current server lock_id",
  • }
  • UNAVAILABLE = {
  • "status_code": 2,
  • "status_desc": "Execution lock is in use by another user",
  • }
  • [docs]
  • @dataclass
  • class LockManageStatus:
  • """
  • Statuses and descriptions for acquiring and releasing lock
  • """
  • SUCCESS = {"status_code": 0, "status_desc": "Success"}
  • MISMATCH = {
  • "status_code": 1,
  • "status_desc": "lock_id does not match current device lock_id",
  • }
  • BUSY = {
  • "status_code": 2,
  • "status_desc": "Lock currently in use unable to perform operation",
  • }
  • [docs]
  • @dataclass
  • class JobCodes:
  • """
  • Job codes for errors paired with their descriptions
  • """
  • NORMAL = {"err_code": 0, "err_desc": "Success"}
  • INDEX_OUT_OF_RANGE = {
  • "err_code": 1,
  • "err_desc": "Index in submitted data is out of range for specified number of variables",
  • }
  • COEF_INDEX_MISMATCH = {
  • "err_code": 2,
  • "err_desc": "Polynomial indices do not match required length for specified coefficient length",
  • }
  • DEVICE_BUSY = {
  • "err_code": 3,
  • "err_desc": "Device currently processing other request",
  • }
  • LOCK_MISMATCH = {
  • "err_code": 4,
  • "err_desc": "lock_id doesn't match current device lock",
  • }
  • HARDWARE_FAILURE = {
  • "err_code": 5,
  • "err_desc": "Device failed during execution",
  • }
  • INVALID_SUM_CONSTRAINT = {
  • "err_code": 6,
  • "err_desc": "Sum constraint must be greater than or equal to 1 and less than or equal to 10000",
  • }
  • INVALID_RELAXATION_SCHEDULE = {
  • "err_code": 7,
  • "err_desc": "Parameter relaxation_schedule must be in set {1,2,3,4}",
  • }
  • USER_INTERRUPT = {
  • "err_code": 8,
  • "err_desc": "User sent stop signal before result was returned",
  • }
  • EXCEEDS_MAX_SIZE = {
  • "err_code": 9,
  • "err_desc": "Exceeds max problem size for device",
  • }
  • DECREASING_INDEX = {
  • "err_code": 10,
  • "err_desc": "One of specified polynomial indices is not specified in non-decreasing order",
  • }
  • INVALID_PRECISION = {
  • "err_code": 11,
  • "err_desc": "The input precision exceeds maximum allowed precision for device",
  • }
  • DUPLICATE_INDEX = {
  • "err_code": 12,
  • "err_desc": "A duplicate polynomial index set was specified for the input polynomial",
  • }
  • PRECISION_CONSTRAINT_MISMATCH = {
  • "err_code": 13,
  • "err_desc": "Sum constraint must be divisible by solution_precision",
  • }
  • PRECISION_NONNEGATIVE = {
  • "err_code": 14,
  • "err_desc": "Input solution precision cannot be negative",
  • }
  • DEGREE_POSITIVE = {
  • "err_code": 15,
  • "err_desc": "Input degree must be greater than 0"
  • }
  • NUM_VARIABLES_POSITIVE = {
  • "err_code": 16,
  • "err_desc": "Input num_variables must be greater than 0"
  • }
  • [docs]
  • def message_to_dict(grpc_message) -> dict:
  • """Convert a gRPC message to a dictionary."""
  • result = {}
  • for descriptor in grpc_message.DESCRIPTOR.fields:
  • field = getattr(grpc_message, descriptor.name)
  • if descriptor.type == descriptor.TYPE_MESSAGE:
  • if descriptor.label == descriptor.LABEL_REPEATED:
  • if field:
  • result[descriptor.name] = [message_to_dict(item) for item in field]
  • else:
  • result[descriptor.name] = []
  • else:
  • if field:
  • result[descriptor.name] = message_to_dict(field)
  • else:
  • result[descriptor.name] = {}
  • else:
  • result[descriptor.name] = field
  • return result
  • [docs]
  • def convert_hamiltonian_to_poly_format(
  • linear_terms: np.ndarray,
  • quadratic_terms: np.ndarray,
  • ) -> Tuple[List[List[int]], List[float]]:
  • """
  • Converts linear terms and quadratic terms of Hamiltonian to polynomial index formatting for Dirac device
  • :param linear_terms: the linear terms for the Hamiltonian 1D length n array
  • :param quadratic_terms: the quadratic coefficients of the Hamiltonian (n by n)
  • :return: a tuple with the following members:
  • - **poly_indices**: List[List[int]] - polynomial indices in non-decreasing sparse format
  • - **poly_coefficients**: List[float] - polynomial coefficients in sparse format
  • .. Sparse matrices are much slower with this implementation convert to dense first
  • """
  • assert len(linear_terms.shape) == 1, "`linear_terms` input must be 1D"
  • try:
  • m, n = quadratic_terms.shape
  • assert m == n, "`quadratic_terms` must be a square matrix"
  • except ValueError:
  • raise ValueError("`quadratic_terms` must be 2D")
  • assert (
  • len(linear_terms) == m
  • ), "`linear_terms` must match dimension row and column dimension of `quadratic_terms`"
  • poly_coefficients = []
  • poly_indices = []
  • # multiplying by 2 here is faster than doing it at each entry in the upper triangular portion
  • quadratic_terms = quadratic_terms * 2
  • diag_idx = np.diag_indices(m)
  • quadratic_terms[diag_idx] /= 2
  • for i in range(m):
  • # add nonzero linear terms to lists
  • if linear_terms[i] != 0:
  • poly_indices.append([0, i + 1])
  • poly_coefficients.append(linear_terms[i])
  • for j in range(m):
  • # add quad terms to lists
  • if quadratic_terms[i, j] and i <= j:
  • poly_coefficients.append(quadratic_terms[i, j])
  • poly_indices.append([i + 1, j + 1])
  • return poly_indices, poly_coefficients
  • [docs]
  • def get_decimal_places(float_num: float)->int:
  • try:
  • decimal_places = len(str(float_num).split('.')[1])
  • except IndexError:
  • decimal_places = 0
  • return decimal_places