#An interesting method for streaming eigenvalues
import socket
import struct
import numpy as np
import matplotlib.pyplot as plt
# --- Utility Functions ---
def _calculate_arcsecant(val):
"""
Safely calculates arcsecant (arccos(1/val)) for a given value.
Returns NaN if |val| < 1.
"""
if np.abs(val) < 1:
return np.nan
return np.arccos(1 / val) # np.abs(val) might be used if desired to map negative and positive
# eigenvalues to the same arcsecant range [0, pi/2).
# For true arcsec(x), keep as 1/val. The problem asks for "arcsecant functions"
# implying the standard definition. Let's use 1/val and handle range [0, pi].
def pseudo_interpolate_arcsecant_stream(x_data_bytes, y_data_bytes, x_interp_chunk_bytes):
"""
Pseudo-interpolates a chunk of interpolation points using pre-existing x_data and y_data.
Expects x_data, y_data, and x_interp_chunk as packed binary data (arrays of floats).
Refitted to accept eigenvalues for arcsecant packing range.
"""
try:
# Unpack binary data into NumPy arrays
num_x = len(x_data_bytes) // 8
num_y = len(y_data_bytes) // 8
if num_x != num_y or num_x < 2:
raise ValueError("X and Y data must have equal length and at least two points.")
fx = np.array(struct.unpack(f'!{num_x}d', x_data_bytes))
fy = np.array(struct.unpack(f'!{num_y}d', y_data_bytes))
# Unpack the current chunk of interpolation points
num_interp_chunk = len(x_interp_chunk_bytes) // 8
x_interp_chunk = np.array(struct.unpack(f'!{num_interp_chunk}d', x_interp_chunk_bytes))
min_x, max_x = np.min(fx), np.max(fx)
# --- Refitting for Eigenvalue Arcsecant Packing Range ---
# The 'packing range' for arcsecant means defining a representative range of inputs
# (eigenvalues) that are relevant to the problem, and then finding the min/max
# arcsecant values over that range.
# Assuming eigenvalues are typically positive and >= 1 for arcsecant.
# If eigenvalues can be in (-inf, -1], they would map to (pi/2, pi].
# For simplicity, let's assume we are primarily interested in the positive domain
# for "packing", as is common for eigenvalues in many applications.
# If eigenvalues can be small (e.g., < 1), they fall outside arcsecant's domain.
# To define a robust 'packing range', we should consider the range of 'x' values
# in the provided fx data, as this defines the context of the interpolation.
# Create a representative set of points for the arcsecant domain calculation.
# These points should be within the valid domain for arcsecant (i.e., |val| >= 1).
# We can derive this range from the `fx` data, ensuring we don't pick values in (-1, 1).
# Determine the effective range of eigenvalues for arcsecant domain.
# If min_x is less than 1 (e.g., 0.5), we should start our arcsecant domain from 1.0.
# If min_x is already >= 1, use it.
effective_min_eigenvalue = 1.0 if min_x < 1.0 else min_x
# Max eigenvalue could be max_x or a very large number if we want to represent "infinity"
# up to a practical limit. For now, use max_x as the upper bound for the representative range.
effective_max_eigenvalue = max_x if max_x >= effective_min_eigenvalue else effective_min_eigenvalue + 1.0
# Generate representative eigenvalues for the arcsecant domain.
# We ensure they are spread across the relevant range, including large values.
# Using a logarithmic spacing might be beneficial for capturing the asymptotic behavior
# towards infinity (where arcsecant approaches pi/2) more effectively if max_x is very large.
# To handle potential large spreads in 'fx', we can create a `representative_eigenvalues`
# array that covers the domain where arcsecant is defined and relevant.
# For very large eigenvalues, arcsecant values cluster near pi/2.
# Option 1: Linear spacing from 1 to max_x (or a chosen practical upper limit).
# Ensures that values >= 1 are covered.
if effective_max_eigenvalue <= effective_min_eigenvalue: # Handle cases where fx has very narrow or inverted range
representative_eigenvalues = np.array([effective_min_eigenvalue, effective_min_eigenvalue + 1.0])
else:
# Generate points log-scaled to capture the behavior near 1 and as values get very large
# We add a small epsilon to avoid log(0) if min_x is very small but we still clamp to 1.
# Example: 1 to 1000, covering relevant arcsecant range (0 to ~pi/2)
representative_eigenvalues = np.logspace(np.log10(effective_min_eigenvalue), np.log10(effective_max_eigenvalue + 1e-9), 100)
# Ensure at least one very large value is considered if max_x is not already huge
if effective_max_eigenvalue < 1e10: # Arbitrary large value
representative_eigenvalues = np.unique(np.concatenate((representative_eigenvalues, [1e10, 1e12])))
# Calculate arcsecant values for this representative range
# Filter out any NaNs if _calculate_arcsecant returns them (e.g., if any representative_eigenvalues were somehow < 1)
# Note: If `_calculate_arcsecant` uses `1/val`, values can go from [0,pi]. If `1/np.abs(val)` it is [0,pi/2).
# Let's assume standard arcsec(x) for simplicity, i.e., arccos(1/x) which maps to [0, pi].
calculated_domain_values = [_calculate_arcsecant(val) for val in representative_eigenvalues if _calculate_arcsecant(val) is not np.nan]
if not calculated_domain_values:
# Fallback if no valid arcsecant values could be generated for the domain
min_arcsecant, max_arcsecant = 0.0, np.pi # Standard range for arccos
else:
min_arcsecant = np.min(calculated_domain_values)
max_arcsecant = np.max(calculated_domain_values)
# Handle the case where min_arcsecant and max_arcsecant are identical (e.g., if all inputs are effectively infinite)
arcsecant_range_is_zero = (max_arcsecant - min_arcsecant) == 0
interp_y = []
for x in x_interp_chunk:
# Find the two closest points in fx for interpolation
idx1 = np.argmin(np.abs(x - fx))
idx2 = (idx1 + 1) % len(fx)
y1, y2 = fy[idx1], fy[idx2]
# Use the input 'x' (eigenvalue) directly for arcsecant calculation
arcsec_val = _calculate_arcsecant(x)
if np.isnan(arcsec_val) or arcsecant_range_is_zero:
# If arcsec_val is NaN (eigenvalue < 1) or the packing range is trivial,
# fall back to using y1 or a simple linear interpolation based on 'x' directly.
# For this refit, we use y1 as a conservative fallback.
interp_y.append(y1)
else:
# Apply the scaling using the dynamically determined arcsecant range
interp_y.append(y1 + (y2 - y1) * (arcsec_val - min_arcsecant) / (max_arcsecant - min_arcsecant))
return np.array(interp_y).tobytes()
except struct.error:
raise ValueError("Invalid binary data format.")
except ValueError as e:
raise ValueError(str(e))
except RuntimeWarning as e: # Catch RuntimeWarnings explicitly
print(f"Warning in pseudo_interpolate_arcsecant_stream: {e}")
raise RuntimeError(f"Interpolation warning: {e}")
except Exception as e:
raise Exception(f"An unexpected error occurred during interpolation: {e}")
# --- Server-Side Logic (Remains unchanged as the core logic change is in pseudo_interpolate_arcsecant_stream) ---
def handle_client_stream(client_socket):
"""
Handles communication with a single client for streaming pseudo-interpolation.
The client first sends fx and fy data, then continuously sends chunks of x_interp.
"""
try:
# 1. Initial setup: Receive fx and fy data (assumed to be sent once per connection)
# Receive fx length and data
fx_length_bytes = client_socket.recv(4)
if not fx_length_bytes: return
fx_length = struct.unpack('!I', fx_length_bytes)[0]
fx_data_bytes = b''
while len(fx_data_bytes) < fx_length:
chunk = client_socket.recv(4096)
if not chunk: return
fx_data_bytes += chunk
# Receive fy length and data
fy_length_bytes = client_socket.recv(4)
if not fy_length_bytes: return
fy_length = struct.unpack('!I', fy_length_bytes)[0]
fy_data_bytes = b''
while len(fy_data_bytes) < fy_length:
chunk = client_socket.recv(4096)
if not chunk: return
fy_data_bytes += chunk
print("Received initial fx and fy data from client.")
# 2. Continuous streaming: Receive and process x_interp chunks
while True:
# Receive length of the next x_interp chunk
interp_x_chunk_length_bytes = client_socket.recv(4)
if not interp_x_chunk_length_bytes:
print("Client disconnected or finished sending stream.")
break # Client has disconnected or finished sending data
interp_x_chunk_length = struct.unpack('!I', interp_x_chunk_length_bytes)[0]
if interp_x_chunk_length == 0: # A signal to indicate end of stream, if agreed upon
print("Received end-of-stream signal (zero length chunk).")
break
interp_x_chunk_data_bytes = b''
while len(interp_x_chunk_data_bytes) < interp_x_chunk_length:
chunk = client_socket.recv(4096)
if not chunk:
print("Client disconnected during chunk reception.")
return
interp_x_chunk_data_bytes += chunk
# Perform the pseudo-interpolation on the received chunk
interp_y_binary_chunk = pseudo_interpolate_arcsecant_stream(
fx_data_bytes, fy_data_bytes, interp_x_chunk_data_bytes
)
# Send the binary interpolated y data chunk back to the client
interp_y_chunk_length = len(interp_y_binary_chunk)
client_socket.sendall(struct.pack('!I', interp_y_chunk_length))
client_socket.sendall(interp_y_binary_chunk)
# print(f"Processed and sent back a chunk of {len(interp_y_binary_chunk)} bytes.")
except ValueError as e:
print(f"ValueError on server (streaming): {e}")
error_message = str(e).encode('utf-8')
client_socket.sendall(struct.pack('!I', len(error_message)))
client_socket.sendall(error_message)
except RuntimeError as e: # Catch custom RuntimeErrors from interpolation function
print(f"RuntimeError on server (streaming): {e}")
error_message = str(e).encode('utf-8')
client_socket.sendall(struct.pack('!I', len(error_message)))
client_socket.sendall(error_message)
except ConnectionResetError:
print("Client forcibly closed the connection during streaming.")
except Exception as e:
print(f"An unexpected error occurred in handle_client_stream: {e}")
finally:
client_socket.close()
print("Connection with client closed.")
def start_server_stream(host, port):
"""
Starts a server to listen for incoming binary data streams for pseudo-interpolation.
"""
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((host, port))
server_socket.listen(5)
print(f"Server listening on {host}:{port} for streaming data...")
while True:
client_socket, addr = server_socket.accept()
print(f"Accepted streaming connection from {addr}")
handle_client_stream(client_socket)
if __name__ == "__main__":
SERVER_HOST = '127.0.0.1'
SERVER_PORT = 12345
start_server_stream(SERVER_HOST, SERVER_PORT)