This Server is an example that will handle nuclear positional data, incase anyone wanted to play with NMR Video/Audio Sources and allow networked commenting as well as superposition updating.
** Server_GraphicsBBS.py
# Server_GraphicsBBS.py
import socket
import struct
import threading
import numpy as np
import traceback
import logging # Use logging instead of print for better control
# from scipy.interpolate import interp1d # Not directly used in the current arcsecant logic, but good for general interpolation
# from scipy.misc import derivative # Deprecated, will not be used
import nmrglue as ng # Import NMRglue
# --- Configuration Constants ---
HOST = '0.0.0.0' # Listen on all interfaces
PORT = 12345
# --- Message Operation Codes (Server to Client) ---
MSG_TYPE_INTERPOLATION_RESULT = 5
MSG_TYPE_INITIAL_DATA_BROADCAST = 10
MSG_TYPE_DIFFERENTIATION_RESULT = 12
MSG_TYPE_NMR_DATA_BROADCAST = 101
MSG_TYPE_SERVER_WARNING = 252
MSG_TYPE_SERVER_ERROR = 253
MSG_TYPE_CLIENT_DISCONNECT = 254
MSG_TYPE_ID_ASSIGNMENT = 255
# --- Message Operation Codes (Client to Server) ---
# Note: Some codes are used bidirectionally based on context (e.g., initial data)
# Client sends these to request/send specific updates
REQ_TYPE_INITIAL_DATA = 0
REQ_TYPE_FX_UPDATE = 1
REQ_TYPE_FY_UPDATE = 2
REQ_TYPE_FZ_UPDATE = 3
REQ_TYPE_EVAL_X_UPDATE = 4
REQ_TYPE_COLOR_UPDATE = 6
REQ_TYPE_POSITION_UPDATE = 7
REQ_TYPE_AUDIO_UPDATE = 8
REQ_TYPE_REQUEST_ALL_DATA = 9 # Currently just a warning if sent as an update
REQ_TYPE_POSITIONAL_COMMENT = 11
REQ_TYPE_NMR_DATA = 100
REQ_TYPE_INTERPOLATION = 0 # Re-using 0 as client request for interpolation
REQ_TYPE_DIFFERENTIATION = 1 # Re-using 1 as client request for differentiation
# --- Set up logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class Server:
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.clients: list[dict] = [] # List to store client sockets and their IDs
self.clients_lock = threading.Lock() # Lock for thread-safe access to clients list
# Store all data associated with each client socket (graphics, position, audio, nmr)
# Key: client_socket, Value: dict of client data
self.client_data: dict[socket.socket, dict] = {}
self.client_data_lock = threading.Lock() # Lock for thread-safe access to client_data
self.next_client_id = 0 # Simple way to assign unique IDs
def pseudo_interpolate_arcsecant_1d_triple(self, fx: np.ndarray, fy: np.ndarray, fz: np.ndarray, x_interp: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""
Pseudo-interpolates 1-dimensional data with y and z datasets using an arcsecant-like function.
This method uses an unconventional scaling and may produce non-intuitive results or NaNs
if the input data or interpolation points are outside a specific range.
It's designed to mimic the original logic while improving robustness.
Args:
fx (numpy.ndarray): 1D array of x-data.
fy (numpy.ndarray): 1D array of corresponding y-data.
fz (numpy.ndarray): 1D array of corresponding z-data.
x_interp (numpy.ndarray): Array of x-values for interpolation.
Returns:
tuple of numpy.ndarray: A tuple containing two arrays of interpolated y and z values.
Raises:
ValueError: If input data lengths are not equal or insufficient, or if arcsecant domain is invalid.
"""
if not (len(fx) == len(fy) == len(fz)) or len(fx) < 2:
raise ValueError("X, Y, and Z data must have equal length and at least two points.")
# Recalculate arcsecant_domain more robustly.
# The original `np.linspace(-1.1, 1.1, 100)` combined with `1/np.abs(x)`
# is very problematic near x=0 leading to large numbers and arccos domain errors.
# A safer approach is to define a valid domain for arccos, e.g., [1, inf) for arcsecant(x).
# However, to preserve the *spirit* of the original (unconventional) approach,
# we'll use a modified scaling that tries to avoid direct arccos(1/0) or arccos(>1).
# This still may not be a true arcsecant interpolation, but keeps the non-linear scaling idea.
# A more stable way to generate the scaling factor,
# if the intent is a non-linear S-curve like behavior within a [0,1] range.
# If the actual mathematical arcsecant behavior is critical, the input scaling needs redesign.
# Here, we create an artificial "arcsecant-like" progression for the scaling factor 't'.
try:
# Generate a range for the argument of arccos that avoids invalid values [1, infinity)
# For arccos(1/x), x must be such that 1/x is in [-1, 1].
# Since we use np.abs, 1/np.abs(scaled_x) must be in [-1, 1].
# This means np.abs(scaled_x) must be >= 1.
# The original `linspace(-1.1, 1.1, 100)` implies `abs(scaled_x)` can be < 1.
# To fix the NaN issue while trying to keep the original structure:
# Scale x_interp to a domain where 1/abs(scaled_x) is always valid for arccos.
# Let's map [min_x, max_x] to [1, SomeMaxVal] for input to arccos(1/abs(val)).
# This is a significant deviation from the original's likely flawed intent,
# but it will produce valid numbers.
# If the user's intent was to use arcsecant shape, a different approach is needed.
# Re-interpreting the "arcsecant-like" to mean non-linear interpolation.
# The original implementation effectively mapped a linear t from [0,1] to another [0,1]
# using arcsecant function's shape, but the domain was wrong.
# A common non-linear interpolation, if not specifically arcsecant, could be a sigmoid.
# Given the original code's `arccos(1/np.abs(scaled_x))`, it suggests a shape where
# change is slow at ends and fast in middle.
# Let's use a simpler, more robust approach that still introduces a non-linearity.
# We will use a scaled linear interpolation factor and apply a non-linear transformation to it.
# The original "arcsecant_domain" calculation was flawed.
# Instead, we will map the linear interpolation factor `t` through a custom non-linear function
# that is somewhat similar in *effect* to what a valid arccos transformation might aim for.
interp_y = np.zeros_like(x_interp, dtype=float)
interp_z = np.zeros_like(x_interp, dtype=float)
min_x, max_x = fx[0], fx[-1] # Assuming fx is sorted
for i, x in enumerate(x_interp):
# Find the two closest points in fx
if x <= fx[0]:
idx1, idx2 = 0, 1
elif x >= fx[-1]:
idx1, idx2 = len(fx) - 2, len(fx) - 1
else:
idx1 = np.searchsorted(fx, x, side='right') - 1
idx2 = idx1 + 1
x1, x2 = fx[idx1], fx[idx2]
y1, y2 = fy[idx1], fy[idx2]
z1, z2 = fz[idx1], fz[idx2]
if (x2 - x1) == 0: # Avoid division by zero for identical x points
interp_y[i] = y1
interp_z[i] = z1
continue
# Linear interpolation factor
t_linear = (x - x1) / (x2 - x1)
# Applying a non-linear scaling to 't_linear' to simulate a "curve"
# A common non-linear transformation that gives an S-curve or slowed ends:
# For example, using a power law (e.g., t_linear**2 or sqrt(t_linear))
# or a sigmoid-like function if 'arcsecant' was meant to distort the progression.
# Given the complexity, let's aim for a simple S-curve effect without complex arcsecant math.
# The original code's arcsecant approach was trying to map a linear range to a non-linear one.
# A simple, robust non-linear transformation for t in [0,1] could be 0.5 * (1 - cos(t * pi))
# which goes from 0 to 1 with a smooth start and end. This is a common Bezier-like curve.
# Or, if the original intent was a 'peaked' non-linearity:
# `scaled_t = 1 - np.cos(t_linear * np.pi)` for a full wave, then scale to 0-1.
# Or simply `scaled_t = t_linear**power` or `scaled_t = 1 - (1-t_linear)**power` for easing.
# Sticking closer to the original's *spirit* of using an inverse trig function for non-linearity:
# Let's map t_linear from [0, 1] to [-pi/2, pi/2] and apply sin to get an S-curve.
# This makes the interpolation values change slowly at the start and end, and faster in the middle.
# This is a common "easing" function.
scaled_t = 0.5 * (1 + np.sin(np.pi * (t_linear - 0.5)))
scaled_t = np.clip(scaled_t, 0, 1) # Ensure it stays within [0, 1] due to float precision
interp_y[i] = y1 + (y2 - y1) * scaled_t
interp_z[i] = z1 + (z2 - z1) * scaled_t
return interp_y, interp_z
except Exception as e:
logging.error(f"Error in pseudo_interpolate_arcsecant_1d_triple: {e}", exc_info=True)
raise ValueError(f"Failed to perform 1D arcsecant-like interpolation: {e}")
def pseudo_interpolate_arcsecant_nd_triple(self, all_fy_data: list[np.ndarray], all_fz_data: list[np.ndarray], all_fx_data: list[np.ndarray], x_interp: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""
Pseudo-interpolates n-dimensional data with y and z datasets.
Performs the interpolation independently for each dimension and returns the concatenated results.
Args:
all_fy_data (list of numpy.ndarray): A list of y-data arrays for each dimension.
all_fz_data (list of numpy.ndarray): A list of z-data arrays for each dimension.
all_fx_data (list of numpy.ndarray): A list of corresponding x-data arrays for each dimension.
x_interp (numpy.ndarray): Array of x-values for interpolation.
Returns:
tuple of numpy.ndarray: A tuple containing two concatenated arrays of interpolated y and z values.
Raises:
ValueError: If the number of x, y, and z data arrays do not match or if an error occurs during 1D interpolation.
"""
all_interp_y = []
all_interp_z = []
num_dimensions = len(all_fy_data)
if not (len(all_fx_data) == num_dimensions == len(all_fz_data)):
raise ValueError("The number of x, y, and z data arrays must match across dimensions.")
for i, (fx, fy, fz) in enumerate(zip(all_fx_data, all_fy_data, all_fz_data)):
try:
interp_y, interp_z = self.pseudo_interpolate_arcsecant_1d_triple(fx, fy, fz, x_interp)
all_interp_y.extend(interp_y)
all_interp_z.extend(interp_z)
except ValueError as e:
raise ValueError(f"ValueError during arcsecant interpolation for dimension {i+1}: {e}")
except Exception as e:
logging.error(f"An unexpected error occurred during arcsecant interpolation for dimension {i+1}: {e}", exc_info=True)
raise Exception(f"An unexpected error occurred during arcsecant interpolation for dimension {i+1}: {e}")
return np.array(all_interp_y), np.array(all_interp_z)
def numerical_derivative_1d(self, y_values: np.ndarray, x_values: np.ndarray) -> np.ndarray:
"""
Calculates the numerical derivative of a 1D array of y-values with respect to x-values.
Uses numpy.gradient for robust and efficient calculation, which handles uniform
and non-uniform spacing.
Args:
y_values (numpy.ndarray): 1D array of y-values.
x_values (numpy.ndarray): 1D array of corresponding x-values.
Returns:
numpy.ndarray: 1D array of approximate derivatives.
Raises:
ValueError: If Y and X data do not have equal length or have less than two points.
"""
if len(y_values) != len(x_values) or len(y_values) < 2:
raise ValueError("Y and X data must have equal length and at least two points for derivative calculation.")
return np.gradient(y_values, x_values)
def differentiate_arcsecant_nd_triple(self, all_fy_data: list[np.ndarray], all_fz_data: list[np.ndarray], all_fx_data: list[np.ndarray], x_eval: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""
Calculates the numerical derivative of the pseudo-interpolated output (for y and z datasets).
Args:
all_fy_data (list of numpy.ndarray): A list of y-data arrays.
all_fz_data (list of numpy.ndarray): A list of z-data arrays.
all_fx_data (list of numpy.ndarray): A list of corresponding x-data arrays.
x_eval (numpy.ndarray): Array of x-values at which to evaluate the derivative.
Returns:
tuple of numpy.ndarray: A tuple containing two concatenated arrays of approximate derivatives (for y and z).
Raises:
ValueError: If input data arrays do not match in length/count, or if evaluation points are insufficient.
"""
num_dimensions = len(all_fy_data)
if not (len(all_fx_data) == num_dimensions == len(all_fz_data)):
raise ValueError("The number of x, y, and z data arrays must match.")
if len(x_eval) < 2:
raise ValueError("At least two evaluation points are needed for differentiation.")
all_derivatives_y = []
all_derivatives_z = []
for i, (fx, fy, fz) in enumerate(zip(all_fx_data, all_fy_data, all_fz_data)):
try:
if len(fx) != len(fy) or len(fx) < 2:
raise ValueError(f"Dimension {i+1}: X and Y data must have equal length and at least two points for differentiation.")
if len(fx) != len(fz):
raise ValueError(f"Dimension {i+1}: X and Z data must have equal length.")
# First, interpolate the data at the evaluation points
interpolated_y, interpolated_z = self.pseudo_interpolate_arcsecant_1d_triple(fx, fy, fz, x_eval)
# Then, differentiate the interpolated data with respect to the evaluation points
derivatives_y = self.numerical_derivative_1d(interpolated_y, x_eval)
derivatives_z = self.numerical_derivative_1d(interpolated_z, x_eval)
all_derivatives_y.extend(derivatives_y)
all_derivatives_z.extend(derivatives_z)
except ValueError as e:
raise ValueError(f"ValueError during differentiation for dimension {i+1}: {e}")
except Exception as e:
logging.error(f"An unexpected error occurred during differentiation for dimension {i+1}: {e}", exc_info=True)
raise Exception(f"An unexpected error occurred during differentiation of arcsecant interpolation for dimension {i+1}: {e}")
return np.array(all_derivatives_y), np.array(all_derivatives_z)
def broadcast(self, message: bytes, sender_socket: socket.socket = None, sender_id: int = None):
"""Sends a message to all clients except the sender (if specified)."""
with self.clients_lock:
clients_to_remove = []
for client_info in self.clients:
client_socket = client_info['socket']
if sender_socket is None or client_socket != sender_socket:
try:
client_socket.sendall(message)
except (socket.error, ConnectionResetError, BrokenPipeError) as e:
logging.warning(f"Error sending to client {client_info.get('id', 'N/A')}: {e}. Marking for removal.")
clients_to_remove.append(client_info)
except Exception as e:
logging.error(f"Unexpected error sending to client {client_info.get('id', 'N/A')}: {e}", exc_info=True)
clients_to_remove.append(client_info)
if clients_to_remove:
for client_info in clients_to_remove:
self.remove_client(client_info['socket'], client_info.get('id', 'N/A'))
def remove_client(self, client_socket: socket.socket, client_id: int = None):
"""Removes a client and its data."""
with self.clients_lock:
client_info_to_remove = None
for client_info in self.clients:
if client_info['socket'] == client_socket:
client_info_to_remove = client_info
break
if client_info_to_remove:
self.clients.remove(client_info_to_remove)
actual_id = client_info_to_remove.get('id', 'N/A')
logging.info(f"Client {actual_id} disconnected. Total clients: {len(self.clients)}")
# Send disconnect message to remaining clients
disconnect_message = struct.pack('!BI', MSG_TYPE_CLIENT_DISCONNECT, actual_id)
# Send in new thread to avoid blocking if broadcast takes time
threading.Thread(target=self.broadcast, args=(disconnect_message, None, None)).start()
with self.client_data_lock:
if client_socket in self.client_data:
del self.client_data[client_socket]
logging.info(f"Data for disconnected client {actual_id} removed.")
try:
client_socket.close()
except socket.error as e:
logging.error(f"Error closing socket for client {actual_id}: {e}")
else:
logging.warning(f"Attempted to remove a client that was not found (ID: {client_id}).")
def handle_client(self, client_socket: socket.socket, client_address: tuple):
"""Handles communication with a single client."""
client_id = -1 # Initialize client_id
try:
with self.clients_lock:
client_id = self.next_client_id
self.next_client_id += 1
client_info = {'socket': client_socket, 'address': client_address, 'id': client_id}
self.clients.append(client_info)
logging.info(f"Connection from {client_address}, assigned ID: {client_id}")
logging.info(f"Total clients connected: {len(self.clients)}")
# Send client ID to the new client
id_message = struct.pack('!BI', MSG_TYPE_ID_ASSIGNMENT, client_id)
client_socket.sendall(id_message)
logging.info(f"Sent ID {client_id} to the new client.")
# Initialize data storage for this client
with self.client_data_lock:
self.client_data[client_socket] = {'id': client_id}
# Send existing clients' initial data to the new client
# This needs to be done carefully to avoid deadlock or sending incomplete data.
# It's better to iterate a copy of self.clients if client list can change during iteration.
with self.clients_lock: # Acquire lock to safely iterate self.clients
for existing_client_info in list(self.clients): # Iterate over a copy
existing_socket = existing_client_info['socket']
existing_id = existing_client_info['id']
# Don't send client's own data back to itself at this stage
if existing_socket != client_socket:
with self.client_data_lock: # Acquire lock for client_data access
if existing_socket in self.client_data and 'id' in self.client_data[existing_socket]:
logging.info(f"Sending existing data of client {existing_id} to new client {client_id}")
# Attempt to pack initial data for the existing client
# Note: This assumes self.client_data[existing_socket] is fully populated
# with initial data fields if it represents a connected client.
data_to_send = self.client_data[existing_socket]
message = self._pack_initial_data(data_to_send, MSG_TYPE_INITIAL_DATA_BROADCAST, existing_id)
if message:
try:
client_socket.sendall(message)
except (socket.error, ConnectionResetError, BrokenPipeError) as e:
logging.warning(f"Error sending existing client data to new client {client_id}: {e}")
# No need to remove client here, handle_client's main loop will catch it
# Receive and process messages from the client
while True:
# Receive operation code / update type
header = self._recv_all(client_socket, 1)
if not header:
logging.info(f"Client {client_id} ({client_address}) disconnected gracefully.")
break
operation_code = struct.unpack('!B', header)[0]
if operation_code in [REQ_TYPE_INTERPOLATION, REQ_TYPE_DIFFERENTIATION]:
logging.info(f"Client {client_id} requested {'interpolation' if operation_code == REQ_TYPE_INTERPOLATION else 'differentiation'}.")
try:
# Receive the number of data dimensions
num_dimensions_bytes = self._recv_all(client_socket, 4)
if not num_dimensions_bytes:
logging.warning(f"Client {client_id} disconnected unexpectedly (no num_dimensions for operation {operation_code}).")
break
num_dimensions = struct.unpack('!I', num_dimensions_bytes)[0]
all_fx_data = []
all_fy_data = []
all_fz_data = []
for dim in range(num_dimensions):
# Receive fx data
num_fx_bytes = self._recv_all(client_socket, 4)
if not num_fx_bytes: break # Handle premature disconnect
num_fx = struct.unpack('!I', num_fx_bytes)[0]
fx_bytes = self._recv_all(client_socket, num_fx * 4)
if not fx_bytes: break
all_fx_data.append(np.frombuffer(fx_bytes, dtype=np.float32))
# Receive fy data
num_fy_bytes = self._recv_all(client_socket, 4)
if not num_fy_bytes: break
num_fy = struct.unpack('!I', num_fy_bytes)[0]
fy_bytes = self._recv_all(client_socket, num_fy * 4)
if not fy_bytes: break
all_fy_data.append(np.frombuffer(fy_bytes, dtype=np.float32))
# Receive fz data
num_fz_bytes = self._recv_all(client_socket, 4)
if not num_fz_bytes: break
num_fz = struct.unpack('!I', num_fz_bytes)[0]
fz_bytes = self._recv_all(client_socket, num_fz * 4)
if not fz_bytes: break
all_fz_data.append(np.frombuffer(fz_bytes, dtype=np.float32))
if not (len(all_fx_data) == num_dimensions and len(all_fy_data) == num_dimensions and len(all_fz_data) == num_dimensions):
logging.warning(f"Client {client_id} disconnected or sent incomplete data for operation {operation_code}.")
break # Data incomplete, break out of while loop
# Receive evaluation x values
eval_x_count_bytes = self._recv_all(client_socket, 4)
if not eval_x_count_bytes:
logging.warning(f"Client {client_id} disconnected unexpectedly (no eval_x length for operation {operation_code}).")
break
eval_x_count = struct.unpack('!I', eval_x_count_bytes)[0]
eval_x_bytes = self._recv_all(client_socket, eval_x_count * 4)
if not eval_x_bytes:
logging.warning(f"Client {client_id} disconnected unexpectedly (no eval_x data for operation {operation_code}).")
break
eval_x_data = np.frombuffer(eval_x_bytes, dtype=np.float32)
# Perform operation
result: np.ndarray
response_type: int
if operation_code == REQ_TYPE_INTERPOLATION:
result_y, result_z = self.pseudo_interpolate_arcsecant_nd_triple(all_fy_data, all_fz_data, all_fx_data, eval_x_data)
result = np.concatenate((result_y, result_z))
response_type = MSG_TYPE_INTERPOLATION_RESULT
else: # operation_code == REQ_TYPE_DIFFERENTIATION
deriv_y, deriv_z = self.differentiate_arcsecant_nd_triple(all_fy_data, all_fz_data, all_fx_data, eval_x_data)
result = np.concatenate((deriv_y, deriv_z))
response_type = MSG_TYPE_DIFFERENTIATION_RESULT
# Send result back to the client
result_bytes = result.astype(np.float32).tobytes()
result_length = len(result_bytes)
response_header = struct.pack('!BI', response_type, result_length)
client_socket.sendall(response_header + result_bytes)
logging.info(f"Sent result (Type {response_type}, {result_length} bytes) to client {client_id}")
except ValueError as ve:
logging.warning(f"ValueError for client {client_id} during interpolation/differentiation: {ve}")
self._send_error_to_client(client_socket, str(ve))
except Exception as e:
logging.error(f"Unexpected error for client {client_id} during interpolation/differentiation: {e}", exc_info=True)
self._send_error_to_client(client_socket, "Server internal error during computation.")
elif operation_code == REQ_TYPE_NMR_DATA:
logging.info(f"Client {client_id} sent NMR data.")
try:
# Receive NMR data details
ndims_bytes = self._recv_all(client_socket, 4)
if not ndims_bytes: break
ndims = struct.unpack('!I', ndims_bytes)[0]
shape_bytes = self._recv_all(client_socket, ndims * 4)
if not shape_bytes: break
shape = struct.unpack(f'!{ndims}I', shape_bytes)
data_size_bytes = self._recv_all(client_socket, 4)
if not data_size_bytes: break
data_size = struct.unpack('!I', data_size_bytes)[0]
data_bytes = self._recv_all(client_socket, data_size)
if not data_bytes: break
# Reshape the received data
nmr_data = np.frombuffer(data_bytes, dtype=np.float32).reshape(shape)
# Store the NMR data
with self.client_data_lock:
self.client_data[client_socket]['nmr_data'] = nmr_data
# Broadcast the NMR data to other clients
broadcast_message = struct.pack('!BI', MSG_TYPE_NMR_DATA_BROADCAST, client_id) # Type 101, Sender ID
broadcast_message += struct.pack('!I', ndims) # Number of dimensions
broadcast_message += struct.pack(f'!{ndims}I', *shape) # Shape of the data
broadcast_message += struct.pack('!I', data_size)
broadcast_message += data_bytes
# Broadcast to all clients except the sender
self.broadcast(broadcast_message, client_socket, client_id)
logging.info(f"Broadcasted NMR data from client {client_id}, shape: {shape}")
except ValueError as ve:
logging.warning(f"Client {client_id} sent incorrectly shaped NMR data: {ve}")
self._send_error_to_client(client_socket, f"Incorrectly shaped NMR data: {ve}")
except Exception as e:
logging.error(f"Error handling NMR data from client {client_id}: {e}", exc_info=True)
self._send_error_to_client(client_socket, "Server internal error processing NMR data.")
else:
# Original data handling logic for general updates (graphics, position, audio, etc.)
data = self._receive_data(client_socket, operation_code)
if data is None: # Indicates disconnection or error during receive
logging.info(f"Error receiving data for type {operation_code} from client {client_id}. Disconnecting.")
break # Exit the while loop to remove the client
self._process_and_broadcast(client_socket, operation_code, data, client_id)
except ConnectionResetError:
logging.info(f"Client {client_id} ({client_address}) forcibly closed the connection.")
except socket.timeout:
logging.info(f"Client {client_id} ({client_address}) timed out.")
except EOFError as e:
logging.info(f"Client {client_id} ({client_address}) disconnected while receiving data: {e}")
except ValueError as ve:
logging.warning(f"ValueError handling client {client_id} ({client_address}): {ve}")
self._send_error_to_client(client_socket, str(ve))
except Exception as e:
logging.error(f"Error handling client {client_id} ({client_address}): {e}", exc_info=True)
self._send_error_to_client(client_socket, "An unexpected server error occurred.")
finally:
logging.info(f"Cleaning up connection for client {client_id}")
self.remove_client(client_socket, client_id)
def _send_error_to_client(self, client_socket: socket.socket, message: str, message_type: int = MSG_TYPE_SERVER_ERROR):
"""Helper to send an error message back to a client."""
try:
error_bytes = message.encode('utf-8')
response = struct.pack('!BI', message_type, len(error_bytes)) + error_bytes
client_socket.sendall(response)
except Exception as send_error:
logging.error(f"Failed to send error message to client: {send_error}")
def _receive_data(self, client_socket: socket.socket, update_type: int) -> dict | tuple | None:
"""Helper function to receive data based on update type."""
try:
if update_type == REQ_TYPE_INITIAL_DATA:
return self._receive_initial_data(client_socket)
elif update_type == REQ_TYPE_FX_UPDATE:
return self._receive_axis_update(client_socket, 'fx')
elif update_type == REQ_TYPE_FY_UPDATE:
return self._receive_axis_update(client_socket, 'fy')
elif update_type == REQ_TYPE_FZ_UPDATE:
return self._receive_axis_update(client_socket, 'fz')
elif update_type == REQ_TYPE_EVAL_X_UPDATE:
return self._receive_eval_x_update(client_socket)
elif update_type == REQ_TYPE_COLOR_UPDATE:
return self._receive_color_update(client_socket)
elif update_type == REQ_TYPE_POSITION_UPDATE:
pos_bytes = self._recv_all(client_socket, 3 * 4)
if not pos_bytes: return None
return struct.unpack('!3f', pos_bytes)
elif update_type == REQ_TYPE_AUDIO_UPDATE:
return self._receive_audio_update(client_socket)
elif update_type == REQ_TYPE_REQUEST_ALL_DATA:
logging.warning(f"Client sent update type {update_type} (Request All Data) in the update loop, ignoring.")
return {} # Indicate ignored type, no actual data expected for this type as an update
elif update_type == REQ_TYPE_POSITIONAL_COMMENT:
return self._receive_positional_comment(client_socket)
# Operation types for interpolation/differentiation (0, 1) and NMR (100) are handled directly in handle_client
# Response types from server to client should not be received by server in this function
elif update_type in [MSG_TYPE_INTERPOLATION_RESULT, MSG_TYPE_INITIAL_DATA_BROADCAST, MSG_TYPE_DIFFERENTIATION_RESULT,
MSG_TYPE_NMR_DATA_BROADCAST, MSG_TYPE_SERVER_WARNING, MSG_TYPE_SERVER_ERROR,
MSG_TYPE_CLIENT_DISCONNECT, MSG_TYPE_ID_ASSIGNMENT]:
logging.warning(f"Client sent a server-to-client message type ({update_type}), ignoring.")
return {}
else:
logging.warning(f"Unknown update type {update_type} received.")
return {}
except (socket.error, struct.error, EOFError, MemoryError) as e:
logging.error(f"Error receiving data for type {update_type}: {e}")
return None # Signal error/disconnection
except Exception as e:
logging.error(f"Unexpected error receiving data for type {update_type}: {e}", exc_info=True)
return None
def _process_and_broadcast(self, client_socket: socket.socket, update_type: int, data: dict | tuple, client_id: int):
"""Helper function to process received data and broadcast."""
if data is None:
return
broadcast_message: bytes | None = None
processed_successfully = False
with self.client_data_lock:
if client_socket not in self.client_data:
logging.warning(f"Received data type {update_type} from client {client_id} before initial data or after removal. Data: {data}")
return # Stop processing if client data entry doesn't exist
client_current_data = self.client_data[client_socket]
try:
if update_type == REQ_TYPE_INITIAL_DATA:
client_current_data.update(data)
logging.info(f"Stored initial data for client {client_id}")
# Broadcast initial data using type MSG_TYPE_INITIAL_DATA_BROADCAST
broadcast_message = self._pack_initial_data(data, MSG_TYPE_INITIAL_DATA_BROADCAST, client_id)
processed_successfully = True
elif update_type == REQ_TYPE_FX_UPDATE:
client_current_data['fx'] = data['data']
num_dims = data['num_dimensions']
broadcast_message = struct.pack('!BI', REQ_TYPE_FX_UPDATE, client_id) + struct.pack('!I', num_dims)
for fx in data['data']:
broadcast_message += struct.pack('!I', len(fx)) + fx.astype(np.float32).tobytes()
processed_successfully = True
elif update_type == REQ_TYPE_FY_UPDATE:
client_current_data['fy'] = data['data']
num_dims = data['num_dimensions']
broadcast_message = struct.pack('!BI', REQ_TYPE_FY_UPDATE, client_id) + struct.pack('!I', num_dims)
for fy in data['data']:
broadcast_message += struct.pack('!I', len(fy)) + fy.astype(np.float32).tobytes()
processed_successfully = True
elif update_type == REQ_TYPE_FZ_UPDATE:
client_current_data['fz'] = data['data']
num_dims = data['num_dimensions']
broadcast_message = struct.pack('!BI', REQ_TYPE_FZ_UPDATE, client_id) + struct.pack('!I', num_dims)
for fz in data['data']:
broadcast_message += struct.pack('!I', len(fz)) + fz.astype(np.float32).tobytes()
processed_successfully = True
elif update_type == REQ_TYPE_EVAL_X_UPDATE:
client_current_data['eval_x'] = data['data']
num_eval_x = data['num_eval_x']
broadcast_message = struct.pack('!BI', REQ_TYPE_EVAL_X_UPDATE, client_id) + struct.pack('!I', num_eval_x) + data['data'].astype(np.float32).tobytes()
processed_successfully = True
elif update_type == REQ_TYPE_COLOR_UPDATE:
client_current_data['color'] = data['data']
num_dims = data['num_dimensions']
broadcast_message = struct.pack('!BI', REQ_TYPE_COLOR_UPDATE, client_id) + struct.pack('!I', num_dims)
for color_array in data['data']: # color_array is np.ndarray of shape (N, 3)
num_colors = len(color_array)
broadcast_message += struct.pack('!I', num_colors) + color_array.astype(np.float32).tobytes()
processed_successfully = True
elif update_type == REQ_TYPE_POSITION_UPDATE:
client_current_data['position'] = data # data is already the tuple (x, y, z)
broadcast_message = struct.pack('!BI', REQ_TYPE_POSITION_UPDATE, client_id) + struct.pack('!3f', *data)
processed_successfully = True
elif update_type == REQ_TYPE_AUDIO_UPDATE:
client_current_data['audio_features'] = data['features']
num_sources = data['num_sources']
broadcast_message = struct.pack('!BI', REQ_TYPE_AUDIO_UPDATE, client_id) + struct.pack('!I', num_sources)
for feature in data['features']:
frequency = feature.get('frequency', 0.0)
amplitude = feature.get('amplitude', 0.0)
broadcast_message += struct.pack('!ff', frequency, amplitude)
processed_successfully = True
elif update_type == REQ_TYPE_POSITIONAL_COMMENT:
position = data['position']
comment = data['comment']
logging.info(f"Client {client_id} commented at {position}: {comment}")
comment_bytes = comment.encode('utf-8')
broadcast_message = struct.pack('!BI', REQ_TYPE_POSITIONAL_COMMENT, client_id) # Type 11, Sender ID
broadcast_message += struct.pack('!3f', *position)
broadcast_message += struct.pack('!I', len(comment_bytes))
broadcast_message += comment_bytes
processed_successfully = True
else:
logging.warning(f"No processing defined for update type {update_type} from client {client_id}.")
except Exception as e:
logging.error(f"Exception processing update type {update_type} from client {client_id}: {e}", exc_info=True)
self._send_error_to_client(client_socket, f"Server error processing update for type {update_type}.")
return # Exit the processing function on error.
if processed_successfully and broadcast_message:
self.broadcast(broadcast_message, client_socket, client_id)
def _pack_initial_data(self, data: dict, message_type: int, client_id: int) -> bytes | None:
"""
Packs initial data into a byte string for sending to a client.
Args:
data (dict): Dictionary containing initial data.
message_type (int): The message type (e.g., MSG_TYPE_INITIAL_DATA_BROADCAST).
client_id (int): The ID of the client sending the data (or the client whose data is being sent).
Returns:
bytes: Packed byte string, or None on error.
"""
try:
if not data:
return None
message = struct.pack('!BI', message_type, client_id)
# Helper to pack lists of numpy arrays
def pack_np_arrays(arr_list):
packed = struct.pack('!I', len(arr_list)) # Number of arrays
for arr in arr_list:
packed += struct.pack('!I', len(arr)) + arr.astype(np.float32).tobytes()
return packed
message += pack_np_arrays(data.get('fx', []))
message += pack_np_arrays(data.get('fy', []))
message += pack_np_arrays(data.get('fz', []))
# Pack eval_x data (single numpy array)
eval_x_data = data.get('eval_x', np.array([]))
message += struct.pack('!I', len(eval_x_data)) + eval_x_data.astype(np.float32).tobytes()
# Pack color data (list of numpy arrays)
color_data = data.get('color', [])
message += struct.pack('!I', len(color_data)) # Number of color arrays
for color_array in color_data:
message += struct.pack('!I', len(color_array)) + color_array.astype(np.float32).tobytes() # Each color_array is N x 3
# Pack position data
position = data.get('position', (0.0, 0.0, 0.0))
message += struct.pack('!3f', *position)
# Pack audio data
audio_features = data.get('audio_features', [])
message += struct.pack('!I', len(audio_features))
for feature in audio_features:
frequency = feature.get('frequency', 0.0)
amplitude = feature.get('amplitude', 0.0)
message += struct.pack('!ff', frequency, amplitude)
return message
except struct.error as e:
logging.error(f"Error packing initial data due to struct: {e}")
return None
except Exception as e:
logging.error(f"Unexpected error packing initial data: {e}", exc_info=True)
return None
def _receive_initial_data(self, client_socket: socket.socket) -> dict | None:
"""Receives initial data from a client."""
data = {}
try:
# Helper to receive lists of numpy arrays
def receive_np_arrays():
num_arrays_bytes = self._recv_all(client_socket, 4)
if not num_arrays_bytes: raise EOFError("Connection closed while receiving num_arrays.")
num_arrays = struct.unpack('!I', num_arrays_bytes)[0]
arrays = []
for _ in range(num_arrays):
num_elements_bytes = self._recv_all(client_socket, 4)
if not num_elements_bytes: raise EOFError("Connection closed while receiving num_elements.")
num_elements = struct.unpack('!I', num_elements_bytes)[0]
array_bytes = self._recv_all(client_socket, num_elements * 4)
if not array_bytes: raise EOFError("Connection closed while receiving array data.")
arrays.append(np.frombuffer(array_bytes, dtype=np.float32))
return arrays
data['fx'] = receive_np_arrays()
data['fy'] = receive_np_arrays()
data['fz'] = receive_np_arrays()
# Receive eval_x data (single numpy array)
num_eval_x_bytes = self._recv_all(client_socket, 4)
if not num_eval_x_bytes: return None
num_eval_x = struct.unpack('!I', num_eval_x_bytes)[0]
eval_x_bytes = self._recv_all(client_socket, num_eval_x * 4)
if not eval_x_bytes: return None
data['eval_x'] = np.frombuffer(eval_x_bytes, dtype=np.float32)
# Receive color data (list of numpy arrays, where each array is N x 3)
num_color_arrays_bytes = self._recv_all(client_socket, 4)
if not num_color_arrays_bytes: return None
num_color_arrays = struct.unpack('!I', num_color_arrays_bytes)[0]
color_data = []
for _ in range(num_color_arrays):
num_colors_bytes = self._recv_all(client_socket, 4)
if not num_colors_bytes: return None
num_colors = struct.unpack('!I', num_colors_bytes)[0]
color_bytes = self._recv_all(client_socket, num_colors * 3 * 4) # 3 floats per color
if not color_bytes: return None
color_data.append(np.frombuffer(color_bytes, dtype=np.float32).reshape(num_colors, 3))
data['color'] = color_data
# Receive position data
pos_bytes = self._recv_all(client_socket, 3 * 4)
if not pos_bytes: return None
data['position'] = struct.unpack('!3f', pos_bytes)
# Receive audio data
num_sources_bytes = self._recv_all(client_socket, 4)
if not num_sources_bytes: return None
num_sources = struct.unpack('!I', num_sources_bytes)[0]
audio_features = []
for _ in range(num_sources):
feature_bytes = self._recv_all(client_socket, 2 * 4) # frequency (float), amplitude (float)
if not feature_bytes: return None
frequency, amplitude = struct.unpack('!ff', feature_bytes)
audio_features.append({'frequency': frequency, 'amplitude': amplitude})
data['audio_features'] = audio_features
return data
except (struct.error, EOFError) as e:
logging.error(f"Error unpacking initial data: {e}")
return None
except Exception as e:
logging.error(f"Unexpected error receiving initial data: {e}", exc_info=True)
return None
def _receive_axis_update(self, client_socket: socket.socket, axis_name: str) -> dict | None:
"""Receives axis update data (fx, fy, or fz)."""
try:
num_dimensions_bytes = self._recv_all(client_socket, 4)
if not num_dimensions_bytes: return None
num_dimensions = struct.unpack('!I', num_dimensions_bytes)[0]
axis_data = []
for _ in range(num_dimensions):
num_axis_bytes = self._recv_all(client_socket, 4)
if not num_axis_bytes: return None
num_axis = struct.unpack('!I', num_axis_bytes)[0]
axis_bytes = self._recv_all(client_socket, num_axis * 4)
if not axis_bytes: return None
axis_data.append(np.frombuffer(axis_bytes, dtype=np.float32))
return {'data': axis_data, 'num_dimensions': num_dimensions}
except (struct.error, EOFError) as e:
logging.error(f"Error unpacking {axis_name} update: {e}")
return None
except Exception as e:
logging.error(f"Unexpected error receiving {axis_name} update: {e}", exc_info=True)
return None
def _receive_eval_x_update(self, client_socket: socket.socket) -> dict | None:
"""Receives the evaluation x-axis update."""
try:
num_eval_x_bytes = self._recv_all(client_socket, 4)
if not num_eval_x_bytes: return None
num_eval_x = struct.unpack('!I', num_eval_x_bytes)[0]
eval_x_bytes = self._recv_all(client_socket, num_eval_x * 4)
if not eval_x_bytes: return None
eval_x_data = np.frombuffer(eval_x_bytes, dtype=np.float32)
return {'data': eval_x_data, 'num_eval_x': num_eval_x}
except (struct.error, EOFError) as e:
logging.error(f"Error unpacking eval_x update: {e}")
return None
except Exception as e:
logging.error(f"Unexpected error receiving eval_x update: {e}", exc_info=True)
return None
def _receive_color_update(self, client_socket: socket.socket) -> dict | None:
"""Receives color update data."""
try:
num_dimensions_bytes = self._recv_all(client_socket, 4)
if not num_dimensions_bytes: return None
num_dimensions = struct.unpack('!I', num_dimensions_bytes)[0]
color_data = []
for _ in range(num_dimensions):
num_colors_bytes = self._recv_all(client_socket, 4)
if not num_colors_bytes: return None
num_colors = struct.unpack('!I', num_colors_bytes)[0]
color_bytes = self._recv_all(client_socket, num_colors * 3 * 4) # 3 floats per color (RGB)
if not color_bytes: return None
color_data.append(np.frombuffer(color_bytes, dtype=np.float32).reshape(num_colors, 3))
return {'data': color_data, 'num_dimensions': num_dimensions}
except (struct.error, EOFError) as e:
logging.error(f"Error unpacking color update: {e}")
return None
except Exception as e:
logging.error(f"Unexpected error receiving color update: {e}", exc_info=True)
return None
def _receive_audio_update(self, client_socket: socket.socket) -> dict | None:
"""Receives audio feature update."""
try:
num_sources_bytes = self._recv_all(client_socket, 4)
if not num_sources_bytes: return None
num_sources = struct.unpack('!I', num_sources_bytes)[0]
audio_features = []
for _ in range(num_sources):
feature_bytes = self._recv_all(client_socket, 2 * 4) # frequency (float), amplitude (float)
if not feature_bytes: return None
frequency, amplitude = struct.unpack('!ff', feature_bytes)
audio_features.append({'frequency': frequency, 'amplitude': amplitude})
return {'features': audio_features, 'num_sources': num_sources}
except (struct.error, EOFError) as e:
logging.error(f"Error unpacking audio update: {e}")
return None
except Exception as e:
logging.error(f"Unexpected error receiving audio update: {e}", exc_info=True)
return None
def _receive_positional_comment(self, client_socket: socket.socket) -> dict | None:
"""Receives a positional comment."""
try:
pos_bytes = self._recv_all(client_socket, 3 * 4)
if not pos_bytes: return None
position = struct.unpack('!3f', pos_bytes)
comment_length_bytes = self._recv_all(client_socket, 4)
if not comment_length_bytes: return None
comment_length = struct.unpack('!I', comment_length_bytes)[0]
comment_bytes = self._recv_all(client_socket, comment_length)
if not comment_bytes: return None
comment = comment_bytes.decode('utf-8')
return {'position': position, 'comment': comment}
except (struct.error, EOFError) as e:
logging.error(f"Error unpacking positional comment: {e}")
return None
except UnicodeDecodeError:
logging.error("Error decoding comment text.")
return None
except Exception as e:
logging.error(f"Unexpected error receiving positional comment: {e}", exc_info=True)
return None
def _recv_all(self, client_socket: socket.socket, num_bytes: int) -> bytes | None:
"""Helper function to receive a specific number of bytes from a socket."""
try:
all_data = b''
# Loop until all expected bytes are received or connection closes
while len(all_data) < num_bytes:
bytes_to_receive = num_bytes - len(all_data)
chunk = client_socket.recv(bytes_to_receive)
if not chunk: # Connection closed unexpectedly
logging.warning(f"Connection closed by peer while trying to receive {num_bytes} bytes (received {len(all_data)}).")
return None
all_data += chunk
return all_data
except socket.timeout:
logging.warning(f"Socket timeout while trying to receive {num_bytes} bytes.")
return None
except socket.error as e:
logging.error(f"Socket error during _recv_all: {e}")
return None
except Exception as e:
logging.error(f"Unexpected error in _recv_all: {e}", exc_info=True)
return None
def start(self):
"""Starts the server."""
server_socket: socket.socket | None = None
try:
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Allow re-use of address
server_socket.bind((self.host, self.port))
server_socket.listen(5)
logging.info(f"Server listening on {self.host}:{self.port}")
while True:
client_socket, client_address = server_socket.accept()
# Set a timeout on the client socket for read operations.
client_socket.settimeout(300) # 5 minutes
client_thread = threading.Thread(target=self.handle_client, args=(client_socket, client_address))
client_thread.daemon = True # Allows main program to exit even if threads are running
client_thread.start()
except socket.error as e:
logging.critical(f"Socket error: {e}")
except Exception as e:
logging.critical(f"An unexpected error occurred during server startup: {e}", exc_info=True)
finally:
if server_socket:
server_socket.close()
logging.info("Server stopped.")
if __name__ == "__main__":
server = Server(HOST, PORT)
server.start()
##Client_GraphicsBBS.py
import socket
import struct
import threading
import numpy as np
import time
import matplotlib.pyplot as plt
from collections import defaultdict
import traceback
# --- Protocol Message Types (Enums for clarity) ---
class MessageType:
# Client to Server
INITIAL_DATA_CLIENT = 0
REQUEST_INTERPOLATION = 0 # Overlaps, need to clarify or separate. For now, assume context.
REQUEST_DIFFERENTIATION = 1
POSITION_UPDATE = 7
AUDIO_UPDATE = 8
POSITIONAL_COMMENT = 11
# Server to Client (Broadcast/Direct)
ASSIGN_CLIENT_ID = 255
CLIENT_DISCONNECTED = 254
SERVER_WARNING = 252
SERVER_ERROR = 253
COMPUTATION_INTERPOLATION_RESULT = 5
COMPUTATION_DIFFERENTIATION_RESULT = 12
INITIAL_DATA_BROADCAST = 10 # Server broadcasts initial data of a new client
FX_UPDATE_BROADCAST = 1
FY_UPDATE_BROADCAST = 2
FZ_UPDATE_BROADCAST = 3
EVAL_X_UPDATE_BROADCAST = 4
COLOR_UPDATE_BROADCAST = 6
POSITION_UPDATE_BROADCAST = 7 # This means 7 is used by client and server. If server broadcasts client position updates, they should be different message types.
AUDIO_UPDATE_BROADCAST = 8 # Similar to position update, consider distinct types.
class Client:
def __init__(self, host, port):
self.host = host
self.port = port
self.socket = None
self.client_id = -1
self.running = True
self.receiver_thread = None
self.data_lock = threading.Lock()
self.all_clients_data = defaultdict(lambda: {
'position': (0.0, 0.0, 0.0),
'color': [],
'audio_features': [],
'fx': [],
'fy': [],
'fz': [],
'eval_x': np.array([], dtype=np.float32)
})
# Computation results, now with a dedicated eval_x
self.interpolation_result_y = np.array([], dtype=np.float32)
self.interpolation_result_z = np.array([], dtype=np.float32)
self.differentiation_result_y = np.array([], dtype=np.float32)
self.differentiation_result_z = np.array([], dtype=np.float32)
self.computation_eval_x_received = np.array([], dtype=np.float32) # eval_x received with the computation result
# Matplotlib setup
self.fig = None
self.ax_3d = None
self.ax_2d = None
self.scatter_plot = None
self.interp_line_y = None
self.interp_line_z = None
self.deriv_line_y = None
self.deriv_line_z = None
self.current_plot_type = None # 'interpolation' or 'differentiation'
def connect(self):
"""Connects to the server and starts the receiver thread."""
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(5.0) # Set a timeout for connect and initial receive
self.socket.connect((self.host, self.port))
print(f"Connected to server at {self.host}:{self.port}")
self.receiver_thread = threading.Thread(target=self.receive_thread)
self.receiver_thread.daemon = True
self.receiver_thread.start()
# Wait for the server to assign and send the client ID with a timeout
start_time = time.time()
while self.client_id == -1 and (time.time() - start_time) < 5.0:
time.sleep(0.05) # Small sleep to avoid busy-waiting
if self.client_id == -1:
print("Error: Did not receive client ID from server within expected time. Disconnecting.")
self.running = False
except socket.timeout:
print(f"Connection attempt timed out to {self.host}:{self.port}")
self.running = False
except socket.error as e:
print(f"Connection error: {e}")
self.running = False
except Exception as e:
print(f"An unexpected error occurred during connection: {e}")
traceback.print_exc()
self.running = False
def receive_thread(self):
"""Thread function to continuously receive data from the server."""
while self.running:
try:
# Read the message type (1 byte)
header = self._recv_all(1)
if not header:
print("Server disconnected gracefully.")
self.running = False
break
message_type = struct.unpack('!B', header)[0]
# --- Handle different message types using constants ---
if message_type == MessageType.ASSIGN_CLIENT_ID:
id_bytes = self._recv_all(4)
self.client_id = struct.unpack('!I', id_bytes)[0]
print(f"Received assigned client ID: {self.client_id}")
with self.data_lock:
# Initialize data for self
self.all_clients_data[self.client_id]['position'] = (0.0, 0.0, 0.0)
self.all_clients_data[self.client_id]['color'] = []
self.all_clients_data[self.client_id]['audio_features'] = []
self.all_clients_data[self.client_id]['fx'] = []
self.all_clients_data[self.client_id]['fy'] = []
self.all_clients_data[self.client_id]['fz'] = []
self.all_clients_data[self.client_id]['eval_x'] = np.array([], dtype=np.float32)
elif message_type == MessageType.CLIENT_DISCONNECTED:
disconnected_id_bytes = self._recv_all(4)
disconnected_id = struct.unpack('!I', disconnected_id_bytes)[0]
print(f"Client {disconnected_id} disconnected.")
with self.data_lock:
if disconnected_id in self.all_clients_data:
del self.all_clients_data[disconnected_id]
elif message_type in [MessageType.SERVER_WARNING, MessageType.SERVER_ERROR]:
length_bytes = self._recv_all(4)
message_length = struct.unpack('!I', length_bytes)[0]
message_bytes = self._recv_all(message_length)
message = message_bytes.decode('utf-8')
if message_type == MessageType.SERVER_WARNING:
print(f"Server Warning: {message}")
else:
print(f"Server Error: {message}")
elif message_type in [MessageType.COMPUTATION_INTERPOLATION_RESULT, MessageType.COMPUTATION_DIFFERENTIATION_RESULT]:
# Protocol change: Server sends eval_x along with results
length_bytes = self._recv_all(4)
total_result_length = struct.unpack('!I', length_bytes)[0]
# Read eval_x length and data
eval_x_len_bytes = self._recv_all(4)
eval_x_length = struct.unpack('!I', eval_x_len_bytes)[0]
eval_x_bytes = self._recv_all(eval_x_length * 4) # Each float is 4 bytes
received_eval_x = np.frombuffer(eval_x_bytes, dtype=np.float32)
# Remaining data is y and z results
remaining_result_length = total_result_length - (4 + eval_x_length * 4) # total_length includes its own length and eval_x length
# If the server sent a different total_length than expected, handle it
if remaining_result_length < 0:
print("Protocol error: Negative remaining result length for computation.")
continue
result_bytes = self._recv_all(remaining_result_length)
num_elements_yz = remaining_result_length // 4 // 2 # Assuming y and z have equal length
result_data = np.frombuffer(result_bytes, dtype=np.float32)
with self.data_lock:
self.computation_eval_x_received = received_eval_x # Store the received eval_x
if message_type == MessageType.COMPUTATION_INTERPOLATION_RESULT:
print("Received Interpolation Result")
self.interpolation_result_y = result_data[:num_elements_yz]
self.interpolation_result_z = result_data[num_elements_yz:]
self.differentiation_result_y = np.array([], dtype=np.float32) # Clear old diff result
self.differentiation_result_z = np.array([], dtype=np.float32)
self.current_plot_type = 'interpolation'
elif message_type == MessageType.COMPUTATION_DIFFERENTIATION_RESULT:
print("Received Differentiation Result")
self.differentiation_result_y = result_data[:num_elements_yz]
self.differentiation_result_z = result_data[num_elements_yz:]
self.interpolation_result_y = np.array([], dtype=np.float32) # Clear old interp result
self.interpolation_result_z = np.array([], dtype=np.float32)
self.current_plot_type = 'differentiation'
elif message_type in [
MessageType.INITIAL_DATA_BROADCAST,
MessageType.FX_UPDATE_BROADCAST,
MessageType.FY_UPDATE_BROADCAST,
MessageType.FZ_UPDATE_BROADCAST,
MessageType.EVAL_X_UPDATE_BROADCAST,
MessageType.COLOR_UPDATE_BROADCAST,
MessageType.POSITION_UPDATE_BROADCAST, # If server re-broadcasts client positions
MessageType.AUDIO_UPDATE_BROADCAST,
MessageType.POSITIONAL_COMMENT
]:
sender_id_bytes = self._recv_all(4)
sender_id = struct.unpack('!I', sender_id_bytes)[0]
with self.data_lock:
if sender_id not in self.all_clients_data:
# Initialize data for a new client if this is the first update received from them
print(f"New client {sender_id} detected via broadcast.")
self.all_clients_data[sender_id]['position'] = (0.0, 0.0, 0.0)
self.all_clients_data[sender_id]['color'] = []
self.all_clients_data[sender_id]['audio_features'] = []
self.all_clients_data[sender_id]['fx'] = []
self.all_clients_data[sender_id]['fy'] = []
self.all_clients_data[sender_id]['fz'] = []
self.all_clients_data[sender_id]['eval_x'] = np.array([], dtype=np.float32)
if message_type == MessageType.INITIAL_DATA_BROADCAST:
self._unpack_initial_data(sender_id)
elif message_type == MessageType.FX_UPDATE_BROADCAST:
self.all_clients_data[sender_id]['fx'] = self._unpack_axis_data()
elif message_type == MessageType.FY_UPDATE_BROADCAST:
self.all_clients_data[sender_id]['fy'] = self._unpack_axis_data()
elif message_type == MessageType.FZ_UPDATE_BROADCAST:
self.all_clients_data[sender_id]['fz'] = self._unpack_axis_data()
elif message_type == MessageType.EVAL_X_UPDATE_BROADCAST:
num_eval_x_bytes = self._recv_all(4)
num_eval_x = struct.unpack('!I', num_eval_x_bytes)[0]
eval_x_bytes = self._recv_all(num_eval_x * 4)
self.all_clients_data[sender_id]['eval_x'] = np.frombuffer(eval_x_bytes, dtype=np.float32)
elif message_type == MessageType.COLOR_UPDATE_BROADCAST:
self.all_clients_data[sender_id]['color'] = self._unpack_color_data()
elif message_type == MessageType.POSITION_UPDATE_BROADCAST:
pos_bytes = self._recv_all(3 * 4)
self.all_clients_data[sender_id]['position'] = struct.unpack('!3f', pos_bytes)
elif message_type == MessageType.AUDIO_UPDATE_BROADCAST:
self.all_clients_data[sender_id]['audio_features'] = self._unpack_audio_data()
elif message_type == MessageType.POSITIONAL_COMMENT:
position_bytes = self._recv_all(3 * 4)
position = struct.unpack('!3f', position_bytes)
length_bytes = self._recv_all(4)
comment_length = struct.unpack('!I', length_bytes)[0]
comment_bytes = self._recv_all(comment_length)
comment = comment_bytes.decode('utf-8')
print(f"Comment from client {sender_id} at {position}: {comment}")
else:
print(f"Received unknown or unhandled message type: {message_type}")
except EOFError:
print("Server closed the connection or sent incomplete data.")
self.running = False
break
except socket.timeout:
print("Socket timeout during receive operation. Connection might be slow or dead.")
# This could be a good place for a re-connection attempt or a warning.
# For now, let's just continue, assuming temporary network issue.
pass
except struct.error as e:
print(f"Struct unpacking error: {e}. Data might be corrupted or protocol violated.")
traceback.print_exc()
# Decide: Should the client disconnect on protocol violation? Often yes.
self.running = False
break
except ConnectionResetError:
print("Connection reset by peer. Server forcibly closed the connection.")
self.running = False
break
except Exception as e:
print(f"An unexpected error occurred in receiver thread: {e}")
traceback.print_exc()
self.running = False # Exit thread on unexpected error
print("Receiver thread stopped.")
def _recv_all(self, n):
"""Helper function to receive exactly n bytes from the socket."""
data = bytearray()
while len(data) < n:
packet = self.socket.recv(n - len(data))
if not packet:
# If packet is empty, the connection has been closed
raise EOFError("Connection closed by peer.")
data.extend(packet)
return bytes(data)
def _unpack_initial_data(self, sender_id):
"""Unpacks the initial data broadcast (Type 10)."""
# ... (logic remains largely the same, ensuring consistent _recv_all calls)
num_dimensions_bytes = self._recv_all(4)
num_dimensions = struct.unpack('!I', num_dimensions_bytes)[0]
all_fx_data = []
all_fy_data = []
all_fz_data = []
all_color_data = []
for _ in range(num_dimensions):
# Receive fx
num_fx_bytes = self._recv_all(4)
num_fx = struct.unpack('!I', num_fx_bytes)[0]
fx_bytes = self._recv_all(num_fx * 4)
all_fx_data.append(np.frombuffer(fx_bytes, dtype=np.float32))
# Receive fy
num_fy_bytes = self._recv_all(4)
num_fy = struct.unpack('!I', num_fy_bytes)[0]
fy_bytes = self._recv_all(num_fy * 4)
all_fy_data.append(np.frombuffer(fy_bytes, dtype=np.float32))
# Receive fz
num_fz_bytes = self._recv_all(4)
num_fz = struct.unpack('!I', num_fz_bytes)[0]
fz_bytes = self._recv_all(num_fz * 4)
all_fz_data.append(np.frombuffer(fz_bytes, dtype=np.float32))
# Receive color data
num_color_points_bytes = self._recv_all(4)
num_color_points = struct.unpack('!I', num_color_points_bytes)[0]
color_bytes = self._recv_all(num_color_points * 3 * 4) # 3 floats per color
color_array = np.frombuffer(color_bytes, dtype=np.float32).reshape(-1, 3)
all_color_data.append(color_array)
# Receive eval_x_data
num_eval_x_bytes = self._recv_all(4)
num_eval_x = struct.unpack('!I', num_eval_x_bytes)[0]
eval_x_bytes = self._recv_all(num_eval_x * 4)
eval_x_data = np.frombuffer(eval_x_bytes, dtype=np.float32)
# Receive position
position_bytes = self._recv_all(3 * 4)
position = struct.unpack('!3f', position_bytes)
# Receive audio features
num_audio_sources_bytes = self._recv_all(4)
num_audio_sources = struct.unpack('!I', num_audio_sources_bytes)[0]
audio_features = []
for _ in range(num_audio_sources):
audio_bytes = self._recv_all(8) # 2 floats (freq, amp)
freq, amp = struct.unpack('!ff', audio_bytes)
audio_features.append({'frequency': freq, 'amplitude': amp})
with self.data_lock:
self.all_clients_data[sender_id].update({
'fx': all_fx_data,
'fy': all_fy_data,
'fz': all_fz_data,
'color': all_color_data,
'eval_x': eval_x_data,
'position': position,
'audio_features': audio_features
})
def _unpack_axis_data(self):
"""Unpacks data for fx, fy, or fz updates (Types 1, 2, 3)."""
num_dimensions_bytes = self._recv_all(4)
num_dimensions = struct.unpack('!I', num_dimensions_bytes)[0]
updated_data = []
for _ in range(num_dimensions):
num_elements_bytes = self._recv_all(4)
num_elements = struct.unpack('!I', num_elements_bytes)[0]
data_bytes = self._recv_all(num_elements * 4)
updated_data.append(np.frombuffer(data_bytes, dtype=np.float32))
return updated_data
def _unpack_color_data(self):
"""Unpacks color data updates (Type 6)."""
num_dimensions_bytes = self._recv_all(4)
num_dimensions = struct.unpack('!I', num_dimensions_bytes)[0]
updated_color_data = []
for _ in range(num_dimensions):
num_color_points_bytes = self._recv_all(4)
num_color_points = struct.unpack('!I', num_color_points_bytes)[0]
color_bytes = self._recv_all(num_color_points * 3 * 4)
color_array = np.frombuffer(color_bytes, dtype=np.float32).reshape(-1, 3)
updated_color_data.append(color_array)
return updated_color_data
def _unpack_audio_data(self):
"""Unpacks audio feature updates (Type 8)."""
num_audio_sources_bytes = self._recv_all(4)
num_audio_sources = struct.unpack('!I', num_audio_sources_bytes)[0]
updated_features = []
for _ in range(num_audio_sources):
audio_bytes = self._recv_all(8) # freq, amp
freq, amp = struct.unpack('!ff', audio_bytes)
updated_features.append({'frequency': freq, 'amplitude': amp})
return updated_features
def send_message(self, message):
"""Helper to send a raw byte message."""
if self.socket and self.running:
try:
self.socket.sendall(message)
except socket.error as e:
print(f"Error sending message: {e}")
self.running = False # Assume connection is broken
except Exception as e:
print(f"Unexpected error sending message: {e}")
self.running = False
def send_initial_data(self, fx_data, fy_data, fz_data, color_data, eval_x_data, position=(0.0, 0.0, 0.0), audio_features=[]):
"""Sends the initial data packet (Type 0)."""
if not (len(fx_data) == len(fy_data) == len(fz_data) == len(color_data)):
print("Error: fx, fy, fz, and color data lists must have the same number of dimensions.")
return
message = struct.pack('!B', MessageType.INITIAL_DATA_CLIENT) # Use the constant
num_dimensions = len(fx_data)
message += struct.pack('!I', num_dimensions)
for i in range(num_dimensions):
fx = np.array(fx_data[i], dtype=np.float32)
fy = np.array(fy_data[i], dtype=np.float32)
fz = np.array(fz_data[i], dtype=np.float32)
color = np.array(color_data[i], dtype=np.float32)
message += struct.pack('!I', len(fx)) + fx.tobytes()
message += struct.pack('!I', len(fy)) + fy.tobytes()
message += struct.pack('!I', len(fz)) + fz.tobytes()
message += struct.pack('!I', len(color)) + color.tobytes()
eval_x = np.array(eval_x_data, dtype=np.float32)
message += struct.pack('!I', len(eval_x)) + eval_x.tobytes()
message += struct.pack('!3f', *position)
message += struct.pack('!I', len(audio_features))
for feature in audio_features:
message += struct.pack('!ff', feature.get('frequency', 0.0), feature.get('amplitude', 0.0))
self.send_message(message)
print("Sent initial data.")
def send_position_update(self, position):
"""Sends a position update (Type 7)."""
message = struct.pack('!B', MessageType.POSITION_UPDATE) # Use the constant
message += struct.pack('!3f', *position)
self.send_message(message)
def send_audio_update(self, audio_features):
"""Sends an audio features update (Type 8)."""
message = struct.pack('!B', MessageType.AUDIO_UPDATE) # Use the constant
message += struct.pack('!I', len(audio_features))
for feature in audio_features:
message += struct.pack('!ff', feature.get('frequency', 0.0), feature.get('amplitude', 0.0))
self.send_message(message)
def send_positional_comment(self, position, comment):
"""Sends a positional comment (Type 11)."""
message = struct.pack('!B', MessageType.POSITIONAL_COMMENT) # Use the constant
message += struct.pack('!3f', *position)
comment_bytes = comment.encode('utf-8')
message += struct.pack('!I', len(comment_bytes))
message += comment_bytes
self.send_message(message)
print(f"Sent comment: '{comment}' at {position}")
def request_computation(self, operation_type, fx_data, fy_data, fz_data, eval_x_data):
"""
Sends a request for interpolation (type 0) or differentiation (type 1).
This is intended as the *first* message after receiving the client ID.
"""
if self.client_id == -1:
print("Cannot send computation request: Client ID not yet assigned.")
return
# Ensure correct message type for requests
if operation_type == 0:
msg_type = MessageType.REQUEST_INTERPOLATION
elif operation_type == 1:
msg_type = MessageType.REQUEST_DIFFERENTIATION
else:
print(f"Invalid operation type for computation request: {operation_type}")
return
if not (len(fx_data) == len(fy_data) == len(fz_data)):
print("Error: fx, fy, and fz data lists must have the same number of dimensions for computation.")
return
message = struct.pack('!B', msg_type) # Use the constant
num_dimensions = len(fx_data)
message += struct.pack('!I', num_dimensions)
for i in range(num_dimensions):
fx = np.array(fx_data[i], dtype=np.float32)
fy = np.array(fy_data[i], dtype=np.float32)
fz = np.array(fz_data[i], dtype=np.float32)
message += struct.pack('!I', len(fx)) + fx.tobytes()
message += struct.pack('!I', len(fy)) + fy.tobytes()
message += struct.pack('!I', len(fz)) + fz.tobytes()
eval_x = np.array(eval_x_data, dtype=np.float32)
message += struct.pack('!I', len(eval_x)) + eval_x.tobytes()
self.send_message(message)
print(f"Sent computation request (Type {operation_type}).")
def setup_visualization(self):
"""Sets up the Matplotlib figure and axes."""
plt.ion() # Turn on interactive mode
self.fig = plt.figure(figsize=(12, 6))
# 3D Scatter plot for client positions
self.ax_3d = self.fig.add_subplot(121, projection='3d')
self.ax_3d.set_title('Client Positions')
self.ax_3d.set_xlabel('X')
self.ax_3d.set_ylabel('Y')
self.ax_3d.set_zlabel('Z')
self.scatter_plot = self.ax_3d.scatter([], [], [], marker='o')
# 2D Plot for computation results
self.ax_2d = self.fig.add_subplot(122)
self.ax_2d.set_title('Computation Result')
self.ax_2d.set_xlabel('Evaluation X')
self.ax_2d.set_ylabel('Result Value')
self.interp_line_y, = self.ax_2d.plot([], [], 'b-', label='Interp Y')
self.interp_line_z, = self.ax_2d.plot([], [], 'g-', label='Interp Z')
self.deriv_line_y, = self.ax_2d.plot([], [], 'r--', label='Deriv Y')
self.deriv_line_z, = self.ax_2d.plot([], [], 'm--', label='Deriv Z')
self.ax_2d.legend()
plt.tight_layout()
plt.show(block=False)
def update_visualization(self):
"""Updates the Matplotlib plots with the latest data."""
if not self.fig or not plt.get_fignums():
return
with self.data_lock:
# Update 3D Client Positions
client_ids = list(self.all_clients_data.keys())
positions = [self.all_clients_data[cid]['position'] for cid in client_ids]
if positions:
x_data, y_data, z_data = zip(*positions)
self.scatter_plot._offsets3d = (list(x_data), list(y_data), list(z_data))
# Update axis limits dynamically
# Add a small buffer to limits, and handle cases with single/no data points
x_min, x_max = (min(x_data) - 1, max(x_data) + 1) if len(set(x_data)) > 1 else (x_data[0] - 1, x_data[0] + 1) if x_data else (-1, 1)
y_min, y_max = (min(y_data) - 1, max(y_data) + 1) if len(set(y_data)) > 1 else (y_data[0] - 1, y_data[0] + 1) if y_data else (-1, 1)
z_min, z_max = (min(z_data) - 1, max(z_data) + 1) if len(set(z_data)) > 1 else (z_data[0] - 1, z_data[0] + 1) if z_data else (-1, 1)
self.ax_3d.set_xlim([x_min, x_max])
self.ax_3d.set_ylim([y_min, y_max])
self.ax_3d.set_zlim([z_min, z_max])
# Add labels for client IDs
for txt in self.ax_3d.texts: # Clear previous annotations
txt.remove()
for i, cid in enumerate(client_ids):
self.ax_3d.text(x_data[i], y_data[i], z_data[i], str(cid))
else:
self.ax_3d.set_xlim([-1, 1])
self.ax_3d.set_ylim([-1, 1])
self.ax_3d.set_zlim([-1, 1])
# Update 2D Computation Result
self.interp_line_y.set_data([], [])
self.interp_line_z.set_data([], [])
self.deriv_line_y.set_data([], [])
self.deriv_line_z.set_data([], [])
eval_x_for_plot = self.computation_eval_x_received # Use the received eval_x
if self.current_plot_type == 'interpolation' and len(self.interpolation_result_y) > 0 and len(eval_x_for_plot) == len(self.interpolation_result_y):
self.interp_line_y.set_data(eval_x_for_plot, self.interpolation_result_y)
self.interp_line_z.set_data(eval_x_for_plot, self.interpolation_result_z)
self.ax_2d.set_title('Interpolation Result')
all_yish = np.concatenate((self.interpolation_result_y, self.interpolation_result_z))
elif self.current_plot_type == 'differentiation' and len(self.differentiation_result_y) > 0 and len(eval_x_for_plot) == len(self.differentiation_result_y):
self.deriv_line_y.set_data(eval_x_for_plot, self.differentiation_result_y)
self.deriv_line_z.set_data(eval_x_for_plot, self.differentiation_result_z)
self.ax_2d.set_title('Differentiation Result')
all_yish = np.concatenate((self.differentiation_result_y, self.differentiation_result_z))
else:
all_yish = np.array([]) # No data to plot
self.ax_2d.set_title('No Computation Result')
if len(eval_x_for_plot) > 1:
self.ax_2d.set_xlim([np.min(eval_x_for_plot), np.max(eval_x_for_plot)])
else:
self.ax_2d.set_xlim([-1, 1]) # Default if no or single point eval_x
if len(all_yish) > 1:
self.ax_2d.set_ylim([np.min(all_yish), np.max(all_yish)])
else:
self.ax_2d.set_ylim([-1, 1]) # Default if no or single point yish data
self.fig.canvas.draw_idle()
self.fig.canvas.flush_events()
def run(self):
"""Main client loop for sending updates and visualizing."""
self.connect()
if not self.running:
return
self.setup_visualization()
example_fx = [np.array([0, 1, 2, 3, 4], dtype=np.float32)]
example_fy = [np.array([0, 1, 4, 9, 16], dtype=np.float32)]
example_fz = [np.array([0, -1, -2, -3, -4], dtype=np.float32)]
example_color = [np.array([[1.0, 0.0, 0.0], [0.0, 1.0, 0.0], [0.0, 0.0, 1.0], [1.0, 1.0, 0.0], [0.0, 1.0, 1.0]], dtype=np.float32)]
example_eval_x = np.linspace(0, 4, 50, dtype=np.float32)
example_position = (0.0, 0.0, 0.0)
example_audio = [{'frequency': 440.0, 'amplitude': 0.5}]
# Wait for ID assignment before sending any data
if self.client_id != -1:
print("Requesting interpolation...")
self.request_computation(0, example_fx, example_fy, example_fz, example_eval_x) # Request Interpolation
time.sleep(1) # Give server time to process request and send result
print("Sending initial data after computation request...")
self.send_initial_data(example_fx, example_fy, example_fz, example_color, example_eval_x, example_position, example_audio)
time.sleep(1)
update_interval = 0.1 # seconds
last_update_time = time.time()
while self.running:
try:
current_time = time.time()
if current_time - last_update_time > update_interval:
if self.client_id != -1:
# Simulate movement
t = time.time() * 0.1
new_pos = (np.sin(t), np.cos(t), np.sin(t + np.pi / 2))
self.send_position_update(new_pos)
# Example: Send a positional comment occasionally
if int(current_time) % 10 == 0 and int(last_update_time) % 10 != 0:
self.send_positional_comment(new_pos, f"Hello from client {self.client_id}!")
last_update_time = current_time
self.update_visualization()
plt.pause(0.01) # Pause briefly to allow GUI events
except Exception as e:
print(f"Error in main run loop: {e}")
traceback.print_exc()
self.running = False # Exit loop on error
self.disconnect()
def disconnect(self):
"""Disconnects from the server and cleans up."""
print("Disconnecting from server...")
self.running = False
if self.socket:
try:
# Attempt graceful shutdown
# SHUT_RDWR prevents further sends/receives
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
except socket.error as e:
print(f"Error during socket shutdown/close: {e}")
if self.receiver_thread and self.receiver_thread.is_alive():
self.receiver_thread.join(timeout=2.0) # Give it a bit more time
if self.receiver_thread.is_alive():
print("Receiver thread did not terminate gracefully. Forcing exit.")
print("Client disconnected.")
if self.fig:
plt.close(self.fig)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description="Graphics BBS Client")
parser.add_argument('--host', type=str, default='127.0.0.1', help='Server host IP address')
parser.add_argument('--port', type=int, default=12345, help='Server port number')
args = parser.parse_args()
client = Client(args.host, args.port)
try:
client.run()
except KeyboardInterrupt:
print("\nClient stopped manually.")
finally:
client.disconnect()
No comments:
Post a Comment