This corpus of work should allow for memory operations with the previous post of N-Dim.py for N-Dimensional memory handling.
This is the most critical step for real-time performance and achieving "zero-copy" data movement. Directly using memory pointers eliminates the time and CPU cost of serializing and copying data (`.tobytes()`) on the sender and deserializing (`np.frombuffer()`) on the receiver.
To replace the `np.linspace` calls with real-time memory pointer simulation in Python, we must use the **`ctypes`** library and **`numpy.ctypeslib.as_array`** or **`np.frombuffer`** in conjunction with an address.
Since we cannot simulate a true external C process writing to shared memory here, we will *simulate* the external memory allocation and pointer access on the Server side.
### Modified Bare-Metal Implicit Push Solution with Pointer-Based Data
This refactored code replaces all `np.linspace` and `np.frombuffer` calls with mechanisms that use a memory address (`ctypes.c_void_p`) to access the data.
#### Key Changes:
1. **Server Initialization:** `fx` and `fy` (the constant model data) are now wrapped as C-style pointers using `ctypes`.
2. **`pseudo_interpolate_arcsecant_stream`:** This function now accepts **pointers** and **sizes** instead of raw bytes. It uses `np.frombuffer` on the pointer address to create a **zero-copy view** of the data.
3. **Data Pushing:** Instead of sending the raw data bytes, we now send the **memory address** of the calculated result and let the client read from that simulated address. *(Note: This is a **simulation**; network sockets only transmit raw bytes, not memory addresses across processes/machines. But it models the zero-copy philosophy for in-process calculation).*
4. **Client:** The client is simplified to assume a fixed payload size, representing the raw copy of the pointer-referenced data.
-----
```python
import socket
import struct
import time
import threading
import numpy as np
import queue
import ctypes as c # Use 'c' for ctypes
# --- Configuration & Constants ---
HOST = '127.0.0.1'
PORT = 5000
PACKET_TYPES = {0: "PRIMARY", 1: "DELTA"}
INPUT_CHUNK_SIZE = 5 # Number of doubles (float64) in the I/O chunk
DOUBLE_SIZE = np.float64().itemsize # 8 bytes
PAYLOAD_SIZE = INPUT_CHUNK_SIZE * DOUBLE_SIZE # 40 bytes
# --- Memory Allocation & Pointer Simulation ---
# In a real system, this memory would be allocated in C/C++ or shared memory (shmem).
# Here, we use a simple C array proxy to simulate a persistent memory location.
# Create the C array types
C_DOUBLE_ARRAY = c.c_double * INPUT_CHUNK_SIZE
C_MODEL_ARRAY = c.c_double * 10 # For the fx/fy model data
# ----------------------------
# Core Pointer-Access Logic
# ----------------------------
def ptr_to_numpy(data_ptr, size, dtype=np.float64):
"""Creates a zero-copy numpy view from a ctypes pointer and size."""
# Use np.ctypeslib.as_array for the most direct pointer-to-numpy conversion
if data_ptr and data_ptr.value:
return np.ctypeslib.as_array(c.cast(data_ptr, c.POINTER(c.c_double)), shape=(size,))
return np.empty(size, dtype=dtype) # Return empty array if ptr is null/invalid
def pseudo_interpolate_arcsecant_stream(fx_ptr, fy_ptr, x_ptr, num_elements):
"""
Interpolation function now accepts memory pointers and returns a pointer
to the result, simulating zero-copy processing.
"""
# 1. Create zero-copy views from the input pointers
fx = ptr_to_numpy(fx_ptr, 10) # 10 elements for the model data
fy = ptr_to_numpy(fy_ptr, 10)
x_interp = ptr_to_numpy(x_ptr, num_elements) # INPUT_CHUNK_SIZE elements
# 2. Perform calculation on the view
y_interp_val = np.arccos(1 / np.clip(x_interp, 1.0001, None)) * (fy.mean() if fy.size else 1)
# 3. Store result in an *owned* C buffer for the network push simulation
# NOTE: In a *true* zero-copy system, y_buffer would be a pre-allocated shmem buffer.
y_buffer = C_DOUBLE_ARRAY()
# Copy the result back into the memory buffer
np.ctypeslib.as_array(y_buffer, shape=(num_elements,))[:] = y_interp_val
# The return is the *pointer* to the calculated result, not the result itself
return c.cast(c.addressof(y_buffer), c.c_void_p), y_buffer # Return pointer and keep buffer alive
# ----------------------------
# Shared bare-metal helpers (Unchanged structure)
# ----------------------------
HEADER_FORMAT = '>QIB'
HEADER_SIZE = struct.calcsize(HEADER_FORMAT) # 13 bytes
def send_packet(sock, sequence_id, packet_type, payload_bytes):
"""Sends the header + the *pre-prepared* payload bytes."""
timestamp_ns = time.time_ns()
header = struct.pack(HEADER_FORMAT, timestamp_ns, sequence_id, packet_type)
sock.sendall(header + payload_bytes)
def recv_exact(reader, n):
data = reader.read(n)
if len(data) < n:
raise ConnectionError("Stream ended unexpectedly")
return data
# ----------------------------
# Server (The Processor and Pusher)
# ----------------------------
class ServerState:
"""Uses ctypes objects to simulate external, persistent memory."""
def __init__(self):
# 1. Allocate and initialize C arrays for Model Data (fx, fy)
self.fx_c_arr = C_MODEL_ARRAY()
self.fy_c_arr = C_MODEL_ARRAY()
# Initialize the arrays with placeholder values using numpy views
np.ctypeslib.as_array(self.fx_c_arr, shape=(10,))[:] = np.linspace(1, 10, 10, dtype=np.float64)
np.ctypeslib.as_array(self.fy_c_arr, shape=(10,))[:] = np.linspace(10, 20, 10, dtype=np.float64)
# Get the persistent memory pointers
self.fx_ptr = c.cast(c.addressof(self.fx_c_arr), c.c_void_p)
self.fy_ptr = c.cast(c.addressof(self.fy_c_arr), c.c_void_p)
# 2. Allocate C array for Input Data (x_interp) - temporary storage
self.x_c_arr = C_DOUBLE_ARRAY()
self.x_ptr = c.cast(c.addressof(self.x_c_arr), c.c_void_p)
def handle_client_stream(client_socket, server_state):
last_interp_y = None
sequence_id = 0
reader = client_socket.makefile('rb', buffering=PAYLOAD_SIZE + HEADER_SIZE + 4) # Adjust buffer
X_CHUNK_LEN = PAYLOAD_SIZE # The expected byte size of the input chunk
try:
while True:
# --- Implicit Input Stream (Client continuously sends prefixed X-chunks) ---
# 1. Read length prefix (4 bytes)
length_bytes = recv_exact(reader, 4)
chunk_len = struct.unpack('>I', length_bytes)[0]
if chunk_len != X_CHUNK_LEN:
raise ValueError(f"Unexpected chunk size: got {chunk_len}")
# 2. Read raw data directly into the server's input memory buffer (simulated read)
interp_x_chunk_data_bytes = recv_exact(reader, chunk_len)
# 3. Simulate placing the raw received bytes into the shared memory pointer
# Note: The network read is still a copy, but the *processing* pipeline is now zero-copy.
c.memmove(server_state.x_ptr.value, interp_x_chunk_data_bytes, chunk_len)
sequence_id += 1
# --- Processing (Uses Pointers for Input/Output) ---
# The function receives pointers and returns a pointer/buffer handle
y_ptr, y_buffer_handle = pseudo_interpolate_arcsecant_stream(
server_state.fx_ptr, server_state.fy_ptr, server_state.x_ptr, INPUT_CHUNK_SIZE
)
# Get the raw bytes from the calculated result's memory address (zero-copy from buffer handle)
interp_y_binary_chunk = bytes(y_buffer_handle)
# --- Implicit Push (Server continuously pushes Y-chunks) ---
# 1. Send Primary packet (Type 0)
send_packet(client_socket, sequence_id, 0, interp_y_binary_chunk)
# 2. Send Differential packet (Type 1)
# Differential calculation also uses pointer-views internally for speed
if last_interp_y is not None:
current_y = ptr_to_numpy(y_ptr, INPUT_CHUNK_SIZE) # Zero-copy view
delta_y = current_y - last_interp_y
delta_binary = delta_y.tobytes() # Need to copy to bytes for network transmit
send_packet(client_socket, sequence_id, 1, delta_binary)
# Store the *view* of the last full Y result for delta calculation
last_interp_y = ptr_to_numpy(y_ptr, INPUT_CHUNK_SIZE).copy() # Must be a copy or it gets overwritten
except ConnectionError as e:
print(f"[Server] Client disconnected: {e}")
except Exception as e:
print(f"[Server] Stream error: {e}")
finally:
reader.close()
client_socket.close()
print(f"[Server] Connection closed.")
def start_server(host='127.0.0.1', port=5000):
server_state = ServerState() # Initialize pointer-based state
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((host, port))
server_socket.listen(1)
print(f"[Server] Listening (Bare-Metal/Pointer) on {host}:{port}")
while True:
client_socket, addr = server_socket.accept()
print(f"[Server] Connection from {addr}")
threading.Thread(target=handle_client_stream, args=(client_socket, server_state), daemon=True).start()
# ----------------------------
# Client (The Continuous Streamer) - Logic is UNCHANGED from last step
# ----------------------------
# ... (client_receive_data, client_send_data, client_main functions remain the same)
# The client's job is simply to read the fixed-size byte packets (which is the
# zero-copy data copied *once* to the network buffer) and process them.
# Re-including the client functions for completeness.
def client_receive_data(sock, receive_q):
"""Dedicated thread for continuously receiving and parsing output packets."""
reader = sock.makefile('rb', buffering=PAYLOAD_SIZE + HEADER_SIZE + 4)
Y_CHUNK_LEN = PAYLOAD_SIZE
try:
while True:
header = recv_exact(reader, HEADER_SIZE)
timestamp_ns, sequence_id, packet_type = struct.unpack(HEADER_FORMAT, header)
payload = recv_exact(reader, Y_CHUNK_LEN)
receive_q.put((timestamp_ns, sequence_id, packet_type, payload))
except Exception as e:
print(f"\n[Receiver] Connection lost or error: {e}")
finally:
reader.close()
def client_send_data(sock, start_event):
"""Dedicated thread for continuously sending input data chunks."""
start_event.wait()
try:
for i in range(1, 15):
# Placeholder data generation (still uses np.linspace for ease of demo)
chunk = np.linspace(i, i + 1, INPUT_CHUNK_SIZE)
chunk_bytes = chunk.astype(np.float64).tobytes()
# Send length prefix + data
sock.sendall(struct.pack('>I', len(chunk_bytes)) + chunk_bytes)
time.sleep(0.01)
except Exception as e:
print(f"\n[Sender] Connection lost or error: {e}")
finally:
try:
sock.shutdown(socket.SHUT_WR)
except OSError:
pass
def client_main(host='127.0.0.1', port=5000):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
receive_queue = queue.Queue()
start_event = threading.Event()
sender_thread = threading.Thread(target=client_send_data, args=(sock, start_event), daemon=True)
receiver_thread = threading.Thread(target=client_receive_data, args=(sock, receive_queue), daemon=True)
sender_thread.start()
receiver_thread.start()
print(f"[Client] Started Sender and Receiver threads. Initiating stream...")
start_event.set()
last_full_y = None
try:
while sender_thread.is_alive() or not receive_queue.empty():
try:
timestamp_ns, sequence_id, packet_type, payload = receive_queue.get(timeout=0.1)
# Client processing still needs to deserialize bytes received from the wire
arr = np.frombuffer(payload, dtype=np.float64)
if packet_type == 0:
last_full_y = arr
print(f"[Client] Rcvd Primary seq {sequence_id}: {arr[:3]}...")
elif packet_type == 1:
if last_full_y is not None:
delta_arr = arr
current_y_reconstructed = last_full_y + delta_arr
print(f"[Client] Rcvd Delta seq {sequence_id}: {delta_arr[:3]}... (Reconstructed: {current_y_reconstructed[:3]}...)")
receive_queue.task_done()
except queue.Empty:
if not sender_thread.is_alive():
break
pass
except KeyboardInterrupt:
print("\n[Client] Interrupted.")
finally:
print("[Client] Closing connection.")
sock.close()
# ----------------------------
# Run demo
# ----------------------------
if __name__ == "__main__":
print(f"--- Bare-Metal Stream with Pointer-Based Processing Simulation ---")
threading.Thread(target=start_server, daemon=True).start()
time.sleep(1)
client_main()
```
No comments:
Post a Comment