Interplanetary Mission Planner (Energy vs Resource Allocation)
Sunday, September 28, 2025
Interplanetary Transport Network Cost 4 pregbonding states
Interplanetary Mission Planner (Energy vs Resource Allocation)
Monday, September 22, 2025
Compozzit my L0ve quadruple team *:%# + ARM11æþ ௐ - Baremetal skullbone
Universal Compositor Engine - C Stack Implementation
Who needs to !bang old squ[elchsw]itschez anyways?
See program #2 so my slim blim thicc priss switch witches quadruple tag my racks no? Step off with that hash :#
Sunday, September 21, 2025
Unified Metamaterial-IO server+client in one Python script.
""" Metamaterial SLS/Laser Sintering Unified Python Script - so our nuclear reactor assemblies and homebrew chemists can create safer hotboxes and technicians can endure to enjoy new material science assays. ♥ A. """
#!/usr/bin/env python3
"""
Unified Metamaterial-IO server+client in one Python script.
Implements the io_dist_full.c wire protocol (big-endian, len-prefixed)
with the same op-codes:
- 0: interpolate triples (fx, fy, fz) -> [Y][Z] over shared interpolation x
- 1: differentiate triples (fx, fy, fz) -> [dY/dx][dZ/dx] on original per-curve x
- 2: interpolate Y-only (fx, fy) -> [Y] over shared interpolation x
- 4: integrate triples (fx, fy, fz) -> cumulative trapezoidal [Y_int][Z_int] on original x
Design goals:
- Fully self-contained: one file, no external services required.
- Numpy-accelerated math paths for good performance.
- Protocol compatible with the C reference: big-endian floats, u8 op, u32 sizes, len-prefixed outputs.
- Threaded server with graceful shutdown on Ctrl+C.
Usage:
- Start server:
python metamaterial_io.py --server 0.0.0.0 5000
- Run client demo (op=2 interpolation of a single curve):
python metamaterial_io.py --client 127.0.0.1 5000
"""
import argparse
import math
import os
import socket
import struct
import sys
import threading
import time
from typing import List, Optional, Tuple
try:
import numpy as np
except ImportError:
print("This script requires numpy (pip install numpy).", file=sys.stderr)
sys.exit(1)
# ========== Wire helpers (big-endian) ==========
def be_u8_recv(sock: socket.socket) -> int:
b = sock.recv(1)
if len(b) != 1:
raise ConnectionError("read u8 failed")
return b[0]
def be_u8_send(sock: socket.socket, v: int) -> None:
sock.sendall(bytes([v & 0xFF]))
def be_u32_recv(sock: socket.socket) -> int:
b = recv_all(sock, 4)
return struct.unpack("!I", b)[0]
def be_u32_send(sock: socket.socket, v: int) -> None:
sock.sendall(struct.pack("!I", v & 0xFFFFFFFF))
def recv_all(sock: socket.socket, n: int) -> bytes:
buf = bytearray()
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
raise ConnectionError("recv timeout/closed")
buf.extend(chunk)
return bytes(buf)
def be_f32_array_recv(sock: socket.socket, count: int) -> np.ndarray:
if count == 0:
return np.empty((0,), dtype=np.float32)
raw = recv_all(sock, count * 4)
# Big-endian float32 to native
arr = np.frombuffer(raw, dtype=">f4").astype(np.float32, copy=True)
return arr
def len_prefixed_bytes_send(sock: socket.socket, payload: bytes) -> None:
be_u32_send(sock, len(payload))
if payload:
sock.sendall(payload)
def len_prefixed_error(sock: socket.socket, msg: str) -> None:
data = msg.encode("utf-8", errors="replace")
len_prefixed_bytes_send(sock, data)
def len_prefixed_f32_array_send(sock: socket.socket, arr: np.ndarray) -> None:
# Convert to big-endian f32 bytes and send with u32 length prefix
if arr is None:
be_u32_send(sock, 0)
return
a = np.asarray(arr, dtype=np.float32)
be = a.astype(">f4", copy=False).tobytes(order="C")
len_prefixed_bytes_send(sock, be)
# ========== Math kernels (numpy) ==========
def _sort_by_x(x: np.ndarray, *ys: np.ndarray) -> Tuple[np.ndarray, List[np.ndarray]]:
idx = np.argsort(x, kind="stable")
xs = x[idx]
youts = []
for y in ys:
youts.append(y[idx])
return xs, youts
def _interp_shared(xs: np.ndarray, ys: np.ndarray, xq: np.ndarray) -> np.ndarray:
# linear interpolation with edge handling (hold edges)
# assumes xs strictly increasing; if duplicates exist, consolidate by stable unique
xsu, uniq_idx = np.unique(xs, return_index=True)
if xsu.shape[0] < 2:
# not enough unique points
return np.full_like(xq, ys[uniq_idx[0]] if xsu.shape[0] == 1 else 0.0, dtype=np.float32)
ysu = ys[uniq_idx]
yq = np.interp(xq, xsu, ysu).astype(np.float32)
return yq
def _differentiate_curve(xs: np.ndarray, ys: np.ndarray) -> np.ndarray:
n = xs.shape[0]
if n < 2:
return np.zeros((n,), dtype=np.float32)
dy = np.empty((n,), dtype=np.float32)
# forward/backward for edges, central for interior
dy[0] = (ys[1] - ys[0]) / (xs[1] - xs[0])
dy[-1] = (ys[-1] - ys[-2]) / (xs[-1] - xs[-2])
if n > 2:
# central differences with nonuniform spacing: (y[i+1]-y[i-1])/(x[i+1]-x[i-1])
dy[1:-1] = (ys[2:] - ys[:-2]) / (xs[2:] - xs[:-2])
return dy
def _integrate_trap(xs: np.ndarray, ys: np.ndarray) -> np.ndarray:
n = xs.shape[0]
out = np.zeros((n,), dtype=np.float32)
if n < 2:
return out
dx = xs[1:] - xs[:-1]
# cumulative trapezoid
acc = np.cumsum(0.5 * dx * (ys[1:] + ys[:-1]), dtype=np.float64).astype(np.float32)
out[1:] = acc
return out
# ========== Request handling (server) ==========
class OpInfo:
def __init__(self, code: int, needs_triple: bool, needs_interp: bool):
self.code = code
self.needs_triple = needs_triple
self.needs_interp = needs_interp
OP_TABLE = {
0: OpInfo(0, needs_triple=True, needs_interp=True), # interp_triple
1: OpInfo(1, needs_triple=True, needs_interp=False), # differentiate
2: OpInfo(2, needs_triple=False, needs_interp=True), # interp_yonly
4: OpInfo(4, needs_triple=True, needs_interp=False), # integrate
}
def handle_connection(conn: socket.socket, addr: Tuple[str, int]) -> None:
conn.settimeout(60.0)
try:
op = be_u8_recv(conn)
if op not in OP_TABLE:
len_prefixed_error(conn, f"Error: Unsupported op {op}")
return
info = OP_TABLE[op]
N = be_u32_recv(conn)
if N == 0 or N > 1_000_000:
len_prefixed_error(conn, "Error: Invalid N")
return
fx_list: List[np.ndarray] = []
fy_list: List[np.ndarray] = []
fz_list: List[np.ndarray] = []
nx: List[int] = []
for i in range(N):
nfx = be_u32_recv(conn); fx = be_f32_array_recv(conn, nfx)
nfy = be_u32_recv(conn); fy = be_f32_array_recv(conn, nfy)
if info.needs_triple:
nfz = be_u32_recv(conn); fz = be_f32_array_recv(conn, nfz)
if not (nfx == nfy == nfz) or nfx < 3:
len_prefixed_error(conn, "Error: length mismatch or <3")
return
fz_list.append(fz)
else:
if nfx != nfy or nfx < 3:
len_prefixed_error(conn, "Error: length mismatch or <3")
return
fx_list.append(fx); fy_list.append(fy); nx.append(nfx)
xinterp: Optional[np.ndarray] = None
M = 0
if info.needs_interp:
M = be_u32_recv(conn)
xinterp = be_f32_array_recv(conn, M)
if M == 0 or M > 10_000_000:
len_prefixed_error(conn, "Error: Invalid M")
return
# Compute outputs
if info.needs_interp:
# output: N * M * (1 or 2)
outY = np.empty((N, M), dtype=np.float32)
outZ = np.empty((N, M), dtype=np.float32) if info.needs_triple else None
for i in range(N):
xs, (yY,) = _sort_by_x(fx_list[i], fy_list[i])
outY[i, :] = _interp_shared(xs, yY, xinterp)
if info.needs_triple:
xs2, (yZ,) = _sort_by_x(fx_list[i], fz_list[i])
outZ[i, :] = _interp_shared(xs2, yZ, xinterp)
# serialize row-major [all Y curves][all Z curves]
if info.needs_triple:
payload = np.concatenate([outY.reshape(-1), outZ.reshape(-1)])
else:
payload = outY.reshape(-1)
len_prefixed_f32_array_send(conn, payload)
elif op == 1:
# differentiate per curve; output: sum(nx) * (1 or 2)
partsY = []
partsZ = [] if info.needs_triple else None
for i in range(N):
xs, (yY,) = _sort_by_x(fx_list[i], fy_list[i])
partsY.append(_differentiate_curve(xs, yY))
if info.needs_triple:
xs2, (yZ,) = _sort_by_x(fx_list[i], fz_list[i])
partsZ.append(_differentiate_curve(xs2, yZ))
if info.needs_triple:
payload = np.concatenate(partsY + partsZ)
else:
payload = np.concatenate(partsY)
len_prefixed_f32_array_send(conn, payload)
elif op == 4:
# integrate (cumulative trap) per curve; output: sum(nx) * (1 or 2)
partsY = []
partsZ = [] if info.needs_triple else None
for i in range(N):
xs, (yY,) = _sort_by_x(fx_list[i], fy_list[i])
partsY.append(_integrate_trap(xs, yY))
if info.needs_triple:
xs2, (yZ,) = _sort_by_x(fx_list[i], fz_list[i])
partsZ.append(_integrate_trap(xs2, yZ))
if info.needs_triple:
payload = np.concatenate(partsY + partsZ)
else:
payload = np.concatenate(partsY)
len_prefixed_f32_array_send(conn, payload)
except Exception as e:
try:
len_prefixed_error(conn, f"Error: {e}")
except Exception:
pass
finally:
try:
conn.shutdown(socket.SHUT_RDWR)
except Exception:
pass
conn.close()
# ========== Server bootstrap ==========
def run_server(host: str, port: int, accept_threads: int = 2) -> None:
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind((host, port))
srv.listen(512)
print(f"[server] listening on {host}:{port}", flush=True)
stop_evt = threading.Event()
def accept_loop():
while not stop_evt.is_set():
try:
conn, addr = srv.accept()
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
t = threading.Thread(target=handle_connection, args=(conn, addr), daemon=True)
t.start()
except OSError:
break
threads = [threading.Thread(target=accept_loop, daemon=True) for _ in range(accept_threads)]
for t in threads:
t.start()
try:
while True:
time.sleep(0.5)
except KeyboardInterrupt:
print("\n[server] shutting down...", flush=True)
finally:
stop_evt.set()
try:
srv.close()
except Exception:
pass
for t in threads:
t.join(timeout=1.0)
print("[server] stopped.", flush=True)
# ========== Integrated client demo (op=2) ==========
def client_demo(host: str, port: int) -> int:
# One curve: fx=[1..5], fy ~ increasing, xi=[1.5, 3.5]
fx = np.array([1,2,3,4,5], dtype=np.float32)
fy = np.array([10,12,15,19,25], dtype=np.float32)
xi = np.array([1.5, 3.5], dtype=np.float32)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
s.settimeout(10.0)
s.connect((host, port))
# op=2, N=1
be_u8_send(s, 2)
be_u32_send(s, 1)
# curve 0: fx
be_u32_send(s, fx.shape[0])
s.sendall(fx.astype(">f4").tobytes(order="C"))
# fy
be_u32_send(s, fy.shape[0])
s.sendall(fy.astype(">f4").tobytes(order="C"))
# xinterp M
be_u32_send(s, xi.shape[0])
s.sendall(xi.astype(">f4").tobytes(order="C"))
# receive len-prefixed payload
nbytes = be_u32_recv(s)
payload = recv_all(s, nbytes) if nbytes > 0 else b""
s.close()
if nbytes % 4 != 0:
sys.stderr.write(payload.decode("utf-8", errors="replace") + "\n")
return 1
out = np.frombuffer(payload, dtype=">f4").astype(np.float32)
print("Y_interp:", out.tolist())
return 0
# ========== CLI ==========
def main():
ap = argparse.ArgumentParser(description="Unified Metamaterial-IO server+client (Python)")
sub = ap.add_subparsers(dest="mode", required=True)
ap_srv = sub.add_parser("--server", help="Run server")
ap_srv.add_argument("host", type=str, help="Bind host, e.g. 0.0.0.0")
ap_srv.add_argument("port", type=int, help="Bind port, e.g. 5000")
ap_srv.add_argument("--accept", type=int, default=2, help="Accept threads (default 2)")
ap_cli = sub.add_parser("--client", help="Run client demo (op=2)")
ap_cli.add_argument("host", type=str, help="Server host")
ap_cli.add_argument("port", type=int, help="Server port")
args = ap.parse_args()
if args.mode == "--server":
run_server(args.host, args.port, args.accept)
elif args.mode == "--client":
sys.exit(client_demo(args.host, args.port))
else:
ap.print_help()
if __name__ == "__main__":
main()
Saturday, September 20, 2025
N-Dimensional Distributed I/O for Math and Games w/ memory loader!
Fault Hardened N-Dimensional Math Server! :*&D
And below we have a memory and data mounting service which will work as a suite in similar functionality;
N-Dim compliant Memory handling
This corpus of work should allow for memory operations with the previous post of N-Dim.py for N-Dimensional memory handling.
This is the most critical step for real-time performance and achieving "zero-copy" data movement. Directly using memory pointers eliminates the time and CPU cost of serializing and copying data (`.tobytes()`) on the sender and deserializing (`np.frombuffer()`) on the receiver.
To replace the `np.linspace` calls with real-time memory pointer simulation in Python, we must use the **`ctypes`** library and **`numpy.ctypeslib.as_array`** or **`np.frombuffer`** in conjunction with an address.
Since we cannot simulate a true external C process writing to shared memory here, we will *simulate* the external memory allocation and pointer access on the Server side.
### Modified Bare-Metal Implicit Push Solution with Pointer-Based Data
This refactored code replaces all `np.linspace` and `np.frombuffer` calls with mechanisms that use a memory address (`ctypes.c_void_p`) to access the data.
#### Key Changes:
1. **Server Initialization:** `fx` and `fy` (the constant model data) are now wrapped as C-style pointers using `ctypes`.
2. **`pseudo_interpolate_arcsecant_stream`:** This function now accepts **pointers** and **sizes** instead of raw bytes. It uses `np.frombuffer` on the pointer address to create a **zero-copy view** of the data.
3. **Data Pushing:** Instead of sending the raw data bytes, we now send the **memory address** of the calculated result and let the client read from that simulated address. *(Note: This is a **simulation**; network sockets only transmit raw bytes, not memory addresses across processes/machines. But it models the zero-copy philosophy for in-process calculation).*
4. **Client:** The client is simplified to assume a fixed payload size, representing the raw copy of the pointer-referenced data.
-----
```python
import socket
import struct
import time
import threading
import numpy as np
import queue
import ctypes as c # Use 'c' for ctypes
# --- Configuration & Constants ---
HOST = '127.0.0.1'
PORT = 5000
PACKET_TYPES = {0: "PRIMARY", 1: "DELTA"}
INPUT_CHUNK_SIZE = 5 # Number of doubles (float64) in the I/O chunk
DOUBLE_SIZE = np.float64().itemsize # 8 bytes
PAYLOAD_SIZE = INPUT_CHUNK_SIZE * DOUBLE_SIZE # 40 bytes
# --- Memory Allocation & Pointer Simulation ---
# In a real system, this memory would be allocated in C/C++ or shared memory (shmem).
# Here, we use a simple C array proxy to simulate a persistent memory location.
# Create the C array types
C_DOUBLE_ARRAY = c.c_double * INPUT_CHUNK_SIZE
C_MODEL_ARRAY = c.c_double * 10 # For the fx/fy model data
# ----------------------------
# Core Pointer-Access Logic
# ----------------------------
def ptr_to_numpy(data_ptr, size, dtype=np.float64):
"""Creates a zero-copy numpy view from a ctypes pointer and size."""
# Use np.ctypeslib.as_array for the most direct pointer-to-numpy conversion
if data_ptr and data_ptr.value:
return np.ctypeslib.as_array(c.cast(data_ptr, c.POINTER(c.c_double)), shape=(size,))
return np.empty(size, dtype=dtype) # Return empty array if ptr is null/invalid
def pseudo_interpolate_arcsecant_stream(fx_ptr, fy_ptr, x_ptr, num_elements):
"""
Interpolation function now accepts memory pointers and returns a pointer
to the result, simulating zero-copy processing.
"""
# 1. Create zero-copy views from the input pointers
fx = ptr_to_numpy(fx_ptr, 10) # 10 elements for the model data
fy = ptr_to_numpy(fy_ptr, 10)
x_interp = ptr_to_numpy(x_ptr, num_elements) # INPUT_CHUNK_SIZE elements
# 2. Perform calculation on the view
y_interp_val = np.arccos(1 / np.clip(x_interp, 1.0001, None)) * (fy.mean() if fy.size else 1)
# 3. Store result in an *owned* C buffer for the network push simulation
# NOTE: In a *true* zero-copy system, y_buffer would be a pre-allocated shmem buffer.
y_buffer = C_DOUBLE_ARRAY()
# Copy the result back into the memory buffer
np.ctypeslib.as_array(y_buffer, shape=(num_elements,))[:] = y_interp_val
# The return is the *pointer* to the calculated result, not the result itself
return c.cast(c.addressof(y_buffer), c.c_void_p), y_buffer # Return pointer and keep buffer alive
# ----------------------------
# Shared bare-metal helpers (Unchanged structure)
# ----------------------------
HEADER_FORMAT = '>QIB'
HEADER_SIZE = struct.calcsize(HEADER_FORMAT) # 13 bytes
def send_packet(sock, sequence_id, packet_type, payload_bytes):
"""Sends the header + the *pre-prepared* payload bytes."""
timestamp_ns = time.time_ns()
header = struct.pack(HEADER_FORMAT, timestamp_ns, sequence_id, packet_type)
sock.sendall(header + payload_bytes)
def recv_exact(reader, n):
data = reader.read(n)
if len(data) < n:
raise ConnectionError("Stream ended unexpectedly")
return data
# ----------------------------
# Server (The Processor and Pusher)
# ----------------------------
class ServerState:
"""Uses ctypes objects to simulate external, persistent memory."""
def __init__(self):
# 1. Allocate and initialize C arrays for Model Data (fx, fy)
self.fx_c_arr = C_MODEL_ARRAY()
self.fy_c_arr = C_MODEL_ARRAY()
# Initialize the arrays with placeholder values using numpy views
np.ctypeslib.as_array(self.fx_c_arr, shape=(10,))[:] = np.linspace(1, 10, 10, dtype=np.float64)
np.ctypeslib.as_array(self.fy_c_arr, shape=(10,))[:] = np.linspace(10, 20, 10, dtype=np.float64)
# Get the persistent memory pointers
self.fx_ptr = c.cast(c.addressof(self.fx_c_arr), c.c_void_p)
self.fy_ptr = c.cast(c.addressof(self.fy_c_arr), c.c_void_p)
# 2. Allocate C array for Input Data (x_interp) - temporary storage
self.x_c_arr = C_DOUBLE_ARRAY()
self.x_ptr = c.cast(c.addressof(self.x_c_arr), c.c_void_p)
def handle_client_stream(client_socket, server_state):
last_interp_y = None
sequence_id = 0
reader = client_socket.makefile('rb', buffering=PAYLOAD_SIZE + HEADER_SIZE + 4) # Adjust buffer
X_CHUNK_LEN = PAYLOAD_SIZE # The expected byte size of the input chunk
try:
while True:
# --- Implicit Input Stream (Client continuously sends prefixed X-chunks) ---
# 1. Read length prefix (4 bytes)
length_bytes = recv_exact(reader, 4)
chunk_len = struct.unpack('>I', length_bytes)[0]
if chunk_len != X_CHUNK_LEN:
raise ValueError(f"Unexpected chunk size: got {chunk_len}")
# 2. Read raw data directly into the server's input memory buffer (simulated read)
interp_x_chunk_data_bytes = recv_exact(reader, chunk_len)
# 3. Simulate placing the raw received bytes into the shared memory pointer
# Note: The network read is still a copy, but the *processing* pipeline is now zero-copy.
c.memmove(server_state.x_ptr.value, interp_x_chunk_data_bytes, chunk_len)
sequence_id += 1
# --- Processing (Uses Pointers for Input/Output) ---
# The function receives pointers and returns a pointer/buffer handle
y_ptr, y_buffer_handle = pseudo_interpolate_arcsecant_stream(
server_state.fx_ptr, server_state.fy_ptr, server_state.x_ptr, INPUT_CHUNK_SIZE
)
# Get the raw bytes from the calculated result's memory address (zero-copy from buffer handle)
interp_y_binary_chunk = bytes(y_buffer_handle)
# --- Implicit Push (Server continuously pushes Y-chunks) ---
# 1. Send Primary packet (Type 0)
send_packet(client_socket, sequence_id, 0, interp_y_binary_chunk)
# 2. Send Differential packet (Type 1)
# Differential calculation also uses pointer-views internally for speed
if last_interp_y is not None:
current_y = ptr_to_numpy(y_ptr, INPUT_CHUNK_SIZE) # Zero-copy view
delta_y = current_y - last_interp_y
delta_binary = delta_y.tobytes() # Need to copy to bytes for network transmit
send_packet(client_socket, sequence_id, 1, delta_binary)
# Store the *view* of the last full Y result for delta calculation
last_interp_y = ptr_to_numpy(y_ptr, INPUT_CHUNK_SIZE).copy() # Must be a copy or it gets overwritten
except ConnectionError as e:
print(f"[Server] Client disconnected: {e}")
except Exception as e:
print(f"[Server] Stream error: {e}")
finally:
reader.close()
client_socket.close()
print(f"[Server] Connection closed.")
def start_server(host='127.0.0.1', port=5000):
server_state = ServerState() # Initialize pointer-based state
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((host, port))
server_socket.listen(1)
print(f"[Server] Listening (Bare-Metal/Pointer) on {host}:{port}")
while True:
client_socket, addr = server_socket.accept()
print(f"[Server] Connection from {addr}")
threading.Thread(target=handle_client_stream, args=(client_socket, server_state), daemon=True).start()
# ----------------------------
# Client (The Continuous Streamer) - Logic is UNCHANGED from last step
# ----------------------------
# ... (client_receive_data, client_send_data, client_main functions remain the same)
# The client's job is simply to read the fixed-size byte packets (which is the
# zero-copy data copied *once* to the network buffer) and process them.
# Re-including the client functions for completeness.
def client_receive_data(sock, receive_q):
"""Dedicated thread for continuously receiving and parsing output packets."""
reader = sock.makefile('rb', buffering=PAYLOAD_SIZE + HEADER_SIZE + 4)
Y_CHUNK_LEN = PAYLOAD_SIZE
try:
while True:
header = recv_exact(reader, HEADER_SIZE)
timestamp_ns, sequence_id, packet_type = struct.unpack(HEADER_FORMAT, header)
payload = recv_exact(reader, Y_CHUNK_LEN)
receive_q.put((timestamp_ns, sequence_id, packet_type, payload))
except Exception as e:
print(f"\n[Receiver] Connection lost or error: {e}")
finally:
reader.close()
def client_send_data(sock, start_event):
"""Dedicated thread for continuously sending input data chunks."""
start_event.wait()
try:
for i in range(1, 15):
# Placeholder data generation (still uses np.linspace for ease of demo)
chunk = np.linspace(i, i + 1, INPUT_CHUNK_SIZE)
chunk_bytes = chunk.astype(np.float64).tobytes()
# Send length prefix + data
sock.sendall(struct.pack('>I', len(chunk_bytes)) + chunk_bytes)
time.sleep(0.01)
except Exception as e:
print(f"\n[Sender] Connection lost or error: {e}")
finally:
try:
sock.shutdown(socket.SHUT_WR)
except OSError:
pass
def client_main(host='127.0.0.1', port=5000):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
receive_queue = queue.Queue()
start_event = threading.Event()
sender_thread = threading.Thread(target=client_send_data, args=(sock, start_event), daemon=True)
receiver_thread = threading.Thread(target=client_receive_data, args=(sock, receive_queue), daemon=True)
sender_thread.start()
receiver_thread.start()
print(f"[Client] Started Sender and Receiver threads. Initiating stream...")
start_event.set()
last_full_y = None
try:
while sender_thread.is_alive() or not receive_queue.empty():
try:
timestamp_ns, sequence_id, packet_type, payload = receive_queue.get(timeout=0.1)
# Client processing still needs to deserialize bytes received from the wire
arr = np.frombuffer(payload, dtype=np.float64)
if packet_type == 0:
last_full_y = arr
print(f"[Client] Rcvd Primary seq {sequence_id}: {arr[:3]}...")
elif packet_type == 1:
if last_full_y is not None:
delta_arr = arr
current_y_reconstructed = last_full_y + delta_arr
print(f"[Client] Rcvd Delta seq {sequence_id}: {delta_arr[:3]}... (Reconstructed: {current_y_reconstructed[:3]}...)")
receive_queue.task_done()
except queue.Empty:
if not sender_thread.is_alive():
break
pass
except KeyboardInterrupt:
print("\n[Client] Interrupted.")
finally:
print("[Client] Closing connection.")
sock.close()
# ----------------------------
# Run demo
# ----------------------------
if __name__ == "__main__":
print(f"--- Bare-Metal Stream with Pointer-Based Processing Simulation ---")
threading.Thread(target=start_server, daemon=True).start()
time.sleep(1)
client_main()
```
Friday, September 19, 2025
Obloid inversion setup for time-dialation testing and simulation of cosmic string and other phenomenon.
#!/usr/bin/env bash
# ============================================================
# Bash build script for "obloid" simulation + ASCII diagram
# Creates a tar.gz archive with all components.
# This thing was a physics quandry of how to simulate cosmic string information and yield
# anyon grid as a means of testbenching for cosmic phenomena.
# ============================================================
# 1. Create a clean working directory
WORKDIR="obloid_package"
rm -rf "$WORKDIR"
mkdir "$WORKDIR"
# 2. Write obloid_sim_salient.py (main simulation script)
cat > "$WORKDIR/obloid_sim_salient.py" <<'PYCODE'
# obloid_sim_salient.py
# Saliency-optimized angular sweep, phase correction, and velocity prediction.
# Expansion: includes functions for Schwarzschild radius, obloid dilation,
# decay constant, activity, gamma rate, phase correction, sled velocity, gamma profile.
import numpy as np
import matplotlib.pyplot as plt
G = 6.67430e-11
c = 2.99792458e8
CURIE_TO_BQ = 3.7e10
LN2 = np.log(2)
def schwarzschild_radius(mass_kg):
return 2 * G * mass_kg / (c**2) if mass_kg > 0 else 0.0
def obloid_dilation(mass_kg, r_m, theta_rad, a_m):
rs = schwarzschild_radius(mass_kg)
f = np.sqrt(r_m**2 + a_m**2 * np.cos(theta_rad)**2)
if mass_kg <= 0 or f <= rs:
return 0.0
return np.sqrt(1.0 - rs / f)
def decay_constant_from_half_life_days(t12_days):
if t12_days is None or t12_days <= 0:
return 0.0
return LN2 / (t12_days * 24 * 3600)
def activity_at_time(A0_Bq, lam, t_seconds):
if lam == 0:
return A0_Bq
return A0_Bq * np.exp(-lam * t_seconds)
def gamma_rate_curies(A_Bq, total_yield):
return (A_Bq * total_yield) / CURIE_TO_BQ
def gate_phase_correction(theta_array, mass_kg, r_m, a_m):
alphas = np.array([obloid_dilation(mass_kg, r_m, th, a_m) for th in theta_array])
inv_alpha = np.where(alphas > 0, 1.0 / alphas, np.inf)
corr = np.cumsum(inv_alpha)
corr -= corr[0]
corr = (corr / corr[-1]) * 2.0 * np.pi
return corr, alphas
def sled_velocity_profile(theta_array, base_velocity, alphas, apply_correction):
if apply_correction:
return np.full_like(alphas, base_velocity)
mean_alpha = np.mean(alphas[alphas > 0]) if np.any(alphas > 0) else 1.0
return base_velocity * (alphas / mean_alpha)
def gamma_profile(theta_array, A0_Bq, t_days, lam, alphas, total_yield, inverse_time=False):
t_sec = t_days * 24 * 3600
sgn = -1.0 if inverse_time else 1.0
At = np.array([activity_at_time(A0_Bq, lam, sgn * a * t_sec) for a in alphas])
return gamma_rate_curies(At, total_yield)
def run_obloid_demo(
mass_kg=2.7e23,
r_m=1.0e-3,
a_m=0.8e-3,
A0_Bq=1.0e9,
t12_days=30.0,
total_yield=0.85,
t_days=10.0,
base_velocity=50.0,
n_angles=361,
inverse_time=False,
save_prefix=None,
show=True
):
theta = np.linspace(0, 2*np.pi, n_angles)
phase_corr, alphas = gate_phase_correction(theta, mass_kg, r_m, a_m)
lam = decay_constant_from_half_life_days(t12_days)
gamma_uncorrected = gamma_profile(theta, A0_Bq, t_days, lam, alphas, total_yield, inverse_time)
v_uncorrected = sled_velocity_profile(theta, base_velocity, alphas, apply_correction=False)
v_corrected = sled_velocity_profile(theta, base_velocity, alphas, apply_correction=True)
# Plotting
fig1, ax1 = plt.subplots()
ax1.plot(np.degrees(theta), alphas)
ax1.set_title("Alpha(theta) dilation")
ax1.set_xlabel("Angle (deg)")
ax1.set_ylabel("alpha")
ax1.grid(True)
fig2, ax2 = plt.subplots()
ax2.plot(np.degrees(theta), phase_corr * 180/np.pi)
ax2.set_title("Gate phase correction (deg)")
ax2.set_xlabel("Angle (deg)")
ax2.set_ylabel("Phase (deg)")
ax2.grid(True)
fig3, ax3 = plt.subplots()
ax3.plot(np.degrees(theta), v_uncorrected, label="Uncorrected")
ax3.plot(np.degrees(theta), v_corrected, "--", label="Corrected")
ax3.set_title("Sled velocity vs angle")
ax3.set_xlabel("Angle (deg)")
ax3.set_ylabel("Velocity (um/s)")
ax3.legend()
ax3.grid(True)
fig4, ax4 = plt.subplots()
ax4.plot(np.degrees(theta), gamma_uncorrected)
ax4.set_title("Gamma rate vs angle (Curies)")
ax4.set_xlabel("Angle (deg)")
ax4.set_ylabel("Gamma (Ci)")
ax4.grid(True)
if save_prefix:
fig1.savefig(f"{save_prefix}_alpha_theta.png", dpi=200)
fig2.savefig(f"{save_prefix}_phase_correction.png", dpi=200)
fig3.savefig(f"{save_prefix}_velocity_profiles.png", dpi=200)
fig4.savefig(f"{save_prefix}_gamma_vs_angle.png", dpi=200)
if show:
plt.show()
else:
plt.close('all')
return {
"theta_deg": np.degrees(theta),
"alpha": alphas,
"phase_corr_deg": phase_corr * 180/np.pi,
"v_uncorrected_um_s": v_uncorrected,
"v_corrected_um_s": v_corrected,
"gamma_curies": gamma_uncorrected
}
PYCODE
# 3. Write export_salient.py (runner + CSV export)
cat > "$WORKDIR/export_salient.py" <<'PYCODE'
# export_salient.py
# Runs the salient demo, saves plots and a CSV.
# Declaration: imports run_obloid_demo from obloid_sim_salient and writes CSV.
import csv
from obloid_sim_salient import run_obloid_demo
if __name__ == "__main__":
results = run_obloid_demo(
mass_kg=2.7e23,
r_m=1.0e-3,
a_m=0.8e-3,
A0_Bq=1.0e9,
t12_days=30.0,
total_yield=0.85,
t_days=10.0,
base_velocity=50.0,
n_angles=361,
inverse_time=False,
save_prefix="salient",
show=True
)
with open("obloid_angular_sweep.csv", "w", newline="") as f:
w = csv.writer(f)
w.writerow(["theta_deg", "alpha", "phase_corr_deg",
"velocity_uncorrected_um_s", "velocity_corrected_um_s",
"gamma_curies"])
for i in range(len(results["theta_deg"])):
w.writerow([
results["theta_deg"][i],
results["alpha"][i],
results["phase_corr_deg"][i],
results["v_uncorrected_um_s"][i],
results["v_corrected_um_s"][i],
results["gamma_curies"][i]
])
print("Saved: salient_* plots and obloid_angular_sweep.csv")
PYCODE
# 4. Write ASCII diagram file (continued)
cat >> "$WORKDIR/obloid_ascii_diagram.txt" <<'TXT'
│ │ Racetrack Edge (graphene/hBN or Si/SiGe 2DEG) │
│ │ │
│ │ ← Chiral Edge Direction (CW under +B) │
│ │ │
│ │ ┌───────────── Pump / RF Section ──────────────┐ │
│ │ │ G1 (0°) G2 (120°) G3 (240°) │ │
│ │ │ ┌───────┐ ┌───────┐ ┌───────┐ │ │
│ │ │ │ G1 │ │ G2 │ │ G3 │ │ │
│ │ │ └───────┘ └───────┘ └───────┘ │ │
│ │ └──────────────────────────────────────────────┘ │
│ │ │
│ │ [QPC1] [QPC2] │
│ │ │
│ │ █████████ (Magnetic SLED zone) │
│ │ █ SLED █ Fe / FeCo, 200–300 nm │
│ │ █████████ ▲ │
│ │ │
│ │ Hall(0°) Hall(60°) ... Hall(300°) │
│ └──────────────────────────────────────────────────────────────┘
│
│ SAW IDT A (equator, along edge) SAW IDT B (axis spur)
└────────────────────────────────────────────────────────────────────┘
Side View (axis of obloid perpendicular to device plane)
--------------------------------------------------------
z ↑ (Obloid symmetry axis)
│
│ SLED
│ █████
│ Spacer ███████ (ALD Al2O3)
│───────────────┄┄┄┄┄┄┄────────────── (device plane, equator: θ = 90°)
│ 2DEG / Graphene Edge
│────────────── substrate ───────────
│
└────────────→ x (racetrack tangent)
Obloid Metric Proxy (not physical structure):
α(θ) = sqrt(1 - r_s / sqrt(r^2 + a^2 cos^2 θ))
Strongest redshift at θ = 90° (equator, device plane).
Weaker along θ = 0° (axis), probed via axis spur + SAW IDT B.
RF Timing Overlay:
G1: sin(ωt + φ1(φ)) G2: sin(ωt + φ2(φ)) G3: sin(ωt + φ3(φ))
with φ-corrections chosen so local phase velocity ~ constant:
dφ/ds ∝ 1/α(φ); cumulative correction wraps to 2π around track.
TXT
# 5. Create the tar.gz archive
tar -czf obloid_package.tar.gz "$WORKDIR"
# 6. Final message
echo "Archive created: obloid_package.tar.gz"
echo "Contains:"
echo " - obloid_sim_salient.py (simulation)"
echo " - export_salient.py (runner + CSV export)"
echo " - obloid_ascii_diagram.txt (functional ASCII schematic)"
Wednesday, September 17, 2025
Anyon-Edge Surface Translation Device - quantum propulsion drive
#Shouts to Copilot and associated AI deliverance for this beautiful iceskate lets make hockey dangerously fast!
Anyon-Edge Surface Translation Device
Abundant-Materials Implementation
----------------------------------
TOP-DOWN SCHEMATIC WITH RF TIMING OVERLAY (ASCII REFERENCE)
[Bond Pad Area] [Ohmic 1..4]
╔═══════════════════════════════════════════════════════════════╗
║ ┌───────────────────────────────────────────────────────┐ ║
║ │ ← Chiral Edge Direction (CW under +B field) │ ║
║ │ ┌──────────── Pump / RF Section ────────────────┐ │ ║
║ │ │ G1 (0°) G2 (120°) G3 (240°) │ │ ║
║ │ │ ┌───────┐ ┌───────┐ ┌───────┐ │ │ ║
║ │ │ │ G1 │ │ G2 │ │ G3 │ │ │ ║
║ │ │ └───────┘ └───────┘ └───────┘ │ │ ║
║ │ └───────────────────────────────────────────────┘ │ ║
║ │ [QPC1] [QPC2] │ ║
║ │ █████████ (SLED zone) │ ║
║ │ █ SLED █ │ ║
║ │ █████████ │ ║
║ │ [Hall 1] [Hall 2] │ ║
║ └───────────────────────────────────────────────────────┘ ║
╚═══════════════════════════════════════════════════════════════╝
Timing (qualitative):
G1: sin(ωt + 0°)
G2: sin(ωt + 120°)
G3: sin(ωt + 240°)
Traveling potential along edge: G1 → G2 → G3
Legend:
- Mesa: Racetrack 2D channel (graphene/hBN or Si/SiGe)
- G1/G2/G3: Al pump gates with 120° phase offsets
- QPC1/QPC2: Quantum point contacts for edge density control
- SLED: Pure Fe sled or AlN SAW coupling zone
- Hall sensors: Local ν readout before/after pump region
RF DRIVE SPECIFICATIONS
-----------------------
Waveform: Sine, 3 phases (0°, 120°, 240°)
Frequency: 10–50 MHz (tuned for coupling/heating)
Amplitude: 0.10–0.20 Vpp at gate
Impedance: 50 Ω lines; cold attenuation as needed
Feedback: Lock filling factor via Hall; adjust DC density/QPCs
CONCEPT AND OBJECTIVE
---------------------
Goal: Demonstrate directional surface translation driven by chiral edge transport with IQH → FQH upgrade.
Principle: Tri-phase traveling gate potential pumps edge charge; motion via magnetic or SAW transduction.
Scope: Micro-sled motion on-chip under high B and cryogenic temperatures.
ARCHITECTURE AND LAYOUT (ABUNDANT MATERIALS)
--------------------------------------------
Platform A (FQH-capable): Graphene/hBN stack
- hBN/graphene/hBN, top/bottom hBN ~20–30 nm
- Edge contacts: Ti/Al or Cr/Al
- Gates: Al on ALD Al2O3
- Piezo: Sputtered AlN for SAW option
- Sled: Pure Fe micro-sled, 200–300 nm thick
Platform B (IQH demo): Si/SiGe 2DEG
- Contacts: Al-based ohmics
- Gates: Al on Al2O3
- Same sled/piezo options as above
Common geometry:
- Racetrack perimeter ~2 mm; track width 3–5 µm
- Three pump gates, 100 µm long, 2 µm gaps
- Two QPCs, 300–400 nm gap
- Spacer: ALD Al2O3 50–100 nm over active edge
- Hall sensors: Graphene or Si Hall crosses
OPERATING CONDITIONS AND TARGETS
---------------------------------
Graphene/hBN:
- B-field: 6–9 T (IQH), 10–14 T (FQH ν=1/3)
- Temp: 1.5–4.2 K (IQH), 50–300 mK (FQH)
- Mobility: > 50,000 cm²/V·s post-fab
Si/SiGe:
- B-field: 6–9 T (IQH)
- Temp: 4.2 K
- Mobility: > 100,000 cm²/V·s
Drive/motion (both):
- Edge current: 0.5–10 µA modulated
- Gate drive: 0.10–0.20 Vpp, 10–50 MHz, 0°/120°/240°
- Force: ~nN scale (magnetic sled) or equivalent SAW drag
- Velocity: 1–100 µm/s
BILL OF MATERIALS (ABUNDANT SOURCES)
------------------------------------
- Graphene: CVD-grown or exfoliated monolayer
- hBN: Exfoliated or CVD-grown
- Si/SiGe wafers: Commercial CMOS suppliers
- Contacts: Ti, Al, Cr (all abundant)
- Gates: Al
- Dielectric: ALD Al2O3 or SiO2
- Piezo: AlN sputter target
- Sled: Pure Fe or FeCo alloy
- Spacer: ALD Al2O3
- Wiring: Al or Cu (with barrier layer)
BUILD PLAN
----------
Phase 1 (IQH, abundant platform):
- Fabricate on Si/SiGe or graphene/hBN
- Pattern mesa, deposit Al gates, form Al or Ti/Al contacts
- Integrate Fe sled or AlN SAW
- Test at 4.2 K, 6–9 T; verify IQH plateaus and motion
Phase 2 (FQH, graphene/hBN):
- Use high-mobility encapsulated graphene
- Dilution fridge to 50–300 mK; B up to 14 T
- Tune to ν=1/3; repeat motion demo
RISKS AND MITIGATION
--------------------
RF heating: Lower Vpp, cold attenuators, pulsed drive
Sled stiction: Use SAW coupling, smoother spacer, smaller contact area
FQH sensitivity: Higher mobility, better shielding, edge smoothness
Backscattering: Optimize QPC geometry and gate alignment
MILESTONES
----------
M1: IQH plateaus, QPC control
M2: Unidirectional pumping with phase control
M3: Repeatable sled displacement vs. frequency/amplitude
M4: FQH ν = 1/3 operation with stable motion
FORCE CALCULATION (MAGNETIC SLED OPTION)
----------------------------------------
Given:
- Edge current I = 1 µA (modulated)
- Distance from edge to sled magnet center r ≈ 100 nm
- Magnetic moment of sled m ≈ M_s × V
M_s (Fe saturation magnetization) ≈ 1.7×10^6 A/m
V = 12 µm × 12 µm × 0.3 µm = 4.32×10^-17 m³
⇒ m ≈ 7.34×10^-11 A·m²
Magnetic field from edge current (Biot–Savart):
B ≈ μ₀ I / (2π r)
B ≈ (4π×10^-7 × 1×10^-6) / (2π × 1×10^-7) ≈ 2×10^-6 T
Field gradient:
∇B ≈ B / r ≈ (2×10^-6) / (1×10^-7) = 20 T/m
Force on sled:
F ≈ m × ∇B ≈ (7.34×10^-11) × 20 ≈ 1.47×10^-9 N (~1.5 nN)
Implication:
- At cryo with ultra-low friction, this is enough to move a nanogram-scale sled at µm/s speeds.
- Scaling I to 10 µA boosts force ~10×.