Wednesday, August 20, 2025

Distributed FPU Protocol with GPU acceleration - Adi Suite 0.1A

 

#For everyone watching along and enjoying the process of this suite, I envision 1D apex volumetric AI #video interpretive rendering for object and environmental near-field interaction, photorealism in games #and movies should now be a simpler task. 

# Love to all my pickaxe junker blaster nuclear gut strugglers! 

# adi-gpu_protocol.py
# This module defines a new communication protocol specifically for
# GPU-accelerated operations.

import socket
import struct
import json
import numpy as np

# --- Operation Constants ---
OPERATION_EXECUTE_GPU_KERNEL = 4
OPERATION_PYTORCH_ADDITION = 5

def send_gpu_payload(conn, operation_type, data):
    """
    Sends structured data over a socket with a focus on binary efficiency.
    
    This function packs a JSON header with metadata and a single
    binary payload containing all NumPy array data.
    
    Args:
        conn (socket.socket): The socket to send data through.
        operation_type (int): The type of operation being requested.
        data (dict): A dictionary containing 'kernel_args' and other metadata.
    """
    
    # Pack all NumPy arrays into a single binary payload
    binary_data = b''
    data_pointers = {}
    
    for key, val in data['kernel_args'].items():
        if isinstance(val, np.ndarray):
            array_bytes = val.astype(np.float32).tobytes()
            data_pointers[key] = {
                'offset': len(binary_data),
                'length': len(array_bytes),
                'dtype': 'float32'
            }
            binary_data += array_bytes
    
    # Create the JSON header with the operation type and data pointers
    json_header = {
        "operation": operation_type,
        "data_pointers": data_pointers,
        "metadata": data['metadata']
    }
    
    # Serialize the header to JSON bytes
    json_header_bytes = json.dumps(json_header).encode('utf-8')
    header_size = len(json_header_bytes)
    
    # Send the size of the header, the header itself, and the binary data
    conn.sendall(struct.pack('!I', header_size))
    conn.sendall(json_header_bytes)
    conn.sendall(binary_data)

def receive_gpu_payload(conn):
    """
    Receives structured data from a socket.
    
    Args:
        conn (socket.socket): The socket to receive data from.
        
    Returns:
        tuple: A tuple containing the JSON header (dict) and the binary
               payload (bytes), or (None, None) if the connection closes.
    """
    try:
        # First, receive the size of the JSON header
        header_size_bytes = conn.recv(4)
        if not header_size_bytes:
            return None, None
            
        header_size = struct.unpack('!I', header_size_bytes)[0]
        
        # Then, receive the JSON header itself
        json_header_bytes = b''
        while len(json_header_bytes) < header_size:
            chunk = conn.recv(header_size - len(json_header_bytes))
            if not chunk:
                return None, None
            json_header_bytes += chunk
            
        json_header = json.loads(json_header_bytes.decode('utf-8'))
        
        # Calculate total binary data length from pointers
        total_data_length = sum(ptr['length'] for ptr in json_header.get('data_pointers', {}).values())
        
        # Finally, receive the binary data
        data_bytes = b''
        while len(data_bytes) < total_data_length:
            chunk = conn.recv(total_data_length - len(data_bytes))
            if not chunk:
                return None, None
            data_bytes += chunk
            
        return json_header, data_bytes
        
    except (socket.error, struct.error, json.JSONDecodeError) as e:
        print(f"Error receiving data: {e}")
        return None, None

# adi-gpu_client.py
# A new client that sends a GPU processing request to the server.

import socket
import numpy as np
import json
import struct

# Import the new GPU protocol
from gpu_protocol import (
    send_gpu_payload,
    receive_gpu_payload,
    OPERATION_EXECUTE_GPU_KERNEL,
    OPERATION_PYTORCH_ADDITION
)

def send_pytorch_addition(client_socket):
    """
    Creates a PyTorch-based request and sends it to the server.
    """
    array_size = 1000000
    array_a = np.random.rand(array_size).astype(np.float32)
    array_b = np.random.rand(array_size).astype(np.float32)
    
    print(f"Created two arrays of size {array_size} for PyTorch GPU processing.")
    
    # Prepare the payload for the GPU
    pytorch_payload = {
        'kernel_args': {
            'array_a': array_a,
            'array_b': array_b
        },
        'metadata': {
            'kernel_name': 'pytorch_add'
        }
    }
    
    send_gpu_payload(client_socket, OPERATION_PYTORCH_ADDITION, pytorch_payload)
    print("Sent PyTorch request to server. Waiting for response...")
    
    return array_a + array_b

def send_cupy_addition(client_socket):
    """
    Creates a CuPy-based request and sends it to the server.
    """
    array_size = 1000000
    array_a = np.random.rand(array_size).astype(np.float32)
    array_b = np.random.rand(array_size).astype(np.float32)
    
    print(f"Created two arrays of size {array_size} for CuPy GPU processing.")
    
    cupy_payload = {
        'kernel_args': {
            'array_a': array_a,
            'array_b': array_b
        },
        'metadata': {
            'kernel_name': 'add_arrays'
        }
    }

    send_gpu_payload(client_socket, OPERATION_EXECUTE_GPU_KERNEL, cupy_payload)
    print("Sent CuPy request to server. Waiting for response...")

    return array_a + array_b

def run_client():
    """
    Connects to the server and sends a request for GPU kernel execution.
    """
    SERVER_HOST = '127.0.0.1'
    SERVER_PORT = 12345
    
    # You can choose which operation to run here
    # 1: CuPy Addition
    # 2: PyTorch Addition
    operation_choice = 2 
    
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
        try:
            client_socket.connect((SERVER_HOST, SERVER_PORT))
            print("Connected to server.")
            
            if operation_choice == 1:
                local_result = send_cupy_addition(client_socket)
            elif operation_choice == 2:
                local_result = send_pytorch_addition(client_socket)
            else:
                print("Invalid operation choice.")
                return

            response_header, response_bytes = receive_gpu_payload(client_socket)
            
            if response_header and 'result' in response_header['data_pointers']:
                result_info = response_header['data_pointers']['result']
                result_array = np.frombuffer(
                    response_bytes,
                    offset=result_info['offset'],
                    count=result_info['length'] // 4,
                    dtype=np.float32
                )
                print(f"\nReceived result from server. First 5 elements:")
                print(result_array[:5])
                
                is_correct = np.allclose(result_array, local_result, atol=1e-5)
                print(f"Result verified against local computation: {'Correct' if is_correct else 'Incorrect'}")
            else:
                print("Failed to receive a valid response from the server.")
                
        except ConnectionRefusedError:
            print(f"Connection refused. Is the server running on {SERVER_HOST}:{SERVER_PORT}?")
        except Exception as e:
            print(f"An error occurred: {e}")

if __name__ == "__main__":
    run_client()

# adi-gpu_server.py
# A new server designed to offload numerical computations to a GPU.
# NOTE: This requires a NVIDIA GPU and the 'cupy' and 'torch' libraries installed.
# Install with: pip install cupy-cudaXXX (e.g., cupy-cuda11x for CUDA 11)
# and: pip install torch torchvision torchaudio

import socket
import struct
import numpy as np
import json
import threading
import multiprocessing
import traceback

import cupy as cp # The GPU-accelerated library
import torch # The PyTorch library

# Import the new GPU protocol
from gpu_protocol import (
    receive_gpu_payload, 
    send_gpu_payload, 
    OPERATION_EXECUTE_GPU_KERNEL,
    OPERATION_PYTORCH_ADDITION
)

# --- GPU Operations (The "kernels" that run on the GPU) ---

def cupy_worker(data_dict):
    """
    Performs a simple element-wise addition on the GPU using CuPy.
    """
    try:
        gpu_array_a = cp.asarray(data_dict['array_a'])
        gpu_array_b = cp.asarray(data_dict['array_b'])
        
        gpu_result = gpu_array_a + gpu_array_b
        
        # Transfer the result from GPU memory back to CPU memory
        result_array = cp.asnumpy(gpu_result)
        return result_array
    except Exception as e:
        print(f"An error occurred during CuPy computation: {e}")
        return None

def pytorch_worker(data_dict):
    """
    Performs a simple element-wise addition on the GPU using PyTorch.
    """
    try:
        # Transfer NumPy arrays from CPU to GPU as PyTorch tensors
        gpu_array_a = torch.from_numpy(data_dict['array_a']).cuda()
        gpu_array_b = torch.from_numpy(data_dict['array_b']).cuda()
        
        # Perform the operation on the GPU
        gpu_result = gpu_array_a + gpu_array_b
        
        # Transfer the result back to CPU as a NumPy array
        result_array = gpu_result.cpu().numpy()
        return result_array
    except Exception as e:
        print(f"An error occurred during PyTorch computation: {e}")
        return None

# --- Server and Multiprocessing Logic ---

def worker_process(request_data):
    """
    A worker function that is executed by a process in the pool.
    It dispatches the correct GPU operation based on the client's request.
    """
    op_type = request_data["operation"]
    data_bytes = request_data["data_bytes"]
    data_pointers = request_data["data_pointers"]
    
    # Unpack the binary data into NumPy arrays, regardless of the backend
    kernel_args = {}
    for key, ptr in data_pointers.items():
        start = ptr['offset']
        end = start + ptr['length']
        kernel_args[key] = np.frombuffer(data_bytes[start:end], dtype=np.float32)

    # Dispatch to the correct GPU backend
    if op_type == OPERATION_EXECUTE_GPU_KERNEL:
        result = cupy_worker(kernel_args)
    elif op_type == OPERATION_PYTORCH_ADDITION:
        result = pytorch_worker(kernel_args)
    else:
        raise ValueError(f"Unknown operation type: {op_type}")
    
    if result is None:
        raise RuntimeError("GPU kernel execution failed.")
    
    return result

def handle_client(client_socket, addr, pool):
    """
    Handles a single client connection.
    Parses the request and submits the GPU task to the multiprocessing pool.
    """
    print(f"Accepted connection from {addr}")
    try:
        json_header, data_bytes = receive_gpu_payload(client_socket)
        if json_header is None:
            print(f"Client {addr} disconnected unexpectedly.")
            return

        print(f"Received request from {addr} for operation {json_header['operation']}")
        
        request_data = {
            "operation": json_header["operation"],
            "data_bytes": data_bytes,
            "data_pointers": json_header["data_pointers"],
            "metadata": json_header["metadata"]
        }

        # Submit the GPU task to the process pool
        result_async = pool.apply_async(worker_process, (request_data,))
        result = result_async.get() # Block until the result is available
        
        # Send the result back to the client
        send_gpu_payload(client_socket, json_header["operation"], {
            'kernel_args': {'result': result},
            'metadata': {}
        })
        print(f"Sent result back to {addr}.")

    except (ValueError, RuntimeError) as e:
        print(f"Error on server from {addr}: {e}")
        error_message = str(e).encode('utf-8')
        # Simplified error response for demonstration
        error_header = {"operation": -1, "error": str(e)}
        send_gpu_payload(client_socket, -1, {'kernel_args': {'error_message': error_message}, 'metadata': {}})
    except ConnectionResetError:
        print(f"Client {addr} forcibly closed the connection.")
    except Exception as e:
        print(f"An unexpected error occurred in handle_client for {addr}: {e}")
        traceback.print_exc()
    finally:
        client_socket.close()
        print(f"Connection with client {addr} closed.")

def start_server(host, port):
    """
    Starts the main server, listens for connections, and manages the process pool.
    """
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind((host, port))
    server_socket.listen(5)
    print(f"Server listening on {host}:{port}")

    try:
        while True:
            client_socket, addr = server_socket.accept()
            client_thread = threading.Thread(target=handle_client, args=(client_socket, addr, pool))
            client_thread.start()
    except KeyboardInterrupt:
        print("\nServer shutting down...")
    finally:
        server_socket.close()
        pool.close()
        pool.join()
        print("Server shutdown complete.")

if __name__ == "__main__":
    SERVER_HOST = '127.0.0.1'
    SERVER_PORT = 12345
    start_server(SERVER_HOST, SERVER_PORT)


Distributed FPU Protocol Communication Tools - Adi FPU Suite 0.4A - With VHDL Architecture Print!

# - Eigenvalue Packing/Unpacking for her comfort ;)

#I expect some will ask ehhh what's going on clock? No ^ just logic. Check out the VHDL Print for a #possibly near infinite virtual memory range register :-P

 # adi-comm-protocol.py
# This module defines the communication protocol and data handling functions
# for the Adi Server and its clients.

import socket
import struct
import json
import numpy as np

# --- Operation Constants (for interoperability) ---
OPERATION_INTERPOLATE_HYPERBOLIC_PARABOLIC = 1
OPERATION_INTERPOLATE_ARCSECANT_STREAM = 2
OPERATION_EIGENVALUE_PACKING = 3

# --- Helper functions for data transmission ---

def send_data(conn, operation_type, data):
    """
    Sends structured data over a socket.
    
    The function first packs the data and a header into a JSON object,
    then sends the JSON size and the JSON itself. Following the JSON, it
    sends any associated binary data (e.g., from numpy arrays).
    
    Args:
        conn (socket.socket): The socket to send data through.
        operation_type (int): The type of operation being requested.
        data (dict): A dictionary containing 'result' or other data to send.
    """
    
    # Check if a numpy array is present and needs to be serialized
    binary_data = b''
    data_pointers = {}
    
    for key, val in data.items():
        if isinstance(val, np.ndarray):
            # Convert the numpy array to bytes
            array_bytes = val.astype(np.float32).tobytes()
            data_pointers[key] = {
                'offset': len(binary_data),
                'length': len(array_bytes),
                'dtype': 'float32'
            }
            binary_data += array_bytes
            data[key] = None # Remove the array from the JSON data to prevent serialization issues
    
    # Create the JSON header with the operation type and data pointers
    json_header = {
        "operation": operation_type,
        "data": data,
        "data_pointers": data_pointers
    }
    
    # Serialize the header to JSON bytes
    json_header_bytes = json.dumps(json_header).encode('utf-8')
    header_size = len(json_header_bytes)
    
    # Send the size of the header, the header itself, and the binary data
    conn.sendall(struct.pack('!I', header_size))
    conn.sendall(json_header_bytes)
    conn.sendall(binary_data)

def receive_data(conn):
    """
    Receives structured data from a socket.
    
    Args:
        conn (socket.socket): The socket to receive data from.
        
    Returns:
        tuple: A tuple containing the JSON header (dict) and the binary
               payload (bytes), or (None, None) if the connection closes.
    """
    try:
        # First, receive the size of the JSON header
        header_size_bytes = conn.recv(4)
        if not header_size_bytes:
            return None, None
            
        header_size = struct.unpack('!I', header_size_bytes)[0]
        
        # Then, receive the JSON header itself
        json_header_bytes = b''
        while len(json_header_bytes) < header_size:
            chunk = conn.recv(header_size - len(json_header_bytes))
            if not chunk:
                return None, None
            json_header_bytes += chunk
            
        json_header = json.loads(json_header_bytes.decode('utf-8'))
        
        # Calculate total binary data length from pointers
        total_data_length = sum(ptr['length'] for ptr in json_header.get('data_pointers', {}).values())
        
        # Finally, receive the binary data
        data_bytes = b''
        while len(data_bytes) < total_data_length:
            chunk = conn.recv(total_data_length - len(data_bytes))
            if not chunk:
                return None, None
            data_bytes += chunk
            
        return json_header, data_bytes
        
    except (socket.error, struct.error, json.JSONDecodeError) as e:
        print(f"Error receiving data: {e}")
        return None, None

# --- New Functions for Eigenvalue Packing ---

def pack_matrix_to_eigen(matrix: np.ndarray):
    """
    Performs eigendecomposition and packs the eigenvalues and eigenvectors.
    
    Args:
        matrix (np.ndarray): The square matrix to be packed.
        
    Returns:
        tuple: A tuple containing the JSON header and a binary payload.
    """
    if matrix.shape[0] != matrix.shape[1]:
        raise ValueError("Matrix must be square for eigendecomposition.")
        
    # Perform eigendecomposition
    eigenvalues, eigenvectors = np.linalg.eig(matrix)
    
    # The client will use this to reconstruct the matrix
    json_header = {
        'operation': OPERATION_EIGENVALUE_PACKING,
        'data_pointers': {
            'eigenvalues': {'offset': 0, 'length': len(eigenvalues) * 4, 'dtype': 'float32'},
            'eigenvectors': {'offset': len(eigenvalues) * 4, 'length': len(eigenvectors.flatten()) * 4, 'dtype': 'float32'},
        },
        'metadata': {
            'matrix_shape': matrix.shape
        }
    }
    
    # Concatenate the binary data
    binary_payload = eigenvalues.astype(np.float32).tobytes() + eigenvectors.astype(np.float32).tobytes()
    
    return json_header, binary_payload
    
def reconstruct_matrix(data_bytes, data_pointers, matrix_shape):
    """
    Reconstructs the original matrix from packed eigenvalues and eigenvectors.
    
    Args:
        data_bytes (bytes): The binary payload.
        data_pointers (dict): Dictionary with offsets and lengths.
        matrix_shape (tuple): The original shape of the matrix.
        
    Returns:
        np.ndarray: The reconstructed matrix.
    """
    eigenvalues_info = data_pointers['eigenvalues']
    eigenvectors_info = data_pointers['eigenvectors']
    
    # Unpack the binary data
    eigenvalues = np.frombuffer(
        data_bytes,
        offset=eigenvalues_info['offset'],
        count=eigenvalues_info['length'] // 4,
        dtype=np.float32
    )
    eigenvectors = np.frombuffer(
        data_bytes,
        offset=eigenvectors_info['offset'],
        count=eigenvectors_info['length'] // 4,
        dtype=np.float32
    ).reshape(matrix_shape)
    
    # Reconstruct the matrix: A = P * D * P_inverse
    # Where P is the matrix of eigenvectors and D is the diagonal matrix of eigenvalues
    D = np.diag(eigenvalues)
    P = eigenvectors
    P_inv = np.linalg.inv(P)
    
    reconstructed_matrix = P @ D @ P_inv
    
    # The result may have very small imaginary components due to floating point inaccuracies,
    # so we'll just return the real part.
    return np.real(reconstructed_matrix)

# adi_comm-client.py
# A sample client that sends a matrix to the server using eigenvalue packing.

import socket
import numpy as np
import json
import struct

# Import the new protocol for interoperability
from protocol import (
    send_data, 
    receive_data,
    pack_matrix_to_eigen,
    OPERATION_EIGENVALUE_PACKING
)

def run_client():
    """
    Connects to the server, packs a sample matrix, sends it, and receives the result.
    """
    SERVER_HOST = '127.0.0.1'
    SERVER_PORT = 12345
    
    # Create a sample square matrix to be packed
    # Make it a bit large to show the value of packing
    sample_matrix = np.array([
        [1.0, 2.0, 3.0, 4.0],
        [5.0, 6.0, 7.0, 8.0],
        [9.0, 10.0, 11.0, 12.0],
        [13.0, 14.0, 15.0, 16.0]
    ], dtype=np.float32)
    
    print("Original Matrix:")
    print(sample_matrix)
    
    # Pack the matrix using the new protocol function
    packed_header, packed_payload = pack_matrix_to_eigen(sample_matrix)
    
    # Connect to the server
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
        try:
            client_socket.connect((SERVER_HOST, SERVER_PORT))
            print("Connected to server.")
            
            # Send the packed data to the server
            send_data(client_socket, packed_header['operation'], {
                'eigenvalues': np.frombuffer(packed_payload, dtype=np.float32, count=len(packed_header['data_pointers']['eigenvalues']) // 4),
                'eigenvectors': np.frombuffer(packed_payload, dtype=np.float32, offset=len(packed_header['data_pointers']['eigenvalues']) // 4),
            })
            
            # Receive the response from the server
            response_header, response_bytes = receive_data(client_socket)
            
            if response_header and 'result' in response_header['data']:
                result_from_server = np.frombuffer(response_bytes, dtype=np.float32)
                print(f"\nReceived result from server: {result_from_server}")
            else:
                print("Failed to receive a valid response.")
                
        except ConnectionRefusedError:
            print(f"Connection refused. Is the server running on {SERVER_HOST}:{SERVER_PORT}?")
        except Exception as e:
            print(f"An error occurred: {e}")

if __name__ == "__main__":
    run_client()

# adi_comm-server.py
# This new, unified server handles all FPU operations from both
# n-math.py and e-stream.py using a single, efficient process pool.

import socket
import struct
import numpy as np
import json
import threading
import multiprocessing
import traceback

# Import the new protocol for interoperability
from protocol import (
    receive_data, 
    send_data, 
    OPERATION_INTERPOLATE_HYPERBOLIC_PARABOLIC, 
    OPERATION_INTERPOLATE_ARCSECANT_STREAM,
    OPERATION_EIGENVALUE_PACKING,
    reconstruct_matrix # The new function to unpack the data
)

# --- FPU Operations (Consolidated from n-math.py and e-stream.py) ---
# These functions are now standalone and can be executed by worker processes.

def hyperbolic_parabolic_interpolation_nd_revised(data_bytes, data_pointers):
    """
    Performs hyperbolic-parabolic interpolation on n-dimensional data.
    """
    all_fy_data = []
    all_fx_data = []
    x_interp = None

    # Unpack the binary data based on the JSON data_pointers
    for key, ptr in data_pointers.items():
        start = ptr['offset']
        end = start + ptr['length']
        unpacked_data = np.frombuffer(data_bytes[start:end], dtype=np.float32)

        if key.startswith('fx'):
            all_fx_data.append(unpacked_data)
        elif key.startswith('fy'):
            all_fy_data.append(unpacked_data)
        elif key == 'x_interp':
            x_interp = unpacked_data

    if len(all_fx_data) != len(all_fy_data) or not x_interp:
        raise ValueError("Invalid data for interpolation.")

    all_interp_y = []
    num_dimensions = len(all_fy_data)

    for fx, fy in zip(all_fx_data, all_fy_data):
        if len(fx) != len(fy) or len(fx) < 3:
            raise ValueError("X and Y data must have equal length and at least three points.")

        interp_y = []
        for x in x_interp:
            distances = np.abs(fx - x)
            nearest_indices = np.argsort(distances)[:3]
            x1, x2, x3 = fx[nearest_indices]
            y1, y2, y3 = fy[nearest_indices]

            # Simplified interpolation logic for demonstration
            if x1 == x2 or x2 == x3 or x1 == x3:
                raise RuntimeError("Collinear points detected, cannot interpolate.")

            # Parabolic interpolation (y = ax^2 + bx + c)
            # Using matrix inversion for a more robust solution
            A = np.array([
                [x1**2, x1, 1],
                [x2**2, x2, 1],
                [x3**2, x3, 1]
            ])
            b = np.array([y1, y2, y3])
            try:
                a, b_lin, c = np.linalg.solve(A, b)
                y_interp = a * x**2 + b_lin * x + c
            except np.linalg.LinAlgError:
                raise RuntimeError("Could not solve parabolic interpolation matrix.")

            interp_y.append(y_interp)
        all_interp_y.append(np.array(interp_y))

    return np.concatenate(all_interp_y)


def pseudo_interpolate_arcsecant_stream(data_bytes, data_pointers):
    """
    Pseudo-interpolates a chunk of interpolation points using pre-existing data.
    This now works as a single, callable function within the service.
    """
    x_data = np.frombuffer(data_bytes[data_pointers['x_data']['offset'] : data_pointers['x_data']['offset'] + data_pointers['x_data']['length']], dtype=np.float32)
    y_data = np.frombuffer(data_bytes[data_pointers['y_data']['offset'] : data_pointers['y_data']['offset'] + data_pointers['y_data']['length']], dtype=np.float32)
    x_interp_chunk = np.frombuffer(data_bytes[data_pointers['x_interp_chunk']['offset'] : data_pointers['x_interp_chunk']['offset'] + data_pointers['x_interp_chunk']['length']], dtype=np.float32)

    def _calculate_arcsecant(val):
        if np.abs(val) < 1:
            return np.nan
        return np.arccos(1 / val)

    interpolated_y = []
    for x in x_interp_chunk:
        # Simplified logic: find nearest point and apply a transformation
        nearest_idx = np.argmin(np.abs(x_data - x))
        y_val = y_data[nearest_idx]
        interpolated_y.append(_calculate_arcsecant(y_val))

    return np.array(interpolated_y, dtype=np.float32)


# --- Server and Multiprocessing Logic ---

def worker_process(request_data):
    """
    A worker function that is executed by a process in the pool.
    It takes a request dictionary and dispatches the correct FPU operation.
    """
    op_type = request_data["operation"]
    data_bytes = request_data["data_bytes"]
    data_pointers = request_data["data_pointers"]
    
    if op_type == OPERATION_INTERPOLATE_HYPERBOLIC_PARABOLIC:
        result = hyperbolic_parabolic_interpolation_nd_revised(data_bytes, data_pointers)
    elif op_type == OPERATION_INTERPOLATE_ARCSECANT_STREAM:
        result = pseudo_interpolate_arcsecant_stream(data_bytes, data_pointers)
    elif op_type == OPERATION_EIGENVALUE_PACKING:
        # Reconstruct the matrix from the packed data
        matrix_shape = request_data["metadata"]["matrix_shape"]
        reconstructed_matrix = reconstruct_matrix(data_bytes, data_pointers, matrix_shape)
        
        # Perform a sample calculation on the reconstructed matrix
        trace_value = np.trace(reconstructed_matrix)
        print(f"Reconstructed Matrix from packed data:\n{reconstructed_matrix}")
        print(f"Calculated Trace: {trace_value}")
        result = np.array([trace_value], dtype=np.float32)
    else:
        raise ValueError(f"Unknown operation type: {op_type}")
    
    return result


def handle_client(client_socket, addr, pool):
    """
    Handles a single client connection.
    Parses the request and submits the FPU task to the multiprocessing pool.
    """
    print(f"Accepted connection from {addr}")
    try:
        json_header, data_bytes = receive_data(client_socket)
        if json_header is None:
            print(f"Client {addr} disconnected unexpectedly.")
            return

        print(f"Received request from {addr} for operation {json_header['operation']}")
        
        request_data = {
            "operation": json_header["operation"],
            "data_bytes": data_bytes,
            "data_pointers": json_header["data_pointers"]
        }
        # Add metadata if present
        if 'metadata' in json_header:
            request_data['metadata'] = json_header['metadata']

        # Submit the FPU task to the process pool
        result_async = pool.apply_async(worker_process, (request_data,))
        result = result_async.get()  # Block until the result is available
        
        # Send the result back to the client
        send_data(client_socket, json_header["operation"], {"result": result})
        print(f"Sent result back to {addr}.")

    except (ValueError, RuntimeError) as e:
        print(f"ValueError or RuntimeError on server from {addr}: {e}")
        error_message = str(e).encode('utf-8')
        client_socket.sendall(struct.pack('!I', len(error_message)))
        client_socket.sendall(error_message)
    except ConnectionResetError:
        print(f"Client {addr} forcibly closed the connection.")
    except Exception as e:
        print(f"An unexpected error occurred in handle_client for {addr}: {e}")
        traceback.print_exc()
    finally:
        client_socket.close()
        print(f"Connection with client {addr} closed.")


def start_server(host, port):
    """
    Starts the main server, listens for connections, and manages the process pool.
    """
    # Create a process pool with a number of workers equal to CPU cores
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind((host, port))
    server_socket.listen(5)
    print(f"Server listening on {host}:{port}")

    try:
        while True:
            client_socket, addr = server_socket.accept()
            # Handle each client in a separate thread to not block the main loop
            client_thread = threading.Thread(target=handle_client, args=(client_socket, addr, pool))
            client_thread.start()
    except KeyboardInterrupt:
        print("\nServer shutting down...")
    finally:
        server_socket.close()
        pool.close()
        pool.join()
        print("Server shutdown complete.")


if __name__ == "__main__":
    SERVER_HOST = '127.0.0.1'
    SERVER_PORT = 12345
    start_server(SERVER_HOST, SERVER_PORT)

-- VHDL Architecture for a custom Eigenvalue Packing Bridge
-- This design is refit to the Python client's JSON-based protocol.
-- This version includes a conceptual framework for virtual memory addressing.

library ieee;
use ieee.std_logic_1164.all;
use ieee.numeric_std.all;

----------------------------------------------------------------------
-- Bridge_Core Entity Declaration
-- This defines the external ports of our FPGA module.
----------------------------------------------------------------------
entity Eigen_Bridge is
    port (
        -- Clock and Reset signals
        clk_i           : in    std_logic;                                -- Main clock input
        rst_i           : in    std_logic;                                -- Synchronous reset input

        -- External Parallel Bus Interface (Data, Write Enable, Ready)
        data_in_i       : in    std_logic_vector(31 downto 0);            -- 32-bit data input from host
        write_enable_i  : in    std_logic;                                -- Write enable strobe from host
        read_enable_i   : in    std_logic;                                -- Read enable strobe from host
        ready_o         : out   std_logic;                                -- Ready signal to host

        -- Data output back to host
        data_out_o      : out   std_logic_vector(31 downto 0);            -- 32-bit data output to host
        
        -- Status and Control
        error_o         : out   std_logic;                                 -- Error flag to host
        
        -- Conceptual Virtual Memory Interface
        virtual_address_i : in    std_logic_vector(31 downto 0);          -- Conceptual virtual address
        physical_address_o: out   std_logic_vector(31 downto 0);          -- Conceptual physical address
        page_fault_o      : out   std_logic                               -- Conceptual page fault signal
    );
end Eigen_Bridge;

----------------------------------------------------------------------
-- Eigen_Bridge Architecture
-- This is the behavioral description of the bridge's logic, refit
-- to the Python client's protocol for eigenvalue packing.
----------------------------------------------------------------------
architecture Behavioral of Eigen_Bridge is

    -- Internal component for the eigenvalue calculation core
    component Eigenvalue_Core is
        port (
            -- Clock and Reset
            clk_i           : in  std_logic;
            rst_i           : in  std_logic;

            -- Control signals for data handshake
            start_i         : in  std_logic;                      -- Signal to start a new calculation
            done_o          : out std_logic;                      -- Indicates when the calculation is complete
            busy_o          : out std_logic;                      -- Indicates the core is busy
            
            -- Pointers to input data buffers
            eigenvalues_ptr_i: in std_logic_vector(31 downto 0);
            eigenvectors_ptr_i: in std_logic_vector(31 downto 0);
            matrix_size_i    : in std_logic_vector(31 downto 0);
            
            -- Output result
            result_o        : out std_logic_vector(31 downto 0);  -- Result of the calculation
            result_len_o    : out std_logic_vector(31 downto 0)
        );
    end component;

    -- State machine for controlling the data flow
    type state_type is (IDLE, 
                        RECEIVE_JSON_HEADER_SIZE, 
                        RECEIVE_JSON_HEADER, 
                        RECEIVE_EIGENVALUES, 
                        RECEIVE_EIGENVECTORS, 
                        VIRTUAL_TO_PHYSICAL_TRANSLATION, -- New state for conceptual MMU
                        CALCULATE, 
                        SEND_RESULT_LEN, 
                        SEND_RESULT_DATA, 
                        ERROR);
    signal state : state_type := IDLE;

    -- Internal signals for communication between bridge and core
    signal start_calc_s     : std_logic := '0';
    signal core_done_s      : std_logic := '0';
    signal core_busy_s      : std_logic := '0';
    signal core_result_s    : std_logic_vector(31 downto 0);
    signal core_result_len_s: std_logic_vector(31 downto 0);
    
    -- Conceptual Dual-Port RAM for data buffering
    type ram_type is array (0 to 65535) of std_logic_vector(31 downto 0);
    signal data_ram : ram_type;
    signal write_address: integer range 0 to 65535 := 0;
    signal read_address: integer range 0 to 65535 := 0;

    -- Signals to hold data from the JSON header
    signal json_header_size_s : std_logic_vector(31 downto 0);
    signal eigenvalues_len_s  : std_logic_vector(31 downto 0);
    signal eigenvectors_len_s : std_logic_vector(31 downto 0);
    signal matrix_size_s      : std_logic_vector(31 downto 0);
    signal data_offset_s      : std_logic_vector(31 downto 0);

    -- Operation code from the Python protocol
    signal op_code_expected : std_logic_vector(31 downto 0) := x"00000003";

    -- Counters for data reception and transmission
    signal write_counter    : integer range 0 to 65536 := 0;
    signal read_counter     : integer range 0 to 65536 := 0;
    
    -- Conceptual virtual memory signals
    signal virtual_address_s  : std_logic_vector(31 downto 0);
    signal physical_address_s : std_logic_vector(31 downto 0);
    signal page_fault_s       : std_logic;
    
begin

    -- Instantiate the Eigenvalue_Core component
    Core_Instance: Eigenvalue_Core
        port map (
            clk_i           => clk_i,
            rst_i           => rst_i,
            start_i         => start_calc_s,
            done_o          => core_done_s,
            busy_o          => core_busy_s,
            eigenvalues_ptr_i=> (others => '0'), -- Conceptual pointers
            eigenvectors_ptr_i=> (others => '0'), -- to the data_ram
            matrix_size_i   => matrix_size_s,
            result_o        => core_result_s,
            result_len_o    => core_result_len_s
        );
        
    -- Conceptual Memory Management Unit (MMU) process
    -- This process conceptually translates virtual addresses to physical addresses.
    -- In a real design, this would be a much more complex block with page tables.
    process(clk_i, rst_i)
    begin
        if rst_i = '1' then
            physical_address_s <= (others => '0');
            page_fault_s <= '0';
        elsif rising_edge(clk_i) then
            -- Check if a virtual address needs translation (e.g., in a new state)
            if state = VIRTUAL_TO_PHYSICAL_TRANSLATION then
                -- For this conceptual model, we will simply use the lower bits
                -- of the virtual address as the physical address.
                -- This represents a direct mapping without a page table.
                physical_address_s <= virtual_address_s;
                
                -- Check for a conceptual page fault.
                -- A real page fault would be if the address is not in memory.
                -- Here, we'll just check if the address is out of our physical range.
                if unsigned(virtual_address_s) > 65535 then
                    page_fault_s <= '1';
                else
                    page_fault_s <= '0';
                end if;
            else
                page_fault_s <= '0';
            end if;
        end if;
    end process;
    
    -- Connect conceptual signals to ports
    physical_address_o <= physical_address_s;
    page_fault_o <= page_fault_s;

    -- Main State Machine Process
    process(clk_i, rst_i)
    begin
        if rst_i = '1' then
            state <= IDLE;
            write_counter <= 0;
            read_counter <= 0;
            write_address <= 0;
            read_address <= 0;
            start_calc_s <= '0';
            ready_o <= '0';
            data_out_o <= (others => '0');
            error_o <= '0';
        elsif rising_edge(clk_i) then
            -- Default assignments
            start_calc_s <= '0';
            ready_o <= '0';
            
            case state is
                -- State 1: Awaiting a new request
                when IDLE =>
                    ready_o <= '1';
                    error_o <= '0';
                    write_address <= 0;
                    
                    if write_enable_i = '1' then
                        state <= RECEIVE_JSON_HEADER_SIZE;
                    end if;
                
                -- State 2: Receiving the JSON header size (4 bytes)
                when RECEIVE_JSON_HEADER_SIZE =>
                    ready_o <= '1';
                    if write_enable_i = '1' then
                        json_header_size_s <= data_in_i;
                        write_counter <= 0;
                        state <= RECEIVE_JSON_HEADER;
                    end if;
                
                -- State 3: Receiving the fixed-size JSON header (conceptualized)
                when RECEIVE_JSON_HEADER =>
                    ready_o <= '1';
                    if write_enable_i = '1' then
                        -- Conceptual parsing of a fixed-size header
                        if write_counter = 0 then
                            -- First word: check for operation code
                            if data_in_i = op_code_expected then
                                -- Do nothing, proceed to next word
                            else
                                state <= ERROR;
                            end if;
                        elsif write_counter = 1 then
                            eigenvalues_len_s <= data_in_i;
                        elsif write_counter = 2 then
                            eigenvectors_len_s <= data_in_i;
                        elsif write_counter = 3 then
                            matrix_size_s <= data_in_i;
                        end if;
                        
                        write_counter <= write_counter + 1;
                        -- Assuming a fixed header size of 4 words (16 bytes)
                        if write_counter = 3 then
                            data_offset_s <= x"00000004"; -- The binary payload starts after the header
                            write_address <= 4;
                            write_counter <= 0;
                            state <= RECEIVE_EIGENVALUES;
                        end if;
                    end if;
                    
                -- State 4: Receiving the eigenvalues binary data
                when RECEIVE_EIGENVALUES =>
                    ready_o <= '1';
                    if write_enable_i = '1' then
                        data_ram(write_address) <= data_in_i;
                        write_address <= write_address + 1;
                        write_counter <= write_counter + 1;
                        if write_counter = to_integer(unsigned(eigenvalues_len_s)) / 4 then
                            write_counter <= 0;
                            state <= RECEIVE_EIGENVECTORS;
                        end if;
                    end if;
                
                -- State 5: Receiving the eigenvectors binary data
                when RECEIVE_EIGENVECTORS =>
                    ready_o <= '1';
                    if write_enable_i = '1' then
                        data_ram(write_address) <= data_in_i;
                        write_address <= write_address + 1;
                        write_counter <= write_counter + 1;
                        if write_counter = to_integer(unsigned(eigenvectors_len_s)) / 4 then
                            state <= CALCULATE;
                        end if;
                    end if;
                    
                -- State 6: Triggering the calculation core
                when CALCULATE =>
                    -- In a real system, the core would read from the physical addresses
                    -- mapped by the conceptual MMU.
                    start_calc_s <= '1'; -- Signal to the core to start
                    ready_o <= '0';
                    read_address <= 0; -- Reset read address for output
                    
                    if core_done_s = '1' then
                        state <= SEND_RESULT_LEN;
                    end if;

                -- State 7: Sending the result length back to the host
                when SEND_RESULT_LEN =>
                    ready_o <= '1';
                    data_out_o <= core_result_len_s;
                    
                    if read_enable_i = '1' then
                        state <= SEND_RESULT_DATA;
                        read_counter <= 0;
                    end if;
                
                -- State 8: Sending the result data back to the host
                when SEND_RESULT_DATA =>
                    ready_o <= '1';
                    data_out_o <= core_result_s;
                    
                    if read_enable_i = '1' then
                        read_counter <= read_counter + 1;
                        if read_counter = to_integer(unsigned(core_result_len_s)) - 1 then
                            state <= IDLE;
                        end if;
                    end if;
                
                -- State 9: Error state
                when ERROR =>
                    error_o <= '1';
                    ready_o <= '0';
                    if rst_i = '1' then
                        state <= IDLE;
                    end if;
            end case;
        end if;
    end process;

end Behavioral;

Distributed FPU Protocol - Adi FPU Suite 0.11A

# adi-protocol.py
# A unified protocol for the FPU distributed service.
# All clients in primary blogpost n-math.py (Python, Assembly, RISC-V) and the server must adhere to this.
import struct
import numpy as np

# --- Common Constants ---
# Operations for the FPU service.
# The server will process a single or a sequence of these.
OPERATION_INTERPOLATE_HYPERBOLIC_PARABOLIC = 0
OPERATION_INTERPOLATE_ARCSECANT_STREAM = 1
OPERATION_DIFFERENTIATE = 2
OPERATION_CALCULATE_GRADIENT_1D = 3
OPERATION_HYPERBOLIC_INTERCEPT_HANDLER = 4
OPERATION_INTEGRATE = 5
OPERATION_INTEGRATE_ND = 6

# --- Payload Structures ---
# All communication uses a JSON header followed by a binary payload.

def send_data(sock, op_type: int, data_dict: dict):
    """
    Constructs and sends a structured request to the server.
    
    The request format is:
    [4-byte JSON header length]
    [JSON header]
    [variable-length binary data payload]

    Args:
        sock (socket): The connected socket.
        op_type (int): The operation type.
        data_dict (dict): A dictionary containing all data to be sent.
                          Keys correspond to data_pointers in the JSON header.
    """
    json_header = {
        "operation": op_type,
        "data_pointers": {} # Maps data key to (length_bytes, offset_bytes)
    }

    binary_payload = b''
    offset = 0

    for key, data_array in data_dict.items():
        # Ensure data is in np.float32 for consistent binary representation
        if not isinstance(data_array, np.ndarray):
            data_array = np.array(data_array, dtype=np.float32)

        data_bytes = data_array.astype(np.float32).tobytes()
        length = len(data_bytes)
        
        json_header["data_pointers"][key] = {
            "length": length,
            "offset": offset
        }
        
        binary_payload += data_bytes
        offset += length

    json_header_bytes = json.dumps(json_header).encode('utf-8')
    header_length = len(json_header_bytes)
    
    sock.sendall(struct.pack('!I', header_length))
    sock.sendall(json_header_bytes)
    sock.sendall(binary_payload)


def receive_data(sock):
    """
    Receives and parses a structured response from the server.
    
    Args:
        sock (socket): The connected socket.

    Returns:
        tuple: (json_header, binary_payload)
    """
    # Receive header length
    header_len_bytes = _recvall(sock, 4)
    if not header_len_bytes:
        return None, None
    header_length = struct.unpack('!I', header_len_bytes)[0]
    
    # Receive JSON header
    json_header_bytes = _recvall(sock, header_length)
    if not json_header_bytes:
        return None, None
    json_header = json.loads(json_header_bytes.decode('utf-8'))
    
    # Receive binary payload
    payload_length = sum(p['length'] for p in json_header.get('data_pointers', {}).values())
    binary_payload = _recvall(sock, payload_length)

    return json_header, binary_payload


def _recvall(sock, n):
    """Helper function to reliably receive exactly 'n' bytes from a socket."""
    data = b''
    while len(data) < n:
        packet = sock.recv(n - len(data))
        if not packet:
            return None
        data += packet
    return data

# adi_client.py
# A consolidated client that can operate and multiplex FPU operands,
# including N-dimensional operations and specialized eigenvalue packing.

import socket
import numpy as np
import json
import time
import struct
from typing import Dict, Any, List

# --- Constants (Must match server) ---
OPERATION_INTERPOLATE = 0
OPERATION_DIFFERENTIATE = 1
OPERATION_CALCULATE_GRADIENT_1D = 2
OPERATION_HYPERBOLIC_INTERCEPT_HANDLER = 3
OPERATION_INTEGRATE = 4
OPERATION_INTEGRATE_ND = 5
OPERATION_WORKFLOW = 6 # For relational compositions
OPERATION_INTERPOLATE_EIGENVALUE = 7 # Hypothetical operation for eigenvalue packing

SERVER_HOST = '127.0.0.1'
SERVER_PORT = 12345

# --- Helper Functions ---
def _sendall_data(sock, data_array):
    """Helper to send a numpy array's bytes."""
    data_bytes = data_array.astype(np.float32).tobytes()
    sock.sendall(struct.pack('!I', len(data_bytes)))
    sock.sendall(data_bytes)

def _recvall(sock, n):
    """
    Helper function to reliably receive exactly 'n' bytes from a socket.
    """
    data = b''
    while len(data) < n:
        packet = sock.recv(n - len(data))
        if not packet:
            return None
        data += packet
    return data

def _recvall_data(sock):
    """Helper to receive a numpy array's bytes or an error message."""
    data_len_bytes = _recvall(sock, 4)
    if data_len_bytes is None:
        raise ConnectionResetError("Incomplete data (length) received from server.")
    data_len = struct.unpack('!I', data_len_bytes)[0]
    data_bytes = _recvall(sock, data_len)
    if data_bytes is None:
        raise ConnectionResetError("Incomplete data (content) received from server.")
    
    try:
        error_message = data_bytes.decode('utf-8')
        if "Error" in error_message or "Server internal error" in error_message:
            raise RuntimeError(f"Server returned an error: {error_message}")
    except UnicodeDecodeError:
        pass
    
    return np.array(struct.unpack(f'!{data_len // 4}f', data_bytes), dtype=np.float32)

def _pack_eigenvalue_data(eigenvalue_array: np.ndarray) -> np.ndarray:
    """
    Compresses eigenvalue data using a hybrid packing scheme.
    - Values with |x| >= 1 are compressed using the arcsecant mapping.
    - Values with |x| < 1 are mapped linearly to a distinct range.
    """
    packed_data = np.zeros_like(eigenvalue_array)

    # Find indices for two packing methods
    arcsec_indices = np.where(np.abs(eigenvalue_array) >= 1)
    linear_indices = np.where(np.abs(eigenvalue_array) < 1)

    # Pack data for |x| >= 1 using arcsecant
    if arcsec_indices[0].size > 0:
        packed_data[arcsec_indices] = np.arccos(1 / eigenvalue_array[arcsec_indices])

    # Pack data for |x| < 1 using a linear mapping to a distinct range
    if linear_indices[0].size > 0:
        # Map values from (-1, 1) to (pi, 2*pi).
        # Linear function: y = mx + c. Map -1 -> pi and 1 -> 2*pi
        # m = (2*pi - pi) / (1 - (-1)) = pi / 2
        # c = pi - m*(-1) = pi + pi/2 = 1.5*pi
        packed_data[linear_indices] = (np.pi/2) * eigenvalue_array[linear_indices] + (1.5 * np.pi)

    return packed_data.astype(np.float32)

def _unpack_eigenvalue_data(packed_array: np.ndarray) -> np.ndarray:
    """
    Decompresses packed data back into original eigenvalues.
    It uses the value's range to determine the correct inverse operation.
    """
    unpacked_data = np.zeros_like(packed_array)
    
    # Identify which unpacking method to use based on the value's range
    arcsec_indices = np.where(packed_array <= np.pi)
    linear_indices = np.where(packed_array > np.pi)

    # Unpack data from the arcsecant range [0, pi]
    if arcsec_indices[0].size > 0:
        # Inverse operation: 1 / cos(y)
        unpacked_data[arcsec_indices] = 1 / np.cos(packed_array[arcsec_indices])

    # Unpack data from the linear range (pi, 2*pi]
    if linear_indices[0].size > 0:
        # Inverse linear function: x = (y - c) / m
        # x = (y - 1.5*pi) / (pi/2)
        unpacked_data[linear_indices] = (packed_array[linear_indices] - (1.5 * np.pi)) / (np.pi / 2)
    
    return unpacked_data.astype(np.float32)

def multiplexed_execute_workflow(workflow_steps: List[Dict[str, Any]]):
    """
    Executes a sequence of operations on the server as a single workflow,
    handling different data types based on the operation.
    """
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        client_socket.connect((SERVER_HOST, SERVER_PORT))
        print(f"Connected to server at {SERVER_HOST}:{SERVER_PORT}")

        # 1. Send OPERATION_WORKFLOW code
        client_socket.sendall(struct.pack('!B', OPERATION_WORKFLOW))

        # 2. Check for operations that require special handling (e.g., eigenvalue packing)
        requires_packing = any(step.get("operation_type") == "INTERPOLATE_EIGENVALUE" for step in workflow_steps)

        modified_workflow = []
        if requires_packing:
            for step in workflow_steps:
                new_step = step.copy()
                if new_step.get("operation_type") == "INTERPOLATE_EIGENVALUE" and new_step["input_data"]["type"] == "direct":
                    input_data = new_step["input_data"]
                    if "eigenvalue_data" in input_data:
                        packed_data = _pack_eigenvalue_data(np.array(input_data["eigenvalue_data"], dtype=np.float32))
                        input_data["eigenvalue_data"] = packed_data.tolist()
                modified_workflow.append(new_step)
        else:
            modified_workflow = workflow_steps
        
        # 3. Serialize workflow steps to JSON
        workflow_json = json.dumps(modified_workflow)
        workflow_bytes = workflow_json.encode('utf-8')

        # 4. Send workflow length and then workflow bytes
        client_socket.sendall(struct.pack('!I', len(workflow_bytes)))
        client_socket.sendall(workflow_bytes)

        print("Workflow sent to server. Waiting for result...")

        # 5. Receive the final result from the server
        result = _recvall_data(client_socket)

        # 6. Unpack the result if it was a packed operation
        if requires_packing and "output_id" in workflow_steps[-1] and workflow_steps[-1]["output_id"] == "eigenvalue_result":
             result = _unpack_eigenvalue_data(result)

        return result

    except Exception as e:
        print(f"Client error during workflow execution: {e}")
        return None
    finally:
        client_socket.close()
        print("Connection closed.")

# --- Main program demonstrating usage ---
if __name__ == "__main__":
    # --- Example 1: N-dimensional operation (from adi-ndim.py) ---
    print("\n--- Testing N-dimensional workflow: Interpolate then Differentiate ---")
    initial_fx_data_workflow = [np.array([1.0, 2.0, 3.0, 4.0, 5.0], dtype=np.float32)]
    initial_fy_data_workflow = [np.array([10.0, 12.0, 15.0, 19.0, 25.0], dtype=np.float32)]
    initial_fz_data_workflow = [np.array([20.0, 21.0, 23.0, 26.0, 30.0], dtype=np.float32)]
    common_eval_points_workflow = np.array([1.5, 2.5, 3.5, 4.5], dtype=np.float32)

    workflow_interpolate_then_differentiate = [
        {
            "operation_type": "INTERPOLATE",
            "input_data": {
                "type": "direct", 
                "fx_data": [arr.tolist() for arr in initial_fx_data_workflow],
                "fy_data": [arr.tolist() for arr in initial_fy_data_workflow],
                "fz_data": [arr.tolist() for arr in initial_fz_data_workflow]
            },
            "parameters": {
                "x_interp_points": common_eval_points_workflow.tolist()
            },
            "output_id": "interpolated_data"
        },
        {
            "operation_type": "DIFFERENTIATE",
            "input_data": {
                "type": "reference",
                "source_id": "interpolated_data"
            },
            "parameters": {
                "x_eval_points": common_eval_points_workflow.tolist()
            }
        }
    ]
    result_ndim = multiplexed_execute_workflow(workflow_interpolate_then_differentiate)
    if result_ndim is not None:
        num_results_per_curve_eval = len(common_eval_points_workflow)
        num_curves_workflow = len(initial_fx_data_workflow)
        y_derivatives_workflow = result_ndim[ : num_results_per_curve_eval * num_curves_workflow]
        z_derivatives_workflow = result_ndim[num_results_per_curve_eval * num_curves_workflow : ]
        print(f"Workflow result (Y derivatives): {y_derivatives_workflow}")
        print(f"Workflow result (Z derivatives): {z_derivatives_workflow}")

    time.sleep(1)

    # --- Example 2: Eigenvalue packing/unpacking operation (from adi-eigen.py) ---
    print("\n--- Testing eigenvalue packing and unpacking ---")
    eigenvalues_to_pack = np.array([2.5, 10.0, 100.0, 0.5, -0.75, 500.0, -2.5, -100.0], dtype=np.float32)
    workflow_eigenvalue = [
        {
            "operation_type": "INTERPOLATE_EIGENVALUE",
            "input_data": {
                "type": "direct", 
                "eigenvalue_data": eigenvalues_to_pack.tolist()
            },
            "output_id": "eigenvalue_result"
        }
    ]
    result_eigen = multiplexed_execute_workflow(workflow_eigenvalue)
    if result_eigen is not None:
        print(f"Original Eigenvalues: {eigenvalues_to_pack}")
        print(f"Unpacked Result: {result_eigen}")
        print(f"Are the results close? {np.allclose(eigenvalues_to_pack, result_eigen)}")

# adi_server.py
# This new, unified server handles all FPU operations from both
# n-math.py and e-stream.py using a single, efficient process pool.

import socket
import struct
import numpy as np
import json
import threading
import multiprocessing
import traceback

# Import the new protocol for interoperability
from protocol import receive_data, send_data, OPERATION_INTERPOLATE_HYPERBOLIC_PARABOLIC, OPERATION_INTERPOLATE_ARCSECANT_STREAM

# --- FPU Operations (Consolidated from n-math.py and e-stream.py) ---
# These functions are now standalone and can be executed by worker processes.

def hyperbolic_parabolic_interpolation_nd_revised(data_bytes, data_pointers):
    """
    Performs hyperbolic-parabolic interpolation on n-dimensional data.
    """
    all_fy_data = []
    all_fx_data = []
    x_interp = None

    # Unpack the binary data based on the JSON data_pointers
    for key, ptr in data_pointers.items():
        start = ptr['offset']
        end = start + ptr['length']
        unpacked_data = np.frombuffer(data_bytes[start:end], dtype=np.float32)

        if key.startswith('fx'):
            all_fx_data.append(unpacked_data)
        elif key.startswith('fy'):
            all_fy_data.append(unpacked_data)
        elif key == 'x_interp':
            x_interp = unpacked_data

    if len(all_fx_data) != len(all_fy_data) or not x_interp:
        raise ValueError("Invalid data for interpolation.")

    all_interp_y = []
    num_dimensions = len(all_fy_data)

    for fx, fy in zip(all_fx_data, all_fy_data):
        if len(fx) != len(fy) or len(fx) < 3:
            raise ValueError("X and Y data must have equal length and at least three points.")

        interp_y = []
        for x in x_interp:
            distances = np.abs(fx - x)
            nearest_indices = np.argsort(distances)[:3]
            x1, x2, x3 = fx[nearest_indices]
            y1, y2, y3 = fy[nearest_indices]

            # Simplified interpolation logic for demonstration
            if x1 == x2 or x2 == x3 or x1 == x3:
                raise RuntimeError("Collinear points detected, cannot interpolate.")

            # Parabolic interpolation (y = ax^2 + bx + c)
            # Using matrix inversion for a more robust solution
            A = np.array([
                [x1**2, x1, 1],
                [x2**2, x2, 1],
                [x3**2, x3, 1]
            ])
            b = np.array([y1, y2, y3])
            try:
                a, b_lin, c = np.linalg.solve(A, b)
                y_interp = a * x**2 + b_lin * x + c
            except np.linalg.LinAlgError:
                raise RuntimeError("Could not solve parabolic interpolation matrix.")

            interp_y.append(y_interp)
        all_interp_y.append(np.array(interp_y))

    return np.concatenate(all_interp_y)


def pseudo_interpolate_arcsecant_stream(data_bytes, data_pointers):
    """
    Pseudo-interpolates a chunk of interpolation points using pre-existing data.
    This now works as a single, callable function within the service.
    """
    x_data = np.frombuffer(data_bytes[data_pointers['x_data']['offset'] : data_pointers['x_data']['offset'] + data_pointers['x_data']['length']], dtype=np.float32)
    y_data = np.frombuffer(data_bytes[data_pointers['y_data']['offset'] : data_pointers['y_data']['offset'] + data_pointers['y_data']['length']], dtype=np.float32)
    x_interp_chunk = np.frombuffer(data_bytes[data_pointers['x_interp_chunk']['offset'] : data_pointers['x_interp_chunk']['offset'] + data_pointers['x_interp_chunk']['length']], dtype=np.float32)

    def _calculate_arcsecant(val):
        if np.abs(val) < 1:
            return np.nan
        return np.arccos(1 / val)

    interpolated_y = []
    for x in x_interp_chunk:
        # Simplified logic: find nearest point and apply a transformation
        nearest_idx = np.argmin(np.abs(x_data - x))
        y_val = y_data[nearest_idx]
        interpolated_y.append(_calculate_arcsecant(y_val))

    return np.array(interpolated_y, dtype=np.float32)


# --- Server and Multiprocessing Logic ---

def worker_process(request_data):
    """
    A worker function that is executed by a process in the pool.
    It takes a request dictionary and dispatches the correct FPU operation.
    """
    op_type = request_data["operation"]
    data_bytes = request_data["data_bytes"]
    data_pointers = request_data["data_pointers"]

    if op_type == OPERATION_INTERPOLATE_HYPERBOLIC_PARABOLIC:
        result = hyperbolic_parabolic_interpolation_nd_revised(data_bytes, data_pointers)
    elif op_type == OPERATION_INTERPOLATE_ARCSECANT_STREAM:
        result = pseudo_interpolate_arcsecant_stream(data_bytes, data_pointers)
    else:
        raise ValueError(f"Unknown operation type: {op_type}")
    
    return result


def handle_client(client_socket, addr, pool):
    """
    Handles a single client connection.
    Parses the request and submits the FPU task to the multiprocessing pool.
    """
    print(f"Accepted connection from {addr}")
    try:
        json_header, data_bytes = receive_data(client_socket)
        if json_header is None:
            print(f"Client {addr} disconnected unexpectedly.")
            return

        print(f"Received request from {addr} for operation {json_header['operation']}")
        
        request_data = {
            "operation": json_header["operation"],
            "data_bytes": data_bytes,
            "data_pointers": json_header["data_pointers"]
        }

        # Submit the FPU task to the process pool
        result_async = pool.apply_async(worker_process, (request_data,))
        result = result_async.get()  # Block until the result is available
        
        # Send the result back to the client
        send_data(client_socket, json_header["operation"], {"result": result})
        print(f"Sent result back to {addr}.")

    except (ValueError, RuntimeError) as e:
        print(f"ValueError or RuntimeError on server from {addr}: {e}")
        error_message = str(e).encode('utf-8')
        client_socket.sendall(struct.pack('!I', len(error_message)))
        client_socket.sendall(error_message)
    except ConnectionResetError:
        print(f"Client {addr} forcibly closed the connection.")
    except Exception as e:
        print(f"An unexpected error occurred in handle_client for {addr}: {e}")
        traceback.print_exc()
    finally:
        client_socket.close()
        print(f"Connection with client {addr} closed.")


def start_server(host, port):
    """
    Starts the main server, listens for connections, and manages the process pool.
    """
    # Create a process pool with a number of workers equal to CPU cores
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind((host, port))
    server_socket.listen(5)
    print(f"Server listening on {host}:{port}")

    try:
        while True:
            client_socket, addr = server_socket.accept()
            # Handle each client in a separate thread to not block the main loop
            client_thread = threading.Thread(target=handle_client, args=(client_socket, addr, pool))
            client_thread.start()
    except KeyboardInterrupt:
        print("\nServer shutting down...")
    finally:
        server_socket.close()
        pool.close()
        pool.join()
        print("Server shutdown complete.")


if __name__ == "__main__":
    SERVER_HOST = '127.0.0.1'
    SERVER_PORT = 12345
    start_server(SERVER_HOST, SERVER_PORT)