Friday, June 13, 2025

Eigenvalue Infinite Data Streaming

 #An interesting method for streaming eigenvalues


import socket

import struct

import numpy as np

import matplotlib.pyplot as plt



# --- Utility Functions ---

def _calculate_arcsecant(val):

    """

    Safely calculates arcsecant (arccos(1/val)) for a given value.

    Returns NaN if |val| < 1.

    """

    if np.abs(val) < 1:

        return np.nan

    return np.arccos(1 / val) # np.abs(val) might be used if desired to map negative and positive

                               # eigenvalues to the same arcsecant range [0, pi/2).

                               # For true arcsec(x), keep as 1/val. The problem asks for "arcsecant functions"

                               # implying the standard definition. Let's use 1/val and handle range [0, pi].



def pseudo_interpolate_arcsecant_stream(x_data_bytes, y_data_bytes, x_interp_chunk_bytes):

    """

    Pseudo-interpolates a chunk of interpolation points using pre-existing x_data and y_data.

    Expects x_data, y_data, and x_interp_chunk as packed binary data (arrays of floats).

    Refitted to accept eigenvalues for arcsecant packing range.

    """

    try:

        # Unpack binary data into NumPy arrays

        num_x = len(x_data_bytes) // 8

        num_y = len(y_data_bytes) // 8


        if num_x != num_y or num_x < 2:

            raise ValueError("X and Y data must have equal length and at least two points.")


        fx = np.array(struct.unpack(f'!{num_x}d', x_data_bytes))

        fy = np.array(struct.unpack(f'!{num_y}d', y_data_bytes))


        # Unpack the current chunk of interpolation points

        num_interp_chunk = len(x_interp_chunk_bytes) // 8

        x_interp_chunk = np.array(struct.unpack(f'!{num_interp_chunk}d', x_interp_chunk_bytes))


        min_x, max_x = np.min(fx), np.max(fx)

        

        # --- Refitting for Eigenvalue Arcsecant Packing Range ---

        # The 'packing range' for arcsecant means defining a representative range of inputs

        # (eigenvalues) that are relevant to the problem, and then finding the min/max

        # arcsecant values over that range.

        

        # Assuming eigenvalues are typically positive and >= 1 for arcsecant.

        # If eigenvalues can be in (-inf, -1], they would map to (pi/2, pi].

        # For simplicity, let's assume we are primarily interested in the positive domain

        # for "packing", as is common for eigenvalues in many applications.

        # If eigenvalues can be small (e.g., < 1), they fall outside arcsecant's domain.

        

        # To define a robust 'packing range', we should consider the range of 'x' values

        # in the provided fx data, as this defines the context of the interpolation.

        

        # Create a representative set of points for the arcsecant domain calculation.

        # These points should be within the valid domain for arcsecant (i.e., |val| >= 1).

        # We can derive this range from the `fx` data, ensuring we don't pick values in (-1, 1).

        

        # Determine the effective range of eigenvalues for arcsecant domain.

        # If min_x is less than 1 (e.g., 0.5), we should start our arcsecant domain from 1.0.

        # If min_x is already >= 1, use it.

        

        effective_min_eigenvalue = 1.0 if min_x < 1.0 else min_x

        # Max eigenvalue could be max_x or a very large number if we want to represent "infinity"

        # up to a practical limit. For now, use max_x as the upper bound for the representative range.

        effective_max_eigenvalue = max_x if max_x >= effective_min_eigenvalue else effective_min_eigenvalue + 1.0

        

        # Generate representative eigenvalues for the arcsecant domain.

        # We ensure they are spread across the relevant range, including large values.

        # Using a logarithmic spacing might be beneficial for capturing the asymptotic behavior

        # towards infinity (where arcsecant approaches pi/2) more effectively if max_x is very large.

        

        # To handle potential large spreads in 'fx', we can create a `representative_eigenvalues`

        # array that covers the domain where arcsecant is defined and relevant.

        # For very large eigenvalues, arcsecant values cluster near pi/2.

        

        # Option 1: Linear spacing from 1 to max_x (or a chosen practical upper limit).

        # Ensures that values >= 1 are covered.

        if effective_max_eigenvalue <= effective_min_eigenvalue: # Handle cases where fx has very narrow or inverted range

             representative_eigenvalues = np.array([effective_min_eigenvalue, effective_min_eigenvalue + 1.0])

        else:

            # Generate points log-scaled to capture the behavior near 1 and as values get very large

            # We add a small epsilon to avoid log(0) if min_x is very small but we still clamp to 1.

            # Example: 1 to 1000, covering relevant arcsecant range (0 to ~pi/2)

            representative_eigenvalues = np.logspace(np.log10(effective_min_eigenvalue), np.log10(effective_max_eigenvalue + 1e-9), 100)

            # Ensure at least one very large value is considered if max_x is not already huge

            if effective_max_eigenvalue < 1e10: # Arbitrary large value

                representative_eigenvalues = np.unique(np.concatenate((representative_eigenvalues, [1e10, 1e12])))

            

        # Calculate arcsecant values for this representative range

        # Filter out any NaNs if _calculate_arcsecant returns them (e.g., if any representative_eigenvalues were somehow < 1)

        # Note: If `_calculate_arcsecant` uses `1/val`, values can go from [0,pi]. If `1/np.abs(val)` it is [0,pi/2).

        # Let's assume standard arcsec(x) for simplicity, i.e., arccos(1/x) which maps to [0, pi].

        

        calculated_domain_values = [_calculate_arcsecant(val) for val in representative_eigenvalues if _calculate_arcsecant(val) is not np.nan]

        

        if not calculated_domain_values:

            # Fallback if no valid arcsecant values could be generated for the domain

            min_arcsecant, max_arcsecant = 0.0, np.pi # Standard range for arccos

        else:

            min_arcsecant = np.min(calculated_domain_values)

            max_arcsecant = np.max(calculated_domain_values)


        # Handle the case where min_arcsecant and max_arcsecant are identical (e.g., if all inputs are effectively infinite)

        arcsecant_range_is_zero = (max_arcsecant - min_arcsecant) == 0


        interp_y = []

        for x in x_interp_chunk:

            # Find the two closest points in fx for interpolation

            idx1 = np.argmin(np.abs(x - fx))

            idx2 = (idx1 + 1) % len(fx) 

            y1, y2 = fy[idx1], fy[idx2]


            # Use the input 'x' (eigenvalue) directly for arcsecant calculation

            arcsec_val = _calculate_arcsecant(x)

            

            if np.isnan(arcsec_val) or arcsecant_range_is_zero:

                # If arcsec_val is NaN (eigenvalue < 1) or the packing range is trivial,

                # fall back to using y1 or a simple linear interpolation based on 'x' directly.

                # For this refit, we use y1 as a conservative fallback.

                interp_y.append(y1) 

            else:

                # Apply the scaling using the dynamically determined arcsecant range

                interp_y.append(y1 + (y2 - y1) * (arcsec_val - min_arcsecant) / (max_arcsecant - min_arcsecant))


        return np.array(interp_y).tobytes()


    except struct.error:

        raise ValueError("Invalid binary data format.")

    except ValueError as e:

        raise ValueError(str(e))

    except RuntimeWarning as e: # Catch RuntimeWarnings explicitly

        print(f"Warning in pseudo_interpolate_arcsecant_stream: {e}")

        raise RuntimeError(f"Interpolation warning: {e}") 

    except Exception as e:

        raise Exception(f"An unexpected error occurred during interpolation: {e}")



# --- Server-Side Logic (Remains unchanged as the core logic change is in pseudo_interpolate_arcsecant_stream) ---



def handle_client_stream(client_socket):

    """

    Handles communication with a single client for streaming pseudo-interpolation.

    The client first sends fx and fy data, then continuously sends chunks of x_interp.

    """

    try:

        # 1. Initial setup: Receive fx and fy data (assumed to be sent once per connection)

        # Receive fx length and data

        fx_length_bytes = client_socket.recv(4)

        if not fx_length_bytes: return

        fx_length = struct.unpack('!I', fx_length_bytes)[0]

        fx_data_bytes = b''

        while len(fx_data_bytes) < fx_length:

            chunk = client_socket.recv(4096)

            if not chunk: return

            fx_data_bytes += chunk


        # Receive fy length and data

        fy_length_bytes = client_socket.recv(4)

        if not fy_length_bytes: return

        fy_length = struct.unpack('!I', fy_length_bytes)[0]

        fy_data_bytes = b''

        while len(fy_data_bytes) < fy_length:

            chunk = client_socket.recv(4096)

            if not chunk: return

            fy_data_bytes += chunk


        print("Received initial fx and fy data from client.")


        # 2. Continuous streaming: Receive and process x_interp chunks

        while True:

            # Receive length of the next x_interp chunk

            interp_x_chunk_length_bytes = client_socket.recv(4)

            if not interp_x_chunk_length_bytes:

                print("Client disconnected or finished sending stream.")

                break # Client has disconnected or finished sending data


            interp_x_chunk_length = struct.unpack('!I', interp_x_chunk_length_bytes)[0]

            if interp_x_chunk_length == 0: # A signal to indicate end of stream, if agreed upon

                print("Received end-of-stream signal (zero length chunk).")

                break


            interp_x_chunk_data_bytes = b''

            while len(interp_x_chunk_data_bytes) < interp_x_chunk_length:

                chunk = client_socket.recv(4096)

                if not chunk:

                    print("Client disconnected during chunk reception.")

                    return

                interp_x_chunk_data_bytes += chunk


            # Perform the pseudo-interpolation on the received chunk

            interp_y_binary_chunk = pseudo_interpolate_arcsecant_stream(

                fx_data_bytes, fy_data_bytes, interp_x_chunk_data_bytes

            )


            # Send the binary interpolated y data chunk back to the client

            interp_y_chunk_length = len(interp_y_binary_chunk)

            client_socket.sendall(struct.pack('!I', interp_y_chunk_length))

            client_socket.sendall(interp_y_binary_chunk)

            # print(f"Processed and sent back a chunk of {len(interp_y_binary_chunk)} bytes.")


    except ValueError as e:

        print(f"ValueError on server (streaming): {e}")

        error_message = str(e).encode('utf-8')

        client_socket.sendall(struct.pack('!I', len(error_message)))

        client_socket.sendall(error_message)

    except RuntimeError as e: # Catch custom RuntimeErrors from interpolation function

        print(f"RuntimeError on server (streaming): {e}")

        error_message = str(e).encode('utf-8')

        client_socket.sendall(struct.pack('!I', len(error_message)))

        client_socket.sendall(error_message)

    except ConnectionResetError:

        print("Client forcibly closed the connection during streaming.")

    except Exception as e:

        print(f"An unexpected error occurred in handle_client_stream: {e}")

    finally:

        client_socket.close()

        print("Connection with client closed.")



def start_server_stream(host, port):

    """

    Starts a server to listen for incoming binary data streams for pseudo-interpolation.

    """

    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    server_socket.bind((host, port))

    server_socket.listen(5)

    print(f"Server listening on {host}:{port} for streaming data...")


    while True:

        client_socket, addr = server_socket.accept()

        print(f"Accepted streaming connection from {addr}")

        handle_client_stream(client_socket)



if __name__ == "__main__":

    SERVER_HOST = '127.0.0.1'

    SERVER_PORT = 12345

    start_server_stream(SERVER_HOST, SERVER_PORT)


No comments: