Thursday, August 21, 2025

Atomtronic Adi Protocol Suite Implementation 0.01A

#Thought some translation might help a few scientists looking at advancing the prospects in their research, Good sallet chaps!

 # adi-protocol_Atomtronic-RingLaser.py
# This module defines the communication protocol and data handling functions
# for the simulated atomtronic system. It also includes a simplified
# mixed-signal bridge to model the VHDL-AMS logic.

import socket
import struct
import json
import numpy as np
import time

# --- Operation Constants (for interoperability) ---
OPERATION_RING_LASER_GYRO = 1

# --- Mixed-Signal Bridge (Simulating VHDL-AMS) ---

def simulate_dac_adc_bridge(digital_data, noisy_physics_model_func, *args, **kwargs):
    """
    Simulates the VHDL-AMS bridge, converting digital data to an analog signal
    that drives a physical model, then converting the result back to digital.

    Args:
        digital_data (np.ndarray): The digital input signal (e.g., a sequence of pulse durations).
        noisy_physics_model_func (function): A function that simulates the physical system.
        *args, **kwargs: Arguments to pass to the physics model function.

    Returns:
        np.ndarray: The simulated digital output data after passing through the
                    physical system and the ADC.
    """
    print("Bridge: Converting digital command to analog signal (DAC)...")
    # Simulate the DAC: a simple linear conversion
    analog_signal = digital_data.astype(np.float64)

    print("Bridge: Running physical simulation with analog signal...")
    # Run the physics model with the analog signal
    # This step represents the interaction with the real atomtronic system
    analog_result = noisy_physics_model_func(analog_signal, *args, **kwargs)

    print("Bridge: Converting analog result back to digital (ADC)...")
    # Simulate the ADC: adds quantization noise and converts back to digital
    # We use a simple rounding to simulate quantization
    digital_output = np.round(analog_result).astype(np.int32)

    return digital_output

# --- Helper functions for data transmission (Reused from original suite) ---

def send_data(conn, operation_type, data):
    """
    Sends structured data over a socket.
    
    The function first packs the data and a header into a JSON object,
    then sends the JSON size and the JSON itself. Following the JSON, it
    sends any associated binary data (e.g., from numpy arrays).
    
    Args:
        conn (socket.socket): The socket to send data through.
        operation_type (int): The type of operation being requested.
        data (dict): A dictionary containing 'result' or other data to send.
    """
    binary_data = b''
    data_pointers = {}
    
    for key, val in data.items():
        if isinstance(val, np.ndarray):
            array_bytes = val.astype(np.float32).tobytes()
            data_pointers[key] = {
                "offset": len(binary_data),
                "length": len(array_bytes),
                "dtype": str(val.dtype),
                "shape": val.shape
            }
            binary_data += array_bytes
        else:
            data_pointers[key] = val

    json_header = {
        "operation": operation_type,
        "data_pointers": data_pointers,
        "total_binary_size": len(binary_data)
    }
    json_header_bytes = json.dumps(json_header).encode('utf-8')
    header_size = struct.pack('!I', len(json_header_bytes))

    conn.sendall(header_size)
    conn.sendall(json_header_bytes)
    conn.sendall(binary_data)

def receive_data(conn):
    """
    Receives structured data from a socket.
    
    The function first receives the JSON header size, then the JSON header itself,
    and finally the binary payload. It reassembles the data based on the pointers.
    
    Args:
        conn (socket.socket): The socket to receive data from.

    Returns:
        tuple: A tuple containing the JSON header and the raw binary data.
    """
    try:
        header_size_bytes = conn.recv(4)
        if not header_size_bytes:
            return None, None
        header_size = struct.unpack('!I', header_size_bytes)[0]
        
        json_header_bytes = b''
        while len(json_header_bytes) < header_size:
            chunk = conn.recv(header_size - len(json_header_bytes))
            if not chunk:
                return None, None
            json_header_bytes += chunk
        
        json_header = json.loads(json_header_bytes.decode('utf-8'))
        
        total_binary_size = json_header.get("total_binary_size", 0)
        binary_payload = b''
        while len(binary_payload) < total_binary_size:
            chunk = conn.recv(total_binary_size - len(binary_payload))
            if not chunk:
                return None, None
            binary_payload += chunk
            
        return json_header, binary_payload

    except (json.JSONDecodeError, struct.error, ConnectionResetError) as e:
        print(f"Error receiving data: {e}")
        return None, None

# atomtronic-server.py
# This program simulates the atomtronic hardware. It acts as a server that
# receives digital commands, runs a physics simulation, and sends back
# the measurement data.

import socket
import numpy as np
import time
import struct
import random

# Import the protocol and mixed-signal bridge
from adi_protocol import receive_data, send_data, simulate_dac_adc_bridge, OPERATION_RING_LASER_GYRO

# --- Simulated Atomtronic Physics ---
def simulate_ring_laser_gyro(analog_dither_signal, rotation_rate, noise_level=0.1):
    """
    Simulates the physics of a ring laser gyroscope.

    It models the Sagnac effect (frequency shift proportional to rotation rate),
    simulated lock-in at low rates, and adds a small amount of random noise.
    The dither signal is used to break the lock-in.

    Args:
        analog_dither_signal (np.ndarray): The analog dither signal from the DAC.
        rotation_rate (float): The simulated physical rotation rate in rad/s.
        noise_level (float): The level of random noise to add.

    Returns:
        np.ndarray: The simulated raw measurement from the photodetector.
    """
    print(f"Simulating ring laser gyro with rotation rate: {rotation_rate} rad/s")
    
    # Sagnac effect: The core physics. Frequency shift is proportional to rotation.
    sagnac_freq_shift = rotation_rate * 1000.0 # Arbitrary scale factor for simulation

    # Lock-in: At low rotation rates, the two beams lock to the same frequency.
    lock_in_threshold = 0.05
    if abs(sagnac_freq_shift) < lock_in_threshold:
        print("Warning: Simulating lock-in. Dither is required.")
        # If locked-in, the output frequency is zero unless dithered
        measured_signal = np.zeros_like(analog_dither_signal)
    else:
        # Without lock-in, the measured signal is the Sagnac shift
        measured_signal = np.full_like(analog_dither_signal, sagnac_freq_shift)

    # Dithering: The analog signal from the DAC breaks the lock-in
    # This is a conceptual model of how the dither signal affects the system
    measured_signal += analog_dither_signal * 0.5  # Dither contributes to the signal

    # Add random measurement noise
    noise = np.random.normal(0, noise_level, measured_signal.shape)
    measured_signal += noise

    return measured_signal

# --- Main Server Logic ---
def start_server(host, port):
    """
    Starts the server to listen for and handle client requests.
    """
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind((host, port))
    server_socket.listen(1)
    print(f"Server simulating atomtronic system, listening on {host}:{port}")

    while True:
        client_socket, addr = server_socket.accept()
        print(f"Connection from {addr}")
        
        try:
            # Step 1: Receive the digital command from the client
            header, binary_data = receive_data(client_socket)
            if header is None:
                continue

            operation = header.get("operation")
            
            if operation == OPERATION_RING_LASER_GYRO:
                print("Received command for Ring-Laser Gyro.")
                
                # Unpack the digital command (dither signal and rotation rate)
                dither_signal = np.frombuffer(binary_data, dtype=np.float32)
                
                # The rotation rate is a parameter sent in the header
                rotation_rate = header["data_pointers"]["rotation_rate"]

                # Step 2: Simulate the mixed-signal bridge and the physical process
                # The server is the "physical" side, so it calls the bridge
                digital_measurement = simulate_dac_adc_bridge(
                    dither_signal,
                    simulate_ring_laser_gyro,
                    rotation_rate=rotation_rate
                )

                # Step 3: Send the digital measurement back to the client
                print("Sending simulated digital measurement back to client.")
                send_data(client_socket, operation, {"measurement": digital_measurement})
                
            else:
                print(f"Unknown operation: {operation}")
                
        except (ConnectionResetError, BrokenPipeError):
            print(f"Client {addr} forcibly closed the connection.")
        except Exception as e:
            print(f"An unexpected error occurred: {e}")
        finally:
            client_socket.close()
            print(f"Connection with client {addr} closed.")

if __name__ == "__main__":
    SERVER_HOST = '127.0.0.1'
    SERVER_PORT = 12345
    start_server(SERVER_HOST, SERVER_PORT)


No comments: