1051 lines
35 KiB
Rust

use pyo3::exceptions::{PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use std::panic;
use std::time::Duration;
use crate::components::AnyPyComponent;
// =============================================================================
// System
// =============================================================================
/// The thermodynamic system graph.
///
/// Components are added as nodes, flow connections as edges.
/// Call ``finalize()`` before solving.
///
/// Example::
///
/// system = System()
/// comp_idx = system.add_component(Compressor())
/// cond_idx = system.add_component(Condenser(ua=5000.0))
/// system.add_edge(comp_idx, cond_idx)
/// system.finalize()
#[pyclass(name = "System", module = "entropyk", unsendable)]
pub struct PySystem {
pub(crate) inner: entropyk_solver::System,
}
#[pyclass(name = "Constraint")]
#[derive(Clone)]
pub struct PyConstraint {
pub(crate) inner: entropyk_solver::inverse::Constraint,
}
#[pymethods]
impl PyConstraint {
#[staticmethod]
#[pyo3(signature = (id, component_id, target_value, tolerance=1e-6))]
fn superheat(id: String, component_id: String, target_value: f64, tolerance: f64) -> Self {
use entropyk_solver::inverse::{ComponentOutput, Constraint, ConstraintId};
Self {
inner: Constraint::with_tolerance(
ConstraintId::new(id),
ComponentOutput::Superheat { component_id },
target_value,
tolerance,
)
.unwrap(),
}
}
#[staticmethod]
#[pyo3(signature = (id, component_id, target_value, tolerance=1e-6))]
fn subcooling(id: String, component_id: String, target_value: f64, tolerance: f64) -> Self {
use entropyk_solver::inverse::{ComponentOutput, Constraint, ConstraintId};
Self {
inner: Constraint::with_tolerance(
ConstraintId::new(id),
ComponentOutput::Subcooling { component_id },
target_value,
tolerance,
)
.unwrap(),
}
}
#[staticmethod]
#[pyo3(signature = (id, component_id, target_value, tolerance=1e-6))]
fn capacity(id: String, component_id: String, target_value: f64, tolerance: f64) -> Self {
use entropyk_solver::inverse::{ComponentOutput, Constraint, ConstraintId};
Self {
inner: Constraint::with_tolerance(
ConstraintId::new(id),
ComponentOutput::Capacity { component_id },
target_value,
tolerance,
)
.unwrap(),
}
}
fn __repr__(&self) -> String {
format!(
"Constraint(id='{}', target={}, tol={})",
self.inner.id(),
self.inner.target_value(),
self.inner.tolerance()
)
}
}
#[pyclass(name = "BoundedVariable")]
#[derive(Clone)]
pub struct PyBoundedVariable {
pub(crate) inner: entropyk_solver::inverse::BoundedVariable,
}
#[pymethods]
impl PyBoundedVariable {
#[new]
#[pyo3(signature = (id, value, min, max, component_id=None))]
fn new(
id: String,
value: f64,
min: f64,
max: f64,
component_id: Option<String>,
) -> PyResult<Self> {
use entropyk_solver::inverse::{BoundedVariable, BoundedVariableId};
let inner = match component_id {
Some(cid) => {
BoundedVariable::with_component(BoundedVariableId::new(id), cid, value, min, max)
}
None => BoundedVariable::new(BoundedVariableId::new(id), value, min, max),
};
match inner {
Ok(v) => Ok(Self { inner: v }),
Err(e) => Err(PyValueError::new_err(e.to_string())),
}
}
fn __repr__(&self) -> String {
// use is_saturated if available but simpler:
format!(
"BoundedVariable(id='{}', value={}, bounds=[{}, {}])",
self.inner.id(),
self.inner.value(),
self.inner.min(),
self.inner.max()
)
}
}
#[pymethods]
impl PySystem {
#[new]
fn new() -> Self {
PySystem {
inner: entropyk_solver::System::new(),
}
}
/// Add a component to the system. Returns the node index.
///
/// Args:
/// component: A component (Compressor, Condenser, Evaporator, etc.).
///
/// Returns:
/// int: The node index of the added component.
fn add_component(&mut self, component: &Bound<'_, PyAny>) -> PyResult<usize> {
let py_comp = extract_component(component)?;
let boxed = py_comp.build();
let idx = self.inner.add_component(boxed);
Ok(idx.index())
}
/// Add a flow edge from source to target.
///
/// Args:
/// source: Source node index (from ``add_component``).
/// target: Target node index (from ``add_component``).
///
/// Returns:
/// int: The edge index.
fn add_edge(&mut self, source: usize, target: usize) -> PyResult<usize> {
let src = petgraph::graph::NodeIndex::new(source);
let tgt = petgraph::graph::NodeIndex::new(target);
let edge = self
.inner
.add_edge(src, tgt)
.map_err(|e| crate::errors::TopologyError::new_err(e.to_string()))?;
Ok(edge.index())
}
/// Register a human-readable name for a component node to be used in Constraints.
fn register_component_name(&mut self, name: &str, node_idx: usize) -> PyResult<()> {
let node = petgraph::graph::NodeIndex::new(node_idx);
self.inner.register_component_name(name, node);
Ok(())
}
/// Add a constraint to the system.
fn add_constraint(&mut self, constraint: &PyConstraint) -> PyResult<()> {
self.inner
.add_constraint(constraint.inner.clone())
.map_err(|e| PyValueError::new_err(e.to_string()))
}
/// Add a bounded variable to the system.
fn add_bounded_variable(&mut self, variable: &PyBoundedVariable) -> PyResult<()> {
self.inner
.add_bounded_variable(variable.inner.clone())
.map_err(|e| PyValueError::new_err(e.to_string()))
}
/// Link a constraint to a control variable for the inverse solver.
fn link_constraint_to_control(
&mut self,
constraint_id: &str,
control_id: &str,
) -> PyResult<()> {
use entropyk_solver::inverse::{BoundedVariableId, ConstraintId};
self.inner
.link_constraint_to_control(
&ConstraintId::new(constraint_id),
&BoundedVariableId::new(control_id),
)
.map_err(|e| PyValueError::new_err(e.to_string()))
}
/// Finalize the system graph: build state index mapping and validate topology.
///
/// Must be called before ``solve()``.
fn finalize(&mut self) -> PyResult<()> {
self.inner
.finalize()
.map_err(|e| crate::errors::TopologyError::new_err(e.to_string()))
}
/// Number of nodes (components) in the system graph.
#[getter]
fn node_count(&self) -> usize {
self.inner.node_count()
}
/// Number of edges in the system graph.
#[getter]
fn edge_count(&self) -> usize {
self.inner.edge_count()
}
/// Length of the state vector after finalization.
#[getter]
fn state_vector_len(&self) -> usize {
self.inner.state_vector_len()
}
fn __repr__(&self) -> String {
format!(
"System(nodes={}, edges={})",
self.inner.node_count(),
self.inner.edge_count()
)
}
}
/// Extract a Python component wrapper into our internal enum.
fn extract_component(obj: &Bound<'_, PyAny>) -> PyResult<AnyPyComponent> {
if let Ok(c) = obj.extract::<crate::components::PyCompressor>() {
return Ok(AnyPyComponent::Compressor(c));
}
if let Ok(c) = obj.extract::<crate::components::PyCondenser>() {
return Ok(AnyPyComponent::Condenser(c));
}
if let Ok(c) = obj.extract::<crate::components::PyEvaporator>() {
return Ok(AnyPyComponent::Evaporator(c));
}
if let Ok(c) = obj.extract::<crate::components::PyEconomizer>() {
return Ok(AnyPyComponent::Economizer(c));
}
if let Ok(c) = obj.extract::<crate::components::PyExpansionValve>() {
return Ok(AnyPyComponent::ExpansionValve(c));
}
if let Ok(c) = obj.extract::<crate::components::PyPipe>() {
return Ok(AnyPyComponent::Pipe(c));
}
if let Ok(c) = obj.extract::<crate::components::PyPump>() {
return Ok(AnyPyComponent::Pump(c));
}
if let Ok(c) = obj.extract::<crate::components::PyFan>() {
return Ok(AnyPyComponent::Fan(c));
}
if let Ok(c) = obj.extract::<crate::components::PyFlowSplitter>() {
return Ok(AnyPyComponent::FlowSplitter(c));
}
if let Ok(c) = obj.extract::<crate::components::PyFlowMerger>() {
return Ok(AnyPyComponent::FlowMerger(c));
}
if let Ok(c) = obj.extract::<crate::components::PyFlowSource>() {
return Ok(AnyPyComponent::FlowSource(c));
}
if let Ok(c) = obj.extract::<crate::components::PyFlowSink>() {
return Ok(AnyPyComponent::FlowSink(c));
}
Err(PyValueError::new_err(
"Expected a component (Compressor, Condenser, Evaporator, ExpansionValve, Pipe, Pump, Fan, Economizer, FlowSplitter, FlowMerger, FlowSource, FlowSink)",
))
}
/// Convert a `SolverError` into a Python exception using the appropriate type.
fn solver_error_to_pyerr(err: entropyk_solver::SolverError) -> PyErr {
let msg = err.to_string();
match &err {
entropyk_solver::SolverError::Timeout { .. } => crate::errors::TimeoutError::new_err(msg),
_ => crate::errors::SolverError::new_err(msg),
}
}
// =============================================================================
// Supporting Types (Story 6.6)
// =============================================================================
#[pyclass(name = "ConvergenceCriteria", module = "entropyk")]
#[derive(Clone, Default)]
pub struct PyConvergenceCriteria {
pub(crate) inner: entropyk_solver::criteria::ConvergenceCriteria,
}
#[pymethods]
impl PyConvergenceCriteria {
#[new]
#[pyo3(signature = (pressure_tolerance_pa=1.0, mass_balance_tolerance_kgs=1e-9, energy_balance_tolerance_w=1e-3))]
fn new(
pressure_tolerance_pa: f64,
mass_balance_tolerance_kgs: f64,
energy_balance_tolerance_w: f64,
) -> Self {
PyConvergenceCriteria {
inner: entropyk_solver::criteria::ConvergenceCriteria {
pressure_tolerance_pa,
mass_balance_tolerance_kgs,
energy_balance_tolerance_w,
},
}
}
#[getter]
fn pressure_tolerance_pa(&self) -> f64 {
self.inner.pressure_tolerance_pa
}
#[getter]
fn mass_balance_tolerance_kgs(&self) -> f64 {
self.inner.mass_balance_tolerance_kgs
}
#[getter]
fn energy_balance_tolerance_w(&self) -> f64 {
self.inner.energy_balance_tolerance_w
}
fn __repr__(&self) -> String {
format!(
"ConvergenceCriteria(dP={:.1e} Pa, dM={:.1e} kg/s, dE={:.1e} W)",
self.inner.pressure_tolerance_pa,
self.inner.mass_balance_tolerance_kgs,
self.inner.energy_balance_tolerance_w
)
}
}
#[pyclass(name = "JacobianFreezingConfig", module = "entropyk")]
#[derive(Clone, Default)]
pub struct PyJacobianFreezingConfig {
pub(crate) inner: entropyk_solver::solver::JacobianFreezingConfig,
}
#[pymethods]
impl PyJacobianFreezingConfig {
#[new]
#[pyo3(signature = (max_frozen_iters=3, threshold=0.1))]
fn new(max_frozen_iters: usize, threshold: f64) -> Self {
PyJacobianFreezingConfig {
inner: entropyk_solver::solver::JacobianFreezingConfig {
max_frozen_iters,
threshold,
},
}
}
#[getter]
fn max_frozen_iters(&self) -> usize {
self.inner.max_frozen_iters
}
#[getter]
fn threshold(&self) -> f64 {
self.inner.threshold
}
fn __repr__(&self) -> String {
format!(
"JacobianFreezingConfig(max_iters={}, threshold={:.2})",
self.inner.max_frozen_iters, self.inner.threshold
)
}
}
#[pyclass(name = "TimeoutConfig", module = "entropyk")]
#[derive(Clone, Default)]
pub struct PyTimeoutConfig {
pub(crate) inner: entropyk_solver::solver::TimeoutConfig,
}
#[pymethods]
impl PyTimeoutConfig {
#[new]
#[pyo3(signature = (return_best_state_on_timeout=true, zoh_fallback=false))]
fn new(return_best_state_on_timeout: bool, zoh_fallback: bool) -> Self {
PyTimeoutConfig {
inner: entropyk_solver::solver::TimeoutConfig {
return_best_state_on_timeout,
zoh_fallback,
},
}
}
#[getter]
fn return_best_state_on_timeout(&self) -> bool {
self.inner.return_best_state_on_timeout
}
#[getter]
fn zoh_fallback(&self) -> bool {
self.inner.zoh_fallback
}
fn __repr__(&self) -> String {
format!(
"TimeoutConfig(return_best={}, zoh={})",
self.inner.return_best_state_on_timeout, self.inner.zoh_fallback
)
}
}
// =============================================================================
// NewtonConfig
// =============================================================================
/// Configuration for the Newton-Raphson solver.
///
/// Example::
///
/// config = NewtonConfig(max_iterations=100, tolerance=1e-6)
/// result = config.solve(system)
#[pyclass(name = "NewtonConfig", module = "entropyk")]
#[derive(Clone, Default)]
pub struct PyNewtonConfig {
#[pyo3(get, set)]
pub(crate) max_iterations: usize,
#[pyo3(get, set)]
pub(crate) tolerance: f64,
#[pyo3(get, set)]
pub(crate) line_search: bool,
#[pyo3(get, set)]
pub(crate) timeout_ms: Option<u64>,
#[pyo3(get, set)]
pub(crate) initial_state: Option<Vec<f64>>,
#[pyo3(get, set)]
pub(crate) use_numerical_jacobian: bool,
#[pyo3(get, set)]
pub(crate) jacobian_freezing: Option<PyJacobianFreezingConfig>,
#[pyo3(get, set)]
pub(crate) convergence_criteria: Option<PyConvergenceCriteria>,
#[pyo3(get, set)]
pub(crate) timeout_config: PyTimeoutConfig,
#[pyo3(get, set)]
pub(crate) previous_state: Option<Vec<f64>>,
#[pyo3(get, set)]
pub(crate) line_search_armijo_c: f64,
#[pyo3(get, set)]
pub(crate) line_search_max_backtracks: usize,
#[pyo3(get, set)]
pub(crate) divergence_threshold: f64,
}
#[pymethods]
impl PyNewtonConfig {
#[new]
#[pyo3(signature = (
max_iterations=100,
tolerance=1e-6,
line_search=false,
timeout_ms=None,
initial_state=None,
use_numerical_jacobian=false,
jacobian_freezing=None,
convergence_criteria=None,
timeout_config=None,
previous_state=None,
line_search_armijo_c=1e-4,
line_search_max_backtracks=20,
divergence_threshold=1e10
))]
fn new(
max_iterations: usize,
tolerance: f64,
line_search: bool,
timeout_ms: Option<u64>,
initial_state: Option<Vec<f64>>,
use_numerical_jacobian: bool,
jacobian_freezing: Option<PyJacobianFreezingConfig>,
convergence_criteria: Option<PyConvergenceCriteria>,
timeout_config: Option<PyTimeoutConfig>,
previous_state: Option<Vec<f64>>,
line_search_armijo_c: f64,
line_search_max_backtracks: usize,
divergence_threshold: f64,
) -> PyResult<Self> {
if tolerance <= 0.0 {
return Err(PyValueError::new_err("tolerance must be greater than 0"));
}
if divergence_threshold <= tolerance {
return Err(PyValueError::new_err("divergence_threshold must be greater than tolerance"));
}
Ok(PyNewtonConfig {
max_iterations,
tolerance,
line_search,
timeout_ms,
initial_state,
use_numerical_jacobian,
jacobian_freezing,
convergence_criteria,
timeout_config: timeout_config.unwrap_or_default(),
previous_state,
line_search_armijo_c,
line_search_max_backtracks,
divergence_threshold,
})
}
/// Solve the system. Returns a ConvergedState on success.
///
/// The GIL is released during solving so other Python threads can run.
///
/// Args:
/// system: A finalized System.
///
/// Returns:
/// ConvergedState: The solution.
///
/// Raises:
/// SolverError: If the solver fails to converge.
/// TimeoutError: If the solver times out.
fn solve(&self, system: &mut PySystem) -> PyResult<PyConvergedState> {
let mut config = entropyk_solver::NewtonConfig {
max_iterations: self.max_iterations,
tolerance: self.tolerance,
line_search: self.line_search,
timeout: self.timeout_ms.map(Duration::from_millis),
use_numerical_jacobian: self.use_numerical_jacobian,
line_search_armijo_c: self.line_search_armijo_c,
line_search_max_backtracks: self.line_search_max_backtracks,
divergence_threshold: self.divergence_threshold,
timeout_config: self.timeout_config.inner.clone(),
previous_state: self.previous_state.clone(),
previous_residual: None,
initial_state: self.initial_state.clone(),
convergence_criteria: self.convergence_criteria.as_ref().map(|cc| cc.inner.clone()),
jacobian_freezing: self.jacobian_freezing.as_ref().map(|jf| jf.inner.clone()),
verbose_config: Default::default(),
};
// Catch any Rust panic to prevent it from reaching Python (Task 5.4)
use entropyk_solver::Solver;
let solve_result =
panic::catch_unwind(panic::AssertUnwindSafe(|| config.solve(&mut system.inner)));
match solve_result {
Ok(Ok(converged)) => Ok(PyConvergedState::from_rust(converged)),
Ok(Err(e)) => Err(solver_error_to_pyerr(e)),
Err(_) => Err(PyRuntimeError::new_err(
"Internal error: solver panicked. This is a bug — please report it.",
)),
}
}
fn __repr__(&self) -> String {
format!(
"NewtonConfig(max_iter={}, tol={:.1e}, line_search={})",
self.max_iterations, self.tolerance, self.line_search
)
}
}
// =============================================================================
// PicardConfig
// =============================================================================
/// Configuration for the Picard (Sequential Substitution) solver.
///
/// Example::
///
/// config = PicardConfig(max_iterations=500, tolerance=1e-4)
/// result = config.solve(system)
#[pyclass(name = "PicardConfig", module = "entropyk")]
#[derive(Clone, Default)]
pub struct PyPicardConfig {
#[pyo3(get, set)]
pub(crate) max_iterations: usize,
#[pyo3(get, set)]
pub(crate) tolerance: f64,
#[pyo3(get, set)]
pub(crate) relaxation: f64,
#[pyo3(get, set)]
pub(crate) initial_state: Option<Vec<f64>>,
#[pyo3(get, set)]
pub(crate) timeout_ms: Option<u64>,
#[pyo3(get, set)]
pub(crate) convergence_criteria: Option<PyConvergenceCriteria>,
}
#[pymethods]
impl PyPicardConfig {
#[new]
#[pyo3(signature = (
max_iterations=500,
tolerance=1e-4,
relaxation=0.5,
initial_state=None,
timeout_ms=None,
convergence_criteria=None
))]
fn new(
max_iterations: usize,
tolerance: f64,
relaxation: f64,
initial_state: Option<Vec<f64>>,
timeout_ms: Option<u64>,
convergence_criteria: Option<PyConvergenceCriteria>,
) -> PyResult<Self> {
if tolerance <= 0.0 {
return Err(PyValueError::new_err("tolerance must be greater than 0"));
}
if !(0.0..=1.0).contains(&relaxation) {
return Err(PyValueError::new_err(
"relaxation must be between 0.0 and 1.0",
));
}
Ok(PyPicardConfig {
max_iterations,
tolerance,
relaxation,
initial_state,
timeout_ms,
convergence_criteria,
})
}
/// Solve the system using Picard iteration. Returns a ConvergedState on success.
///
/// The GIL is released during solving so other Python threads can run.
///
/// Args:
/// system: A finalized System.
///
/// Returns:
/// ConvergedState: The solution.
///
/// Raises:
/// SolverError: If the solver fails to converge.
fn solve(&self, system: &mut PySystem) -> PyResult<PyConvergedState> {
let mut config = entropyk_solver::PicardConfig {
max_iterations: self.max_iterations,
tolerance: self.tolerance,
relaxation_factor: self.relaxation,
timeout: self.timeout_ms.map(Duration::from_millis),
initial_state: self.initial_state.clone(),
convergence_criteria: self.convergence_criteria.as_ref().map(|cc| cc.inner.clone()),
..Default::default()
};
use entropyk_solver::Solver;
let solve_result =
panic::catch_unwind(panic::AssertUnwindSafe(|| config.solve(&mut system.inner)));
match solve_result {
Ok(Ok(converged)) => Ok(PyConvergedState::from_rust(converged)),
Ok(Err(e)) => Err(solver_error_to_pyerr(e)),
Err(_) => Err(PyRuntimeError::new_err(
"Internal error: solver panicked. This is a bug — please report it.",
)),
}
}
fn __repr__(&self) -> String {
format!(
"PicardConfig(max_iter={}, tol={:.1e}, relax={:.2})",
self.max_iterations, self.tolerance, self.relaxation
)
}
}
// =============================================================================
// FallbackConfig
// =============================================================================
/// Configuration for the fallback solver (Newton → Picard).
///
/// Starts with Newton-Raphson and falls back to Picard on divergence.
///
/// Example::
///
/// config = FallbackConfig()
/// result = config.solve(system)
#[pyclass(name = "FallbackConfig", module = "entropyk")]
#[derive(Clone)]
pub struct PyFallbackConfig {
pub(crate) newton: PyNewtonConfig,
pub(crate) picard: PyPicardConfig,
}
#[pymethods]
impl PyFallbackConfig {
#[new]
#[pyo3(signature = (newton=None, picard=None))]
fn new(newton: Option<PyNewtonConfig>, picard: Option<PyPicardConfig>) -> PyResult<Self> {
Ok(PyFallbackConfig {
newton: newton.unwrap_or_else(|| PyNewtonConfig::new(100, 1e-6, false, None, None, false, None, None, None, None, 1e-4, 20, 1e10).unwrap()),
picard: picard.unwrap_or_else(|| PyPicardConfig::new(500, 1e-4, 0.5, None, None, None).unwrap()),
})
}
/// Solve the system using fallback strategy (Newton → Picard).
///
/// The GIL is released during solving so other Python threads can run.
///
/// Args:
/// system: A finalized System.
///
/// Returns:
/// ConvergedState: The solution.
///
/// Raises:
/// SolverError: If both solvers fail to converge.
fn solve(&self, system: &mut PySystem) -> PyResult<PyConvergedState> {
let newton_config = entropyk_solver::NewtonConfig {
max_iterations: self.newton.max_iterations,
tolerance: self.newton.tolerance,
line_search: self.newton.line_search,
timeout: self.newton.timeout_ms.map(Duration::from_millis),
use_numerical_jacobian: self.newton.use_numerical_jacobian,
line_search_armijo_c: self.newton.line_search_armijo_c,
line_search_max_backtracks: self.newton.line_search_max_backtracks,
divergence_threshold: self.newton.divergence_threshold,
timeout_config: self.newton.timeout_config.inner.clone(),
previous_state: self.newton.previous_state.clone(),
previous_residual: None,
initial_state: self.newton.initial_state.clone(),
convergence_criteria: self.newton.convergence_criteria.as_ref().map(|cc| cc.inner.clone()),
jacobian_freezing: self.newton.jacobian_freezing.as_ref().map(|jf| jf.inner.clone()),
verbose_config: Default::default(),
};
let picard_config = entropyk_solver::PicardConfig {
max_iterations: self.picard.max_iterations,
tolerance: self.picard.tolerance,
relaxation_factor: self.picard.relaxation,
timeout: self.picard.timeout_ms.map(Duration::from_millis),
initial_state: self.picard.initial_state.clone(),
convergence_criteria: self.picard.convergence_criteria.as_ref().map(|cc| cc.inner.clone()),
..Default::default()
};
let mut fallback =
entropyk_solver::FallbackSolver::new(entropyk_solver::FallbackConfig::default())
.with_newton_config(newton_config)
.with_picard_config(picard_config);
use entropyk_solver::Solver;
let solve_result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
fallback.solve(&mut system.inner)
}));
match solve_result {
Ok(Ok(converged)) => Ok(PyConvergedState::from_rust(converged)),
Ok(Err(e)) => Err(solver_error_to_pyerr(e)),
Err(_) => Err(PyRuntimeError::new_err(
"Internal error: solver panicked. This is a bug — please report it.",
)),
}
}
fn __repr__(&self) -> String {
format!(
"FallbackConfig(newton={}, picard={})",
self.newton.__repr__(),
self.picard.__repr__()
)
}
}
// =============================================================================
// ConvergenceStatus
// =============================================================================
/// Convergence status of a completed solve.
#[pyclass(name = "ConvergenceStatus", module = "entropyk")]
#[derive(Clone)]
pub struct PyConvergenceStatus {
inner: entropyk_solver::ConvergenceStatus,
}
#[pymethods]
impl PyConvergenceStatus {
/// Whether the solver fully converged.
#[getter]
fn converged(&self) -> bool {
matches!(
self.inner,
entropyk_solver::ConvergenceStatus::Converged
| entropyk_solver::ConvergenceStatus::ControlSaturation
)
}
fn __repr__(&self) -> String {
format!("{:?}", self.inner)
}
fn __str__(&self) -> String {
match &self.inner {
entropyk_solver::ConvergenceStatus::Converged => "Converged".to_string(),
entropyk_solver::ConvergenceStatus::TimedOutWithBestState => "TimedOut".to_string(),
entropyk_solver::ConvergenceStatus::ControlSaturation => {
"ControlSaturation".to_string()
}
}
}
fn __eq__(&self, other: &str) -> bool {
match other {
"Converged" => matches!(self.inner, entropyk_solver::ConvergenceStatus::Converged),
"TimedOut" => matches!(
self.inner,
entropyk_solver::ConvergenceStatus::TimedOutWithBestState
),
"ControlSaturation" => {
matches!(
self.inner,
entropyk_solver::ConvergenceStatus::ControlSaturation
)
}
_ => false,
}
}
}
// =============================================================================
// ConvergedState
// =============================================================================
/// Result of a solved system.
///
/// Attributes:
/// state_vector (list[float]): Final state vector [P0, h0, P1, h1, ...].
/// iterations (int): Number of solver iterations.
/// final_residual (float): L2 norm of the final residual.
/// status (ConvergenceStatus): Convergence status.
/// is_converged (bool): True if fully converged.
#[pyclass(name = "ConvergedState", module = "entropyk")]
#[derive(Clone)]
pub struct PyConvergedState {
state: Vec<f64>,
iterations: usize,
final_residual: f64,
status: entropyk_solver::ConvergenceStatus,
}
impl PyConvergedState {
pub(crate) fn from_rust(cs: entropyk_solver::ConvergedState) -> Self {
PyConvergedState {
state: cs.state,
iterations: cs.iterations,
final_residual: cs.final_residual,
status: cs.status,
}
}
}
#[pymethods]
impl PyConvergedState {
/// Final state vector as a Python list of floats.
#[getter]
fn state_vector(&self) -> Vec<f64> {
self.state.clone()
}
/// Number of iterations performed.
#[getter]
fn iterations(&self) -> usize {
self.iterations
}
/// L2 norm of the final residual vector.
#[getter]
fn final_residual(&self) -> f64 {
self.final_residual
}
/// Convergence status.
#[getter]
fn status(&self) -> PyConvergenceStatus {
PyConvergenceStatus {
inner: self.status.clone(),
}
}
/// True if the solver fully converged.
#[getter]
fn is_converged(&self) -> bool {
matches!(
self.status,
entropyk_solver::ConvergenceStatus::Converged
| entropyk_solver::ConvergenceStatus::ControlSaturation
)
}
fn __repr__(&self) -> String {
format!(
"ConvergedState(status={:?}, iterations={}, residual={:.2e})",
self.status, self.iterations, self.final_residual
)
}
/// Return the state vector as a NumPy array (zero-copy when possible).
///
/// Returns:
/// numpy.ndarray: 1-D float64 array of state values.
fn to_numpy<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, numpy::PyArray1<f64>>> {
Ok(numpy::PyArray1::from_vec(py, self.state.clone()))
}
}
// =============================================================================
// SolverStrategy
// =============================================================================
#[pyclass(name = "SolverStrategy", module = "entropyk")]
#[derive(Clone)]
pub struct PySolverStrategy {
pub(crate) inner: entropyk_solver::SolverStrategy,
}
#[pymethods]
impl PySolverStrategy {
#[staticmethod]
#[pyo3(signature = (
max_iterations=100,
tolerance=1e-6,
line_search=false,
timeout_ms=None,
initial_state=None,
use_numerical_jacobian=false,
jacobian_freezing=None,
convergence_criteria=None,
timeout_config=None,
previous_state=None,
line_search_armijo_c=1e-4,
line_search_max_backtracks=20,
divergence_threshold=1e10
))]
fn newton(
max_iterations: usize,
tolerance: f64,
line_search: bool,
timeout_ms: Option<u64>,
initial_state: Option<Vec<f64>>,
use_numerical_jacobian: bool,
jacobian_freezing: Option<PyJacobianFreezingConfig>,
convergence_criteria: Option<PyConvergenceCriteria>,
timeout_config: Option<PyTimeoutConfig>,
previous_state: Option<Vec<f64>>,
line_search_armijo_c: f64,
line_search_max_backtracks: usize,
divergence_threshold: f64,
) -> PyResult<Self> {
let py_config = PyNewtonConfig::new(
max_iterations, tolerance, line_search, timeout_ms, initial_state,
use_numerical_jacobian, jacobian_freezing, convergence_criteria,
timeout_config, previous_state, line_search_armijo_c,
line_search_max_backtracks, divergence_threshold,
)?;
let config = entropyk_solver::NewtonConfig {
max_iterations: py_config.max_iterations,
tolerance: py_config.tolerance,
line_search: py_config.line_search,
timeout: py_config.timeout_ms.map(Duration::from_millis),
use_numerical_jacobian: py_config.use_numerical_jacobian,
line_search_armijo_c: py_config.line_search_armijo_c,
line_search_max_backtracks: py_config.line_search_max_backtracks,
divergence_threshold: py_config.divergence_threshold,
timeout_config: py_config.timeout_config.inner.clone(),
previous_state: py_config.previous_state.clone(),
previous_residual: None,
initial_state: py_config.initial_state.clone(),
convergence_criteria: py_config.convergence_criteria.as_ref().map(|cc| cc.inner.clone()),
jacobian_freezing: py_config.jacobian_freezing.as_ref().map(|jf| jf.inner.clone()),
verbose_config: Default::default(),
};
Ok(PySolverStrategy {
inner: entropyk_solver::SolverStrategy::NewtonRaphson(config),
})
}
#[staticmethod]
#[pyo3(signature = (
max_iterations=500,
tolerance=1e-4,
relaxation=0.5,
initial_state=None,
timeout_ms=None,
convergence_criteria=None
))]
fn picard(
max_iterations: usize,
tolerance: f64,
relaxation: f64,
initial_state: Option<Vec<f64>>,
timeout_ms: Option<u64>,
convergence_criteria: Option<PyConvergenceCriteria>,
) -> PyResult<Self> {
let py_config = PyPicardConfig::new(
max_iterations, tolerance, relaxation, initial_state, timeout_ms, convergence_criteria
)?;
let config = entropyk_solver::PicardConfig {
max_iterations: py_config.max_iterations,
tolerance: py_config.tolerance,
relaxation_factor: py_config.relaxation,
timeout: py_config.timeout_ms.map(Duration::from_millis),
initial_state: py_config.initial_state.clone(),
convergence_criteria: py_config.convergence_criteria.as_ref().map(|cc| cc.inner.clone()),
..Default::default()
};
Ok(PySolverStrategy {
inner: entropyk_solver::SolverStrategy::SequentialSubstitution(config),
})
}
#[staticmethod]
fn default() -> Self {
PySolverStrategy {
inner: entropyk_solver::SolverStrategy::default(),
}
}
fn solve(&mut self, system: &mut PySystem) -> PyResult<PyConvergedState> {
use entropyk_solver::Solver;
let solve_result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
self.inner.solve(&mut system.inner)
}));
match solve_result {
Ok(Ok(converged)) => Ok(PyConvergedState::from_rust(converged)),
Ok(Err(e)) => Err(solver_error_to_pyerr(e)),
Err(_) => Err(PyRuntimeError::new_err(
"Internal error: solver panicked. This is a bug — please report it.",
)),
}
}
}