- """
- Client gRPC services for running on FPGA reservoir computer EmuCore developed by QCi.
- """
- import logging
- import json
- import time
- from typing import List, TypedDict
- import os
- import grpc
- from grpc._channel import _InactiveRpcError
- import numpy as np
- from . import emucore_pb2, emucore_pb2_grpc
- from .utils import set_filter_coefficients, \
- bytes_to_array, \
- prep_input_data, \
- message_to_dict
- grpc_service_config = json.dumps(
- {
- "methodConfig": [
- {
- "name": [{"service": "EmuCore.EmuCoreService"}],
- "retryPolicy": {
- "maxAttempts": 5,
- "initialBackoff": "0.2s",
- "maxBackoff": "10s",
- "backoffMultiplier": 2.5,
- "retryableStatusCodes": ["UNAVAILABLE"],
- },
- }
- ]
- }
- )
- MAX_INPUT_SIZE = 20*1024*1024
- class StatusMessage(TypedDict):
- """
- Structure of responses for configuration requests to EmuCore device.
-
- :param status: the status of the request
- :param message: a description for the recieved status
- """
- status: int
- message: str
- class InactiveRpcError(Exception):
- """Custom exception wrapper around grpc._channel._InactiveRpcError."""
- class EmuCoreClient:
- """
- Provides services for accessing EmuCore server
- :param ip_addr: the IP address of the gRPC server
- :param port:
- The port that the RPC server is running on
- :param max_data_size: int
- The max send and recieve message length for RPC server
- .. note::
- :code:`lock_id` is used by a variety of class functions.
- It is set to an empty string by default since default for device server
- :code:`lock_id` is also an empty string. This allows for single user
- processing without having to acquire a device lock.
- .. All GRPC calls follow a specific pattern:
- .. 1. Fill in data to be sent in message stub
- .. 2. Send data using stub service method
- .. 3. Parse response
- """
- def __init__(
- self,
- ip_addr: str = os.getenv("DEVICE_IP_ADDRESS", "localhost"),
- port: str = os.getenv("DEVICE_PORT", "50051"),
- max_data_size: int =512 * 1024 * 1024,
- ):
- self._ip_add_port = ip_addr + ":" + port
- self._channel_opt = [
- ("grpc.max_send_message_length", max_data_size),
- ("grpc.max_receive_message_length", max_data_size),
- ("grpc.service_config", grpc_service_config),
- ]
- self.channel = grpc.insecure_channel(
- self._ip_add_port, options=self._channel_opt
- )
- self.stub = emucore_pb2_grpc.EmuCoreServiceStub(self.channel)
- def check_lock(self, lock_id: str = "") -> dict:
- """
- Checks if submitted :code:`lock_id` has execution lock on the device
- :param lock_id: a UUID which will be checked to determine if has exclusive
- device execution lock
- :return: a member of :class:`eqc_direct.utils.LockCheckStatus` as a dict:
- - **status_code**: `int`- status code for lock check
- - **status_desc**: `str`- a description for the associated status code
- """
- check_input = emucore_pb2.lock_message(lock_id=lock_id)
- check_output = self.eqc_stub.check_lock(check_input)
- return message_to_dict(check_output)
- def reservoir_reset(self,
- lock_id) -> StatusMessage:
- """
- Resets a reservoir instance by clearing RAM on the server
- :param lock_id: a lock_id which has an active reserve on the device
-
- :return:
- dictionary with with values from members of
- :class:`emucore_direct.types.StatusResponses`
- """
- reset_message = emucore_pb2.lock_message(lock_id=lock_id)
- return message_to_dict(self.stub.reservoir_reset(reset_message))
- def system_info(self) -> dict:
- """
- Provides system info on call
-
- :return:
- dict with following keys:
- - **system_name**: `str`- product name
- - **system_version**: `str`- server version
-
- """
- sys_info_resp = self.stub.system_info(emucore_pb2.empty_message())
- return message_to_dict(sys_info_resp)
- def acquire_lock(self) -> dict:
- """
- Attempts to acquire exclusive lock for submitting jobs
- :return:
- a member of :class:`emucore_direct.types.LockManageStatus` as a dict along
- with an additional key :code:`lock_id`:
- - **lock_id**: `str`- if acquired the current device `lock_id`
- else empty string
- - **status**: `int`- status code for lock id acquisition
- - **message**: `str`- a description for the associated status code
- """
- try:
- acquire_lock_resp = self.stub.acquire_lock(emucore_pb2.empty_message())
- except _InactiveRpcError as exc:
- raise InactiveRpcError(
- "acquire_lock failed due to grpc._channel._InactiveRpcError."
- ) from exc
- return message_to_dict(acquire_lock_resp)
- def release_lock(self, lock_id):
- """
- Releases exclusive lock for submitting data to reservoir
- :param lock_id: a UUID with currently acquired exclusive device lock
- :return:
- a dict with the following keys:
- - **lock_released**: `bool`- if released is True else False
- - **message**: `str`- a description of release operation result
- """
- release_input = emucore_pb2.lock_message(lock_id = lock_id)
- try:
- release_lock_resp = self.stub.release_lock(release_input)
- except _InactiveRpcError as exc:
- raise InactiveRpcError(
- "release_lock failed due to grpc._channel._InactiveRpcError."
- ) from exc
- return message_to_dict(release_lock_resp)
- def rc_config(self,
- lock_id: str,
- vbias: float,
- gain: float,
- num_nodes: int,
- num_taps: int) -> StatusMessage:
- """
- Configures reservoir model and how data will be processed by the reservoir.
- :param lock_id: a lock_id which has an active reserve on the device
- :param vbias: bias to apply to each node in reservoir. Range for parameter [0,1].
- :param gain: memory setting for system how long should inputs effect reservoir similar
- to beta in adaptive gradient descent range for parameter [0,1]
- :param num_nodes: the total number of hidden nodes to instantiate within the reservoir,
- a single hidden layer
- :param num_taps: number of connections in reservoir, generally should be set to less
- than the number of nodes in reservoir. Defines interconnection between nodes.
- :return: dictionary with with values from one of the members of
- :class:`emucore_direct.types.StatusResponses`
- """
- filter_coefs = set_filter_coefficients(num_taps=num_taps)
- config_message = emucore_pb2.rc_config_message(lock_id=lock_id,
- vbias = vbias,
- gain = gain,
- num_nodes = num_nodes,
- num_taps = num_taps)
- rc_status = self.stub.rc_config(config_message)
- return message_to_dict(rc_status)
- def rc_run(self,
- lock_id: str,
- reservoir_input: List[int],):
- """
- Runs a series of data through the reservoir and returns response from device
- based on current reservoir configuration.
- :param lock_id: a lock_id which has an active reserve on the device
- :param reservoir_input: a list of digitized values to input to the reservoir must be
- less than MAX_INPUT_SIZE
- :return: a dictionary with the folowing keys:
- - **status**: `int`- the status for the reservoir submission
- - **message**: `str`- a description of the status for the submission
- - **states**: `bytes`- response from reservoir as bytes.
- """
- assert MAX_INPUT_SIZE>=len(reservoir_input), \
- f"Input to reservoir must be of less than or equal to {MAX_INPUT_SIZE} was {len(reservoir_input)}"
- run_message = emucore_pb2.rc_run_message(lock_id=lock_id,
- input = reservoir_input,)
- rc_response = self.stub.rc_run(run_message)
- return message_to_dict(rc_response)
- def wait_for_lock(self):
- """
- Waits for lock indefinitely calling :func:`acquire_lock`
- :return: a tuple of the following items:
- - **lock_id**: `str`- exclusive lock for device execution with a timeout
- - **start_queue_ts**: `int`- time in ns when began lock acquisition.
- - **end_queue_ts**: `int`- time in ns when lock was acquired.
- """
- lock_id = ""
- start_queue_ts = time.time_ns()
- while lock_id == "":
- lock_id = self.acquire_lock()["lock_id"]
-
- if lock_id == "":
- time.sleep(1)
- end_queue_ts = time.time_ns()
- return lock_id, start_queue_ts, end_queue_ts
- def process_all_data(self,
- lock_id: str,
- input_data: np.ndarray,
- num_nodes: int,
- density: float,
- feature_scaling: float,
- max_scale_val: float=None,
- weights: np.ndarray=None,
- seed_val_weights: int=13):
- """
- Run dataset through reservoir:
- 1. Get lock
- 2. Apply scaling and random weights mask to input data
- 3. Run data through reservoir
- 4. Combine data from reservoir responses and reshape based on number of nodes
- 5. Release lock
- :param lock_id: a UUID that currently has lock on the device
- :param input_data: data or series to process via reservoir
- :param num_nodes: the total number of hidden nodes to instantiate within the reservoir,
- a single hidden layer (this is also used to apply random weights to to the data
- as well as reshape data recieved from reservoir back to correct output dimension)
- :param feature_scaling: after applying max abs scalar feature scaling factor applied
- :param max_scale_val: max absolute value used to scale data if provided
- :param seed_val_weights: seeds randomness for weigths to allow for reproducibility
- :note: if doing multiple runs without reset the max value mustn't exceed original
- data max value in order for results to be processed properly.
- :return:
- a tuple of the following elements:
-
- - **reservoir_response**: `np.ndarray`- reservoir response represented as an array dimension of array will be nrows of input matrix by num nodes.
- - **max_scale_value**: `np.ndarray`- the scaling value that was applied to the input data before it was processed by the reservoir.
- - **weights**: `np.ndarray`- the weights that were used to apply the random mask to the data prior to being processed by the reservoir.
- """
- n_rows, n_cols = input_data.shape
- input_packets, max_scale_val, weights = prep_input_data(
- input_data=input_data,
- num_nodes=num_nodes,
- density=density,
- feature_scaling=feature_scaling,
- max_scale_val=max_scale_val,
- weights=weights,
- seed_val_weights=seed_val_weights)
- try:
- reservoir_resp = np.array([])
- for i in input_packets:
- packet_resp = self.rc_run(
- lock_id=lock_id,
- reservoir_input=i)
- packet_bytes = bytearray(np.array(packet_resp["states"]))
- packet_arr = bytes_to_array(input_bytes=packet_bytes)
- reservoir_resp = np.concatenate((reservoir_resp, packet_arr),axis=0)
- except Exception as err:
- print("ERROR OCCURRED", err)
- raise RuntimeError("Error while processing data") from err
- return np.array(reservoir_resp).reshape(n_rows,num_nodes), max_scale_val, weights