Subclass MeasurementModel in Python to define any nonlinear measurement function. The Rust EKF calls your Python predict() method during filtering, and computes the measurement Jacobian via finite differences automatically unless you provide an analytical override.
importnumpyasnpimportbraheasbhbh.initialize_eop()# Define a custom range measurement modelclassRangeModel(bh.MeasurementModel):"""Measures distance from a ground station to the satellite."""def__init__(self,station_eci,sigma):super().__init__()self.station_eci=np.array(station_eci)self.sigma=sigmadefpredict(self,epoch,state):pos=state[:3]returnnp.array([np.linalg.norm(pos-self.station_eci)])defnoise_covariance(self):returnnp.array([[self.sigma**2]])defmeasurement_dim(self):return1defname(self):return"Range"# Set up orbit and truth propagatorepoch=bh.Epoch(2024,1,1,0,0,0.0)r=bh.R_EARTH+500e3v=(bh.GM_EARTH/r)**0.5true_state=np.array([r,0.0,0.0,0.0,v,0.0])truth_prop=bh.NumericalOrbitPropagator(epoch,true_state,bh.NumericalPropagationConfig.default(),bh.ForceModelConfig.two_body(),)# Ground station (approximately on the equator)station=np.array([bh.R_EARTH,0.0,0.0])# Create EKF with both a built-in model and our custom range modelposition_model=bh.InertialPositionMeasurementModel(10.0)range_model=RangeModel(station,100.0)initial_state=true_state.copy()initial_state[0]+=500.0# 500m position offsetp0=np.diag([1e6,1e6,1e6,1e2,1e2,1e2])ekf=bh.ExtendedKalmanFilter(epoch,initial_state,p0,measurement_models=[position_model,range_model],propagation_config=bh.NumericalPropagationConfig.default(),force_config=bh.ForceModelConfig.two_body(),)# Alternate between position and range observationsdt=60.0foriinrange(1,21):obs_epoch=epoch+dt*itruth_prop.propagate_to(obs_epoch)truth_st=truth_prop.current_state()ifi%2==0:# Position observation (model_index=0)obs=bh.Observation(obs_epoch,truth_st[:3],model_index=0)else:# Range observation (model_index=1)true_range=np.linalg.norm(truth_st[:3]-station)obs=bh.Observation(obs_epoch,np.array([true_range]),model_index=1)record=ekf.process_observation(obs)print(f" {record.measurement_name:20s} prefit residual norm: "f"{np.linalg.norm(record.prefit_residual):.3f}")# Summarytruth_prop.propagate_to(ekf.current_epoch())pos_error=np.linalg.norm(ekf.current_state()[:3]-truth_prop.current_state()[:3])print(f"\nFinal position error: {pos_error:.2f} m")print(f"Records: {len(ekf.records())} "f"(InertialPosition: {sum(1forrinekf.records()ifr.measurement_name=='InertialPosition')}, "f"Range: {sum(1forrinekf.records()ifr.measurement_name=='Range')})")
usebraheasbh;usenalgebra::{DMatrix,DVector};/// Custom range measurement model: distance from a station to the satellite.structRangeModel{station_eci:DVector<f64>,noise_cov:DMatrix<f64>,}implRangeModel{fnnew(station_eci:DVector<f64>,sigma:f64)->Self{letnoise_cov=DMatrix::from_element(1,1,sigma*sigma);Self{station_eci,noise_cov}}}implbh::estimation::MeasurementModelforRangeModel{fnpredict(&self,_epoch:&bh::time::Epoch,state:&DVector<f64>,_params:Option<&DVector<f64>>,)->Result<DVector<f64>,bh::utils::errors::BraheError>{letrange=(state.rows(0,3)-&self.station_eci).norm();Ok(DVector::from_vec(vec![range]))}fnnoise_covariance(&self)->DMatrix<f64>{self.noise_cov.clone()}fnmeasurement_dim(&self)->usize{1}fnname(&self)->&str{"Range"}}fnmain(){bh::initialize_eop().unwrap();// Set up orbit and truth propagatorletepoch=bh::time::Epoch::from_datetime(2024,1,1,0,0,0.0,0.0,bh::time::TimeSystem::UTC);letr=bh::constants::physical::R_EARTH+500e3;letv=(bh::constants::physical::GM_EARTH/r).sqrt();lettrue_state=DVector::from_vec(vec![r,0.0,0.0,0.0,v,0.0]);letmuttruth_prop=bh::propagators::DNumericalOrbitPropagator::new(epoch,true_state.clone(),bh::propagators::NumericalPropagationConfig::default(),bh::propagators::force_model_config::ForceModelConfig::two_body_gravity(),None,None,None,None,).unwrap();// Ground station on the equatorletstation=DVector::from_vec(vec![bh::constants::physical::R_EARTH,0.0,0.0]);// Create EKF with both a built-in model and a custom range modelletposition_model=bh::estimation::InertialPositionMeasurementModel::new(10.0);letrange_model=RangeModel::new(station.clone(),100.0);letmutinitial_state=true_state.clone();initial_state[0]+=500.0;letp0=DMatrix::from_diagonal(&DVector::from_vec(vec![1e6,1e6,1e6,1e2,1e2,1e2,]));letmodels:Vec<Box<dynbh::estimation::MeasurementModel>>=vec![Box::new(position_model),Box::new(range_model),];letmutekf=bh::estimation::ExtendedKalmanFilter::new(epoch,initial_state,p0,bh::propagators::NumericalPropagationConfig::default(),bh::propagators::force_model_config::ForceModelConfig::two_body_gravity(),None,None,None,models,bh::estimation::EKFConfig::default(),).unwrap();// Alternate between position and range observationsletdt=60.0;foriin1..=20{letobs_epoch=epoch+dt*iasf64;truth_prop.propagate_to(obs_epoch);lettruth_st=truth_prop.current_state();letobs=ifi%2==0{// Position observation (model_index=0)bh::estimation::Observation::new(obs_epoch,truth_st.rows(0,3).into_owned(),0)}else{// Range observation (model_index=1)lettrue_range=(truth_st.rows(0,3)-&station).norm();bh::estimation::Observation::new(obs_epoch,DVector::from_vec(vec![true_range]),1)};letrecord=ekf.process_observation(&obs).unwrap();println!(" {:20} prefit residual norm: {:.3}",record.measurement_name,record.prefit_residual.norm());}// Summaryusebh::propagators::traits::DStatePropagator;truth_prop.propagate_to(ekf.current_epoch());letpos_error=(ekf.current_state().rows(0,3)-truth_prop.current_state().rows(0,3)).norm();println!("\nFinal position error: {:.2} m",pos_error);println!("Records: {} (InertialPosition: {}, Range: {})",ekf.records().len(),ekf.records().iter().filter(|r|r.measurement_name=="InertialPosition").count(),ekf.records().iter().filter(|r|r.measurement_name=="Range").count());}
Range prefit residual norm: 365.806
InertialPosition prefit residual norm: 354.296
Range prefit residual norm: 47.384
InertialPosition prefit residual norm: 80.298
Range prefit residual norm: 0.007
InertialPosition prefit residual norm: 0.202
Range prefit residual norm: 0.004
InertialPosition prefit residual norm: 0.085
Range prefit residual norm: 0.007
InertialPosition prefit residual norm: 0.051
Range prefit residual norm: 0.007
InertialPosition prefit residual norm: 0.035
Range prefit residual norm: 0.008
InertialPosition prefit residual norm: 0.027
Range prefit residual norm: 0.008
InertialPosition prefit residual norm: 0.021
Range prefit residual norm: 0.008
InertialPosition prefit residual norm: 0.018
Range prefit residual norm: 0.008
InertialPosition prefit residual norm: 0.015
Final position error: 0.01 m
Records: 20 (InertialPosition: 10, Range: 10)
The RangeModel above measures the Euclidean distance from a ground station to the satellite. It implements four required methods and relies on the default finite-difference Jacobian. The EKF processes range observations alongside built-in position observations by assigning different model_index values.
Every custom model must implement these four methods:
predict(epoch, state) -> numpy.ndarray -- compute the predicted measurement \(h(\mathbf{x}, t)\). The epoch is a brahe.Epoch and state is a 1D numpy array. Return a 1D numpy array of length measurement_dim().
noise_covariance() -> numpy.ndarray -- return the measurement noise covariance matrix \(R\) as a 2D numpy array of shape (m, m). This is called once at construction and cached, so it must not depend on epoch or state.
measurement_dim() -> int -- return the dimension of the measurement vector. Also called once and cached.
name() -> str -- return a human-readable name. This appears in FilterRecord entries and is useful for filtering residuals by model type.
By default, the measurement Jacobian \(H = \partial h / \partial \mathbf{x}\) is computed via central finite differences using the same perturbation strategy as the propagator Jacobians. This calls your predict() method \(2n\) times (where \(n\) is the state dimension), which works but adds Python function-call overhead.
To provide an analytical Jacobian, override jacobian() and return a 2D numpy array:
A single EKF can use multiple measurement models -- both built-in and custom. Each Observation carries a model_index that selects which model processes it:
ekf=bh.ExtendedKalmanFilter(epoch,state,p0,measurement_models=[bh.InertialPositionMeasurementModel(10.0),# index 0RangeModel(station,100.0),# index 1],propagation_config=bh.NumericalPropagationConfig.default(),force_config=bh.ForceModelConfig.two_body(),)# Position observation uses model 0obs_pos=bh.Observation(t1,position_measurement,model_index=0)# Range observation uses model 1obs_range=bh.Observation(t2,np.array([range_km]),model_index=1)
Built-in models passed to the EKF execute entirely in Rust with no Python overhead. Custom Python models incur GIL acquisition on each predict() and jacobian() call. For performance-critical applications with many observations, consider implementing custom models in Rust via the MeasurementModel trait.