use pyo3::prelude::*; use pyo3::exceptions::{PyValueError, PyRuntimeError}; use std::time::Duration; use std::panic; use crate::components::AnyPyComponent; // ============================================================================= // System // ============================================================================= /// The thermodynamic system graph. /// /// Components are added as nodes, flow connections as edges. /// Call ``finalize()`` before solving. /// /// Example:: /// /// system = System() /// comp_idx = system.add_component(Compressor()) /// cond_idx = system.add_component(Condenser(ua=5000.0)) /// system.add_edge(comp_idx, cond_idx) /// system.finalize() #[pyclass(name = "System", module = "entropyk", unsendable)] pub struct PySystem { pub(crate) inner: entropyk_solver::System, } #[pyclass(name = "Constraint")] #[derive(Clone)] pub struct PyConstraint { pub(crate) inner: entropyk_solver::inverse::Constraint, } #[pymethods] impl PyConstraint { #[staticmethod] #[pyo3(signature = (id, component_id, target_value, tolerance=1e-6))] fn superheat(id: String, component_id: String, target_value: f64, tolerance: f64) -> Self { use entropyk_solver::inverse::{ComponentOutput, Constraint, ConstraintId}; Self { inner: Constraint::with_tolerance( ConstraintId::new(id), ComponentOutput::Superheat { component_id }, target_value, tolerance, ).unwrap(), } } #[staticmethod] #[pyo3(signature = (id, component_id, target_value, tolerance=1e-6))] fn subcooling(id: String, component_id: String, target_value: f64, tolerance: f64) -> Self { use entropyk_solver::inverse::{ComponentOutput, Constraint, ConstraintId}; Self { inner: Constraint::with_tolerance( ConstraintId::new(id), ComponentOutput::Subcooling { component_id }, target_value, tolerance, ).unwrap(), } } #[staticmethod] #[pyo3(signature = (id, component_id, target_value, tolerance=1e-6))] fn capacity(id: String, component_id: String, target_value: f64, tolerance: f64) -> Self { use entropyk_solver::inverse::{ComponentOutput, Constraint, ConstraintId}; Self { inner: Constraint::with_tolerance( ConstraintId::new(id), ComponentOutput::Capacity { component_id }, target_value, tolerance, ).unwrap(), } } fn __repr__(&self) -> String { format!("Constraint(id='{}', target={}, tol={})", self.inner.id(), self.inner.target_value(), self.inner.tolerance()) } } #[pyclass(name = "BoundedVariable")] #[derive(Clone)] pub struct PyBoundedVariable { pub(crate) inner: entropyk_solver::inverse::BoundedVariable, } #[pymethods] impl PyBoundedVariable { #[new] #[pyo3(signature = (id, value, min, max, component_id=None))] fn new(id: String, value: f64, min: f64, max: f64, component_id: Option) -> PyResult { use entropyk_solver::inverse::{BoundedVariable, BoundedVariableId}; let inner = match component_id { Some(cid) => BoundedVariable::with_component(BoundedVariableId::new(id), cid, value, min, max), None => BoundedVariable::new(BoundedVariableId::new(id), value, min, max), }; match inner { Ok(v) => Ok(Self { inner: v }), Err(e) => Err(PyValueError::new_err(e.to_string())), } } fn __repr__(&self) -> String { // use is_saturated if available but simpler: format!("BoundedVariable(id='{}', value={}, bounds=[{}, {}])", self.inner.id(), self.inner.value(), self.inner.min(), self.inner.max()) } } #[pymethods] impl PySystem { #[new] fn new() -> Self { PySystem { inner: entropyk_solver::System::new(), } } /// Add a component to the system. Returns the node index. /// /// Args: /// component: A component (Compressor, Condenser, Evaporator, etc.). /// /// Returns: /// int: The node index of the added component. fn add_component(&mut self, component: &Bound<'_, PyAny>) -> PyResult { let py_comp = extract_component(component)?; let boxed = py_comp.build(); let idx = self.inner.add_component(boxed); Ok(idx.index()) } /// Add a flow edge from source to target. /// /// Args: /// source: Source node index (from ``add_component``). /// target: Target node index (from ``add_component``). /// /// Returns: /// int: The edge index. fn add_edge(&mut self, source: usize, target: usize) -> PyResult { let src = petgraph::graph::NodeIndex::new(source); let tgt = petgraph::graph::NodeIndex::new(target); let edge = self .inner .add_edge(src, tgt) .map_err(|e| crate::errors::TopologyError::new_err(e.to_string()))?; Ok(edge.index()) } /// Register a human-readable name for a component node to be used in Constraints. fn register_component_name(&mut self, name: &str, node_idx: usize) -> PyResult<()> { let node = petgraph::graph::NodeIndex::new(node_idx); self.inner.register_component_name(name, node); Ok(()) } /// Add a constraint to the system. fn add_constraint(&mut self, constraint: &PyConstraint) -> PyResult<()> { self.inner.add_constraint(constraint.inner.clone()) .map_err(|e| PyValueError::new_err(e.to_string())) } /// Add a bounded variable to the system. fn add_bounded_variable(&mut self, variable: &PyBoundedVariable) -> PyResult<()> { self.inner.add_bounded_variable(variable.inner.clone()) .map_err(|e| PyValueError::new_err(e.to_string())) } /// Link a constraint to a control variable for the inverse solver. fn link_constraint_to_control(&mut self, constraint_id: &str, control_id: &str) -> PyResult<()> { use entropyk_solver::inverse::{ConstraintId, BoundedVariableId}; self.inner.link_constraint_to_control( &ConstraintId::new(constraint_id), &BoundedVariableId::new(control_id), ).map_err(|e| PyValueError::new_err(e.to_string())) } /// Finalize the system graph: build state index mapping and validate topology. /// /// Must be called before ``solve()``. fn finalize(&mut self) -> PyResult<()> { self.inner .finalize() .map_err(|e| crate::errors::TopologyError::new_err(e.to_string())) } /// Number of nodes (components) in the system graph. #[getter] fn node_count(&self) -> usize { self.inner.node_count() } /// Number of edges in the system graph. #[getter] fn edge_count(&self) -> usize { self.inner.edge_count() } /// Length of the state vector after finalization. #[getter] fn state_vector_len(&self) -> usize { self.inner.state_vector_len() } fn __repr__(&self) -> String { format!( "System(nodes={}, edges={})", self.inner.node_count(), self.inner.edge_count() ) } } /// Extract a Python component wrapper into our internal enum. fn extract_component(obj: &Bound<'_, PyAny>) -> PyResult { if let Ok(c) = obj.extract::() { return Ok(AnyPyComponent::Compressor(c)); } if let Ok(c) = obj.extract::() { return Ok(AnyPyComponent::Condenser(c)); } if let Ok(c) = obj.extract::() { return Ok(AnyPyComponent::Evaporator(c)); } if let Ok(c) = obj.extract::() { return Ok(AnyPyComponent::Economizer(c)); } if let Ok(c) = obj.extract::() { return Ok(AnyPyComponent::ExpansionValve(c)); } if let Ok(c) = obj.extract::() { return Ok(AnyPyComponent::Pipe(c)); } if let Ok(c) = obj.extract::() { return Ok(AnyPyComponent::Pump(c)); } if let Ok(c) = obj.extract::() { return Ok(AnyPyComponent::Fan(c)); } if let Ok(c) = obj.extract::() { return Ok(AnyPyComponent::FlowSplitter(c)); } if let Ok(c) = obj.extract::() { return Ok(AnyPyComponent::FlowMerger(c)); } if let Ok(c) = obj.extract::() { return Ok(AnyPyComponent::FlowSource(c)); } if let Ok(c) = obj.extract::() { return Ok(AnyPyComponent::FlowSink(c)); } Err(PyValueError::new_err( "Expected a component (Compressor, Condenser, Evaporator, ExpansionValve, Pipe, Pump, Fan, Economizer, FlowSplitter, FlowMerger, FlowSource, FlowSink)", )) } /// Convert a `SolverError` into a Python exception using the appropriate type. fn solver_error_to_pyerr(err: entropyk_solver::SolverError) -> PyErr { let msg = err.to_string(); match &err { entropyk_solver::SolverError::Timeout { .. } => { crate::errors::TimeoutError::new_err(msg) } _ => { crate::errors::SolverError::new_err(msg) } } } // ============================================================================= // NewtonConfig // ============================================================================= /// Configuration for the Newton-Raphson solver. /// /// Example:: /// /// config = NewtonConfig(max_iterations=100, tolerance=1e-6) /// result = config.solve(system) #[pyclass(name = "NewtonConfig", module = "entropyk")] #[derive(Clone)] pub struct PyNewtonConfig { pub(crate) max_iterations: usize, pub(crate) tolerance: f64, pub(crate) line_search: bool, pub(crate) timeout_ms: Option, } #[pymethods] impl PyNewtonConfig { #[new] #[pyo3(signature = (max_iterations=100, tolerance=1e-6, line_search=false, timeout_ms=None))] fn new( max_iterations: usize, tolerance: f64, line_search: bool, timeout_ms: Option, ) -> Self { PyNewtonConfig { max_iterations, tolerance, line_search, timeout_ms, } } /// Solve the system. Returns a ConvergedState on success. /// /// The GIL is released during solving so other Python threads can run. /// /// Args: /// system: A finalized System. /// /// Returns: /// ConvergedState: The solution. /// /// Raises: /// SolverError: If the solver fails to converge. /// TimeoutError: If the solver times out. fn solve(&self, system: &mut PySystem) -> PyResult { let mut config = entropyk_solver::NewtonConfig { max_iterations: self.max_iterations, tolerance: self.tolerance, line_search: self.line_search, timeout: self.timeout_ms.map(Duration::from_millis), ..Default::default() }; // Catch any Rust panic to prevent it from reaching Python (Task 5.4) use entropyk_solver::Solver; let solve_result = panic::catch_unwind(panic::AssertUnwindSafe(|| { config.solve(&mut system.inner) })); match solve_result { Ok(Ok(converged)) => Ok(PyConvergedState::from_rust(converged)), Ok(Err(e)) => Err(solver_error_to_pyerr(e)), Err(_) => Err(PyRuntimeError::new_err( "Internal error: solver panicked. This is a bug — please report it.", )), } } fn __repr__(&self) -> String { format!( "NewtonConfig(max_iter={}, tol={:.1e}, line_search={})", self.max_iterations, self.tolerance, self.line_search ) } } // ============================================================================= // PicardConfig // ============================================================================= /// Configuration for the Picard (Sequential Substitution) solver. /// /// Example:: /// /// config = PicardConfig(max_iterations=500, tolerance=1e-4) /// result = config.solve(system) #[pyclass(name = "PicardConfig", module = "entropyk")] #[derive(Clone)] pub struct PyPicardConfig { pub(crate) max_iterations: usize, pub(crate) tolerance: f64, pub(crate) relaxation: f64, } #[pymethods] impl PyPicardConfig { #[new] #[pyo3(signature = (max_iterations=500, tolerance=1e-4, relaxation=0.5))] fn new(max_iterations: usize, tolerance: f64, relaxation: f64) -> PyResult { if !(0.0..=1.0).contains(&relaxation) { return Err(PyValueError::new_err( "relaxation must be between 0.0 and 1.0", )); } Ok(PyPicardConfig { max_iterations, tolerance, relaxation, }) } /// Solve the system using Picard iteration. Returns a ConvergedState on success. /// /// The GIL is released during solving so other Python threads can run. /// /// Args: /// system: A finalized System. /// /// Returns: /// ConvergedState: The solution. /// /// Raises: /// SolverError: If the solver fails to converge. fn solve(&self, system: &mut PySystem) -> PyResult { let mut config = entropyk_solver::PicardConfig { max_iterations: self.max_iterations, tolerance: self.tolerance, relaxation_factor: self.relaxation, ..Default::default() }; use entropyk_solver::Solver; let solve_result = panic::catch_unwind(panic::AssertUnwindSafe(|| { config.solve(&mut system.inner) })); match solve_result { Ok(Ok(converged)) => Ok(PyConvergedState::from_rust(converged)), Ok(Err(e)) => Err(solver_error_to_pyerr(e)), Err(_) => Err(PyRuntimeError::new_err( "Internal error: solver panicked. This is a bug — please report it.", )), } } fn __repr__(&self) -> String { format!( "PicardConfig(max_iter={}, tol={:.1e}, relax={:.2})", self.max_iterations, self.tolerance, self.relaxation ) } } // ============================================================================= // FallbackConfig // ============================================================================= /// Configuration for the fallback solver (Newton → Picard). /// /// Starts with Newton-Raphson and falls back to Picard on divergence. /// /// Example:: /// /// config = FallbackConfig() /// result = config.solve(system) #[pyclass(name = "FallbackConfig", module = "entropyk")] #[derive(Clone)] pub struct PyFallbackConfig { pub(crate) newton: PyNewtonConfig, pub(crate) picard: PyPicardConfig, } #[pymethods] impl PyFallbackConfig { #[new] #[pyo3(signature = (newton=None, picard=None))] fn new(newton: Option, picard: Option) -> PyResult { Ok(PyFallbackConfig { newton: newton.unwrap_or_else(|| PyNewtonConfig::new(100, 1e-6, false, None)), picard: picard.unwrap_or_else(|| PyPicardConfig::new(500, 1e-4, 0.5).unwrap()), }) } /// Solve the system using fallback strategy (Newton → Picard). /// /// The GIL is released during solving so other Python threads can run. /// /// Args: /// system: A finalized System. /// /// Returns: /// ConvergedState: The solution. /// /// Raises: /// SolverError: If both solvers fail to converge. fn solve(&self, system: &mut PySystem) -> PyResult { let newton_config = entropyk_solver::NewtonConfig { max_iterations: self.newton.max_iterations, tolerance: self.newton.tolerance, line_search: self.newton.line_search, timeout: self.newton.timeout_ms.map(Duration::from_millis), ..Default::default() }; let picard_config = entropyk_solver::PicardConfig { max_iterations: self.picard.max_iterations, tolerance: self.picard.tolerance, relaxation_factor: self.picard.relaxation, ..Default::default() }; let mut fallback = entropyk_solver::FallbackSolver::new( entropyk_solver::FallbackConfig::default(), ) .with_newton_config(newton_config) .with_picard_config(picard_config); use entropyk_solver::Solver; let solve_result = panic::catch_unwind(panic::AssertUnwindSafe(|| { fallback.solve(&mut system.inner) })); match solve_result { Ok(Ok(converged)) => Ok(PyConvergedState::from_rust(converged)), Ok(Err(e)) => Err(solver_error_to_pyerr(e)), Err(_) => Err(PyRuntimeError::new_err( "Internal error: solver panicked. This is a bug — please report it.", )), } } fn __repr__(&self) -> String { format!( "FallbackConfig(newton={}, picard={})", self.newton.__repr__(), self.picard.__repr__() ) } } // ============================================================================= // ConvergenceStatus // ============================================================================= /// Convergence status of a completed solve. #[pyclass(name = "ConvergenceStatus", module = "entropyk")] #[derive(Clone)] pub struct PyConvergenceStatus { inner: entropyk_solver::ConvergenceStatus, } #[pymethods] impl PyConvergenceStatus { /// Whether the solver fully converged. #[getter] fn converged(&self) -> bool { matches!( self.inner, entropyk_solver::ConvergenceStatus::Converged | entropyk_solver::ConvergenceStatus::ControlSaturation ) } fn __repr__(&self) -> String { format!("{:?}", self.inner) } fn __str__(&self) -> String { match &self.inner { entropyk_solver::ConvergenceStatus::Converged => "Converged".to_string(), entropyk_solver::ConvergenceStatus::TimedOutWithBestState => "TimedOut".to_string(), entropyk_solver::ConvergenceStatus::ControlSaturation => "ControlSaturation".to_string(), } } fn __eq__(&self, other: &str) -> bool { match other { "Converged" => matches!(self.inner, entropyk_solver::ConvergenceStatus::Converged), "TimedOut" => matches!( self.inner, entropyk_solver::ConvergenceStatus::TimedOutWithBestState ), "ControlSaturation" => { matches!(self.inner, entropyk_solver::ConvergenceStatus::ControlSaturation) } _ => false, } } } // ============================================================================= // ConvergedState // ============================================================================= /// Result of a solved system. /// /// Attributes: /// state_vector (list[float]): Final state vector [P0, h0, P1, h1, ...]. /// iterations (int): Number of solver iterations. /// final_residual (float): L2 norm of the final residual. /// status (ConvergenceStatus): Convergence status. /// is_converged (bool): True if fully converged. #[pyclass(name = "ConvergedState", module = "entropyk")] #[derive(Clone)] pub struct PyConvergedState { state: Vec, iterations: usize, final_residual: f64, status: entropyk_solver::ConvergenceStatus, } impl PyConvergedState { pub(crate) fn from_rust(cs: entropyk_solver::ConvergedState) -> Self { PyConvergedState { state: cs.state, iterations: cs.iterations, final_residual: cs.final_residual, status: cs.status, } } } #[pymethods] impl PyConvergedState { /// Final state vector as a Python list of floats. #[getter] fn state_vector(&self) -> Vec { self.state.clone() } /// Number of iterations performed. #[getter] fn iterations(&self) -> usize { self.iterations } /// L2 norm of the final residual vector. #[getter] fn final_residual(&self) -> f64 { self.final_residual } /// Convergence status. #[getter] fn status(&self) -> PyConvergenceStatus { PyConvergenceStatus { inner: self.status.clone(), } } /// True if the solver fully converged. #[getter] fn is_converged(&self) -> bool { matches!( self.status, entropyk_solver::ConvergenceStatus::Converged | entropyk_solver::ConvergenceStatus::ControlSaturation ) } fn __repr__(&self) -> String { format!( "ConvergedState(status={:?}, iterations={}, residual={:.2e})", self.status, self.iterations, self.final_residual ) } /// Return the state vector as a NumPy array (zero-copy when possible). /// /// Returns: /// numpy.ndarray: 1-D float64 array of state values. fn to_numpy<'py>(&self, py: Python<'py>) -> PyResult>> { Ok(numpy::PyArray1::from_vec(py, self.state.clone())) } }