Wednesday, August 20, 2025

Distributed FPU Protocol - Adi FPU Suite 0.11A

# adi-protocol.py
# A unified protocol for the FPU distributed service.
# All clients in primary blogpost n-math.py (Python, Assembly, RISC-V) and the server must adhere to this.
import struct
import numpy as np

# --- Common Constants ---
# Operations for the FPU service.
# The server will process a single or a sequence of these.
OPERATION_INTERPOLATE_HYPERBOLIC_PARABOLIC = 0
OPERATION_INTERPOLATE_ARCSECANT_STREAM = 1
OPERATION_DIFFERENTIATE = 2
OPERATION_CALCULATE_GRADIENT_1D = 3
OPERATION_HYPERBOLIC_INTERCEPT_HANDLER = 4
OPERATION_INTEGRATE = 5
OPERATION_INTEGRATE_ND = 6

# --- Payload Structures ---
# All communication uses a JSON header followed by a binary payload.

def send_data(sock, op_type: int, data_dict: dict):
    """
    Constructs and sends a structured request to the server.
    
    The request format is:
    [4-byte JSON header length]
    [JSON header]
    [variable-length binary data payload]

    Args:
        sock (socket): The connected socket.
        op_type (int): The operation type.
        data_dict (dict): A dictionary containing all data to be sent.
                          Keys correspond to data_pointers in the JSON header.
    """
    json_header = {
        "operation": op_type,
        "data_pointers": {} # Maps data key to (length_bytes, offset_bytes)
    }

    binary_payload = b''
    offset = 0

    for key, data_array in data_dict.items():
        # Ensure data is in np.float32 for consistent binary representation
        if not isinstance(data_array, np.ndarray):
            data_array = np.array(data_array, dtype=np.float32)

        data_bytes = data_array.astype(np.float32).tobytes()
        length = len(data_bytes)
        
        json_header["data_pointers"][key] = {
            "length": length,
            "offset": offset
        }
        
        binary_payload += data_bytes
        offset += length

    json_header_bytes = json.dumps(json_header).encode('utf-8')
    header_length = len(json_header_bytes)
    
    sock.sendall(struct.pack('!I', header_length))
    sock.sendall(json_header_bytes)
    sock.sendall(binary_payload)


def receive_data(sock):
    """
    Receives and parses a structured response from the server.
    
    Args:
        sock (socket): The connected socket.

    Returns:
        tuple: (json_header, binary_payload)
    """
    # Receive header length
    header_len_bytes = _recvall(sock, 4)
    if not header_len_bytes:
        return None, None
    header_length = struct.unpack('!I', header_len_bytes)[0]
    
    # Receive JSON header
    json_header_bytes = _recvall(sock, header_length)
    if not json_header_bytes:
        return None, None
    json_header = json.loads(json_header_bytes.decode('utf-8'))
    
    # Receive binary payload
    payload_length = sum(p['length'] for p in json_header.get('data_pointers', {}).values())
    binary_payload = _recvall(sock, payload_length)

    return json_header, binary_payload


def _recvall(sock, n):
    """Helper function to reliably receive exactly 'n' bytes from a socket."""
    data = b''
    while len(data) < n:
        packet = sock.recv(n - len(data))
        if not packet:
            return None
        data += packet
    return data

# adi_client.py
# A consolidated client that can operate and multiplex FPU operands,
# including N-dimensional operations and specialized eigenvalue packing.

import socket
import numpy as np
import json
import time
import struct
from typing import Dict, Any, List

# --- Constants (Must match server) ---
OPERATION_INTERPOLATE = 0
OPERATION_DIFFERENTIATE = 1
OPERATION_CALCULATE_GRADIENT_1D = 2
OPERATION_HYPERBOLIC_INTERCEPT_HANDLER = 3
OPERATION_INTEGRATE = 4
OPERATION_INTEGRATE_ND = 5
OPERATION_WORKFLOW = 6 # For relational compositions
OPERATION_INTERPOLATE_EIGENVALUE = 7 # Hypothetical operation for eigenvalue packing

SERVER_HOST = '127.0.0.1'
SERVER_PORT = 12345

# --- Helper Functions ---
def _sendall_data(sock, data_array):
    """Helper to send a numpy array's bytes."""
    data_bytes = data_array.astype(np.float32).tobytes()
    sock.sendall(struct.pack('!I', len(data_bytes)))
    sock.sendall(data_bytes)

def _recvall(sock, n):
    """
    Helper function to reliably receive exactly 'n' bytes from a socket.
    """
    data = b''
    while len(data) < n:
        packet = sock.recv(n - len(data))
        if not packet:
            return None
        data += packet
    return data

def _recvall_data(sock):
    """Helper to receive a numpy array's bytes or an error message."""
    data_len_bytes = _recvall(sock, 4)
    if data_len_bytes is None:
        raise ConnectionResetError("Incomplete data (length) received from server.")
    data_len = struct.unpack('!I', data_len_bytes)[0]
    data_bytes = _recvall(sock, data_len)
    if data_bytes is None:
        raise ConnectionResetError("Incomplete data (content) received from server.")
    
    try:
        error_message = data_bytes.decode('utf-8')
        if "Error" in error_message or "Server internal error" in error_message:
            raise RuntimeError(f"Server returned an error: {error_message}")
    except UnicodeDecodeError:
        pass
    
    return np.array(struct.unpack(f'!{data_len // 4}f', data_bytes), dtype=np.float32)

def _pack_eigenvalue_data(eigenvalue_array: np.ndarray) -> np.ndarray:
    """
    Compresses eigenvalue data using a hybrid packing scheme.
    - Values with |x| >= 1 are compressed using the arcsecant mapping.
    - Values with |x| < 1 are mapped linearly to a distinct range.
    """
    packed_data = np.zeros_like(eigenvalue_array)

    # Find indices for two packing methods
    arcsec_indices = np.where(np.abs(eigenvalue_array) >= 1)
    linear_indices = np.where(np.abs(eigenvalue_array) < 1)

    # Pack data for |x| >= 1 using arcsecant
    if arcsec_indices[0].size > 0:
        packed_data[arcsec_indices] = np.arccos(1 / eigenvalue_array[arcsec_indices])

    # Pack data for |x| < 1 using a linear mapping to a distinct range
    if linear_indices[0].size > 0:
        # Map values from (-1, 1) to (pi, 2*pi).
        # Linear function: y = mx + c. Map -1 -> pi and 1 -> 2*pi
        # m = (2*pi - pi) / (1 - (-1)) = pi / 2
        # c = pi - m*(-1) = pi + pi/2 = 1.5*pi
        packed_data[linear_indices] = (np.pi/2) * eigenvalue_array[linear_indices] + (1.5 * np.pi)

    return packed_data.astype(np.float32)

def _unpack_eigenvalue_data(packed_array: np.ndarray) -> np.ndarray:
    """
    Decompresses packed data back into original eigenvalues.
    It uses the value's range to determine the correct inverse operation.
    """
    unpacked_data = np.zeros_like(packed_array)
    
    # Identify which unpacking method to use based on the value's range
    arcsec_indices = np.where(packed_array <= np.pi)
    linear_indices = np.where(packed_array > np.pi)

    # Unpack data from the arcsecant range [0, pi]
    if arcsec_indices[0].size > 0:
        # Inverse operation: 1 / cos(y)
        unpacked_data[arcsec_indices] = 1 / np.cos(packed_array[arcsec_indices])

    # Unpack data from the linear range (pi, 2*pi]
    if linear_indices[0].size > 0:
        # Inverse linear function: x = (y - c) / m
        # x = (y - 1.5*pi) / (pi/2)
        unpacked_data[linear_indices] = (packed_array[linear_indices] - (1.5 * np.pi)) / (np.pi / 2)
    
    return unpacked_data.astype(np.float32)

def multiplexed_execute_workflow(workflow_steps: List[Dict[str, Any]]):
    """
    Executes a sequence of operations on the server as a single workflow,
    handling different data types based on the operation.
    """
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        client_socket.connect((SERVER_HOST, SERVER_PORT))
        print(f"Connected to server at {SERVER_HOST}:{SERVER_PORT}")

        # 1. Send OPERATION_WORKFLOW code
        client_socket.sendall(struct.pack('!B', OPERATION_WORKFLOW))

        # 2. Check for operations that require special handling (e.g., eigenvalue packing)
        requires_packing = any(step.get("operation_type") == "INTERPOLATE_EIGENVALUE" for step in workflow_steps)

        modified_workflow = []
        if requires_packing:
            for step in workflow_steps:
                new_step = step.copy()
                if new_step.get("operation_type") == "INTERPOLATE_EIGENVALUE" and new_step["input_data"]["type"] == "direct":
                    input_data = new_step["input_data"]
                    if "eigenvalue_data" in input_data:
                        packed_data = _pack_eigenvalue_data(np.array(input_data["eigenvalue_data"], dtype=np.float32))
                        input_data["eigenvalue_data"] = packed_data.tolist()
                modified_workflow.append(new_step)
        else:
            modified_workflow = workflow_steps
        
        # 3. Serialize workflow steps to JSON
        workflow_json = json.dumps(modified_workflow)
        workflow_bytes = workflow_json.encode('utf-8')

        # 4. Send workflow length and then workflow bytes
        client_socket.sendall(struct.pack('!I', len(workflow_bytes)))
        client_socket.sendall(workflow_bytes)

        print("Workflow sent to server. Waiting for result...")

        # 5. Receive the final result from the server
        result = _recvall_data(client_socket)

        # 6. Unpack the result if it was a packed operation
        if requires_packing and "output_id" in workflow_steps[-1] and workflow_steps[-1]["output_id"] == "eigenvalue_result":
             result = _unpack_eigenvalue_data(result)

        return result

    except Exception as e:
        print(f"Client error during workflow execution: {e}")
        return None
    finally:
        client_socket.close()
        print("Connection closed.")

# --- Main program demonstrating usage ---
if __name__ == "__main__":
    # --- Example 1: N-dimensional operation (from adi-ndim.py) ---
    print("\n--- Testing N-dimensional workflow: Interpolate then Differentiate ---")
    initial_fx_data_workflow = [np.array([1.0, 2.0, 3.0, 4.0, 5.0], dtype=np.float32)]
    initial_fy_data_workflow = [np.array([10.0, 12.0, 15.0, 19.0, 25.0], dtype=np.float32)]
    initial_fz_data_workflow = [np.array([20.0, 21.0, 23.0, 26.0, 30.0], dtype=np.float32)]
    common_eval_points_workflow = np.array([1.5, 2.5, 3.5, 4.5], dtype=np.float32)

    workflow_interpolate_then_differentiate = [
        {
            "operation_type": "INTERPOLATE",
            "input_data": {
                "type": "direct", 
                "fx_data": [arr.tolist() for arr in initial_fx_data_workflow],
                "fy_data": [arr.tolist() for arr in initial_fy_data_workflow],
                "fz_data": [arr.tolist() for arr in initial_fz_data_workflow]
            },
            "parameters": {
                "x_interp_points": common_eval_points_workflow.tolist()
            },
            "output_id": "interpolated_data"
        },
        {
            "operation_type": "DIFFERENTIATE",
            "input_data": {
                "type": "reference",
                "source_id": "interpolated_data"
            },
            "parameters": {
                "x_eval_points": common_eval_points_workflow.tolist()
            }
        }
    ]
    result_ndim = multiplexed_execute_workflow(workflow_interpolate_then_differentiate)
    if result_ndim is not None:
        num_results_per_curve_eval = len(common_eval_points_workflow)
        num_curves_workflow = len(initial_fx_data_workflow)
        y_derivatives_workflow = result_ndim[ : num_results_per_curve_eval * num_curves_workflow]
        z_derivatives_workflow = result_ndim[num_results_per_curve_eval * num_curves_workflow : ]
        print(f"Workflow result (Y derivatives): {y_derivatives_workflow}")
        print(f"Workflow result (Z derivatives): {z_derivatives_workflow}")

    time.sleep(1)

    # --- Example 2: Eigenvalue packing/unpacking operation (from adi-eigen.py) ---
    print("\n--- Testing eigenvalue packing and unpacking ---")
    eigenvalues_to_pack = np.array([2.5, 10.0, 100.0, 0.5, -0.75, 500.0, -2.5, -100.0], dtype=np.float32)
    workflow_eigenvalue = [
        {
            "operation_type": "INTERPOLATE_EIGENVALUE",
            "input_data": {
                "type": "direct", 
                "eigenvalue_data": eigenvalues_to_pack.tolist()
            },
            "output_id": "eigenvalue_result"
        }
    ]
    result_eigen = multiplexed_execute_workflow(workflow_eigenvalue)
    if result_eigen is not None:
        print(f"Original Eigenvalues: {eigenvalues_to_pack}")
        print(f"Unpacked Result: {result_eigen}")
        print(f"Are the results close? {np.allclose(eigenvalues_to_pack, result_eigen)}")

# adi_server.py
# This new, unified server handles all FPU operations from both
# n-math.py and e-stream.py using a single, efficient process pool.

import socket
import struct
import numpy as np
import json
import threading
import multiprocessing
import traceback

# Import the new protocol for interoperability
from protocol import receive_data, send_data, OPERATION_INTERPOLATE_HYPERBOLIC_PARABOLIC, OPERATION_INTERPOLATE_ARCSECANT_STREAM

# --- FPU Operations (Consolidated from n-math.py and e-stream.py) ---
# These functions are now standalone and can be executed by worker processes.

def hyperbolic_parabolic_interpolation_nd_revised(data_bytes, data_pointers):
    """
    Performs hyperbolic-parabolic interpolation on n-dimensional data.
    """
    all_fy_data = []
    all_fx_data = []
    x_interp = None

    # Unpack the binary data based on the JSON data_pointers
    for key, ptr in data_pointers.items():
        start = ptr['offset']
        end = start + ptr['length']
        unpacked_data = np.frombuffer(data_bytes[start:end], dtype=np.float32)

        if key.startswith('fx'):
            all_fx_data.append(unpacked_data)
        elif key.startswith('fy'):
            all_fy_data.append(unpacked_data)
        elif key == 'x_interp':
            x_interp = unpacked_data

    if len(all_fx_data) != len(all_fy_data) or not x_interp:
        raise ValueError("Invalid data for interpolation.")

    all_interp_y = []
    num_dimensions = len(all_fy_data)

    for fx, fy in zip(all_fx_data, all_fy_data):
        if len(fx) != len(fy) or len(fx) < 3:
            raise ValueError("X and Y data must have equal length and at least three points.")

        interp_y = []
        for x in x_interp:
            distances = np.abs(fx - x)
            nearest_indices = np.argsort(distances)[:3]
            x1, x2, x3 = fx[nearest_indices]
            y1, y2, y3 = fy[nearest_indices]

            # Simplified interpolation logic for demonstration
            if x1 == x2 or x2 == x3 or x1 == x3:
                raise RuntimeError("Collinear points detected, cannot interpolate.")

            # Parabolic interpolation (y = ax^2 + bx + c)
            # Using matrix inversion for a more robust solution
            A = np.array([
                [x1**2, x1, 1],
                [x2**2, x2, 1],
                [x3**2, x3, 1]
            ])
            b = np.array([y1, y2, y3])
            try:
                a, b_lin, c = np.linalg.solve(A, b)
                y_interp = a * x**2 + b_lin * x + c
            except np.linalg.LinAlgError:
                raise RuntimeError("Could not solve parabolic interpolation matrix.")

            interp_y.append(y_interp)
        all_interp_y.append(np.array(interp_y))

    return np.concatenate(all_interp_y)


def pseudo_interpolate_arcsecant_stream(data_bytes, data_pointers):
    """
    Pseudo-interpolates a chunk of interpolation points using pre-existing data.
    This now works as a single, callable function within the service.
    """
    x_data = np.frombuffer(data_bytes[data_pointers['x_data']['offset'] : data_pointers['x_data']['offset'] + data_pointers['x_data']['length']], dtype=np.float32)
    y_data = np.frombuffer(data_bytes[data_pointers['y_data']['offset'] : data_pointers['y_data']['offset'] + data_pointers['y_data']['length']], dtype=np.float32)
    x_interp_chunk = np.frombuffer(data_bytes[data_pointers['x_interp_chunk']['offset'] : data_pointers['x_interp_chunk']['offset'] + data_pointers['x_interp_chunk']['length']], dtype=np.float32)

    def _calculate_arcsecant(val):
        if np.abs(val) < 1:
            return np.nan
        return np.arccos(1 / val)

    interpolated_y = []
    for x in x_interp_chunk:
        # Simplified logic: find nearest point and apply a transformation
        nearest_idx = np.argmin(np.abs(x_data - x))
        y_val = y_data[nearest_idx]
        interpolated_y.append(_calculate_arcsecant(y_val))

    return np.array(interpolated_y, dtype=np.float32)


# --- Server and Multiprocessing Logic ---

def worker_process(request_data):
    """
    A worker function that is executed by a process in the pool.
    It takes a request dictionary and dispatches the correct FPU operation.
    """
    op_type = request_data["operation"]
    data_bytes = request_data["data_bytes"]
    data_pointers = request_data["data_pointers"]

    if op_type == OPERATION_INTERPOLATE_HYPERBOLIC_PARABOLIC:
        result = hyperbolic_parabolic_interpolation_nd_revised(data_bytes, data_pointers)
    elif op_type == OPERATION_INTERPOLATE_ARCSECANT_STREAM:
        result = pseudo_interpolate_arcsecant_stream(data_bytes, data_pointers)
    else:
        raise ValueError(f"Unknown operation type: {op_type}")
    
    return result


def handle_client(client_socket, addr, pool):
    """
    Handles a single client connection.
    Parses the request and submits the FPU task to the multiprocessing pool.
    """
    print(f"Accepted connection from {addr}")
    try:
        json_header, data_bytes = receive_data(client_socket)
        if json_header is None:
            print(f"Client {addr} disconnected unexpectedly.")
            return

        print(f"Received request from {addr} for operation {json_header['operation']}")
        
        request_data = {
            "operation": json_header["operation"],
            "data_bytes": data_bytes,
            "data_pointers": json_header["data_pointers"]
        }

        # Submit the FPU task to the process pool
        result_async = pool.apply_async(worker_process, (request_data,))
        result = result_async.get()  # Block until the result is available
        
        # Send the result back to the client
        send_data(client_socket, json_header["operation"], {"result": result})
        print(f"Sent result back to {addr}.")

    except (ValueError, RuntimeError) as e:
        print(f"ValueError or RuntimeError on server from {addr}: {e}")
        error_message = str(e).encode('utf-8')
        client_socket.sendall(struct.pack('!I', len(error_message)))
        client_socket.sendall(error_message)
    except ConnectionResetError:
        print(f"Client {addr} forcibly closed the connection.")
    except Exception as e:
        print(f"An unexpected error occurred in handle_client for {addr}: {e}")
        traceback.print_exc()
    finally:
        client_socket.close()
        print(f"Connection with client {addr} closed.")


def start_server(host, port):
    """
    Starts the main server, listens for connections, and manages the process pool.
    """
    # Create a process pool with a number of workers equal to CPU cores
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind((host, port))
    server_socket.listen(5)
    print(f"Server listening on {host}:{port}")

    try:
        while True:
            client_socket, addr = server_socket.accept()
            # Handle each client in a separate thread to not block the main loop
            client_thread = threading.Thread(target=handle_client, args=(client_socket, addr, pool))
            client_thread.start()
    except KeyboardInterrupt:
        print("\nServer shutting down...")
    finally:
        server_socket.close()
        pool.close()
        pool.join()
        print("Server shutdown complete.")


if __name__ == "__main__":
    SERVER_HOST = '127.0.0.1'
    SERVER_PORT = 12345
    start_server(SERVER_HOST, SERVER_PORT)

No comments: