Wednesday, August 20, 2025

Distributed FPU Protocol with GPU acceleration - Adi Suite 0.1A

 

#For everyone watching along and enjoying the process of this suite, I envision 1D apex volumetric AI #video interpretive rendering for object and environmental near-field interaction, photorealism in games #and movies should now be a simpler task. 

# Love to all my pickaxe junker blaster nuclear gut strugglers! 

# adi-gpu_protocol.py
# This module defines a new communication protocol specifically for
# GPU-accelerated operations.

import socket
import struct
import json
import numpy as np

# --- Operation Constants ---
OPERATION_EXECUTE_GPU_KERNEL = 4
OPERATION_PYTORCH_ADDITION = 5

def send_gpu_payload(conn, operation_type, data):
    """
    Sends structured data over a socket with a focus on binary efficiency.
    
    This function packs a JSON header with metadata and a single
    binary payload containing all NumPy array data.
    
    Args:
        conn (socket.socket): The socket to send data through.
        operation_type (int): The type of operation being requested.
        data (dict): A dictionary containing 'kernel_args' and other metadata.
    """
    
    # Pack all NumPy arrays into a single binary payload
    binary_data = b''
    data_pointers = {}
    
    for key, val in data['kernel_args'].items():
        if isinstance(val, np.ndarray):
            array_bytes = val.astype(np.float32).tobytes()
            data_pointers[key] = {
                'offset': len(binary_data),
                'length': len(array_bytes),
                'dtype': 'float32'
            }
            binary_data += array_bytes
    
    # Create the JSON header with the operation type and data pointers
    json_header = {
        "operation": operation_type,
        "data_pointers": data_pointers,
        "metadata": data['metadata']
    }
    
    # Serialize the header to JSON bytes
    json_header_bytes = json.dumps(json_header).encode('utf-8')
    header_size = len(json_header_bytes)
    
    # Send the size of the header, the header itself, and the binary data
    conn.sendall(struct.pack('!I', header_size))
    conn.sendall(json_header_bytes)
    conn.sendall(binary_data)

def receive_gpu_payload(conn):
    """
    Receives structured data from a socket.
    
    Args:
        conn (socket.socket): The socket to receive data from.
        
    Returns:
        tuple: A tuple containing the JSON header (dict) and the binary
               payload (bytes), or (None, None) if the connection closes.
    """
    try:
        # First, receive the size of the JSON header
        header_size_bytes = conn.recv(4)
        if not header_size_bytes:
            return None, None
            
        header_size = struct.unpack('!I', header_size_bytes)[0]
        
        # Then, receive the JSON header itself
        json_header_bytes = b''
        while len(json_header_bytes) < header_size:
            chunk = conn.recv(header_size - len(json_header_bytes))
            if not chunk:
                return None, None
            json_header_bytes += chunk
            
        json_header = json.loads(json_header_bytes.decode('utf-8'))
        
        # Calculate total binary data length from pointers
        total_data_length = sum(ptr['length'] for ptr in json_header.get('data_pointers', {}).values())
        
        # Finally, receive the binary data
        data_bytes = b''
        while len(data_bytes) < total_data_length:
            chunk = conn.recv(total_data_length - len(data_bytes))
            if not chunk:
                return None, None
            data_bytes += chunk
            
        return json_header, data_bytes
        
    except (socket.error, struct.error, json.JSONDecodeError) as e:
        print(f"Error receiving data: {e}")
        return None, None

# adi-gpu_client.py
# A new client that sends a GPU processing request to the server.

import socket
import numpy as np
import json
import struct

# Import the new GPU protocol
from gpu_protocol import (
    send_gpu_payload,
    receive_gpu_payload,
    OPERATION_EXECUTE_GPU_KERNEL,
    OPERATION_PYTORCH_ADDITION
)

def send_pytorch_addition(client_socket):
    """
    Creates a PyTorch-based request and sends it to the server.
    """
    array_size = 1000000
    array_a = np.random.rand(array_size).astype(np.float32)
    array_b = np.random.rand(array_size).astype(np.float32)
    
    print(f"Created two arrays of size {array_size} for PyTorch GPU processing.")
    
    # Prepare the payload for the GPU
    pytorch_payload = {
        'kernel_args': {
            'array_a': array_a,
            'array_b': array_b
        },
        'metadata': {
            'kernel_name': 'pytorch_add'
        }
    }
    
    send_gpu_payload(client_socket, OPERATION_PYTORCH_ADDITION, pytorch_payload)
    print("Sent PyTorch request to server. Waiting for response...")
    
    return array_a + array_b

def send_cupy_addition(client_socket):
    """
    Creates a CuPy-based request and sends it to the server.
    """
    array_size = 1000000
    array_a = np.random.rand(array_size).astype(np.float32)
    array_b = np.random.rand(array_size).astype(np.float32)
    
    print(f"Created two arrays of size {array_size} for CuPy GPU processing.")
    
    cupy_payload = {
        'kernel_args': {
            'array_a': array_a,
            'array_b': array_b
        },
        'metadata': {
            'kernel_name': 'add_arrays'
        }
    }

    send_gpu_payload(client_socket, OPERATION_EXECUTE_GPU_KERNEL, cupy_payload)
    print("Sent CuPy request to server. Waiting for response...")

    return array_a + array_b

def run_client():
    """
    Connects to the server and sends a request for GPU kernel execution.
    """
    SERVER_HOST = '127.0.0.1'
    SERVER_PORT = 12345
    
    # You can choose which operation to run here
    # 1: CuPy Addition
    # 2: PyTorch Addition
    operation_choice = 2 
    
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
        try:
            client_socket.connect((SERVER_HOST, SERVER_PORT))
            print("Connected to server.")
            
            if operation_choice == 1:
                local_result = send_cupy_addition(client_socket)
            elif operation_choice == 2:
                local_result = send_pytorch_addition(client_socket)
            else:
                print("Invalid operation choice.")
                return

            response_header, response_bytes = receive_gpu_payload(client_socket)
            
            if response_header and 'result' in response_header['data_pointers']:
                result_info = response_header['data_pointers']['result']
                result_array = np.frombuffer(
                    response_bytes,
                    offset=result_info['offset'],
                    count=result_info['length'] // 4,
                    dtype=np.float32
                )
                print(f"\nReceived result from server. First 5 elements:")
                print(result_array[:5])
                
                is_correct = np.allclose(result_array, local_result, atol=1e-5)
                print(f"Result verified against local computation: {'Correct' if is_correct else 'Incorrect'}")
            else:
                print("Failed to receive a valid response from the server.")
                
        except ConnectionRefusedError:
            print(f"Connection refused. Is the server running on {SERVER_HOST}:{SERVER_PORT}?")
        except Exception as e:
            print(f"An error occurred: {e}")

if __name__ == "__main__":
    run_client()

# adi-gpu_server.py
# A new server designed to offload numerical computations to a GPU.
# NOTE: This requires a NVIDIA GPU and the 'cupy' and 'torch' libraries installed.
# Install with: pip install cupy-cudaXXX (e.g., cupy-cuda11x for CUDA 11)
# and: pip install torch torchvision torchaudio

import socket
import struct
import numpy as np
import json
import threading
import multiprocessing
import traceback

import cupy as cp # The GPU-accelerated library
import torch # The PyTorch library

# Import the new GPU protocol
from gpu_protocol import (
    receive_gpu_payload, 
    send_gpu_payload, 
    OPERATION_EXECUTE_GPU_KERNEL,
    OPERATION_PYTORCH_ADDITION
)

# --- GPU Operations (The "kernels" that run on the GPU) ---

def cupy_worker(data_dict):
    """
    Performs a simple element-wise addition on the GPU using CuPy.
    """
    try:
        gpu_array_a = cp.asarray(data_dict['array_a'])
        gpu_array_b = cp.asarray(data_dict['array_b'])
        
        gpu_result = gpu_array_a + gpu_array_b
        
        # Transfer the result from GPU memory back to CPU memory
        result_array = cp.asnumpy(gpu_result)
        return result_array
    except Exception as e:
        print(f"An error occurred during CuPy computation: {e}")
        return None

def pytorch_worker(data_dict):
    """
    Performs a simple element-wise addition on the GPU using PyTorch.
    """
    try:
        # Transfer NumPy arrays from CPU to GPU as PyTorch tensors
        gpu_array_a = torch.from_numpy(data_dict['array_a']).cuda()
        gpu_array_b = torch.from_numpy(data_dict['array_b']).cuda()
        
        # Perform the operation on the GPU
        gpu_result = gpu_array_a + gpu_array_b
        
        # Transfer the result back to CPU as a NumPy array
        result_array = gpu_result.cpu().numpy()
        return result_array
    except Exception as e:
        print(f"An error occurred during PyTorch computation: {e}")
        return None

# --- Server and Multiprocessing Logic ---

def worker_process(request_data):
    """
    A worker function that is executed by a process in the pool.
    It dispatches the correct GPU operation based on the client's request.
    """
    op_type = request_data["operation"]
    data_bytes = request_data["data_bytes"]
    data_pointers = request_data["data_pointers"]
    
    # Unpack the binary data into NumPy arrays, regardless of the backend
    kernel_args = {}
    for key, ptr in data_pointers.items():
        start = ptr['offset']
        end = start + ptr['length']
        kernel_args[key] = np.frombuffer(data_bytes[start:end], dtype=np.float32)

    # Dispatch to the correct GPU backend
    if op_type == OPERATION_EXECUTE_GPU_KERNEL:
        result = cupy_worker(kernel_args)
    elif op_type == OPERATION_PYTORCH_ADDITION:
        result = pytorch_worker(kernel_args)
    else:
        raise ValueError(f"Unknown operation type: {op_type}")
    
    if result is None:
        raise RuntimeError("GPU kernel execution failed.")
    
    return result

def handle_client(client_socket, addr, pool):
    """
    Handles a single client connection.
    Parses the request and submits the GPU task to the multiprocessing pool.
    """
    print(f"Accepted connection from {addr}")
    try:
        json_header, data_bytes = receive_gpu_payload(client_socket)
        if json_header is None:
            print(f"Client {addr} disconnected unexpectedly.")
            return

        print(f"Received request from {addr} for operation {json_header['operation']}")
        
        request_data = {
            "operation": json_header["operation"],
            "data_bytes": data_bytes,
            "data_pointers": json_header["data_pointers"],
            "metadata": json_header["metadata"]
        }

        # Submit the GPU task to the process pool
        result_async = pool.apply_async(worker_process, (request_data,))
        result = result_async.get() # Block until the result is available
        
        # Send the result back to the client
        send_gpu_payload(client_socket, json_header["operation"], {
            'kernel_args': {'result': result},
            'metadata': {}
        })
        print(f"Sent result back to {addr}.")

    except (ValueError, RuntimeError) as e:
        print(f"Error on server from {addr}: {e}")
        error_message = str(e).encode('utf-8')
        # Simplified error response for demonstration
        error_header = {"operation": -1, "error": str(e)}
        send_gpu_payload(client_socket, -1, {'kernel_args': {'error_message': error_message}, 'metadata': {}})
    except ConnectionResetError:
        print(f"Client {addr} forcibly closed the connection.")
    except Exception as e:
        print(f"An unexpected error occurred in handle_client for {addr}: {e}")
        traceback.print_exc()
    finally:
        client_socket.close()
        print(f"Connection with client {addr} closed.")

def start_server(host, port):
    """
    Starts the main server, listens for connections, and manages the process pool.
    """
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind((host, port))
    server_socket.listen(5)
    print(f"Server listening on {host}:{port}")

    try:
        while True:
            client_socket, addr = server_socket.accept()
            client_thread = threading.Thread(target=handle_client, args=(client_socket, addr, pool))
            client_thread.start()
    except KeyboardInterrupt:
        print("\nServer shutting down...")
    finally:
        server_socket.close()
        pool.close()
        pool.join()
        print("Server shutdown complete.")

if __name__ == "__main__":
    SERVER_HOST = '127.0.0.1'
    SERVER_PORT = 12345
    start_server(SERVER_HOST, SERVER_PORT)


No comments: