# adi-protocol.py
# A unified protocol for the FPU distributed service.
# All clients in primary blogpost n-math.py (Python, Assembly, RISC-V) and the server must adhere to this.
import struct
import numpy as np
# --- Common Constants ---
# Operations for the FPU service.
# The server will process a single or a sequence of these.
OPERATION_INTERPOLATE_HYPERBOLIC_PARABOLIC = 0
OPERATION_INTERPOLATE_ARCSECANT_STREAM = 1
OPERATION_DIFFERENTIATE = 2
OPERATION_CALCULATE_GRADIENT_1D = 3
OPERATION_HYPERBOLIC_INTERCEPT_HANDLER = 4
OPERATION_INTEGRATE = 5
OPERATION_INTEGRATE_ND = 6
# --- Payload Structures ---
# All communication uses a JSON header followed by a binary payload.
def send_data(sock, op_type: int, data_dict: dict):
"""
Constructs and sends a structured request to the server.
The request format is:
[4-byte JSON header length]
[JSON header]
[variable-length binary data payload]
Args:
sock (socket): The connected socket.
op_type (int): The operation type.
data_dict (dict): A dictionary containing all data to be sent.
Keys correspond to data_pointers in the JSON header.
"""
json_header = {
"operation": op_type,
"data_pointers": {} # Maps data key to (length_bytes, offset_bytes)
}
binary_payload = b''
offset = 0
for key, data_array in data_dict.items():
# Ensure data is in np.float32 for consistent binary representation
if not isinstance(data_array, np.ndarray):
data_array = np.array(data_array, dtype=np.float32)
data_bytes = data_array.astype(np.float32).tobytes()
length = len(data_bytes)
json_header["data_pointers"][key] = {
"length": length,
"offset": offset
}
binary_payload += data_bytes
offset += length
json_header_bytes = json.dumps(json_header).encode('utf-8')
header_length = len(json_header_bytes)
sock.sendall(struct.pack('!I', header_length))
sock.sendall(json_header_bytes)
sock.sendall(binary_payload)
def receive_data(sock):
"""
Receives and parses a structured response from the server.
Args:
sock (socket): The connected socket.
Returns:
tuple: (json_header, binary_payload)
"""
# Receive header length
header_len_bytes = _recvall(sock, 4)
if not header_len_bytes:
return None, None
header_length = struct.unpack('!I', header_len_bytes)[0]
# Receive JSON header
json_header_bytes = _recvall(sock, header_length)
if not json_header_bytes:
return None, None
json_header = json.loads(json_header_bytes.decode('utf-8'))
# Receive binary payload
payload_length = sum(p['length'] for p in json_header.get('data_pointers', {}).values())
binary_payload = _recvall(sock, payload_length)
return json_header, binary_payload
def _recvall(sock, n):
"""Helper function to reliably receive exactly 'n' bytes from a socket."""
data = b''
while len(data) < n:
packet = sock.recv(n - len(data))
if not packet:
return None
data += packet
return data
# adi_client.py
# A consolidated client that can operate and multiplex FPU operands,
# including N-dimensional operations and specialized eigenvalue packing.
import socket
import numpy as np
import json
import time
import struct
from typing import Dict, Any, List
# --- Constants (Must match server) ---
OPERATION_INTERPOLATE = 0
OPERATION_DIFFERENTIATE = 1
OPERATION_CALCULATE_GRADIENT_1D = 2
OPERATION_HYPERBOLIC_INTERCEPT_HANDLER = 3
OPERATION_INTEGRATE = 4
OPERATION_INTEGRATE_ND = 5
OPERATION_WORKFLOW = 6 # For relational compositions
OPERATION_INTERPOLATE_EIGENVALUE = 7 # Hypothetical operation for eigenvalue packing
SERVER_HOST = '127.0.0.1'
SERVER_PORT = 12345
# --- Helper Functions ---
def _sendall_data(sock, data_array):
"""Helper to send a numpy array's bytes."""
data_bytes = data_array.astype(np.float32).tobytes()
sock.sendall(struct.pack('!I', len(data_bytes)))
sock.sendall(data_bytes)
def _recvall(sock, n):
"""
Helper function to reliably receive exactly 'n' bytes from a socket.
"""
data = b''
while len(data) < n:
packet = sock.recv(n - len(data))
if not packet:
return None
data += packet
return data
def _recvall_data(sock):
"""Helper to receive a numpy array's bytes or an error message."""
data_len_bytes = _recvall(sock, 4)
if data_len_bytes is None:
raise ConnectionResetError("Incomplete data (length) received from server.")
data_len = struct.unpack('!I', data_len_bytes)[0]
data_bytes = _recvall(sock, data_len)
if data_bytes is None:
raise ConnectionResetError("Incomplete data (content) received from server.")
try:
error_message = data_bytes.decode('utf-8')
if "Error" in error_message or "Server internal error" in error_message:
raise RuntimeError(f"Server returned an error: {error_message}")
except UnicodeDecodeError:
pass
return np.array(struct.unpack(f'!{data_len // 4}f', data_bytes), dtype=np.float32)
def _pack_eigenvalue_data(eigenvalue_array: np.ndarray) -> np.ndarray:
"""
Compresses eigenvalue data using a hybrid packing scheme.
- Values with |x| >= 1 are compressed using the arcsecant mapping.
- Values with |x| < 1 are mapped linearly to a distinct range.
"""
packed_data = np.zeros_like(eigenvalue_array)
# Find indices for two packing methods
arcsec_indices = np.where(np.abs(eigenvalue_array) >= 1)
linear_indices = np.where(np.abs(eigenvalue_array) < 1)
# Pack data for |x| >= 1 using arcsecant
if arcsec_indices[0].size > 0:
packed_data[arcsec_indices] = np.arccos(1 / eigenvalue_array[arcsec_indices])
# Pack data for |x| < 1 using a linear mapping to a distinct range
if linear_indices[0].size > 0:
# Map values from (-1, 1) to (pi, 2*pi).
# Linear function: y = mx + c. Map -1 -> pi and 1 -> 2*pi
# m = (2*pi - pi) / (1 - (-1)) = pi / 2
# c = pi - m*(-1) = pi + pi/2 = 1.5*pi
packed_data[linear_indices] = (np.pi/2) * eigenvalue_array[linear_indices] + (1.5 * np.pi)
return packed_data.astype(np.float32)
def _unpack_eigenvalue_data(packed_array: np.ndarray) -> np.ndarray:
"""
Decompresses packed data back into original eigenvalues.
It uses the value's range to determine the correct inverse operation.
"""
unpacked_data = np.zeros_like(packed_array)
# Identify which unpacking method to use based on the value's range
arcsec_indices = np.where(packed_array <= np.pi)
linear_indices = np.where(packed_array > np.pi)
# Unpack data from the arcsecant range [0, pi]
if arcsec_indices[0].size > 0:
# Inverse operation: 1 / cos(y)
unpacked_data[arcsec_indices] = 1 / np.cos(packed_array[arcsec_indices])
# Unpack data from the linear range (pi, 2*pi]
if linear_indices[0].size > 0:
# Inverse linear function: x = (y - c) / m
# x = (y - 1.5*pi) / (pi/2)
unpacked_data[linear_indices] = (packed_array[linear_indices] - (1.5 * np.pi)) / (np.pi / 2)
return unpacked_data.astype(np.float32)
def multiplexed_execute_workflow(workflow_steps: List[Dict[str, Any]]):
"""
Executes a sequence of operations on the server as a single workflow,
handling different data types based on the operation.
"""
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
client_socket.connect((SERVER_HOST, SERVER_PORT))
print(f"Connected to server at {SERVER_HOST}:{SERVER_PORT}")
# 1. Send OPERATION_WORKFLOW code
client_socket.sendall(struct.pack('!B', OPERATION_WORKFLOW))
# 2. Check for operations that require special handling (e.g., eigenvalue packing)
requires_packing = any(step.get("operation_type") == "INTERPOLATE_EIGENVALUE" for step in workflow_steps)
modified_workflow = []
if requires_packing:
for step in workflow_steps:
new_step = step.copy()
if new_step.get("operation_type") == "INTERPOLATE_EIGENVALUE" and new_step["input_data"]["type"] == "direct":
input_data = new_step["input_data"]
if "eigenvalue_data" in input_data:
packed_data = _pack_eigenvalue_data(np.array(input_data["eigenvalue_data"], dtype=np.float32))
input_data["eigenvalue_data"] = packed_data.tolist()
modified_workflow.append(new_step)
else:
modified_workflow = workflow_steps
# 3. Serialize workflow steps to JSON
workflow_json = json.dumps(modified_workflow)
workflow_bytes = workflow_json.encode('utf-8')
# 4. Send workflow length and then workflow bytes
client_socket.sendall(struct.pack('!I', len(workflow_bytes)))
client_socket.sendall(workflow_bytes)
print("Workflow sent to server. Waiting for result...")
# 5. Receive the final result from the server
result = _recvall_data(client_socket)
# 6. Unpack the result if it was a packed operation
if requires_packing and "output_id" in workflow_steps[-1] and workflow_steps[-1]["output_id"] == "eigenvalue_result":
result = _unpack_eigenvalue_data(result)
return result
except Exception as e:
print(f"Client error during workflow execution: {e}")
return None
finally:
client_socket.close()
print("Connection closed.")
# --- Main program demonstrating usage ---
if __name__ == "__main__":
# --- Example 1: N-dimensional operation (from adi-ndim.py) ---
print("\n--- Testing N-dimensional workflow: Interpolate then Differentiate ---")
initial_fx_data_workflow = [np.array([1.0, 2.0, 3.0, 4.0, 5.0], dtype=np.float32)]
initial_fy_data_workflow = [np.array([10.0, 12.0, 15.0, 19.0, 25.0], dtype=np.float32)]
initial_fz_data_workflow = [np.array([20.0, 21.0, 23.0, 26.0, 30.0], dtype=np.float32)]
common_eval_points_workflow = np.array([1.5, 2.5, 3.5, 4.5], dtype=np.float32)
workflow_interpolate_then_differentiate = [
{
"operation_type": "INTERPOLATE",
"input_data": {
"type": "direct",
"fx_data": [arr.tolist() for arr in initial_fx_data_workflow],
"fy_data": [arr.tolist() for arr in initial_fy_data_workflow],
"fz_data": [arr.tolist() for arr in initial_fz_data_workflow]
},
"parameters": {
"x_interp_points": common_eval_points_workflow.tolist()
},
"output_id": "interpolated_data"
},
{
"operation_type": "DIFFERENTIATE",
"input_data": {
"type": "reference",
"source_id": "interpolated_data"
},
"parameters": {
"x_eval_points": common_eval_points_workflow.tolist()
}
}
]
result_ndim = multiplexed_execute_workflow(workflow_interpolate_then_differentiate)
if result_ndim is not None:
num_results_per_curve_eval = len(common_eval_points_workflow)
num_curves_workflow = len(initial_fx_data_workflow)
y_derivatives_workflow = result_ndim[ : num_results_per_curve_eval * num_curves_workflow]
z_derivatives_workflow = result_ndim[num_results_per_curve_eval * num_curves_workflow : ]
print(f"Workflow result (Y derivatives): {y_derivatives_workflow}")
print(f"Workflow result (Z derivatives): {z_derivatives_workflow}")
time.sleep(1)
# --- Example 2: Eigenvalue packing/unpacking operation (from adi-eigen.py) ---
print("\n--- Testing eigenvalue packing and unpacking ---")
eigenvalues_to_pack = np.array([2.5, 10.0, 100.0, 0.5, -0.75, 500.0, -2.5, -100.0], dtype=np.float32)
workflow_eigenvalue = [
{
"operation_type": "INTERPOLATE_EIGENVALUE",
"input_data": {
"type": "direct",
"eigenvalue_data": eigenvalues_to_pack.tolist()
},
"output_id": "eigenvalue_result"
}
]
result_eigen = multiplexed_execute_workflow(workflow_eigenvalue)
if result_eigen is not None:
print(f"Original Eigenvalues: {eigenvalues_to_pack}")
print(f"Unpacked Result: {result_eigen}")
print(f"Are the results close? {np.allclose(eigenvalues_to_pack, result_eigen)}")
# adi_server.py
# This new, unified server handles all FPU operations from both
# n-math.py and e-stream.py using a single, efficient process pool.
import socket
import struct
import numpy as np
import json
import threading
import multiprocessing
import traceback
# Import the new protocol for interoperability
from protocol import receive_data, send_data, OPERATION_INTERPOLATE_HYPERBOLIC_PARABOLIC, OPERATION_INTERPOLATE_ARCSECANT_STREAM
# --- FPU Operations (Consolidated from n-math.py and e-stream.py) ---
# These functions are now standalone and can be executed by worker processes.
def hyperbolic_parabolic_interpolation_nd_revised(data_bytes, data_pointers):
"""
Performs hyperbolic-parabolic interpolation on n-dimensional data.
"""
all_fy_data = []
all_fx_data = []
x_interp = None
# Unpack the binary data based on the JSON data_pointers
for key, ptr in data_pointers.items():
start = ptr['offset']
end = start + ptr['length']
unpacked_data = np.frombuffer(data_bytes[start:end], dtype=np.float32)
if key.startswith('fx'):
all_fx_data.append(unpacked_data)
elif key.startswith('fy'):
all_fy_data.append(unpacked_data)
elif key == 'x_interp':
x_interp = unpacked_data
if len(all_fx_data) != len(all_fy_data) or not x_interp:
raise ValueError("Invalid data for interpolation.")
all_interp_y = []
num_dimensions = len(all_fy_data)
for fx, fy in zip(all_fx_data, all_fy_data):
if len(fx) != len(fy) or len(fx) < 3:
raise ValueError("X and Y data must have equal length and at least three points.")
interp_y = []
for x in x_interp:
distances = np.abs(fx - x)
nearest_indices = np.argsort(distances)[:3]
x1, x2, x3 = fx[nearest_indices]
y1, y2, y3 = fy[nearest_indices]
# Simplified interpolation logic for demonstration
if x1 == x2 or x2 == x3 or x1 == x3:
raise RuntimeError("Collinear points detected, cannot interpolate.")
# Parabolic interpolation (y = ax^2 + bx + c)
# Using matrix inversion for a more robust solution
A = np.array([
[x1**2, x1, 1],
[x2**2, x2, 1],
[x3**2, x3, 1]
])
b = np.array([y1, y2, y3])
try:
a, b_lin, c = np.linalg.solve(A, b)
y_interp = a * x**2 + b_lin * x + c
except np.linalg.LinAlgError:
raise RuntimeError("Could not solve parabolic interpolation matrix.")
interp_y.append(y_interp)
all_interp_y.append(np.array(interp_y))
return np.concatenate(all_interp_y)
def pseudo_interpolate_arcsecant_stream(data_bytes, data_pointers):
"""
Pseudo-interpolates a chunk of interpolation points using pre-existing data.
This now works as a single, callable function within the service.
"""
x_data = np.frombuffer(data_bytes[data_pointers['x_data']['offset'] : data_pointers['x_data']['offset'] + data_pointers['x_data']['length']], dtype=np.float32)
y_data = np.frombuffer(data_bytes[data_pointers['y_data']['offset'] : data_pointers['y_data']['offset'] + data_pointers['y_data']['length']], dtype=np.float32)
x_interp_chunk = np.frombuffer(data_bytes[data_pointers['x_interp_chunk']['offset'] : data_pointers['x_interp_chunk']['offset'] + data_pointers['x_interp_chunk']['length']], dtype=np.float32)
def _calculate_arcsecant(val):
if np.abs(val) < 1:
return np.nan
return np.arccos(1 / val)
interpolated_y = []
for x in x_interp_chunk:
# Simplified logic: find nearest point and apply a transformation
nearest_idx = np.argmin(np.abs(x_data - x))
y_val = y_data[nearest_idx]
interpolated_y.append(_calculate_arcsecant(y_val))
return np.array(interpolated_y, dtype=np.float32)
# --- Server and Multiprocessing Logic ---
def worker_process(request_data):
"""
A worker function that is executed by a process in the pool.
It takes a request dictionary and dispatches the correct FPU operation.
"""
op_type = request_data["operation"]
data_bytes = request_data["data_bytes"]
data_pointers = request_data["data_pointers"]
if op_type == OPERATION_INTERPOLATE_HYPERBOLIC_PARABOLIC:
result = hyperbolic_parabolic_interpolation_nd_revised(data_bytes, data_pointers)
elif op_type == OPERATION_INTERPOLATE_ARCSECANT_STREAM:
result = pseudo_interpolate_arcsecant_stream(data_bytes, data_pointers)
else:
raise ValueError(f"Unknown operation type: {op_type}")
return result
def handle_client(client_socket, addr, pool):
"""
Handles a single client connection.
Parses the request and submits the FPU task to the multiprocessing pool.
"""
print(f"Accepted connection from {addr}")
try:
json_header, data_bytes = receive_data(client_socket)
if json_header is None:
print(f"Client {addr} disconnected unexpectedly.")
return
print(f"Received request from {addr} for operation {json_header['operation']}")
request_data = {
"operation": json_header["operation"],
"data_bytes": data_bytes,
"data_pointers": json_header["data_pointers"]
}
# Submit the FPU task to the process pool
result_async = pool.apply_async(worker_process, (request_data,))
result = result_async.get() # Block until the result is available
# Send the result back to the client
send_data(client_socket, json_header["operation"], {"result": result})
print(f"Sent result back to {addr}.")
except (ValueError, RuntimeError) as e:
print(f"ValueError or RuntimeError on server from {addr}: {e}")
error_message = str(e).encode('utf-8')
client_socket.sendall(struct.pack('!I', len(error_message)))
client_socket.sendall(error_message)
except ConnectionResetError:
print(f"Client {addr} forcibly closed the connection.")
except Exception as e:
print(f"An unexpected error occurred in handle_client for {addr}: {e}")
traceback.print_exc()
finally:
client_socket.close()
print(f"Connection with client {addr} closed.")
def start_server(host, port):
"""
Starts the main server, listens for connections, and manages the process pool.
"""
# Create a process pool with a number of workers equal to CPU cores
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((host, port))
server_socket.listen(5)
print(f"Server listening on {host}:{port}")
try:
while True:
client_socket, addr = server_socket.accept()
# Handle each client in a separate thread to not block the main loop
client_thread = threading.Thread(target=handle_client, args=(client_socket, addr, pool))
client_thread.start()
except KeyboardInterrupt:
print("\nServer shutting down...")
finally:
server_socket.close()
pool.close()
pool.join()
print("Server shutdown complete.")
if __name__ == "__main__":
SERVER_HOST = '127.0.0.1'
SERVER_PORT = 12345
start_server(SERVER_HOST, SERVER_PORT)
No comments:
Post a Comment