Sunday, October 5, 2025
OSCSL v1-4
arm æþ - Crashproofing Neuromorphic/Cordian Suite + Architecture + Debugger + Unified Webserver + Compositor
# Neuromorphic Suite + Architecture + Debugger + Unified Webserver
Epilogue:
From Errors to Insights: Building a Crash-Proof System-on-Chip (SoC)
In the world of high-performance hardware, failure is not an option. A system crash caused by a buffer overflow or a single malformed data packet can be catastrophic. But what if we could design a System-on-Chip (SoC) that doesn't just survive these events, but treats them as valuable data?
This post outlines a multi-layered architectural strategy for a high-throughput SoC that is resilient by design. We'll explore how to move beyond simple error flags to create a system that proactively prevents crashes, isolates faults, and provides deep diagnostic insights, turning potential failures into opportunities for analysis and optimization.
The Backbone: A Scalable Network-on-Chip (NoC)
For any complex SoC with multiple processing elements and shared memory, a traditional shared bus is a recipe for a bottleneck. Our architecture is built on a packet-switched Network-on-Chip (NoC). Think of it as a dedicated multi-lane highway system for data packets on the chip. This allows many parallel data streams to flow simultaneously between different hardware blocks, providing the scalability and high aggregate bandwidth essential for a demanding compositor system.
Layer 1: Proactive Flow Control with Smart Buffering
Data doesn't always flow smoothly. It arrives in bursts and must cross between parts of the chip running at different speeds (known as Clock Domain Crossings, or CDCs). This is a classic recipe for data overruns and loss.
Our first line of defense is a network of intelligent, dual-clock FIFO (First-In, First-Out) buffers. But simply adding buffers isn't enough. The key to resilience is proactive backpressure.
Instead of waiting for a buffer to be completely full, our FIFOs generate an almost_full warning signal. This signal propagates backward through the NoC, automatically telling the original data source to pause. This end-to-end, hardware-enforced flow control prevents overflows before they can even happen, allowing the system to gracefully handle intense data bursts without dropping a single packet.
Layer 2: A Hardware Firewall for Malformed Data
A common cause of system crashes is malformed or malicious data. Our architecture incorporates a dedicated Ingress Packet Validator—a hardware firewall that sits at the edge of the chip. Before any packet is allowed onto the NoC, this module performs a series of rigorous checks in a single clock cycle:
* Opcode Validation: Is this a known, valid command?
* Length Checking: Does the packet have the expected size for its command type?
* Integrity Checking: Does the packet’s payload pass a Cyclic Redundancy Check (CRC)?
If a packet fails any of these checks, it is quarantined, not processed. The invalid data is never allowed to reach the core processing logic, preventing it from corrupting system state or causing a crash. This transforms a potentially system-wide failure into a silent, contained event.
Layer 3: Fault Containment with Resource Partitioning
To handle multiple tasks with different priorities, we draw inspiration from modern GPU virtualization technology (like NVIDIA's Multi-Instance GPU). A Hardware Resource Manager (HRM) allows the SoC's processing elements to be partitioned into isolated, independent groups.
This provides two major benefits:
* Guaranteed Quality of Service (QoS): A high-priority, real-time task can be guaranteed its slice of processing power and memory bandwidth, unaffected by other tasks running on the chip.
* Fault Containment: A software bug or data-dependent error that causes a deadlock within one partition cannot monopolize shared resources or crash the entire system. The fault is completely contained within its hardware partition, allowing the rest of the SoC to operate normally.
Turning Errors into Insights: The 'Sump' Fault Logger
The most innovative component of our architecture is a dedicated on-chip fault logging unit we call the 'Sump'. When the firewall quarantines a bad packet or a buffer reports a critical event, it doesn't just disappear. The detecting module sends a detailed fault report to the Sump.
The Sump acts as the SoC's "black box recorder," storing a history of the most recent hardware exceptions in a non-volatile ring buffer. Each log entry is a rich, structured record containing:
* A high-resolution Timestamp
* The specific Fault Code (e.g., INVALID_OPCODE, FIFO_OVERFLOW)
* The unique ID of the Source Module that reported the error
* A snapshot of the offending Packet Header
To retrieve this data safely, we designed a custom extension to the standard JTAG debug interface. An external debugger can connect and drain the fault logs from the Sump via this out-of-band channel without pausing or interfering with the SoC's primary operations.
A System That Heals and Informs
By integrating these layers, we create a complete chain of resilience. A corrupted packet arrives, the firewall quarantines it, and the Sump logs a detailed report with microsecond precision—all while the system continues to process valid data without interruption. An engineer can later connect via JTAG to perform post-mortem analysis, using the timestamped logs to instantly pinpoint the root cause of the issue.
This philosophy transforms hardware design. By treating errors as data, we can build systems that are not only robust and crash-proof but also provide the deep visibility needed for rapid debugging, performance tuning, and creating truly intelligent, self-aware hardware.
Technical detail:
The refactored neuromorphic suite introduces several architectural changes designed to improve computation efficiency and control flexibility, particularly within embedded ARM/GPU hybrid environments.
Computational Improvements
The refactoring improves computation this year primarily through hardware optimization, dynamic resource management, and introduction of a specialized control execution system:
1. Hardware-Optimized Control Paths (ARM)
The system enhances performance by optimizing frequent control operations via MMIO (Memory-Mapped I/O) access using ARM short-case efficiency for hot paths.
- This is achieved by using inline AArch64 instructions (
ldr
/str
) and the__attribute__((always_inline))
attribute for fast MMIO read/write operations when running on AArch64 hardware. - When the
ENABLE_MAPPED_GPU_REGS
define is used, the runtime server performs control writes backed by MMIO, leveraging these inline assembly optimizations.
2. Dynamic Resource Management and GPU Acceleration
Computation is dynamically improved through throttling and autoscaling mechanisms integrated into the gpu_runtime_server
.
- GPU Throttling and Autoscaling: The
GlobalGpuThrottler
uses a token bucket model to manage maximum bytes per second transferred. The ThrottleAutoScaler observes actual transfer rates against the configured rate and dynamically adjusts the throttle rate to maintain atarget_util_
(defaulting to 70%). - Lane Utilization Feedback: The system incorporates neuromorphic lane utilization tracking from the hardware/VHDL map. The VHDL map includes logic for 8 ONoC (Optical Network on Chip) lanes with utilization counters. These utilization percentages are read from MMIO (e.g.,
NEURO_MMIO_ADDR
orLANE_UTIL_ADDR
) and posted to the runtime server. This allows theThrottleAutoScaler
to adjust thelane_fraction
, enabling computation to adapt based on current ONoC traffic. - GPU Acceleration with Fallback: The runtime server attempts to use GPU Tensor Core Transform via cuBLAS for accelerated vector processing. If CUDA/cuBLAS support is not available, it uses a CPU fallback mechanism.
gpu_runtime_server
to ensure the neuromorphic system remains functional even when hardware acceleration via CUDA/cuBLAS is unavailable.Here is a detailed breakdown of the mechanism:
1. Detection of GPU/CUDA Support
The decision to use the GPU or fall back to the CPU is made by checking for the presence and readiness of the CUDA/cuBLAS environment during server initialization and before processing a transformation request.
- CUDA Runtime Check: The function
has_cuda_support_runtime()
is used to determine if the CUDA runtime is available and if there is at least one detected device (devcount > 0
). - cuBLAS Initialization Check: The function
initialize_cublas()
attempts to create a cuBLAS handle (g_cublas_handle
). If the status returned bycublasCreate
is notCUBLAS_STATUS_SUCCESS
, cuBLAS is marked as unavailable (g_cublas_ready = false
). - Server Startup Logging: When the server starts, it logs the outcome of these checks:
- If
initialize_cublas()
andhas_cuda_support_runtime()
are successful, it logs:[server] cuBLAS/CUDA available
. - Otherwise, it logs:
[server] cuBLAS/CUDA NOT available; CPU fallback enabled
.
- If
2. Implementation of the Fallback in /transform
Endpoint
The actual selection between GPU processing and CPU processing occurs when the server receives a request on the /transform
endpoint.
-
The endpoint handler checks the global
cublas_ok
flag (which reflects the successful initialization of cuBLAS/CUDA). -
The output vector (
out
) is determined using a conditional call:std::vector<float> out = (cublas_ok ? gpu_tensor_core_transform(input) : cpu_tensor_transform(input));
If
cublas_ok
is true, the GPU transformation is attempted; otherwise, the CPU fallback is executed.
3. CPU Fallback Functionality
The dedicated CPU fallback function is simple, defining a direct identity transformation:
-
The function
cpu_tensor_transform
takes the input vector (in
) and returns it directly.std::vector<float> cpu_tensor_transform(const std::vector<float> &in) { return in; }
4. GPU Path Internal Fallback
Even when the GPU path (gpu_tensor_core_transform
) is selected, it contains an internal early exit fallback for immediate failure conditions:
- The
gpu_tensor_core_transform
function first checks ifinitialize_cublas()
andhas_cuda_support_runtime()
succeed again. - If either check fails (meaning the GPU environment became unavailable after startup or the initial check failed), the function executes a loop that copies the
input
vector to theoutput
vector and returns, performing a CPU copy operation instead of the GPU work.
Summary of CPU Fallback Execution
The CPU fallback condition is triggered in two main scenarios:
- System-Wide Lack of Support: If CUDA/cuBLAS is not initialized successfully at startup, the
/transform
endpoint executescpu_tensor_transform(input)
, which returns the input unchanged. - Internal GPU Failure: If the
gpu_tensor_core_transform
function is called but finds that CUDA initialization or runtime support is missing, it skips all CUDA memory allocation and cuBLAS operations, and instead copies the input vector to the output vector on the CPU. ]
3. Compact Control Execution via Short-Code VM
The introduction of a Short-Code Virtual Machine (VM) represents a refactoring for flexible and compact control execution.
- This stack-based VM is implemented in both the C++ runtime server and the C bootloader.
- The runtime server exposes a new
/execute
endpoint that accepts binary bytecode payloads for execution, allowing for compact control commands like dynamically setting the lane fraction (SYS_SET_LANES
). - The bootloader also gains an
execute <hex_string>
command, enabling low-level, intrant control bytecode execution on the bare-metal target for operations like MMIO writes or system resets. This potentially improves control latency and footprint by minimizing the communication necessary for complex control sequences.
ARM æþ v1 -Baremetal/Standalone OEM ready
ARM - Neuromorphic v2-Compositor / Boot Menu Added
ARM æþ Neuromorphic Compositor -Compositor Standalone
Neuromorphic chipset CORDIAN VHDL - Try an iCE40 or iCE65 FPGA for emulation :-0 just supply your own controller and software
Adi-Protocol-AArch64 [for GPU overhauled N-Dim Optimization] v1.0
ARM æþ Optimized GPU Local TCP suite non-HTTP
• Unconstrained (no guardrail):
R_max equals the node’s peak fabric rate. For a 16‑tile node at 1024‑bit and 2.5 GHz per tile:
• T_node,peak = 16 × 320 GB/s = 5.12 TB/s
• Therefore, bucket rate at maximum operation: 5.12 TB/s
• Within QoS guardrails (aggressive 10% cap):
• R_max = 0.10 × 5.12 TB/s = 512 GB/s
• If you adopt the optical overprovision example (peak ≈ 6.4 TB/s):
• Unconstrained: 6.4 TB/s
• 10% guardrail: 640 GB/s
Tip: Use R_max = η × T_node,peak, with η chosen to protect on‑chip QoS (commonly 2–10%).
Thursday, October 2, 2025
AI game engine prototype - Final! w/ Therapeutic training
AI Game Engine v1 - Rakshas Intl. Unltd. OSCSLv4 - Google Gemini ISC
Files:
Math Server see: Original hardened server
Create_suite.sh - Standalone dev
Gemini & Veo 3 implementation - Google AI dev
Collaborative Suite - Save and share
With some fine tuning, firebase and tone.js we arrive at the finalé;
Final Example#1 - Now .tar extractable run show!
Google Gemini FPS-metaverse! C/O Rakshas Intl. Unltd.
We at Rakshas International Unlimited are perturbed by war and as being responsible support this report and game mode POC to limit habituation to violent games as we want familial supremacy not junkie drunk dunking on cuckloaded tall poppy syndrome luckpots.
Metaversal Therapeutics Report
Here's a POC as a responsive effort to improving your competitive gaming needs!
Wednesday, October 1, 2025
Meta humans iŋ ARM æþ
Talk about super ionic humans
Here's a writeup on how ARM æþ works well in the process manufacturing of this research.
Neuromorphic computing and metamaterial stabilized TNA.
-A. Muralidhar Oct 7 2025:16:53 EDT
Future research opportunities:
Based on the architectural synthesis and the theoretical framework established, the research portends a range of advanced simulations that extend beyond the initial scope of Topological Nucleotide Assembly (TNA). The platform's design as a generic, high-performance "physicalized computation" engine allows its core components to be repurposed for simulating other complex physical and biological systems.
Here are three major avenues for further simulation that can be directly extrapolated from the current research:
1. Generalized Molecular Dynamics and Control
The TNA simulation is a specific instance of a broader class of problems: controlling molecular-level systems via a feedback loop. The architecture is well-suited to simulate other processes where a system's state must be sensed and its evolution guided by external fields.
* Simulation of Controlled Protein Folding:
* Concept: Protein folding is a complex optimization problem where a polypeptide chain seeks its lowest-energy three-dimensional structure. Misfolding is implicated in many diseases. This simulation would use the platform to guide a simulated protein into a desired stable conformation.
* Implementation:
* The HSNR Acquisition step would be repurposed as Conformational State Sensing. The ONoC would ingest data representing the protein's current fold state (e.g., from simulated atomic force microscopy or spectroscopy). [1, 2]
* The Weyl Semimetal Flux computation would model the application of precisely controlled, non-uniform electromagnetic fields. The GPU would calculate the field geometry needed to apply femtonewton-scale forces to specific amino acid residues, guiding the folding pathway and avoiding undesirable intermediate states. [3, 1]
* The Adaptive Assembly Loop would function as a real-time folding director, making iterative adjustments to the control fields based on the sensed conformational state, actively preventing the protein from getting trapped in local energy minima. [1]
* Simulation of Crystal Growth and Defect Mitigation:
* Concept: This simulation would model the epitaxial growth of complex crystals, such as the Weyl Semimetals themselves. [4, 5] The goal would be to use the control plane to actively identify and correct the formation of lattice defects in real-time.
* Implementation:
* The ONoC would simulate a high-resolution imaging sensor monitoring the crystal's growing surface.
* The ARM control plane would run algorithms to detect anomalies in the growth pattern that signal the formation of a dislocation or impurity.
* The GPU would calculate a corrective action, such as a highly localized thermal or ionic pulse, which would be actuated via the neuromorphic substrate's MMIO registers to anneal the defect before it propagates. [6]
2. Simulation of Topological Material Physics
The TNA simulation uses "Weyl Semimetal Flux" as a powerful metaphor for its computational core. The platform could be used to move beyond the metaphor and simulate the actual quantum-level physics of these exotic materials.
* Simulation of Chiral Anomaly and Anomalous Transport:
* Concept: Weyl Semimetals exhibit unique quantum phenomena, including the chiral anomaly, where applying parallel electric and magnetic fields creates an anomalous charge current. [3, 7] This simulation would model these effects, which are computationally intensive and difficult to study experimentally.
* Implementation:
* A large 3D lattice representing the crystal structure of a material like Tantalum Arsenide (TaAs) would be instantiated in GPU memory. [4]
* The gpu_tensor_core_transform kernel would be replaced with a more complex solver for the quantum field theory equations that govern electron transport in the material. [6, 8]
* The simulation would allow researchers to apply virtual electric and magnetic fields and observe the resulting charge and heat transport, including the "severe violation of the Wiedemann-Franz law" noted in the research, providing a powerful tool for fundamental physics discovery. [3]
3. Simulation of Complex, Path-Dependent Systems
The architecture's most unique features—the hardware-level Sump_Logic_Unit and the software's "branching checkpoints"—are purpose-built for exploring and debugging complex, non-deterministic processes.
* Interactive Simulation of Directed Evolution:
* Concept: This simulation would model the directed evolution of a biomolecule (like an enzyme or RNA catalyst) through rounds of mutation and selection. Because mutation is a stochastic process, many evolutionary paths are possible.
* Implementation:
* The simulation would start with a parent molecule. At each generation, the control software would simulate the introduction of random mutations.
* The branching checkpoint feature would be used to save the complete state of the system before each stochastic mutation event. [6]
* A researcher could allow the simulation to proceed down one evolutionary path. If it leads to a non-viable molecule, instead of restarting, they could instantly checkout a previous branch and explore an alternative mutation, effectively navigating the "multiverse" of possible evolutionary outcomes. [6] This transforms the platform from a simple simulator into an interactive laboratory for exploring complex, branching-path phenomena.
* Hardware-in-the-Loop Anomaly Detection:
* Concept: This simulation would test the system's ability to use its hardware triggers for ultra-fast fault detection. It would model a physical process prone to rapid, unpredictable failure modes (e.g., thermal runaway in a battery or plasma instability in a fusion reactor).
* Implementation:
* The simulation running on the GPU would model the physics of the process.
* The ARM control software would monitor the simulation's state. Its goal would be to learn the patterns on the system bus that precede a failure.
* The software would then program the Sump_Logic_Unit by writing to the radian_tune_register, configuring it to act as a hardware watchdog that can detect these specific precursor patterns and trigger an instantaneous hardware reset or safe-mode interrupt—a reaction far faster than a software-only control loop could achieve. [2] This would validate the system's use in high-stakes, real-time safety and control applications.
Sunday, September 28, 2025
Interplanetary Transport Network Cost 4 pregbonding states
Interplanetary Mission Planner (Energy vs Resource Allocation)
Monday, September 22, 2025
Compozzit my L0ve quadruple team *:%# + ARM11æþ ௐ - Baremetal skullbone
Universal Compositor Engine - C Stack Implementation
Who needs to !bang old squ[elchsw]itschez anyways?
See program #2 so my slim blim thicc priss switch witches quadruple tag my racks no? Step off with that hash :#
Sunday, September 21, 2025
Unified Metamaterial-IO server+client in one Python script.
""" Metamaterial SLS/Laser Sintering Unified Python Script - so our nuclear reactor assemblies and homebrew chemists can create safer hotboxes and technicians can endure to enjoy new material science assays. ♥ A. """
#!/usr/bin/env python3
"""
Unified Metamaterial-IO server+client in one Python script.
Implements the io_dist_full.c wire protocol (big-endian, len-prefixed)
with the same op-codes:
- 0: interpolate triples (fx, fy, fz) -> [Y][Z] over shared interpolation x
- 1: differentiate triples (fx, fy, fz) -> [dY/dx][dZ/dx] on original per-curve x
- 2: interpolate Y-only (fx, fy) -> [Y] over shared interpolation x
- 4: integrate triples (fx, fy, fz) -> cumulative trapezoidal [Y_int][Z_int] on original x
Design goals:
- Fully self-contained: one file, no external services required.
- Numpy-accelerated math paths for good performance.
- Protocol compatible with the C reference: big-endian floats, u8 op, u32 sizes, len-prefixed outputs.
- Threaded server with graceful shutdown on Ctrl+C.
Usage:
- Start server:
python metamaterial_io.py --server 0.0.0.0 5000
- Run client demo (op=2 interpolation of a single curve):
python metamaterial_io.py --client 127.0.0.1 5000
"""
import argparse
import math
import os
import socket
import struct
import sys
import threading
import time
from typing import List, Optional, Tuple
try:
import numpy as np
except ImportError:
print("This script requires numpy (pip install numpy).", file=sys.stderr)
sys.exit(1)
# ========== Wire helpers (big-endian) ==========
def be_u8_recv(sock: socket.socket) -> int:
b = sock.recv(1)
if len(b) != 1:
raise ConnectionError("read u8 failed")
return b[0]
def be_u8_send(sock: socket.socket, v: int) -> None:
sock.sendall(bytes([v & 0xFF]))
def be_u32_recv(sock: socket.socket) -> int:
b = recv_all(sock, 4)
return struct.unpack("!I", b)[0]
def be_u32_send(sock: socket.socket, v: int) -> None:
sock.sendall(struct.pack("!I", v & 0xFFFFFFFF))
def recv_all(sock: socket.socket, n: int) -> bytes:
buf = bytearray()
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
raise ConnectionError("recv timeout/closed")
buf.extend(chunk)
return bytes(buf)
def be_f32_array_recv(sock: socket.socket, count: int) -> np.ndarray:
if count == 0:
return np.empty((0,), dtype=np.float32)
raw = recv_all(sock, count * 4)
# Big-endian float32 to native
arr = np.frombuffer(raw, dtype=">f4").astype(np.float32, copy=True)
return arr
def len_prefixed_bytes_send(sock: socket.socket, payload: bytes) -> None:
be_u32_send(sock, len(payload))
if payload:
sock.sendall(payload)
def len_prefixed_error(sock: socket.socket, msg: str) -> None:
data = msg.encode("utf-8", errors="replace")
len_prefixed_bytes_send(sock, data)
def len_prefixed_f32_array_send(sock: socket.socket, arr: np.ndarray) -> None:
# Convert to big-endian f32 bytes and send with u32 length prefix
if arr is None:
be_u32_send(sock, 0)
return
a = np.asarray(arr, dtype=np.float32)
be = a.astype(">f4", copy=False).tobytes(order="C")
len_prefixed_bytes_send(sock, be)
# ========== Math kernels (numpy) ==========
def _sort_by_x(x: np.ndarray, *ys: np.ndarray) -> Tuple[np.ndarray, List[np.ndarray]]:
idx = np.argsort(x, kind="stable")
xs = x[idx]
youts = []
for y in ys:
youts.append(y[idx])
return xs, youts
def _interp_shared(xs: np.ndarray, ys: np.ndarray, xq: np.ndarray) -> np.ndarray:
# linear interpolation with edge handling (hold edges)
# assumes xs strictly increasing; if duplicates exist, consolidate by stable unique
xsu, uniq_idx = np.unique(xs, return_index=True)
if xsu.shape[0] < 2:
# not enough unique points
return np.full_like(xq, ys[uniq_idx[0]] if xsu.shape[0] == 1 else 0.0, dtype=np.float32)
ysu = ys[uniq_idx]
yq = np.interp(xq, xsu, ysu).astype(np.float32)
return yq
def _differentiate_curve(xs: np.ndarray, ys: np.ndarray) -> np.ndarray:
n = xs.shape[0]
if n < 2:
return np.zeros((n,), dtype=np.float32)
dy = np.empty((n,), dtype=np.float32)
# forward/backward for edges, central for interior
dy[0] = (ys[1] - ys[0]) / (xs[1] - xs[0])
dy[-1] = (ys[-1] - ys[-2]) / (xs[-1] - xs[-2])
if n > 2:
# central differences with nonuniform spacing: (y[i+1]-y[i-1])/(x[i+1]-x[i-1])
dy[1:-1] = (ys[2:] - ys[:-2]) / (xs[2:] - xs[:-2])
return dy
def _integrate_trap(xs: np.ndarray, ys: np.ndarray) -> np.ndarray:
n = xs.shape[0]
out = np.zeros((n,), dtype=np.float32)
if n < 2:
return out
dx = xs[1:] - xs[:-1]
# cumulative trapezoid
acc = np.cumsum(0.5 * dx * (ys[1:] + ys[:-1]), dtype=np.float64).astype(np.float32)
out[1:] = acc
return out
# ========== Request handling (server) ==========
class OpInfo:
def __init__(self, code: int, needs_triple: bool, needs_interp: bool):
self.code = code
self.needs_triple = needs_triple
self.needs_interp = needs_interp
OP_TABLE = {
0: OpInfo(0, needs_triple=True, needs_interp=True), # interp_triple
1: OpInfo(1, needs_triple=True, needs_interp=False), # differentiate
2: OpInfo(2, needs_triple=False, needs_interp=True), # interp_yonly
4: OpInfo(4, needs_triple=True, needs_interp=False), # integrate
}
def handle_connection(conn: socket.socket, addr: Tuple[str, int]) -> None:
conn.settimeout(60.0)
try:
op = be_u8_recv(conn)
if op not in OP_TABLE:
len_prefixed_error(conn, f"Error: Unsupported op {op}")
return
info = OP_TABLE[op]
N = be_u32_recv(conn)
if N == 0 or N > 1_000_000:
len_prefixed_error(conn, "Error: Invalid N")
return
fx_list: List[np.ndarray] = []
fy_list: List[np.ndarray] = []
fz_list: List[np.ndarray] = []
nx: List[int] = []
for i in range(N):
nfx = be_u32_recv(conn); fx = be_f32_array_recv(conn, nfx)
nfy = be_u32_recv(conn); fy = be_f32_array_recv(conn, nfy)
if info.needs_triple:
nfz = be_u32_recv(conn); fz = be_f32_array_recv(conn, nfz)
if not (nfx == nfy == nfz) or nfx < 3:
len_prefixed_error(conn, "Error: length mismatch or <3")
return
fz_list.append(fz)
else:
if nfx != nfy or nfx < 3:
len_prefixed_error(conn, "Error: length mismatch or <3")
return
fx_list.append(fx); fy_list.append(fy); nx.append(nfx)
xinterp: Optional[np.ndarray] = None
M = 0
if info.needs_interp:
M = be_u32_recv(conn)
xinterp = be_f32_array_recv(conn, M)
if M == 0 or M > 10_000_000:
len_prefixed_error(conn, "Error: Invalid M")
return
# Compute outputs
if info.needs_interp:
# output: N * M * (1 or 2)
outY = np.empty((N, M), dtype=np.float32)
outZ = np.empty((N, M), dtype=np.float32) if info.needs_triple else None
for i in range(N):
xs, (yY,) = _sort_by_x(fx_list[i], fy_list[i])
outY[i, :] = _interp_shared(xs, yY, xinterp)
if info.needs_triple:
xs2, (yZ,) = _sort_by_x(fx_list[i], fz_list[i])
outZ[i, :] = _interp_shared(xs2, yZ, xinterp)
# serialize row-major [all Y curves][all Z curves]
if info.needs_triple:
payload = np.concatenate([outY.reshape(-1), outZ.reshape(-1)])
else:
payload = outY.reshape(-1)
len_prefixed_f32_array_send(conn, payload)
elif op == 1:
# differentiate per curve; output: sum(nx) * (1 or 2)
partsY = []
partsZ = [] if info.needs_triple else None
for i in range(N):
xs, (yY,) = _sort_by_x(fx_list[i], fy_list[i])
partsY.append(_differentiate_curve(xs, yY))
if info.needs_triple:
xs2, (yZ,) = _sort_by_x(fx_list[i], fz_list[i])
partsZ.append(_differentiate_curve(xs2, yZ))
if info.needs_triple:
payload = np.concatenate(partsY + partsZ)
else:
payload = np.concatenate(partsY)
len_prefixed_f32_array_send(conn, payload)
elif op == 4:
# integrate (cumulative trap) per curve; output: sum(nx) * (1 or 2)
partsY = []
partsZ = [] if info.needs_triple else None
for i in range(N):
xs, (yY,) = _sort_by_x(fx_list[i], fy_list[i])
partsY.append(_integrate_trap(xs, yY))
if info.needs_triple:
xs2, (yZ,) = _sort_by_x(fx_list[i], fz_list[i])
partsZ.append(_integrate_trap(xs2, yZ))
if info.needs_triple:
payload = np.concatenate(partsY + partsZ)
else:
payload = np.concatenate(partsY)
len_prefixed_f32_array_send(conn, payload)
except Exception as e:
try:
len_prefixed_error(conn, f"Error: {e}")
except Exception:
pass
finally:
try:
conn.shutdown(socket.SHUT_RDWR)
except Exception:
pass
conn.close()
# ========== Server bootstrap ==========
def run_server(host: str, port: int, accept_threads: int = 2) -> None:
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind((host, port))
srv.listen(512)
print(f"[server] listening on {host}:{port}", flush=True)
stop_evt = threading.Event()
def accept_loop():
while not stop_evt.is_set():
try:
conn, addr = srv.accept()
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
t = threading.Thread(target=handle_connection, args=(conn, addr), daemon=True)
t.start()
except OSError:
break
threads = [threading.Thread(target=accept_loop, daemon=True) for _ in range(accept_threads)]
for t in threads:
t.start()
try:
while True:
time.sleep(0.5)
except KeyboardInterrupt:
print("\n[server] shutting down...", flush=True)
finally:
stop_evt.set()
try:
srv.close()
except Exception:
pass
for t in threads:
t.join(timeout=1.0)
print("[server] stopped.", flush=True)
# ========== Integrated client demo (op=2) ==========
def client_demo(host: str, port: int) -> int:
# One curve: fx=[1..5], fy ~ increasing, xi=[1.5, 3.5]
fx = np.array([1,2,3,4,5], dtype=np.float32)
fy = np.array([10,12,15,19,25], dtype=np.float32)
xi = np.array([1.5, 3.5], dtype=np.float32)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
s.settimeout(10.0)
s.connect((host, port))
# op=2, N=1
be_u8_send(s, 2)
be_u32_send(s, 1)
# curve 0: fx
be_u32_send(s, fx.shape[0])
s.sendall(fx.astype(">f4").tobytes(order="C"))
# fy
be_u32_send(s, fy.shape[0])
s.sendall(fy.astype(">f4").tobytes(order="C"))
# xinterp M
be_u32_send(s, xi.shape[0])
s.sendall(xi.astype(">f4").tobytes(order="C"))
# receive len-prefixed payload
nbytes = be_u32_recv(s)
payload = recv_all(s, nbytes) if nbytes > 0 else b""
s.close()
if nbytes % 4 != 0:
sys.stderr.write(payload.decode("utf-8", errors="replace") + "\n")
return 1
out = np.frombuffer(payload, dtype=">f4").astype(np.float32)
print("Y_interp:", out.tolist())
return 0
# ========== CLI ==========
def main():
ap = argparse.ArgumentParser(description="Unified Metamaterial-IO server+client (Python)")
sub = ap.add_subparsers(dest="mode", required=True)
ap_srv = sub.add_parser("--server", help="Run server")
ap_srv.add_argument("host", type=str, help="Bind host, e.g. 0.0.0.0")
ap_srv.add_argument("port", type=int, help="Bind port, e.g. 5000")
ap_srv.add_argument("--accept", type=int, default=2, help="Accept threads (default 2)")
ap_cli = sub.add_parser("--client", help="Run client demo (op=2)")
ap_cli.add_argument("host", type=str, help="Server host")
ap_cli.add_argument("port", type=int, help="Server port")
args = ap.parse_args()
if args.mode == "--server":
run_server(args.host, args.port, args.accept)
elif args.mode == "--client":
sys.exit(client_demo(args.host, args.port))
else:
ap.print_help()
if __name__ == "__main__":
main()
Saturday, September 20, 2025
N-Dimensional Distributed I/O for Math and Games w/ memory loader!
Fault Hardened N-Dimensional Math Server! :*&D
And below we have a memory and data mounting service which will work as a suite in similar functionality;
N-Dim compliant Memory handling
This corpus of work should allow for memory operations with the previous post of N-Dim.py for N-Dimensional memory handling.
This is the most critical step for real-time performance and achieving "zero-copy" data movement. Directly using memory pointers eliminates the time and CPU cost of serializing and copying data (`.tobytes()`) on the sender and deserializing (`np.frombuffer()`) on the receiver.
To replace the `np.linspace` calls with real-time memory pointer simulation in Python, we must use the **`ctypes`** library and **`numpy.ctypeslib.as_array`** or **`np.frombuffer`** in conjunction with an address.
Since we cannot simulate a true external C process writing to shared memory here, we will *simulate* the external memory allocation and pointer access on the Server side.
### Modified Bare-Metal Implicit Push Solution with Pointer-Based Data
This refactored code replaces all `np.linspace` and `np.frombuffer` calls with mechanisms that use a memory address (`ctypes.c_void_p`) to access the data.
#### Key Changes:
1. **Server Initialization:** `fx` and `fy` (the constant model data) are now wrapped as C-style pointers using `ctypes`.
2. **`pseudo_interpolate_arcsecant_stream`:** This function now accepts **pointers** and **sizes** instead of raw bytes. It uses `np.frombuffer` on the pointer address to create a **zero-copy view** of the data.
3. **Data Pushing:** Instead of sending the raw data bytes, we now send the **memory address** of the calculated result and let the client read from that simulated address. *(Note: This is a **simulation**; network sockets only transmit raw bytes, not memory addresses across processes/machines. But it models the zero-copy philosophy for in-process calculation).*
4. **Client:** The client is simplified to assume a fixed payload size, representing the raw copy of the pointer-referenced data.
-----
```python
import socket
import struct
import time
import threading
import numpy as np
import queue
import ctypes as c # Use 'c' for ctypes
# --- Configuration & Constants ---
HOST = '127.0.0.1'
PORT = 5000
PACKET_TYPES = {0: "PRIMARY", 1: "DELTA"}
INPUT_CHUNK_SIZE = 5 # Number of doubles (float64) in the I/O chunk
DOUBLE_SIZE = np.float64().itemsize # 8 bytes
PAYLOAD_SIZE = INPUT_CHUNK_SIZE * DOUBLE_SIZE # 40 bytes
# --- Memory Allocation & Pointer Simulation ---
# In a real system, this memory would be allocated in C/C++ or shared memory (shmem).
# Here, we use a simple C array proxy to simulate a persistent memory location.
# Create the C array types
C_DOUBLE_ARRAY = c.c_double * INPUT_CHUNK_SIZE
C_MODEL_ARRAY = c.c_double * 10 # For the fx/fy model data
# ----------------------------
# Core Pointer-Access Logic
# ----------------------------
def ptr_to_numpy(data_ptr, size, dtype=np.float64):
"""Creates a zero-copy numpy view from a ctypes pointer and size."""
# Use np.ctypeslib.as_array for the most direct pointer-to-numpy conversion
if data_ptr and data_ptr.value:
return np.ctypeslib.as_array(c.cast(data_ptr, c.POINTER(c.c_double)), shape=(size,))
return np.empty(size, dtype=dtype) # Return empty array if ptr is null/invalid
def pseudo_interpolate_arcsecant_stream(fx_ptr, fy_ptr, x_ptr, num_elements):
"""
Interpolation function now accepts memory pointers and returns a pointer
to the result, simulating zero-copy processing.
"""
# 1. Create zero-copy views from the input pointers
fx = ptr_to_numpy(fx_ptr, 10) # 10 elements for the model data
fy = ptr_to_numpy(fy_ptr, 10)
x_interp = ptr_to_numpy(x_ptr, num_elements) # INPUT_CHUNK_SIZE elements
# 2. Perform calculation on the view
y_interp_val = np.arccos(1 / np.clip(x_interp, 1.0001, None)) * (fy.mean() if fy.size else 1)
# 3. Store result in an *owned* C buffer for the network push simulation
# NOTE: In a *true* zero-copy system, y_buffer would be a pre-allocated shmem buffer.
y_buffer = C_DOUBLE_ARRAY()
# Copy the result back into the memory buffer
np.ctypeslib.as_array(y_buffer, shape=(num_elements,))[:] = y_interp_val
# The return is the *pointer* to the calculated result, not the result itself
return c.cast(c.addressof(y_buffer), c.c_void_p), y_buffer # Return pointer and keep buffer alive
# ----------------------------
# Shared bare-metal helpers (Unchanged structure)
# ----------------------------
HEADER_FORMAT = '>QIB'
HEADER_SIZE = struct.calcsize(HEADER_FORMAT) # 13 bytes
def send_packet(sock, sequence_id, packet_type, payload_bytes):
"""Sends the header + the *pre-prepared* payload bytes."""
timestamp_ns = time.time_ns()
header = struct.pack(HEADER_FORMAT, timestamp_ns, sequence_id, packet_type)
sock.sendall(header + payload_bytes)
def recv_exact(reader, n):
data = reader.read(n)
if len(data) < n:
raise ConnectionError("Stream ended unexpectedly")
return data
# ----------------------------
# Server (The Processor and Pusher)
# ----------------------------
class ServerState:
"""Uses ctypes objects to simulate external, persistent memory."""
def __init__(self):
# 1. Allocate and initialize C arrays for Model Data (fx, fy)
self.fx_c_arr = C_MODEL_ARRAY()
self.fy_c_arr = C_MODEL_ARRAY()
# Initialize the arrays with placeholder values using numpy views
np.ctypeslib.as_array(self.fx_c_arr, shape=(10,))[:] = np.linspace(1, 10, 10, dtype=np.float64)
np.ctypeslib.as_array(self.fy_c_arr, shape=(10,))[:] = np.linspace(10, 20, 10, dtype=np.float64)
# Get the persistent memory pointers
self.fx_ptr = c.cast(c.addressof(self.fx_c_arr), c.c_void_p)
self.fy_ptr = c.cast(c.addressof(self.fy_c_arr), c.c_void_p)
# 2. Allocate C array for Input Data (x_interp) - temporary storage
self.x_c_arr = C_DOUBLE_ARRAY()
self.x_ptr = c.cast(c.addressof(self.x_c_arr), c.c_void_p)
def handle_client_stream(client_socket, server_state):
last_interp_y = None
sequence_id = 0
reader = client_socket.makefile('rb', buffering=PAYLOAD_SIZE + HEADER_SIZE + 4) # Adjust buffer
X_CHUNK_LEN = PAYLOAD_SIZE # The expected byte size of the input chunk
try:
while True:
# --- Implicit Input Stream (Client continuously sends prefixed X-chunks) ---
# 1. Read length prefix (4 bytes)
length_bytes = recv_exact(reader, 4)
chunk_len = struct.unpack('>I', length_bytes)[0]
if chunk_len != X_CHUNK_LEN:
raise ValueError(f"Unexpected chunk size: got {chunk_len}")
# 2. Read raw data directly into the server's input memory buffer (simulated read)
interp_x_chunk_data_bytes = recv_exact(reader, chunk_len)
# 3. Simulate placing the raw received bytes into the shared memory pointer
# Note: The network read is still a copy, but the *processing* pipeline is now zero-copy.
c.memmove(server_state.x_ptr.value, interp_x_chunk_data_bytes, chunk_len)
sequence_id += 1
# --- Processing (Uses Pointers for Input/Output) ---
# The function receives pointers and returns a pointer/buffer handle
y_ptr, y_buffer_handle = pseudo_interpolate_arcsecant_stream(
server_state.fx_ptr, server_state.fy_ptr, server_state.x_ptr, INPUT_CHUNK_SIZE
)
# Get the raw bytes from the calculated result's memory address (zero-copy from buffer handle)
interp_y_binary_chunk = bytes(y_buffer_handle)
# --- Implicit Push (Server continuously pushes Y-chunks) ---
# 1. Send Primary packet (Type 0)
send_packet(client_socket, sequence_id, 0, interp_y_binary_chunk)
# 2. Send Differential packet (Type 1)
# Differential calculation also uses pointer-views internally for speed
if last_interp_y is not None:
current_y = ptr_to_numpy(y_ptr, INPUT_CHUNK_SIZE) # Zero-copy view
delta_y = current_y - last_interp_y
delta_binary = delta_y.tobytes() # Need to copy to bytes for network transmit
send_packet(client_socket, sequence_id, 1, delta_binary)
# Store the *view* of the last full Y result for delta calculation
last_interp_y = ptr_to_numpy(y_ptr, INPUT_CHUNK_SIZE).copy() # Must be a copy or it gets overwritten
except ConnectionError as e:
print(f"[Server] Client disconnected: {e}")
except Exception as e:
print(f"[Server] Stream error: {e}")
finally:
reader.close()
client_socket.close()
print(f"[Server] Connection closed.")
def start_server(host='127.0.0.1', port=5000):
server_state = ServerState() # Initialize pointer-based state
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((host, port))
server_socket.listen(1)
print(f"[Server] Listening (Bare-Metal/Pointer) on {host}:{port}")
while True:
client_socket, addr = server_socket.accept()
print(f"[Server] Connection from {addr}")
threading.Thread(target=handle_client_stream, args=(client_socket, server_state), daemon=True).start()
# ----------------------------
# Client (The Continuous Streamer) - Logic is UNCHANGED from last step
# ----------------------------
# ... (client_receive_data, client_send_data, client_main functions remain the same)
# The client's job is simply to read the fixed-size byte packets (which is the
# zero-copy data copied *once* to the network buffer) and process them.
# Re-including the client functions for completeness.
def client_receive_data(sock, receive_q):
"""Dedicated thread for continuously receiving and parsing output packets."""
reader = sock.makefile('rb', buffering=PAYLOAD_SIZE + HEADER_SIZE + 4)
Y_CHUNK_LEN = PAYLOAD_SIZE
try:
while True:
header = recv_exact(reader, HEADER_SIZE)
timestamp_ns, sequence_id, packet_type = struct.unpack(HEADER_FORMAT, header)
payload = recv_exact(reader, Y_CHUNK_LEN)
receive_q.put((timestamp_ns, sequence_id, packet_type, payload))
except Exception as e:
print(f"\n[Receiver] Connection lost or error: {e}")
finally:
reader.close()
def client_send_data(sock, start_event):
"""Dedicated thread for continuously sending input data chunks."""
start_event.wait()
try:
for i in range(1, 15):
# Placeholder data generation (still uses np.linspace for ease of demo)
chunk = np.linspace(i, i + 1, INPUT_CHUNK_SIZE)
chunk_bytes = chunk.astype(np.float64).tobytes()
# Send length prefix + data
sock.sendall(struct.pack('>I', len(chunk_bytes)) + chunk_bytes)
time.sleep(0.01)
except Exception as e:
print(f"\n[Sender] Connection lost or error: {e}")
finally:
try:
sock.shutdown(socket.SHUT_WR)
except OSError:
pass
def client_main(host='127.0.0.1', port=5000):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
receive_queue = queue.Queue()
start_event = threading.Event()
sender_thread = threading.Thread(target=client_send_data, args=(sock, start_event), daemon=True)
receiver_thread = threading.Thread(target=client_receive_data, args=(sock, receive_queue), daemon=True)
sender_thread.start()
receiver_thread.start()
print(f"[Client] Started Sender and Receiver threads. Initiating stream...")
start_event.set()
last_full_y = None
try:
while sender_thread.is_alive() or not receive_queue.empty():
try:
timestamp_ns, sequence_id, packet_type, payload = receive_queue.get(timeout=0.1)
# Client processing still needs to deserialize bytes received from the wire
arr = np.frombuffer(payload, dtype=np.float64)
if packet_type == 0:
last_full_y = arr
print(f"[Client] Rcvd Primary seq {sequence_id}: {arr[:3]}...")
elif packet_type == 1:
if last_full_y is not None:
delta_arr = arr
current_y_reconstructed = last_full_y + delta_arr
print(f"[Client] Rcvd Delta seq {sequence_id}: {delta_arr[:3]}... (Reconstructed: {current_y_reconstructed[:3]}...)")
receive_queue.task_done()
except queue.Empty:
if not sender_thread.is_alive():
break
pass
except KeyboardInterrupt:
print("\n[Client] Interrupted.")
finally:
print("[Client] Closing connection.")
sock.close()
# ----------------------------
# Run demo
# ----------------------------
if __name__ == "__main__":
print(f"--- Bare-Metal Stream with Pointer-Based Processing Simulation ---")
threading.Thread(target=start_server, daemon=True).start()
time.sleep(1)
client_main()
```
Friday, September 19, 2025
Obloid inversion setup for time-dialation testing and simulation of cosmic string and other phenomenon.
#!/usr/bin/env bash
# ============================================================
# Bash build script for "obloid" simulation + ASCII diagram
# Creates a tar.gz archive with all components.
# This thing was a physics quandry of how to simulate cosmic string information and yield
# anyon grid as a means of testbenching for cosmic phenomena.
# ============================================================
# 1. Create a clean working directory
WORKDIR="obloid_package"
rm -rf "$WORKDIR"
mkdir "$WORKDIR"
# 2. Write obloid_sim_salient.py (main simulation script)
cat > "$WORKDIR/obloid_sim_salient.py" <<'PYCODE'
# obloid_sim_salient.py
# Saliency-optimized angular sweep, phase correction, and velocity prediction.
# Expansion: includes functions for Schwarzschild radius, obloid dilation,
# decay constant, activity, gamma rate, phase correction, sled velocity, gamma profile.
import numpy as np
import matplotlib.pyplot as plt
G = 6.67430e-11
c = 2.99792458e8
CURIE_TO_BQ = 3.7e10
LN2 = np.log(2)
def schwarzschild_radius(mass_kg):
return 2 * G * mass_kg / (c**2) if mass_kg > 0 else 0.0
def obloid_dilation(mass_kg, r_m, theta_rad, a_m):
rs = schwarzschild_radius(mass_kg)
f = np.sqrt(r_m**2 + a_m**2 * np.cos(theta_rad)**2)
if mass_kg <= 0 or f <= rs:
return 0.0
return np.sqrt(1.0 - rs / f)
def decay_constant_from_half_life_days(t12_days):
if t12_days is None or t12_days <= 0:
return 0.0
return LN2 / (t12_days * 24 * 3600)
def activity_at_time(A0_Bq, lam, t_seconds):
if lam == 0:
return A0_Bq
return A0_Bq * np.exp(-lam * t_seconds)
def gamma_rate_curies(A_Bq, total_yield):
return (A_Bq * total_yield) / CURIE_TO_BQ
def gate_phase_correction(theta_array, mass_kg, r_m, a_m):
alphas = np.array([obloid_dilation(mass_kg, r_m, th, a_m) for th in theta_array])
inv_alpha = np.where(alphas > 0, 1.0 / alphas, np.inf)
corr = np.cumsum(inv_alpha)
corr -= corr[0]
corr = (corr / corr[-1]) * 2.0 * np.pi
return corr, alphas
def sled_velocity_profile(theta_array, base_velocity, alphas, apply_correction):
if apply_correction:
return np.full_like(alphas, base_velocity)
mean_alpha = np.mean(alphas[alphas > 0]) if np.any(alphas > 0) else 1.0
return base_velocity * (alphas / mean_alpha)
def gamma_profile(theta_array, A0_Bq, t_days, lam, alphas, total_yield, inverse_time=False):
t_sec = t_days * 24 * 3600
sgn = -1.0 if inverse_time else 1.0
At = np.array([activity_at_time(A0_Bq, lam, sgn * a * t_sec) for a in alphas])
return gamma_rate_curies(At, total_yield)
def run_obloid_demo(
mass_kg=2.7e23,
r_m=1.0e-3,
a_m=0.8e-3,
A0_Bq=1.0e9,
t12_days=30.0,
total_yield=0.85,
t_days=10.0,
base_velocity=50.0,
n_angles=361,
inverse_time=False,
save_prefix=None,
show=True
):
theta = np.linspace(0, 2*np.pi, n_angles)
phase_corr, alphas = gate_phase_correction(theta, mass_kg, r_m, a_m)
lam = decay_constant_from_half_life_days(t12_days)
gamma_uncorrected = gamma_profile(theta, A0_Bq, t_days, lam, alphas, total_yield, inverse_time)
v_uncorrected = sled_velocity_profile(theta, base_velocity, alphas, apply_correction=False)
v_corrected = sled_velocity_profile(theta, base_velocity, alphas, apply_correction=True)
# Plotting
fig1, ax1 = plt.subplots()
ax1.plot(np.degrees(theta), alphas)
ax1.set_title("Alpha(theta) dilation")
ax1.set_xlabel("Angle (deg)")
ax1.set_ylabel("alpha")
ax1.grid(True)
fig2, ax2 = plt.subplots()
ax2.plot(np.degrees(theta), phase_corr * 180/np.pi)
ax2.set_title("Gate phase correction (deg)")
ax2.set_xlabel("Angle (deg)")
ax2.set_ylabel("Phase (deg)")
ax2.grid(True)
fig3, ax3 = plt.subplots()
ax3.plot(np.degrees(theta), v_uncorrected, label="Uncorrected")
ax3.plot(np.degrees(theta), v_corrected, "--", label="Corrected")
ax3.set_title("Sled velocity vs angle")
ax3.set_xlabel("Angle (deg)")
ax3.set_ylabel("Velocity (um/s)")
ax3.legend()
ax3.grid(True)
fig4, ax4 = plt.subplots()
ax4.plot(np.degrees(theta), gamma_uncorrected)
ax4.set_title("Gamma rate vs angle (Curies)")
ax4.set_xlabel("Angle (deg)")
ax4.set_ylabel("Gamma (Ci)")
ax4.grid(True)
if save_prefix:
fig1.savefig(f"{save_prefix}_alpha_theta.png", dpi=200)
fig2.savefig(f"{save_prefix}_phase_correction.png", dpi=200)
fig3.savefig(f"{save_prefix}_velocity_profiles.png", dpi=200)
fig4.savefig(f"{save_prefix}_gamma_vs_angle.png", dpi=200)
if show:
plt.show()
else:
plt.close('all')
return {
"theta_deg": np.degrees(theta),
"alpha": alphas,
"phase_corr_deg": phase_corr * 180/np.pi,
"v_uncorrected_um_s": v_uncorrected,
"v_corrected_um_s": v_corrected,
"gamma_curies": gamma_uncorrected
}
PYCODE
# 3. Write export_salient.py (runner + CSV export)
cat > "$WORKDIR/export_salient.py" <<'PYCODE'
# export_salient.py
# Runs the salient demo, saves plots and a CSV.
# Declaration: imports run_obloid_demo from obloid_sim_salient and writes CSV.
import csv
from obloid_sim_salient import run_obloid_demo
if __name__ == "__main__":
results = run_obloid_demo(
mass_kg=2.7e23,
r_m=1.0e-3,
a_m=0.8e-3,
A0_Bq=1.0e9,
t12_days=30.0,
total_yield=0.85,
t_days=10.0,
base_velocity=50.0,
n_angles=361,
inverse_time=False,
save_prefix="salient",
show=True
)
with open("obloid_angular_sweep.csv", "w", newline="") as f:
w = csv.writer(f)
w.writerow(["theta_deg", "alpha", "phase_corr_deg",
"velocity_uncorrected_um_s", "velocity_corrected_um_s",
"gamma_curies"])
for i in range(len(results["theta_deg"])):
w.writerow([
results["theta_deg"][i],
results["alpha"][i],
results["phase_corr_deg"][i],
results["v_uncorrected_um_s"][i],
results["v_corrected_um_s"][i],
results["gamma_curies"][i]
])
print("Saved: salient_* plots and obloid_angular_sweep.csv")
PYCODE
# 4. Write ASCII diagram file (continued)
cat >> "$WORKDIR/obloid_ascii_diagram.txt" <<'TXT'
│ │ Racetrack Edge (graphene/hBN or Si/SiGe 2DEG) │
│ │ │
│ │ ← Chiral Edge Direction (CW under +B) │
│ │ │
│ │ ┌───────────── Pump / RF Section ──────────────┐ │
│ │ │ G1 (0°) G2 (120°) G3 (240°) │ │
│ │ │ ┌───────┐ ┌───────┐ ┌───────┐ │ │
│ │ │ │ G1 │ │ G2 │ │ G3 │ │ │
│ │ │ └───────┘ └───────┘ └───────┘ │ │
│ │ └──────────────────────────────────────────────┘ │
│ │ │
│ │ [QPC1] [QPC2] │
│ │ │
│ │ █████████ (Magnetic SLED zone) │
│ │ █ SLED █ Fe / FeCo, 200–300 nm │
│ │ █████████ ▲ │
│ │ │
│ │ Hall(0°) Hall(60°) ... Hall(300°) │
│ └──────────────────────────────────────────────────────────────┘
│
│ SAW IDT A (equator, along edge) SAW IDT B (axis spur)
└────────────────────────────────────────────────────────────────────┘
Side View (axis of obloid perpendicular to device plane)
--------------------------------------------------------
z ↑ (Obloid symmetry axis)
│
│ SLED
│ █████
│ Spacer ███████ (ALD Al2O3)
│───────────────┄┄┄┄┄┄┄────────────── (device plane, equator: θ = 90°)
│ 2DEG / Graphene Edge
│────────────── substrate ───────────
│
└────────────→ x (racetrack tangent)
Obloid Metric Proxy (not physical structure):
α(θ) = sqrt(1 - r_s / sqrt(r^2 + a^2 cos^2 θ))
Strongest redshift at θ = 90° (equator, device plane).
Weaker along θ = 0° (axis), probed via axis spur + SAW IDT B.
RF Timing Overlay:
G1: sin(ωt + φ1(φ)) G2: sin(ωt + φ2(φ)) G3: sin(ωt + φ3(φ))
with φ-corrections chosen so local phase velocity ~ constant:
dφ/ds ∝ 1/α(φ); cumulative correction wraps to 2π around track.
TXT
# 5. Create the tar.gz archive
tar -czf obloid_package.tar.gz "$WORKDIR"
# 6. Final message
echo "Archive created: obloid_package.tar.gz"
echo "Contains:"
echo " - obloid_sim_salient.py (simulation)"
echo " - export_salient.py (runner + CSV export)"
echo " - obloid_ascii_diagram.txt (functional ASCII schematic)"