#For everyone watching along and enjoying the process of this suite, I envision 1D apex volumetric AI #video interpretive rendering for object and environmental near-field interaction, photorealism in games #and movies should now be a simpler task.
# Love to all my pickaxe junker blaster nuclear gut strugglers!
# adi-gpu_protocol.py
# This module defines a new communication protocol specifically for
# GPU-accelerated operations.
import socket
import struct
import json
import numpy as np
# --- Operation Constants ---
OPERATION_EXECUTE_GPU_KERNEL = 4
OPERATION_PYTORCH_ADDITION = 5
def send_gpu_payload(conn, operation_type, data):
"""
Sends structured data over a socket with a focus on binary efficiency.
This function packs a JSON header with metadata and a single
binary payload containing all NumPy array data.
Args:
conn (socket.socket): The socket to send data through.
operation_type (int): The type of operation being requested.
data (dict): A dictionary containing 'kernel_args' and other metadata.
"""
# Pack all NumPy arrays into a single binary payload
binary_data = b''
data_pointers = {}
for key, val in data['kernel_args'].items():
if isinstance(val, np.ndarray):
array_bytes = val.astype(np.float32).tobytes()
data_pointers[key] = {
'offset': len(binary_data),
'length': len(array_bytes),
'dtype': 'float32'
}
binary_data += array_bytes
# Create the JSON header with the operation type and data pointers
json_header = {
"operation": operation_type,
"data_pointers": data_pointers,
"metadata": data['metadata']
}
# Serialize the header to JSON bytes
json_header_bytes = json.dumps(json_header).encode('utf-8')
header_size = len(json_header_bytes)
# Send the size of the header, the header itself, and the binary data
conn.sendall(struct.pack('!I', header_size))
conn.sendall(json_header_bytes)
conn.sendall(binary_data)
def receive_gpu_payload(conn):
"""
Receives structured data from a socket.
Args:
conn (socket.socket): The socket to receive data from.
Returns:
tuple: A tuple containing the JSON header (dict) and the binary
payload (bytes), or (None, None) if the connection closes.
"""
try:
# First, receive the size of the JSON header
header_size_bytes = conn.recv(4)
if not header_size_bytes:
return None, None
header_size = struct.unpack('!I', header_size_bytes)[0]
# Then, receive the JSON header itself
json_header_bytes = b''
while len(json_header_bytes) < header_size:
chunk = conn.recv(header_size - len(json_header_bytes))
if not chunk:
return None, None
json_header_bytes += chunk
json_header = json.loads(json_header_bytes.decode('utf-8'))
# Calculate total binary data length from pointers
total_data_length = sum(ptr['length'] for ptr in json_header.get('data_pointers', {}).values())
# Finally, receive the binary data
data_bytes = b''
while len(data_bytes) < total_data_length:
chunk = conn.recv(total_data_length - len(data_bytes))
if not chunk:
return None, None
data_bytes += chunk
return json_header, data_bytes
except (socket.error, struct.error, json.JSONDecodeError) as e:
print(f"Error receiving data: {e}")
return None, None
# adi-gpu_client.py
# A new client that sends a GPU processing request to the server.
import socket
import numpy as np
import json
import struct
# Import the new GPU protocol
from gpu_protocol import (
send_gpu_payload,
receive_gpu_payload,
OPERATION_EXECUTE_GPU_KERNEL,
OPERATION_PYTORCH_ADDITION
)
def send_pytorch_addition(client_socket):
"""
Creates a PyTorch-based request and sends it to the server.
"""
array_size = 1000000
array_a = np.random.rand(array_size).astype(np.float32)
array_b = np.random.rand(array_size).astype(np.float32)
print(f"Created two arrays of size {array_size} for PyTorch GPU processing.")
# Prepare the payload for the GPU
pytorch_payload = {
'kernel_args': {
'array_a': array_a,
'array_b': array_b
},
'metadata': {
'kernel_name': 'pytorch_add'
}
}
send_gpu_payload(client_socket, OPERATION_PYTORCH_ADDITION, pytorch_payload)
print("Sent PyTorch request to server. Waiting for response...")
return array_a + array_b
def send_cupy_addition(client_socket):
"""
Creates a CuPy-based request and sends it to the server.
"""
array_size = 1000000
array_a = np.random.rand(array_size).astype(np.float32)
array_b = np.random.rand(array_size).astype(np.float32)
print(f"Created two arrays of size {array_size} for CuPy GPU processing.")
cupy_payload = {
'kernel_args': {
'array_a': array_a,
'array_b': array_b
},
'metadata': {
'kernel_name': 'add_arrays'
}
}
send_gpu_payload(client_socket, OPERATION_EXECUTE_GPU_KERNEL, cupy_payload)
print("Sent CuPy request to server. Waiting for response...")
return array_a + array_b
def run_client():
"""
Connects to the server and sends a request for GPU kernel execution.
"""
SERVER_HOST = '127.0.0.1'
SERVER_PORT = 12345
# You can choose which operation to run here
# 1: CuPy Addition
# 2: PyTorch Addition
operation_choice = 2
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
try:
client_socket.connect((SERVER_HOST, SERVER_PORT))
print("Connected to server.")
if operation_choice == 1:
local_result = send_cupy_addition(client_socket)
elif operation_choice == 2:
local_result = send_pytorch_addition(client_socket)
else:
print("Invalid operation choice.")
return
response_header, response_bytes = receive_gpu_payload(client_socket)
if response_header and 'result' in response_header['data_pointers']:
result_info = response_header['data_pointers']['result']
result_array = np.frombuffer(
response_bytes,
offset=result_info['offset'],
count=result_info['length'] // 4,
dtype=np.float32
)
print(f"\nReceived result from server. First 5 elements:")
print(result_array[:5])
is_correct = np.allclose(result_array, local_result, atol=1e-5)
print(f"Result verified against local computation: {'Correct' if is_correct else 'Incorrect'}")
else:
print("Failed to receive a valid response from the server.")
except ConnectionRefusedError:
print(f"Connection refused. Is the server running on {SERVER_HOST}:{SERVER_PORT}?")
except Exception as e:
print(f"An error occurred: {e}")
if __name__ == "__main__":
run_client()
# adi-gpu_server.py
# A new server designed to offload numerical computations to a GPU.
# NOTE: This requires a NVIDIA GPU and the 'cupy' and 'torch' libraries installed.
# Install with: pip install cupy-cudaXXX (e.g., cupy-cuda11x for CUDA 11)
# and: pip install torch torchvision torchaudio
import socket
import struct
import numpy as np
import json
import threading
import multiprocessing
import traceback
import cupy as cp # The GPU-accelerated library
import torch # The PyTorch library
# Import the new GPU protocol
from gpu_protocol import (
receive_gpu_payload,
send_gpu_payload,
OPERATION_EXECUTE_GPU_KERNEL,
OPERATION_PYTORCH_ADDITION
)
# --- GPU Operations (The "kernels" that run on the GPU) ---
def cupy_worker(data_dict):
"""
Performs a simple element-wise addition on the GPU using CuPy.
"""
try:
gpu_array_a = cp.asarray(data_dict['array_a'])
gpu_array_b = cp.asarray(data_dict['array_b'])
gpu_result = gpu_array_a + gpu_array_b
# Transfer the result from GPU memory back to CPU memory
result_array = cp.asnumpy(gpu_result)
return result_array
except Exception as e:
print(f"An error occurred during CuPy computation: {e}")
return None
def pytorch_worker(data_dict):
"""
Performs a simple element-wise addition on the GPU using PyTorch.
"""
try:
# Transfer NumPy arrays from CPU to GPU as PyTorch tensors
gpu_array_a = torch.from_numpy(data_dict['array_a']).cuda()
gpu_array_b = torch.from_numpy(data_dict['array_b']).cuda()
# Perform the operation on the GPU
gpu_result = gpu_array_a + gpu_array_b
# Transfer the result back to CPU as a NumPy array
result_array = gpu_result.cpu().numpy()
return result_array
except Exception as e:
print(f"An error occurred during PyTorch computation: {e}")
return None
# --- Server and Multiprocessing Logic ---
def worker_process(request_data):
"""
A worker function that is executed by a process in the pool.
It dispatches the correct GPU operation based on the client's request.
"""
op_type = request_data["operation"]
data_bytes = request_data["data_bytes"]
data_pointers = request_data["data_pointers"]
# Unpack the binary data into NumPy arrays, regardless of the backend
kernel_args = {}
for key, ptr in data_pointers.items():
start = ptr['offset']
end = start + ptr['length']
kernel_args[key] = np.frombuffer(data_bytes[start:end], dtype=np.float32)
# Dispatch to the correct GPU backend
if op_type == OPERATION_EXECUTE_GPU_KERNEL:
result = cupy_worker(kernel_args)
elif op_type == OPERATION_PYTORCH_ADDITION:
result = pytorch_worker(kernel_args)
else:
raise ValueError(f"Unknown operation type: {op_type}")
if result is None:
raise RuntimeError("GPU kernel execution failed.")
return result
def handle_client(client_socket, addr, pool):
"""
Handles a single client connection.
Parses the request and submits the GPU task to the multiprocessing pool.
"""
print(f"Accepted connection from {addr}")
try:
json_header, data_bytes = receive_gpu_payload(client_socket)
if json_header is None:
print(f"Client {addr} disconnected unexpectedly.")
return
print(f"Received request from {addr} for operation {json_header['operation']}")
request_data = {
"operation": json_header["operation"],
"data_bytes": data_bytes,
"data_pointers": json_header["data_pointers"],
"metadata": json_header["metadata"]
}
# Submit the GPU task to the process pool
result_async = pool.apply_async(worker_process, (request_data,))
result = result_async.get() # Block until the result is available
# Send the result back to the client
send_gpu_payload(client_socket, json_header["operation"], {
'kernel_args': {'result': result},
'metadata': {}
})
print(f"Sent result back to {addr}.")
except (ValueError, RuntimeError) as e:
print(f"Error on server from {addr}: {e}")
error_message = str(e).encode('utf-8')
# Simplified error response for demonstration
error_header = {"operation": -1, "error": str(e)}
send_gpu_payload(client_socket, -1, {'kernel_args': {'error_message': error_message}, 'metadata': {}})
except ConnectionResetError:
print(f"Client {addr} forcibly closed the connection.")
except Exception as e:
print(f"An unexpected error occurred in handle_client for {addr}: {e}")
traceback.print_exc()
finally:
client_socket.close()
print(f"Connection with client {addr} closed.")
def start_server(host, port):
"""
Starts the main server, listens for connections, and manages the process pool.
"""
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((host, port))
server_socket.listen(5)
print(f"Server listening on {host}:{port}")
try:
while True:
client_socket, addr = server_socket.accept()
client_thread = threading.Thread(target=handle_client, args=(client_socket, addr, pool))
client_thread.start()
except KeyboardInterrupt:
print("\nServer shutting down...")
finally:
server_socket.close()
pool.close()
pool.join()
print("Server shutdown complete.")
if __name__ == "__main__":
SERVER_HOST = '127.0.0.1'
SERVER_PORT = 12345
start_server(SERVER_HOST, SERVER_PORT)
No comments:
Post a Comment