##Server Code#
import socket
import struct
import numpy as np
import threading
import json
from scipy.integrate import simps # Assuming scipy is installed for N-D integration
# --- Constants ---
OPERATION_INTERPOLATE = 0
OPERATION_DIFFERENTIATE = 1
OPERATION_CALCULATE_GRADIENT_1D = 2
OPERATION_HYPERBOLIC_INTERCEPT_HANDLER = 3
OPERATION_INTEGRATE = 4
OPERATION_INTEGRATE_ND = 5 # Existing for single N-D integral call
OPERATION_WORKFLOW = 6 # NEW: For relational compositions
SERVER_HOST = '127.0.0.1'
SERVER_PORT = 12345
# --- Helper Functions ---
def _recvall(sock, n):
"""
Helper function to reliably receive exactly 'n' bytes from a socket.
"""
data = b''
while len(data) < n:
packet = sock.recv(n - len(data))
if not packet:
return None
data += packet
return data
def _sendall_data(sock, data_array):
"""Helper to send a numpy array's bytes."""
data_bytes = data_array.astype(np.float32).tobytes()
sock.sendall(struct.pack('!I', len(data_bytes)))
sock.sendall(data_bytes)
def _recvall_data(sock):
"""Helper to receive a numpy array's bytes."""
data_len_bytes = _recvall(sock, 4)
if data_len_bytes is None:
raise ConnectionResetError("Incomplete data (length) received.")
data_len = struct.unpack('!I', data_len_bytes)[0]
data_bytes = _recvall(sock, data_len)
if data_bytes is None:
raise ConnectionResetError("Incomplete data (content) received.")
return np.array(struct.unpack(f'!{data_len // 4}f', data_bytes), dtype=np.float32)
# --- Core Data Processing Functions ---
# These functions remain mostly as defined previously, but some might be called
# directly with pre-processed numpy arrays instead of raw socket data.
def pseudo_interpolate_arcsecant_1d_triple(x_data, y_data, z_data, x_interp_points):
if not (len(x_data) == len(y_data) == len(z_data)) or len(x_data) < 2:
raise ValueError("X, Y, and Z data must have equal length and at least two points for pseudo-interpolation.")
min_x, max_x = np.min(x_data), np.max(x_data)
if np.isclose(max_x, min_x):
return np.full_like(x_interp_points, y_data[0]), np.full_like(x_interp_points, z_data[0])
interp_y_results = np.zeros_like(x_interp_points, dtype=float)
interp_z_results = np.zeros_like(x_interp_points, dtype=float)
LARGE_U_MAX = 1000.0
for i, x_val in enumerate(x_interp_points):
idx = np.searchsorted(x_data, x_val)
if idx == 0:
idx1, idx2 = 0, 1
elif idx == len(x_data):
idx1, idx2 = len(x_data) - 2, len(x_data) - 1
else:
if np.isclose(x_data[idx], x_val):
idx1 = idx
idx2 = idx
else:
idx1 = idx - 1
idx2 = idx
y1, y2 = y_data[idx1], y_data[idx2]
z1, z2 = z_data[idx1], z_data[idx2]
normalized_x = (x_val - min_x) / (max_x - min_x)
u_interp = 1.0 + normalized_x * (LARGE_U_MAX - 1.0)
arcsec_val = np.arccos(1.0 / u_interp)
interpolation_factor = arcsec_val / (np.pi / 2.0)
interp_y_results[i] = y1 + (y2 - y1) * interpolation_factor
interp_z_results[i] = z1 + (z2 - z1) * interpolation_factor
return interp_y_results, interp_z_results
def pseudo_interpolate_arcsecant_nd_triple(all_fx_data, all_fy_data, all_fz_data, x_interp_points):
num_dimensions = len(all_fy_data)
if not (len(all_fx_data) == num_dimensions == len(all_fz_data)):
raise ValueError("The number of x, y, and z data arrays must match for all N 3D curves.")
all_interp_y = []
all_interp_z = []
for dim_idx, (fx, fy, fz) in enumerate(zip(all_fx_data, all_fy_data, all_fz_data)):
try:
if not (len(fx) == len(fy) == len(fz)):
raise ValueError(f"3D Curve {dim_idx+1}: X, Y, and Z data arrays must have equal length.")
if len(fx) < 2:
raise ValueError(f"3D Curve {dim_idx+1}: Each data array (X, Y, Z) must have at least two points for interpolation.")
sort_indices = np.argsort(fx)
fx_sorted = fx[sort_indices]
fy_sorted = fy[sort_indices]
fz_sorted = fz[sort_indices]
interp_y, interp_z = pseudo_interpolate_arcsecant_1d_triple(fx_sorted, fy_sorted, fz_sorted, x_interp_points)
all_interp_y.extend(interp_y)
all_interp_z.extend(interp_z)
except ValueError as e:
raise ValueError(f"Error in pseudo-interpolation for 3D Curve {dim_idx+1}: {e}")
except Exception as e:
raise Exception(f"An unexpected error occurred during arcsecant interpolation for 3D Curve {dim_idx+1}: {e}")
return np.array(all_interp_y), np.array(all_interp_z)
def numerical_derivative_1d(y_values, x_values):
if len(y_values) != len(x_values) or len(y_values) < 2:
raise ValueError("Y and X data must have equal length and at least two points for derivative calculation.")
if not np.all(np.diff(x_values) > 0):
raise ValueError("X values must be strictly increasing for derivative calculation.")
derivatives = np.zeros_like(y_values, dtype=float)
h = np.diff(x_values)
if np.isclose(h[0], 0):
derivatives[0] = 0.0
else:
derivatives[0] = (y_values[1] - y_values[0]) / h[0]
for i in range(1, len(y_values) - 1):
denominator = x_values[i + 1] - x_values[i - 1]
if np.isclose(denominator, 0):
derivatives[i] = 0.0
else:
derivatives[i] = (y_values[i + 1] - y_values[i - 1]) / denominator
if np.isclose(h[-1], 0):
derivatives[-1] = 0.0
else:
derivatives[-1] = (y_values[-1] - y_values[-2]) / h[-1]
return derivatives
def differentiate_arcsecant_nd_triple(all_fx_data, all_fy_data, all_fz_data, x_eval_points):
num_dimensions = len(all_fy_data)
if not (len(all_fx_data) == num_dimensions == len(all_fz_data)):
raise ValueError("The number of x, y, and z data arrays must match for all N 3D curves.")
if len(x_eval_points) < 2:
raise ValueError("At least two evaluation points are needed for differentiation.")
if not np.all(np.diff(x_eval_points) > 0):
raise ValueError("Evaluation X values must be strictly increasing for derivative calculation.")
all_derivatives_y = []
all_derivatives_z = []
for dim_idx, (fx, fy, fz) in enumerate(zip(all_fx_data, all_fy_data, all_fz_data)):
try:
if not (len(fx) == len(fy) == len(fz)):
raise ValueError(f"3D Curve {dim_idx+1}: X, Y, and Z data arrays must have equal length.")
if len(fx) < 2:
raise ValueError(f"3D Curve {dim_idx+1}: Each data array (X, Y, Z) must have at least two points for differentiation.")
sort_indices = np.argsort(fx)
fx_sorted = fx[sort_indices]
fy_sorted = fy[sort_indices]
fz_sorted = fz[sort_indices]
interpolated_y, interpolated_z = pseudo_interpolate_arcsecant_1d_triple(fx_sorted, fy_sorted, fz_sorted, x_eval_points)
derivatives_y = numerical_derivative_1d(interpolated_y, x_eval_points)
derivatives_z = numerical_derivative_1d(interpolated_z, x_eval_points)
all_derivatives_y.extend(derivatives_y)
all_derivatives_z.extend(derivatives_z)
except ValueError as e:
raise ValueError(f"Error in derivative calculation for 3D Curve {dim_idx+1}: {e}")
except Exception as e:
raise Exception(f"An unexpected error occurred during differentiation of arcsecant interpolation for 3D Curve {dim_idx+1}: {e}")
return np.array(all_derivatives_y), np.array(all_derivatives_z)
def calculate_gradient_nd_triple(all_fx_data, all_fy_data, all_fz_data):
num_dimensions = len(all_fy_data)
if not (len(all_fx_data) == num_dimensions == len(all_fz_data)):
raise ValueError("The number of x, y, and z data arrays must match for all N 3D curves.")
all_gradient_y = []
all_gradient_z = []
for dim_idx, (fx, fy, fz) in enumerate(zip(all_fx_data, all_fy_data, all_fz_data)):
try:
if not (len(fx) == len(fy) == len(fz)):
raise ValueError(f"3D Curve {dim_idx+1}: X, Y, and Z data arrays must have equal length.")
if len(fx) < 2:
raise ValueError(f"3D Curve {dim_idx+1}: Each data array (X, Y, Z) must have at least two points for gradient calculation.")
sort_indices = np.argsort(fx)
fx_sorted = fx[sort_indices]
fy_sorted = fy[sort_indices]
fz_sorted = fz[sort_indices]
gradient_y_dim = numerical_derivative_1d(fy_sorted, fx_sorted)
gradient_z_dim = numerical_derivative_1d(fz_sorted, fx_sorted)
all_gradient_y.extend(gradient_y_dim)
all_gradient_z.extend(gradient_z_dim)
except ValueError as e:
raise ValueError(f"Error in gradient calculation for 3D Curve {dim_idx+1}: {e}")
except Exception as e:
raise Exception(f"An unexpected error occurred during gradient calculation for 3D Curve {dim_idx+1}: {e}")
return np.array(all_gradient_y), np.array(all_gradient_z)
def hyperbolic_intercept_handler_nd_triple(all_fx_data, all_fy_data, all_fz_data,
sensitivity_factor=10.0,
zero_threshold=1e-6):
num_dimensions = len(all_fy_data)
if not (len(all_fx_data) == num_dimensions == len(all_fz_data)):
raise ValueError("The number of x, y, and z data arrays must match for all N 3D curves.")
if not isinstance(sensitivity_factor, (int, float)) or sensitivity_factor <= 0:
raise ValueError("sensitivity_factor must be a positive number.")
if not isinstance(zero_threshold, (int, float)) or zero_threshold < 0:
raise ValueError("zero_threshold must be a non-negative number.")
all_transformed_slopes_y = []
all_transformed_slopes_z = []
for dim_idx, (fx, fy, fz) in enumerate(zip(all_fx_data, all_fy_data, all_fz_data)):
try:
if not (len(fx) == len(fy) == len(fz)):
raise ValueError(f"3D Curve {dim_idx+1}: X, Y, and Z data arrays must have equal length.")
if len(fx) < 2:
raise ValueError(f"3D Curve {dim_idx+1}: Each data array (X, Y, Z) must have at least two points for slope calculation.")
sort_indices = np.argsort(fx)
fx_sorted = fx[sort_indices]
fy_sorted = fy[sort_indices]
fz_sorted = fz[sort_indices]
slopes_y = numerical_derivative_1d(fy_sorted, fx_sorted)
slopes_z = numerical_derivative_1d(fz_sorted, fx_sorted)
intercept_indices_y = np.where(
(np.diff(np.sign(slopes_y)) != 0) |
(np.abs(slopes_y[:-1]) < zero_threshold) |
(np.abs(slopes_y[1:]) < zero_threshold)
)[0]
if len(slopes_y) > 0 and np.abs(slopes_y[-1]) < zero_threshold:
intercept_indices_y = np.append(intercept_indices_y, len(slopes_y) - 1)
intercept_indices_y = np.unique(intercept_indices_y).astype(int)
intercept_indices_z = np.where(
(np.diff(np.sign(slopes_z)) != 0) |
(np.abs(slopes_z[:-1]) < zero_threshold) |
(np.abs(slopes_z[1:]) < zero_threshold)
)[0]
if len(slopes_z) > 0 and np.abs(slopes_z[-1]) < zero_threshold:
intercept_indices_z = np.append(intercept_indices_z, len(slopes_z) - 1)
intercept_indices_z = np.unique(intercept_indices_z).astype(int)
transformed_slopes_y = np.zeros_like(slopes_y)
transformed_slopes_z = np.zeros_like(slopes_z)
for j in range(len(slopes_y)):
if len(intercept_indices_y) > 0:
nearest_intercept_dist_y = np.min(np.abs(intercept_indices_y - j))
proximity_factor_y = 1.0 / np.cosh(nearest_intercept_dist_y * sensitivity_factor * (1.0/len(fx_sorted)))
transformed_slopes_y[j] = slopes_y[j] * proximity_factor_y
else:
transformed_slopes_y[j] = slopes_y[j]
for j in range(len(slopes_z)):
if len(intercept_indices_z) > 0:
nearest_intercept_dist_z = np.min(np.abs(intercept_indices_z - j))
proximity_factor_z = 1.0 / np.cosh(nearest_intercept_dist_z * sensitivity_factor * (1.0/len(fx_sorted)))
transformed_slopes_z[j] = slopes_z[j] * proximity_factor_z
else:
transformed_slopes_z[j] = slopes_z[j]
all_transformed_slopes_y.extend(transformed_slopes_y)
all_transformed_slopes_z.extend(transformed_slopes_z)
except ValueError as e:
raise ValueError(f"Error in hyperbolic intercept handling for 3D Curve {dim_idx+1}: {e}")
except Exception as e:
raise Exception(f"An unexpected error occurred during hyperbolic intercept handling for 3D Curve {dim_idx+1}: {e}")
return np.array(all_transformed_slopes_y), np.array(all_transformed_slopes_z)
def numerical_integration_1d(y_values, x_values):
if len(y_values) != len(x_values) or len(y_values) < 2:
raise ValueError("Y and X data must have equal length and at least two points for integration.")
if not np.all(np.diff(x_values) > 0):
raise ValueError("X values must be strictly increasing for integration.")
cumulative_integral = np.zeros_like(y_values, dtype=float)
cumulative_integral[0] = 0.0
for i in range(1, len(x_values)):
dx = x_values[i] - x_values[i-1]
avg_y = (y_values[i] + y_values[i-1]) / 2.0
current_area = avg_y * dx
cumulative_integral[i] = cumulative_integral[i-1] + current_area
return cumulative_integral
def integrate_arcsecant_nd_triple(all_fx_data, all_fy_data, all_fz_data):
num_dimensions = len(all_fy_data)
if not (len(all_fx_data) == num_dimensions == len(all_fz_data)):
raise ValueError("The number of x, y, and z data arrays must match for all N 3D curves.")
all_integrals_y = []
all_integrals_z = []
for dim_idx, (fx, fy, fz) in enumerate(zip(all_fx_data, all_fy_data, all_fz_data)):
try:
if not (len(fx) == len(fy) == len(fz)):
raise ValueError(f"3D Curve {dim_idx+1}: X, Y, and Z data arrays must have equal length.")
if len(fx) < 2:
raise ValueError(f"3D Curve {dim_idx+1}: Each data array (X, Y, Z) must have at least two points for integration.")
sort_indices = np.argsort(fx)
fx_sorted = fx[sort_indices]
fy_sorted = fy[sort_indices]
fz_sorted = fz[sort_indices]
integral_y_dim = numerical_integration_1d(fy_sorted, fx_sorted)
integral_z_dim = numerical_integration_1d(fz_sorted, fx_sorted)
all_integrals_y.extend(integral_y_dim)
all_integrals_z.extend(integral_z_dim)
except ValueError as e:
raise ValueError(f"Error in integration for 3D Curve {dim_idx+1}: {e}")
except Exception as e:
raise Exception(f"An unexpected error occurred during integration for 3D Curve {dim_idx+1}: {e}")
return np.array(all_integrals_y), np.array(all_integrals_z)
def numerical_integration_nd(num_dims_for_integral, grid_shape, integration_ranges, y_values_grid, z_values_grid):
"""
Performs N-dimensional numerical integration using an iterated trapezoidal rule (simps from scipy).
"""
if not isinstance(integration_ranges, list) or \
any(not isinstance(r, tuple) or len(r) != 2 for r in integration_ranges):
raise ValueError("integration_ranges must be a list of (min, max) tuples.")
if len(grid_shape) != num_dims_for_integral:
raise ValueError("grid_shape dimensions must match num_dims_for_integral.")
x_axes_coords = []
for i in range(num_dims_for_integral):
min_val, max_val = integration_ranges[i]
x_axes_coords.append(np.linspace(min_val, max_val, grid_shape[i]))
integral_y_val = 0.0
integral_z_val = 0.0
if num_dims_for_integral == 0: # Should not happen, but for safety
return np.array([0.0]), np.array([0.0])
# Perform iterated integration for N > 0
current_integral_y = y_values_grid
current_integral_z = z_values_grid
for dim_idx in range(num_dims_for_integral - 1, -1, -1): # Integrate from innermost to outermost
current_integral_y = simps(current_integral_y, x_axes_coords[dim_idx], axis=dim_idx)
if current_integral_z is not None:
current_integral_z = simps(current_integral_z, x_axes_coords[dim_idx], axis=dim_idx)
integral_y_val = current_integral_y
integral_z_val = current_integral_z if current_integral_z is not None else 0.0
return np.array([integral_y_val]), np.array([integral_z_val])
# --- Server Communication Logic (Refitted for WORKFLOW) ---
def handle_client(client_socket, addr):
print(f"Handling client: {addr}")
try:
operation_code_bytes = _recvall(client_socket, 1)
if operation_code_bytes is None:
print(f"Client {addr} disconnected (no operation code received).")
return
operation_code = struct.unpack('!B', operation_code_bytes)[0]
print(f"Client {addr} requested operation code: {operation_code}")
if operation_code == OPERATION_WORKFLOW:
print(f"Client {addr} sent a WORKFLOW request.")
workflow_len_bytes = _recvall(client_socket, 4)
if workflow_len_bytes is None:
raise ConnectionResetError("Incomplete workflow data (length).")
workflow_len = struct.unpack('!I', workflow_len_bytes)[0]
workflow_bytes = _recvall(client_socket, workflow_len)
if workflow_bytes is None:
raise ConnectionResetError("Incomplete workflow data (content).")
workflow_json = workflow_bytes.decode('utf-8')
workflow_steps = json.loads(workflow_json)
# Store intermediate results by their output_id
intermediate_results = {}
final_result_y, final_result_z = None, None
for step_idx, step in enumerate(workflow_steps):
op_type = step.get('operation_type')
input_spec = step.get('input_data')
parameters = step.get('parameters', {})
output_id = step.get('output_id') # Optional, for chaining
print(f" Executing workflow step {step_idx + 1}: {op_type}")
# Determine input data for the current step
all_fx_data, all_fy_data, all_fz_data = [], [], []
num_dims_for_integral = 0
grid_shape = tuple()
integration_ranges = []
y_values_grid, z_values_grid = None, None
if input_spec and input_spec.get('type') == 'direct':
# Raw data provided directly for this step (e.g., first step)
all_fx_data = [np.array(arr, dtype=np.float32) for arr in input_spec.get('fx_data', [])]
all_fy_data = [np.array(arr, dtype=np.float32) for arr in input_spec.get('fy_data', [])]
all_fz_data = [np.array(arr, dtype=np.float32) for arr in input_spec.get('fz_data', [])]
# For OPERATION_INTEGRATE_ND, direct input would be different
if op_type == 'INTEGRATE_ND':
num_dims_for_integral = input_spec.get('num_dims_integration')
grid_shape = tuple(input_spec.get('grid_shape'))
integration_ranges = [(r[0], r[1]) for r in input_spec.get('integration_ranges', [])]
y_values_grid = np.array(input_spec.get('flat_y_values', []), dtype=np.float32).reshape(grid_shape)
z_values_grid_flat = input_spec.get('flat_z_values')
if z_values_grid_flat is not None:
z_values_grid = np.array(z_values_grid_flat, dtype=np.float32).reshape(grid_shape)
else:
z_values_grid = None # No Z for N-D integral
elif input_spec and input_spec.get('type') == 'reference':
# Input is a reference to a previous step's output
source_id = input_spec.get('source_id')
if source_id not in intermediate_results:
raise ValueError(f"Reference to unknown output_id '{source_id}' in workflow step {step_idx + 1}.")
ref_data = intermediate_results[source_id]
if op_type in ['INTERPOLATE', 'DIFFERENTIATE', 'CALCULATE_GRADIENT_1D', 'HYPERBOLIC_INTERCEPT_HANDLER', 'INTEGRATE']:
# Assuming previous output was 'all_interp_y', 'all_interp_z'
# For chaining, we need to carefully manage what is passed.
# For simplicity, if a previous step produced concatenated y and z,
# we need to deconstruct it. This requires knowing the original num_dimensions
# from the first data source, or adapting functions to take flattened data.
# For this example, let's assume 'all_interp_y' and 'all_interp_z' are directly available
# if the prior step explicitly stored them in intermediate_results under the output_id.
# A more robust solution would be for `intermediate_results` to store
# complex types (e.g., a dict with {'fx': ..., 'fy': ..., 'fz': ...} or {'y_grid': ..., 'z_grid': ...})
# Currently, our existing functions return concatenated np arrays.
# So, if a previous output was `np.concatenate((all_interp_y, all_interp_z))`,
# we need to figure out how to split it back for the next step.
# This implies that `intermediate_results` should store structured data,
# not just a single concatenated array.
# For demonstration, let's assume `intermediate_results[source_id]` is a tuple (Y_results, Z_results)
# And that each Y_results/Z_results is already 'all_interp_y' or 'all_derivatives_y' form.
# The `x_data` for such steps would need to be passed alongside or be part of the `reference`.
# This highlights the complexity of generic chaining. Let's make an assumption:
# For 'N independent 3D curves' operations, the output stored in intermediate_results
# is a tuple (all_Y_results, all_Z_results). The 'x_data' (all_fx_data) is generally needed
# and must either be carried through or re-provided.
# For simplicity, we'll assume *all_fx_data* for 1D curve operations must be passed *directly*
# with each step, or implicitly from the very first direct input.
# This means we cannot simply reference a `y` array without its corresponding `x`.
# REVISION: For chaining, intermediate_results should store a dict
# { 'fx': [...], 'fy': [...], 'fz': [...] }
# or { 'y_grid': ..., 'z_grid': ..., 'grid_shape': ..., 'integration_ranges': ... }
# Let's refine the `intermediate_results` storage:
# It will store dictionaries with keys 'fx_data', 'fy_data', 'fz_data'
# or 'y_grid_data', 'z_grid_data', 'grid_shape', 'num_dims', 'ranges'
if 'fx_data' in ref_data: # If it's a 3D curve-based result
all_fx_data = ref_data['fx_data']
all_fy_data = ref_data['fy_data']
all_fz_data = ref_data['fz_data']
elif 'y_grid_data' in ref_data: # If it's an N-D grid based result
y_values_grid = ref_data['y_grid_data']
z_values_grid = ref_data['z_grid_data']
grid_shape = ref_data['grid_shape']
num_dims_for_integral = ref_data['num_dims']
integration_ranges = ref_data['ranges']
else:
raise ValueError(f"Unsupported reference data type for source_id '{source_id}'.")
else: # No input_spec means previous step's output is implicitly current input
if step_idx > 0:
raise ValueError(f"Workflow step {step_idx + 1} requires input specification (direct or reference).")
result_y, result_z = None, None
if op_type == 'INTERPOLATE':
result_y, result_z = pseudo_interpolate_arcsecant_nd_triple(
all_fx_data, all_fy_data, all_fz_data, np.array(parameters.get('x_interp_points'), dtype=np.float32)
)
elif op_type == 'DIFFERENTIATE':
result_y, result_z = differentiate_arcsecant_nd_triple(
all_fx_data, all_fy_data, all_fz_data, np.array(parameters.get('x_eval_points'), dtype=np.float32)
)
elif op_type == 'CALCULATE_GRADIENT_1D':
result_y, result_z = calculate_gradient_nd_triple(
all_fx_data, all_fy_data, all_fz_data
)
elif op_type == 'HYPERBOLIC_INTERCEPT_HANDLER':
result_y, result_z = hyperbolic_intercept_handler_nd_triple(
all_fx_data, all_fy_data, all_fz_data,
sensitivity_factor=parameters.get('sensitivity_factor', 10.0),
zero_threshold=parameters.get('zero_threshold', 1e-6)
)
elif op_type == 'INTEGRATE':
result_y, result_z = integrate_arcsecant_nd_triple(
all_fx_data, all_fy_data, all_fz_data
)
elif op_type == 'INTEGRATE_ND':
result_y, result_z = numerical_integration_nd(
num_dims_for_integral, grid_shape, integration_ranges,
y_values_grid, z_values_grid
)
else:
raise ValueError(f"Unknown operation type in workflow: {op_type}")
# Store results for chaining if an output_id is provided
if output_id:
if op_type in ['INTERPOLATE', 'DIFFERENTIATE', 'CALCULATE_GRADIENT_1D', 'HYPERBOLIC_INTERCEPT_HANDLER', 'INTEGRATE']:
# Store lists of arrays for 3D curves
intermediate_results[output_id] = {
'fx_data': all_fx_data, # x data might need to be carried through
'fy_data': [result_y[i*len(all_fx_data[0]):(i+1)*len(all_fx_data[0])] for i in range(len(all_fx_data))],
'fz_data': [result_z[i*len(all_fx_data[0]):(i+1)*len(all_fx_data[0])] for i in range(len(all_fx_data))]
}
elif op_type == 'INTEGRATE_ND':
# Store grid data for N-D integral results
intermediate_results[output_id] = {
'y_grid_data': result_y, # This would be a scalar, not a grid, for definite integral
'z_grid_data': result_z, # This would be a scalar
'grid_shape': grid_shape, # Preserve original grid info for context
'num_dims': num_dims_for_integral,
'ranges': integration_ranges
}
final_result_y = result_y
final_result_z = result_z
# Send the final results of the last operation in the workflow
result = np.concatenate((final_result_y.flatten(), final_result_z.flatten())).astype(np.float32)
_sendall_data(client_socket, result)
print(f"Successfully processed workflow and sent {len(result)} float results to {addr}.")
elif operation_code == OPERATION_INTEGRATE_ND:
# --- Protocol for single N-Dimensional Volume Integration ---
num_dims_for_integral_bytes = _recvall(client_socket, 4)
if num_dims_for_integral_bytes is None:
raise ConnectionResetError(f"Incomplete N-dim integral data (num_dims) from {addr}.")
num_dims_for_integral = struct.unpack('!I', num_dims_for_integral_bytes)[0]
grid_shape_len_bytes = _recvall(client_socket, 4)
if grid_shape_len_bytes is None:
raise ConnectionResetError(f"Incomplete N-dim integral data (grid_shape_len) from {addr}.")
grid_shape_len = struct.unpack('!I', grid_shape_len_bytes)[0]
grid_shape_bytes = _recvall(client_socket, grid_shape_len)
if grid_shape_bytes is None:
raise ConnectionResetError(f"Incomplete N-dim integral data (grid_shape) from {addr}.")
grid_shape = struct.unpack(f'!{grid_shape_len // 4}I', grid_shape_bytes)
print(f"Client {addr} sending N-D grid with shape: {grid_shape}")
integration_ranges_len_bytes = _recvall(client_socket, 4)
if integration_ranges_len_bytes is None:
raise ConnectionResetError(f"Incomplete N-dim integral data (ranges_len) from {addr}.")
integration_ranges_len = struct.unpack('!I', integration_ranges_len_bytes)[0]
integration_ranges_bytes = _recvall(client_socket, integration_ranges_len)
if integration_ranges_bytes is None:
raise ConnectionResetError(f"Incomplete N-dim integral data (ranges) from {addr}.")
integration_ranges_flat = np.array(struct.unpack(f'!{integration_ranges_len // 4}f', integration_ranges_bytes), dtype=np.float32)
integration_ranges = [(integration_ranges_flat[i], integration_ranges_flat[i+1]) for i in range(0, len(integration_ranges_flat), 2)]
print(f"Client {addr} sending N-D integration ranges: {integration_ranges}")
flat_y_values_len_bytes = _recvall(client_socket, 4)
if flat_y_values_len_bytes is None:
raise ConnectionResetError(f"Incomplete N-dim integral data (y_values_len) from {addr}.")
flat_y_values_len = struct.unpack('!I', flat_y_values_len_bytes)[0]
flat_y_values_bytes = _recvall(client_socket, flat_y_values_len)
if flat_y_values_bytes is None:
raise ConnectionResetError(f"Incomplete N-dim integral data (y_values) from {addr}.")
y_values_grid = np.array(struct.unpack(f'!{flat_y_values_len // 4}f', flat_y_values_bytes), dtype=np.float32).reshape(grid_shape)
flat_z_values_len_bytes = _recvall(client_socket, 4)
if flat_z_values_len_bytes is None: # Z values are optional for N-D integration
z_values_grid = None
else:
flat_z_values_len = struct.unpack('!I', flat_z_values_len_bytes)[0]
flat_z_values_bytes = _recvall(client_socket, flat_z_values_len)
if flat_z_values_bytes is None:
z_values_grid = None
print("No Z values provided for N-D integration.")
else:
z_values_grid = np.array(struct.unpack(f'!{flat_z_values_len // 4}f', flat_z_values_bytes), dtype=np.float32).reshape(grid_shape)
result_y, result_z = numerical_integration_nd(
num_dims_for_integral, grid_shape, integration_ranges, y_values_grid, z_values_grid
)
result = np.concatenate((result_y.flatten(), result_z.flatten())).astype(np.float32)
_sendall_data(client_socket, result)
else: # Existing Protocol for N Independent 3D Curves
num_dimensions_bytes = _recvall(client_socket, 4)
if num_dimensions_bytes is None:
print(f"Client {addr} disconnected (no number of 3D curves received).")
return
num_dimensions = struct.unpack('!I', num_dimensions_bytes)[0]
print(f"Client {addr} sending {num_dimensions} independent 3D curves.")
all_fx_data = []
all_fy_data = []
all_fz_data = []
for dim in range(num_dimensions):
num_fx = struct.unpack('!I', _recvall(client_socket, 4))[0]
fx_bytes = _recvall(client_socket, num_fx * 4)
if fx_bytes is None:
raise ConnectionResetError(f"Incomplete x data for 3D curve {dim+1} from {addr}.")
all_fx_data.append(np.array(struct.unpack(f'!{num_fx}f', fx_bytes), dtype=np.float32))
num_fy = struct.unpack('!I', _recvall(client_socket, 4))[0]
fy_bytes = _recvall(client_socket, num_fy * 4)
if fy_bytes is None:
raise ConnectionResetError(f"Incomplete y data for 3D curve {dim+1} from {addr}.")
all_fy_data.append(np.array(struct.unpack(f'!{num_fy}f', fy_bytes), dtype=np.float32))
num_fz = struct.unpack('!I', _recvall(client_socket, 4))[0]
fz_bytes = _recvall(client_socket, num_fz * 4)
if fz_bytes is None:
raise ConnectionResetError(f"Incomplete z data for 3D curve {dim+1} from {addr}.")
all_fz_data.append(np.array(struct.unpack(f'!{num_fz}f', fz_bytes), dtype=np.float32))
eval_x_data_sorted = None
if operation_code in [OPERATION_INTERPOLATE, OPERATION_DIFFERENTIATE]:
eval_x_count = struct.unpack('!I', _recvall(client_socket, 4))[0]
eval_x_bytes = _recvall(client_socket, eval_x_count * 4)
if eval_x_bytes is None:
raise ConnectionResetError(f"Incomplete evaluation x data from {addr}.")
eval_x_data = np.array(struct.unpack(f'!{eval_x_count}f', eval_x_bytes), dtype=np.float32)
eval_x_data_sorted = np.sort(eval_x_data)
elif operation_code in [OPERATION_CALCULATE_GRADIENT_1D, OPERATION_HYPERBOLIC_INTERCEPT_HANDLER, OPERATION_INTEGRATE]:
pass
else:
raise ValueError(f"Invalid operation code received from {addr}: {operation_code}")
result_y, result_z = None, None
if operation_code == OPERATION_INTERPOLATE:
print(f"Performing interpolation for client {addr}...")
result_y, result_z = pseudo_interpolate_arcsecant_nd_triple(all_fx_data, all_fy_data, all_fz_data, eval_x_data_sorted)
elif operation_code == OPERATION_DIFFERENTIATE:
print(f"Performing differentiation of interpolated data for client {addr}...")
result_y, result_z = differentiate_arcsecant_nd_triple(all_fx_data, all_fy_data, all_fz_data, eval_x_data_sorted)
elif operation_code == OPERATION_CALCULATE_GRADIENT_1D:
print(f"Calculating 1D gradients for client {addr}...")
result_y, result_z = calculate_gradient_nd_triple(all_fx_data, all_fy_data, all_fz_data)
elif operation_code == OPERATION_HYPERBOLIC_INTERCEPT_HANDLER:
print(f"Performing hyperbolic intercept handling for client {addr}...")
result_y, result_z = hyperbolic_intercept_handler_nd_triple(all_fx_data, all_fy_data, all_fz_data)
elif operation_code == OPERATION_INTEGRATE:
print(f"Performing integration for client {addr}...")
result_y, result_z = integrate_arcsecant_nd_triple(all_fx_data, all_fy_data, all_fz_data)
result = np.concatenate((result_y, result_z)).astype(np.float32)
_sendall_data(client_socket, result)
print(f"Successfully processed and sent {len(result)} float results to {addr}.")
except (ValueError, ConnectionResetError, json.JSONDecodeError) as e:
print(f"Error handling client {addr}: {e}")
error_message = str(e).encode('utf-8')
try:
client_socket.sendall(struct.pack('!I', len(error_message)))
client_socket.sendall(error_message)
except Exception as send_e:
print(f"Failed to send error message to {addr}: {send_e}")
except Exception as e:
print(f"An unexpected internal server error occurred for client {addr}: {e}")
error_message = f"Server internal error: {e}".encode('utf-8')
try:
client_socket.sendall(struct.pack('!I', len(error_message)))
client_socket.sendall(error_message)
except Exception as send_e:
print(f"Failed to send error message to {addr}: {send_e}")
finally:
client_socket.close()
print(f"Connection with client {addr} closed.")
def start_server(host, port):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
server_socket.bind((host, port))
server_socket.listen(5)
print(f"Server listening on {host}:{port}")
while True:
client_socket, addr = server_socket.accept()
print(f"Accepted connection from {addr}")
client_thread = threading.Thread(target=handle_client, args=(client_socket, addr))
client_thread.daemon = True
client_thread.start()
except OSError as e:
print(f"Server binding error: {e}. This might mean another instance is running or the port is in use.")
except KeyboardInterrupt:
print("\nServer shutting down...")
except Exception as e:
print(f"An unexpected server error occurred: {e}")
finally:
server_socket.close()
print("Server socket closed.")
if __name__ == "__main__":
start_server(SERVER_HOST, SERVER_PORT)
##Client Code#
import socket
import struct
import numpy as np
import json
import time
# --- Constants (Must match server) ---
OPERATION_INTERPOLATE = 0
OPERATION_DIFFERENTIATE = 1
OPERATION_CALCULATE_GRADIENT_1D = 2
OPERATION_HYPERBOLIC_INTERCEPT_HANDLER = 3
OPERATION_INTEGRATE = 4
OPERATION_INTEGRATE_ND = 5
OPERATION_WORKFLOW = 6 # NEW: For relational compositions
SERVER_HOST = '127.0.0.1'
SERVER_PORT = 12345
# --- Helper Functions ---
def _sendall_data(sock, data_array):
"""Helper to send a numpy array's bytes."""
data_bytes = data_array.astype(np.float32).tobytes()
sock.sendall(struct.pack('!I', len(data_bytes)))sock.sendall(data_bytes)
def _recvall(sock, n):
"""
Helper function to reliably receive exactly 'n' bytes from a socket.
"""
data = b''
while len(data) < n:
packet = sock.recv(n - len(data))
if not packet:
return None
data += packet
return data
def _recvall_data(sock):
"""Helper to receive a numpy array's bytes or an error message."""
data_len_bytes = _recvall(sock, 4)
if data_len_bytes is None:
raise ConnectionResetError("Incomplete data (length) received from server.") data_len = struct.unpack('!I', data_len_bytes)[0]data_bytes = _recvall(sock, data_len)
if data_bytes is None:
raise ConnectionResetError("Incomplete data (content) received from server.")# Check if the received data is an error message
try:
error_message = data_bytes.decode('utf-8')if "Error" in error_message or "Server internal error" in error_message:
raise RuntimeError(f"Server returned an error: {error_message}")except UnicodeDecodeError:
# Not an error message, proceed as normal float data
pass
return np.array(struct.unpack(f'!{data_len // 4}f', data_bytes), dtype=np.float32)# --- Client Workflow Execution Function ---
def execute_workflow(workflow_steps):
"""
Executes a sequence of operations on the server as a single workflow.
Args:
workflow_steps (list): A list of dictionaries, where each dictionary
defines a single operation in the workflow.
Each operation dictionary should contain:
- "operation_type" (str): The name of the operation
(e.g., "INTERPOLATE", "DIFFERENTIATE", "INTEGRATE_ND").
These strings correspond to internal server logic.
- "input_data" (dict): Specifies the input for this step.
- "type" (str): "direct" if data is provided directly,
"reference" if referring to a previous step's output.
- If "type" is "direct":
- "fx_data", "fy_data", "fz_data" (list of lists of floats):
For 3D curve operations.
- For "INTEGRATE_ND":
- "num_dims_integration" (int)
- "grid_shape" (list of ints)
- "integration_ranges" (list of lists/tuples of floats, e.g., [[min1, max1], [min2, max2]])
- "flat_y_values" (list of floats): Flattened y-grid data.
- "flat_z_values" (list of floats, optional): Flattened z-grid data.
- If "type" is "reference":
- "source_id" (str): The 'output_id' of a previous step
whose result should be used as input for this step.
- "parameters" (dict, optional): Operation-specific parameters.
- For "INTERPOLATE": {"x_interp_points": list of floats} - For "DIFFERENTIATE": {"x_eval_points": list of floats}- For "HYPERBOLIC_INTERCEPT_HANDLER":
{"sensitivity_factor": float, "zero_threshold": float}- "output_id" (str, optional): A unique identifier for this step's
output. This allows subsequent steps to reference it.
Only the last step in a workflow can omit this, as its output
is the final result returned to the client.
Returns:
numpy.ndarray: The final result from the last operation in the workflow,
or None if an error occurred.
"""
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
client_socket.connect((SERVER_HOST, SERVER_PORT))
print(f"Connected to server at {SERVER_HOST}:{SERVER_PORT}")# 1. Send OPERATION_WORKFLOW code
client_socket.sendall(struct.pack('!B', OPERATION_WORKFLOW))# 2. Serialize workflow steps to JSON
workflow_json = json.dumps(workflow_steps)
workflow_bytes = workflow_json.encode('utf-8')# 3. Send workflow length and then workflow bytes
client_socket.sendall(struct.pack('!I', len(workflow_bytes)))client_socket.sendall(workflow_bytes)
print("Workflow sent to server. Waiting for result...")# 4. Receive the final result from the server
result = _recvall_data(client_socket)
return result
except Exception as e:
print(f"Client error during workflow execution: {e}")return None
finally:
client_socket.close()
print("Connection closed.")def execute_single_operation_nd_triple(operation_code, all_fx_data, all_fy_data, all_fz_data, eval_x_points=None):
"""
Executes a single operation for N independent 3D curves.
Args:
operation_code (int): The specific operation to perform
(e.g., OPERATION_INTERPOLATE, OPERATION_DIFFERENTIATE).
all_fx_data (list of numpy.ndarray): List of x-coordinate arrays for N curves.
all_fy_data (list of numpy.ndarray): List of y-coordinate arrays for N curves.
all_fz_data (list of numpy.ndarray): List of z-coordinate arrays for N curves.
eval_x_points (numpy.ndarray, optional): Points at which to interpolate or differentiate.
Required for INTERPOLATE and DIFFERENTIATE.
Returns:
numpy.ndarray: The concatenated results (Y, then Z) for all curves, or None on error.
"""
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
client_socket.connect((SERVER_HOST, SERVER_PORT))
print(f"Connected to server at {SERVER_HOST}:{SERVER_PORT}")# Send the operation code
client_socket.sendall(struct.pack('!B', operation_code))# Send the number of 3D curves
num_dimensions = len(all_fx_data)
client_socket.sendall(struct.pack('!I', num_dimensions))# Send data for each 3D curve
for dim in range(num_dimensions):
_sendall_data(client_socket, all_fx_data[dim])
_sendall_data(client_socket, all_fy_data[dim])
_sendall_data(client_socket, all_fz_data[dim])
# Send evaluation points if required by the operation
if operation_code in [OPERATION_INTERPOLATE, OPERATION_DIFFERENTIATE] and eval_x_points is not None:
_sendall_data(client_socket, eval_x_points)
print(f"Data for operation {operation_code} sent. Waiting for result...")result = _recvall_data(client_socket)
return result
except Exception as e:
print(f"Client error for single 3D curve operation: {e}")return None
finally:
client_socket.close()
print("Connection closed.")def execute_single_operation_nd_integral(num_dims, grid_shape, integration_ranges, flat_y_values, flat_z_values=None):
"""
Executes a single N-dimensional volume integration operation.
Args:
num_dims (int): The number of dimensions for the integral.
grid_shape (tuple): The shape of the N-dimensional data grid.
integration_ranges (list of tuples): List of (min, max) tuples for each dimension's integration range.
flat_y_values (numpy.ndarray): Flattened N-dimensional y-values grid.
flat_z_values (numpy.ndarray, optional): Flattened N-dimensional z-values grid.
Returns:
numpy.ndarray: The resulting scalar integral value (Y, then Z), or None on error.
"""
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
client_socket.connect((SERVER_HOST, SERVER_PORT))
print(f"Connected to server at {SERVER_HOST}:{SERVER_PORT}")# Send OPERATION_INTEGRATE_ND code
client_socket.sendall(struct.pack('!B', OPERATION_INTEGRATE_ND))# Send integral parameters
client_socket.sendall(struct.pack('!I', num_dims)) # num_dimensions_for_integral grid_shape_bytes = struct.pack(f'!{len(grid_shape)}I', *grid_shape) client_socket.sendall(struct.pack('!I', len(grid_shape_bytes)))client_socket.sendall(grid_shape_bytes)
integration_ranges_flat = np.array([val for r in integration_ranges for val in r], dtype=np.float32)
client_socket.sendall(struct.pack('!I', len(integration_ranges_flat.tobytes())))client_socket.sendall(integration_ranges_flat.tobytes())
# Send flattened Y and Z values
_sendall_data(client_socket, flat_y_values)
if flat_z_values is not None:
_sendall_data(client_socket, flat_z_values)
else:
client_socket.sendall(struct.pack('!I', 0)) # Indicate no Z values by sending 0 lengthprint(f"Data for N-D integral sent. Waiting for result...")
result = _recvall_data(client_socket)
return result
except Exception as e:
print(f"Client error for single N-D integral operation: {e}")return None
finally:
client_socket.close()
print("Connection closed.")# --- Demonstration of Usage ---
if __name__ == "__main__":
# Example 1: Testing single 3D curve interpolation (existing functionality)
print("\n--- Testing single 3D curve interpolation ---")fx_data_single = [np.array([1.0, 2.0, 3.0, 4.0, 5.0], dtype=np.float32)]
fy_data_single = [np.array([10.0, 12.0, 15.0, 19.0, 25.0], dtype=np.float32)]
fz_data_single = [np.array([20.0, 21.0, 23.0, 26.0, 30.0], dtype=np.float32)]
interp_points_single = np.array([1.5, 3.5], dtype=np.float32)
interp_result_single = execute_single_operation_nd_triple(
OPERATION_INTERPOLATE, fx_data_single, fy_data_single, fz_data_single, interp_points_single
)
if interp_result_single is not None:
# Expected output is concatenated Y and Z results
# For a single curve and 2 interpolation points, result will be [y1, y2, z1, z2]
interp_y = interp_result_single[:len(interp_points_single)]
interp_z = interp_result_single[len(interp_points_single):]
print(f"Single interpolation result (Y values): {interp_y}") print(f"Single interpolation result (Z values): {interp_z}")time.sleep(1) # Give server a moment
# Example 2: Testing single N-dimensional integral operation (2D)
print("\n--- Testing single N-dimensional integral (2D) ---")# Define a simple 2D function f(x,y) = x^2 + y^2 on a 3x3 grid
x_coords_2d = np.linspace(0, 1, 3)
y_coords_2d = np.linspace(0, 1, 3)
X_grid, Y_grid = np.meshgrid(x_coords_2d, y_coords_2d, indexing='ij') # Use 'ij' for consistent reshaping
F_values_grid = X_grid**2 + Y_grid**2 # Simulate y = F(x,y)
G_values_grid = X_grid + Y_grid # Simulate z = G(x,y)
integral_result_nd = execute_single_operation_nd_integral(
num_dims=2,
grid_shape=(3, 3),
integration_ranges=[(0, 1), (0, 1)], # Ranges for x and y dimensions
flat_y_values=F_values_grid.flatten(),
flat_z_values=G_values_grid.flatten()
)
if integral_result_nd is not None:
# Result is a scalar for Y integral and Z integral
integral_y = integral_result_nd[0]
integral_z = integral_result_nd[1]
print(f"Single N-D integral result (Y): {integral_y}") print(f"Single N-D integral result (Z): {integral_z}")time.sleep(1) # Give server a moment
# Example 3: Relational Composition Workflow - Interpolate then Differentiate
print("\n--- Testing relational composition workflow: Interpolate then Differentiate ---")# Data for the first step (interpolation)
initial_fx_data_workflow = [np.array([1.0, 2.0, 3.0, 4.0, 5.0], dtype=np.float32)]
initial_fy_data_workflow = [np.array([10.0, 12.0, 15.0, 19.0, 25.0], dtype=np.float32)]
initial_fz_data_workflow = [np.array([20.0, 21.0, 23.0, 26.0, 30.0], dtype=np.float32)]
# Points for interpolation, which will then be evaluation points for differentiation
common_eval_points_workflow = np.array([1.5, 2.5, 3.5, 4.5], dtype=np.float32)
workflow_interpolate_then_differentiate = [
{"operation_type": "INTERPOLATE",
"input_data": {"type": "direct", # This step takes direct input data
"fx_data": [arr.tolist() for arr in initial_fx_data_workflow], # Convert numpy to list for JSON serialization
"fy_data": [arr.tolist() for arr in initial_fy_data_workflow],
"fz_data": [arr.tolist() for arr in initial_fz_data_workflow]
},
"parameters": {"x_interp_points": common_eval_points_workflow.tolist()
},
"output_id": "interpolated_data" # Assign an ID to this step's output so it can be referenced
},
{"operation_type": "DIFFERENTIATE",
"input_data": {"type": "reference", # This step takes input from a previous step
"source_id": "interpolated_data" # Reference the output generated by the "INTERPOLATE" step
},
"parameters": {"x_eval_points": common_eval_points_workflow.tolist() # X-values are still needed for differentiation
}
# No "output_id" for the last step, as its result is the final return to the client
}
]
workflow_result_diff = execute_workflow(workflow_interpolate_then_differentiate)
if workflow_result_diff is not None:
# The result will be concatenated Y and Z derivatives
# For 1 curve and 4 evaluation points, result is [y_deriv_1, y_deriv_2, y_deriv_3, y_deriv_4, z_deriv_1, ...]
num_results_per_curve_eval = len(common_eval_points_workflow)
num_curves_workflow = len(initial_fx_data_workflow) # In this case, 1
y_derivatives_workflow = workflow_result_diff[ : num_results_per_curve_eval * num_curves_workflow]
z_derivatives_workflow = workflow_result_diff[num_results_per_curve_eval * num_curves_workflow : ]
print(f"Workflow result (Y derivatives): {y_derivatives_workflow}") print(f"Workflow result (Z derivatives): {z_derivatives_workflow}")time.sleep(1) # Give server a moment
# Example 4: Relational Composition Workflow - More complex (e.g., two curves, interpolate, then gradient)
print("\n--- Testing relational composition workflow: Two Curves, Interpolate then Gradient ---")fx_c1 = np.array([1, 2, 3], dtype=np.float32)
fy_c1 = np.array([5, 6, 7], dtype=np.float32)
fz_c1 = np.array([10, 12, 14], dtype=np.float32)
fx_c2 = np.array([10, 11, 12], dtype=np.float32)
fy_c2 = np.array([20, 21, 22], dtype=np.float32)
fz_c2 = np.array([30, 31, 32], dtype=np.float32)
initial_fx_data_multi = [fx_c1, fx_c2]
initial_fy_data_multi = [fy_c1, fy_c2]
initial_fz_data_multi = [fz_c1, fz_c2]
interp_points_multi = np.array([1.5, 2.5, 10.5, 11.5], dtype=np.float32) # Interpolate at points relevant to both curves
workflow_multi_curve = [
{"operation_type": "INTERPOLATE",
"input_data": {"type": "direct",
"fx_data": [arr.tolist() for arr in initial_fx_data_multi],
"fy_data": [arr.tolist() for arr in initial_fy_data_multi],
"fz_data": [arr.tolist() for arr in initial_fz_data_multi]
},
"parameters": {"x_interp_points": interp_points_multi.tolist()
},
"output_id": "interpolated_multi_data"
},
{"operation_type": "CALCULATE_GRADIENT_1D", # Gradient calculation also works on 3D curve data
"input_data": {"type": "reference",
"source_id": "interpolated_multi_data"
# Note: The server-side gradient function will internally re-sort x for each curve
# from the 'fx_data' carried through with the reference.
}
}
]
workflow_result_multi = execute_workflow(workflow_multi_curve)
if workflow_result_multi is not None:
# Result will be concatenated Y and Z gradients for both curves
# For 2 curves, each interpolated to 2 points, result has 4 Y and 4 Z values
num_interp_points_per_curve = len(interp_points_multi) // len(initial_fx_data_multi)
total_y_results = num_interp_points_per_curve * len(initial_fx_data_multi)
y_gradients_multi = workflow_result_multi[:total_y_results]
z_gradients_multi = workflow_result_multi[total_y_results:]
print(f"Workflow result (Y gradients for both curves): {y_gradients_multi}") print(f"Workflow result (Z gradients for both curves): {z_gradients_multi}")#####################
To make the JSON query formulation user-friendly, here's a detailed explanation of the structure expected by the server for workflow operations.
The server's
OPERATION_WORKFLOWmode expects a single JSON object (serialized as a string) containing a list of operation steps. This allows you to chain multiple mathematical operations together.JSON Workflow Structure
The top-level JSON structure for a workflow request is a dictionary with a single key:
"workflow". The value associated with this key is a list of operation dictionaries.{ "workflow": [ { // First operation dictionary }, { // Second operation dictionary }, // ... more operation dictionaries ] }Operation Dictionary Structure
Each dictionary within the
"workflow"list represents a single step (operation) in your computation pipeline. It must contain the following keys:
"operation_type"(string, required):
- Specifies the mathematical operation to perform in this step.
- Valid values correspond to the functionalities implemented on the server:
"INTERPOLATE": For pseudo-interpolation of 3D curves."DIFFERENTIATE": For differentiation of 3D curves (after interpolation)."CALCULATE_GRADIENT_1D": For calculating gradients of 3D curves."HYPERBOLIC_INTERCEPT_HANDLER": For hyperbolic intercept processing of 3D curves. "INTEGRATE": For integration of 3D curves."INTEGRATE_ND": For N-dimensional volume integration."input_data"(object, required):
- Defines where the data for this operation comes from.
- It has a
"type"key, which can be either"direct"or"reference".input_dataType:"direct"
- Used for the first operation in a workflow, or any operation where you want to provide raw data directly.
- Structure depends on the
operation_type:
- For 3D Curve Operations (
INTERPOLATE,DIFFERENTIATE,CALCULATE_GRADIENT_1D,HYPERBOLIC_INTERCEPT_HANDLER,INTEGRATE):"input_data": { "type": "direct", "fx_data": [[x1_1, x1_2, ...], [x2_1, x2_2, ...], ...], // List of x-arrays for N curves "fy_data": [[y1_1, y1_2, ...], [y2_1, y2_2, ...], ...], // List of y-arrays for N curves "fz_data": [[z1_1, z1_2, ...], [z2_1, z2_2, ...], ...] // List of z-arrays for N curves }Each inner list represents a 1D NumPy array in Python.- For N-Dimensional Integral Operations (
INTEGRATE_ND):"input_data": { "type": "direct", "num_dims_integration": 2, // Example: 2 for a 2D integral "grid_shape": [3, 3], // Example: 3x3 grid "integration_ranges": [[0.0, 1.0], [0.0, 1.0]], // Example: x from 0 to 1, y from 0 to 1 "flat_y_values": [y1, y2, y3, ...], // Flattened N-D array of y-values "flat_z_values": [z1, z2, z3, ...] // Optional: Flattened N-D array of z-values }input_dataType:"reference"
- Used for subsequent operations in a workflow to use the output of a previous step as input.
- You specify the
output_idof the previous step. The server intelligently passes the correct data format based on theoperation_typeof the referenced output.- Structure:
"input_data": { "type": "reference", "source_id": "unique_id_of_previous_step_output" // Matches an "output_id" from a prior step } "parameters"(object, optional):
- A dictionary holding any operation-specific parameters.
- For
INTERPOLATE:"parameters": { "x_interp_points": [1.5, 2.5, 3.5] // List of x-coordinates for interpolation }- For
DIFFERENTIATE:"parameters": { "x_eval_points": [1.5, 2.5, 3.5] // List of x-coordinates for differentiation }- For
HYPERBOLIC_INTERCEPT_HANDLER:"parameters": { "sensitivity_factor": 10.0, // Optional, default is 10.0 "zero_threshold": 0.000001 // Optional, default is 1e-6 }- Other operations (
CALCULATE_GRADIENT_1D,INTEGRATE,INTEGRATE_ND) typically do not require additional parameters beyond their input data."output_id"(string, optional):
- A unique string identifier for the result of this specific operation.
- If provided, the server will store this step's output under this ID, making it available for subsequent
"reference"typeinput_datain later workflow steps.- This is crucial for chaining operations. The last step in a workflow typically omits this, as its result is the final output returned to the client.
Example Workflow JSON
Here's an example demonstrating a workflow to Interpolate a 3D curve and then Differentiate the interpolated result:
{ "workflow": [ { "operation_type": "INTERPOLATE", "input_data": { "type": "direct", "fx_data": [[1.0, 2.0, 3.0, 4.0, 5.0]], "fy_data": [[10.0, 12.0, 15.0, 19.0, 25.0]], "fz_data": [[20.0, 21.0, 23.0, 26.0, 30.0]] }, "parameters": { "x_interp_points": [1.5, 2.5, 3.5, 4.5] }, "output_id": "my_interpolated_curve" }, { "operation_type": "DIFFERENTIATE", "input_data": { "type": "reference", "source_id": "my_interpolated_curve" }, "parameters": { "x_eval_points": [1.5, 2.5, 3.5, 4.5] } // No "output_id" here, as this is the final step } ] }By structuring your JSON requests this way, you can build complex computational pipelines that leverage the different functionalities of the server in a flexible and user-friendly manner.
No comments:
Post a Comment