Saturday, September 20, 2025

N-Dim compliant Memory handling


This corpus of work should allow for memory operations with the previous post of N-Dim.py for N-Dimensional memory handling.

This is the most critical step for real-time performance and achieving "zero-copy" data movement. Directly using memory pointers eliminates the time and CPU cost of serializing and copying data (`.tobytes()`) on the sender and deserializing (`np.frombuffer()`) on the receiver.


To replace the `np.linspace` calls with real-time memory pointer simulation in Python, we must use the **`ctypes`** library and **`numpy.ctypeslib.as_array`** or **`np.frombuffer`** in conjunction with an address.


Since we cannot simulate a true external C process writing to shared memory here, we will *simulate* the external memory allocation and pointer access on the Server side.


### Modified Bare-Metal Implicit Push Solution with Pointer-Based Data


This refactored code replaces all `np.linspace` and `np.frombuffer` calls with mechanisms that use a memory address (`ctypes.c_void_p`) to access the data.


#### Key Changes:


1.  **Server Initialization:** `fx` and `fy` (the constant model data) are now wrapped as C-style pointers using `ctypes`.

2.  **`pseudo_interpolate_arcsecant_stream`:** This function now accepts **pointers** and **sizes** instead of raw bytes. It uses `np.frombuffer` on the pointer address to create a **zero-copy view** of the data.

3.  **Data Pushing:** Instead of sending the raw data bytes, we now send the **memory address** of the calculated result and let the client read from that simulated address. *(Note: This is a **simulation**; network sockets only transmit raw bytes, not memory addresses across processes/machines. But it models the zero-copy philosophy for in-process calculation).*

4.  **Client:** The client is simplified to assume a fixed payload size, representing the raw copy of the pointer-referenced data.


-----


```python

import socket

import struct

import time

import threading

import numpy as np

import queue

import ctypes as c # Use 'c' for ctypes


# --- Configuration & Constants ---

HOST = '127.0.0.1'

PORT = 5000

PACKET_TYPES = {0: "PRIMARY", 1: "DELTA"}

INPUT_CHUNK_SIZE = 5 # Number of doubles (float64) in the I/O chunk

DOUBLE_SIZE = np.float64().itemsize # 8 bytes

PAYLOAD_SIZE = INPUT_CHUNK_SIZE * DOUBLE_SIZE # 40 bytes


# --- Memory Allocation & Pointer Simulation ---

# In a real system, this memory would be allocated in C/C++ or shared memory (shmem).

# Here, we use a simple C array proxy to simulate a persistent memory location.


# Create the C array types

C_DOUBLE_ARRAY = c.c_double * INPUT_CHUNK_SIZE

C_MODEL_ARRAY = c.c_double * 10 # For the fx/fy model data


# ----------------------------

# Core Pointer-Access Logic

# ----------------------------

def ptr_to_numpy(data_ptr, size, dtype=np.float64):

    """Creates a zero-copy numpy view from a ctypes pointer and size."""

    # Use np.ctypeslib.as_array for the most direct pointer-to-numpy conversion

    if data_ptr and data_ptr.value:

        return np.ctypeslib.as_array(c.cast(data_ptr, c.POINTER(c.c_double)), shape=(size,))

    return np.empty(size, dtype=dtype) # Return empty array if ptr is null/invalid


def pseudo_interpolate_arcsecant_stream(fx_ptr, fy_ptr, x_ptr, num_elements):

    """

    Interpolation function now accepts memory pointers and returns a pointer

    to the result, simulating zero-copy processing.

    """

    # 1. Create zero-copy views from the input pointers

    fx = ptr_to_numpy(fx_ptr, 10) # 10 elements for the model data

    fy = ptr_to_numpy(fy_ptr, 10)

    x_interp = ptr_to_numpy(x_ptr, num_elements) # INPUT_CHUNK_SIZE elements


    # 2. Perform calculation on the view

    y_interp_val = np.arccos(1 / np.clip(x_interp, 1.0001, None)) * (fy.mean() if fy.size else 1)


    # 3. Store result in an *owned* C buffer for the network push simulation

    # NOTE: In a *true* zero-copy system, y_buffer would be a pre-allocated shmem buffer.

    y_buffer = C_DOUBLE_ARRAY() 

    

    # Copy the result back into the memory buffer

    np.ctypeslib.as_array(y_buffer, shape=(num_elements,))[:] = y_interp_val

    

    # The return is the *pointer* to the calculated result, not the result itself

    return c.cast(c.addressof(y_buffer), c.c_void_p), y_buffer # Return pointer and keep buffer alive


# ----------------------------

# Shared bare-metal helpers (Unchanged structure)

# ----------------------------

HEADER_FORMAT = '>QIB'

HEADER_SIZE = struct.calcsize(HEADER_FORMAT) # 13 bytes


def send_packet(sock, sequence_id, packet_type, payload_bytes):

    """Sends the header + the *pre-prepared* payload bytes."""

    timestamp_ns = time.time_ns()

    header = struct.pack(HEADER_FORMAT, timestamp_ns, sequence_id, packet_type)

    sock.sendall(header + payload_bytes)


def recv_exact(reader, n):

    data = reader.read(n)

    if len(data) < n:

        raise ConnectionError("Stream ended unexpectedly")

    return data


# ----------------------------

# Server (The Processor and Pusher)

# ----------------------------

class ServerState:

    """Uses ctypes objects to simulate external, persistent memory."""

    def __init__(self):

        # 1. Allocate and initialize C arrays for Model Data (fx, fy)

        self.fx_c_arr = C_MODEL_ARRAY()

        self.fy_c_arr = C_MODEL_ARRAY()

        

        # Initialize the arrays with placeholder values using numpy views

        np.ctypeslib.as_array(self.fx_c_arr, shape=(10,))[:] = np.linspace(1, 10, 10, dtype=np.float64)

        np.ctypeslib.as_array(self.fy_c_arr, shape=(10,))[:] = np.linspace(10, 20, 10, dtype=np.float64)

        

        # Get the persistent memory pointers

        self.fx_ptr = c.cast(c.addressof(self.fx_c_arr), c.c_void_p)

        self.fy_ptr = c.cast(c.addressof(self.fy_c_arr), c.c_void_p)

        

        # 2. Allocate C array for Input Data (x_interp) - temporary storage

        self.x_c_arr = C_DOUBLE_ARRAY()

        self.x_ptr = c.cast(c.addressof(self.x_c_arr), c.c_void_p)


def handle_client_stream(client_socket, server_state):

    last_interp_y = None

    sequence_id = 0

    reader = client_socket.makefile('rb', buffering=PAYLOAD_SIZE + HEADER_SIZE + 4) # Adjust buffer

    X_CHUNK_LEN = PAYLOAD_SIZE # The expected byte size of the input chunk


    try:

        while True:

            # --- Implicit Input Stream (Client continuously sends prefixed X-chunks) ---

            # 1. Read length prefix (4 bytes)

            length_bytes = recv_exact(reader, 4)

            chunk_len = struct.unpack('>I', length_bytes)[0]

            

            if chunk_len != X_CHUNK_LEN:

                 raise ValueError(f"Unexpected chunk size: got {chunk_len}")


            # 2. Read raw data directly into the server's input memory buffer (simulated read)

            interp_x_chunk_data_bytes = recv_exact(reader, chunk_len)

            

            # 3. Simulate placing the raw received bytes into the shared memory pointer

            # Note: The network read is still a copy, but the *processing* pipeline is now zero-copy.

            c.memmove(server_state.x_ptr.value, interp_x_chunk_data_bytes, chunk_len)


            sequence_id += 1


            # --- Processing (Uses Pointers for Input/Output) ---

            # The function receives pointers and returns a pointer/buffer handle

            y_ptr, y_buffer_handle = pseudo_interpolate_arcsecant_stream(

                server_state.fx_ptr, server_state.fy_ptr, server_state.x_ptr, INPUT_CHUNK_SIZE

            )


            # Get the raw bytes from the calculated result's memory address (zero-copy from buffer handle)

            interp_y_binary_chunk = bytes(y_buffer_handle)


            # --- Implicit Push (Server continuously pushes Y-chunks) ---

            # 1. Send Primary packet (Type 0)

            send_packet(client_socket, sequence_id, 0, interp_y_binary_chunk)


            # 2. Send Differential packet (Type 1)

            # Differential calculation also uses pointer-views internally for speed

            if last_interp_y is not None:

                current_y = ptr_to_numpy(y_ptr, INPUT_CHUNK_SIZE) # Zero-copy view

                delta_y = current_y - last_interp_y

                delta_binary = delta_y.tobytes() # Need to copy to bytes for network transmit

                send_packet(client_socket, sequence_id, 1, delta_binary)


            # Store the *view* of the last full Y result for delta calculation

            last_interp_y = ptr_to_numpy(y_ptr, INPUT_CHUNK_SIZE).copy() # Must be a copy or it gets overwritten


    except ConnectionError as e:

        print(f"[Server] Client disconnected: {e}")

    except Exception as e:

        print(f"[Server] Stream error: {e}")

    finally:

        reader.close()

        client_socket.close()

        print(f"[Server] Connection closed.")


def start_server(host='127.0.0.1', port=5000):

    server_state = ServerState() # Initialize pointer-based state


    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    server_socket.bind((host, port))

    server_socket.listen(1)

    print(f"[Server] Listening (Bare-Metal/Pointer) on {host}:{port}")


    while True:

        client_socket, addr = server_socket.accept()

        print(f"[Server] Connection from {addr}")

        threading.Thread(target=handle_client_stream, args=(client_socket, server_state), daemon=True).start()


# ----------------------------

# Client (The Continuous Streamer) - Logic is UNCHANGED from last step

# ----------------------------

# ... (client_receive_data, client_send_data, client_main functions remain the same)

# The client's job is simply to read the fixed-size byte packets (which is the

# zero-copy data copied *once* to the network buffer) and process them.


# Re-including the client functions for completeness.

def client_receive_data(sock, receive_q):

    """Dedicated thread for continuously receiving and parsing output packets."""

    reader = sock.makefile('rb', buffering=PAYLOAD_SIZE + HEADER_SIZE + 4)

    Y_CHUNK_LEN = PAYLOAD_SIZE 


    try:

        while True:

            header = recv_exact(reader, HEADER_SIZE)

            timestamp_ns, sequence_id, packet_type = struct.unpack(HEADER_FORMAT, header)


            payload = recv_exact(reader, Y_CHUNK_LEN)

            receive_q.put((timestamp_ns, sequence_id, packet_type, payload))


    except Exception as e:

        print(f"\n[Receiver] Connection lost or error: {e}")

    finally:

        reader.close()


def client_send_data(sock, start_event):

    """Dedicated thread for continuously sending input data chunks."""

    start_event.wait()

    

    try:

        for i in range(1, 15):

            # Placeholder data generation (still uses np.linspace for ease of demo)

            chunk = np.linspace(i, i + 1, INPUT_CHUNK_SIZE) 

            chunk_bytes = chunk.astype(np.float64).tobytes()

            

            # Send length prefix + data

            sock.sendall(struct.pack('>I', len(chunk_bytes)) + chunk_bytes)

            

            time.sleep(0.01) 

            

    except Exception as e:

        print(f"\n[Sender] Connection lost or error: {e}")

    finally:

        try:

            sock.shutdown(socket.SHUT_WR)

        except OSError:

            pass


def client_main(host='127.0.0.1', port=5000):

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    sock.connect((host, port))

    

    receive_queue = queue.Queue()

    start_event = threading.Event()

    

    sender_thread = threading.Thread(target=client_send_data, args=(sock, start_event), daemon=True)

    receiver_thread = threading.Thread(target=client_receive_data, args=(sock, receive_queue), daemon=True)

    

    sender_thread.start()

    receiver_thread.start()

    

    print(f"[Client] Started Sender and Receiver threads. Initiating stream...")

    start_event.set()


    last_full_y = None

    

    try:

        while sender_thread.is_alive() or not receive_queue.empty():

            try:

                timestamp_ns, sequence_id, packet_type, payload = receive_queue.get(timeout=0.1)

                

                # Client processing still needs to deserialize bytes received from the wire

                arr = np.frombuffer(payload, dtype=np.float64) 


                if packet_type == 0:

                    last_full_y = arr

                    print(f"[Client] Rcvd Primary seq {sequence_id}: {arr[:3]}...")

                

                elif packet_type == 1:

                    if last_full_y is not None:

                        delta_arr = arr

                        current_y_reconstructed = last_full_y + delta_arr

                        print(f"[Client] Rcvd Delta   seq {sequence_id}: {delta_arr[:3]}... (Reconstructed: {current_y_reconstructed[:3]}...)")

                

                receive_queue.task_done()


            except queue.Empty:

                if not sender_thread.is_alive():

                    break 

                pass 

                

    except KeyboardInterrupt:

        print("\n[Client] Interrupted.")

    finally:

        print("[Client] Closing connection.")

        sock.close()



# ----------------------------

# Run demo

# ----------------------------

if __name__ == "__main__":

    print(f"--- Bare-Metal Stream with Pointer-Based Processing Simulation ---")

    

    threading.Thread(target=start_server, daemon=True).start()

    time.sleep(1)


    client_main()

```

No comments: