OEM-Based Access Prediction
This example demonstrates an end-to-end workflow for computing ground station access windows using a CCSDS Orbit Ephemeris Message (OEM). We'll generate an artificial OEM from an ISS-like orbit, save and reload it from disk, convert it to an OrbitTrajectory, and compute access windows against three ground stations. Finally, we compare the OEM-based results against direct propagator results to validate fidelity.
Setup
Import the necessary libraries and initialize Earth orientation parameters.
| import brahe as bh
import numpy as np
from brahe.ccsds import OEM
bh.initialize_eop()
|
Generate OEM
Define an ISS-like orbit (~408 km altitude, 51.6° inclination), propagate it for 24 hours using a Keplerian propagator, and build an OEM with the Cartesian ECI states. Since OEM files store position and velocity data, we sample the propagator's state_eci() method to get Cartesian states regardless of its internal representation.
| # Define an ISS-like orbit: ~408 km altitude, 51.6° inclination
epoch = bh.Epoch.from_datetime(2024, 6, 15, 0, 0, 0.0, 0.0, bh.TimeSystem.UTC)
oe = np.array([bh.R_EARTH + 408e3, 0.0001, 51.6, 15.0, 30.0, 0.0])
# Create a Keplerian propagator with 60-second step size
prop = bh.KeplerianPropagator.from_keplerian(epoch, oe, bh.AngleFormat.DEGREES, 60.0)
# Propagate for 24 hours to generate the full trajectory
end_epoch = epoch + 86400.0
prop.propagate_to(end_epoch)
# Build an OEM from the propagated trajectory
oem = OEM(originator="BRAHE_EXAMPLE")
oem.classification = "unclassified"
oem.message_id = "OEM-ISS-EXAMPLE"
seg_idx = oem.add_segment(
object_name="ISS (ZARYA)",
object_id="1998-067A",
center_name="EARTH",
ref_frame="EME2000",
time_system="UTC",
start_time=epoch,
stop_time=end_epoch,
interpolation="LAGRANGE",
interpolation_degree=7,
)
# Add trajectory states to the OEM — frame conversion is automatic.
# The segment's ref_frame ("EME2000") tells add_trajectory() which frame to use,
# so it works regardless of the propagator's internal representation.
seg = oem.segments[seg_idx]
seg.add_trajectory(prop.trajectory)
print(f"Generated OEM with {seg.num_states} states over 24 hours")
|
Save and Load OEM
Write the OEM to disk in CCSDS KVN format, then load it back to demonstrate the round-trip file I/O workflow.
| # Write the OEM to a file in KVN format
oem_path = "/tmp/brahe_iss_ephemeris.oem"
oem.to_file(oem_path, "KVN")
print(f"Written OEM to {oem_path}")
# Load the OEM back from file
oem_loaded = OEM.from_file(oem_path)
seg_loaded = oem_loaded.segments[0]
print(f"Loaded OEM: object={seg_loaded.object_name}, states={seg_loaded.num_states}")
print(f" Time span: {seg_loaded.start_time} to {seg_loaded.stop_time}")
|
Create OrbitTrajectory
Convert the loaded OEM segment into an OrbitTrajectory object, which can be used directly with Brahe's access computation pipeline.
| # Convert the loaded OEM segment into an OrbitTrajectory
traj = oem_loaded.segment_to_trajectory(0)
traj = traj.with_name("ISS")
print(f"Created OrbitTrajectory '{traj.get_name()}' with {len(traj)} states")
|
Define Ground Stations
Create PointLocation objects for San Francisco, New York, and London.
| # Define ground stations: San Francisco, New York, London
sf = bh.PointLocation(-122.4194, 37.7749, 0.0)
sf.set_name("San Francisco")
nyc = bh.PointLocation(-74.006, 40.7128, 0.0)
nyc.set_name("New York")
london = bh.PointLocation(-0.1276, 51.5074, 0.0)
london.set_name("London")
stations = [sf, nyc, london]
print(f"Defined {len(stations)} ground stations")
|
Compute Access Windows from OEM
Pass the OEM-loaded trajectory directly to location_accesses() with a 10° minimum elevation constraint. This is the key step — OrbitTrajectory objects work as state providers for access computation, just like propagators.
| # Compute access windows using the OEM-loaded trajectory
constraint = bh.ElevationConstraint(min_elevation_deg=10.0)
oem_windows = bh.location_accesses(stations, traj, epoch, end_epoch, constraint)
print(f"OEM trajectory: {len(oem_windows)} access windows found")
|
Compute Reference Windows
For validation, compute access windows using the original Keplerian propagator over the same time window and with the same constraint.
| # Compute access windows directly from the propagator for comparison
direct_windows = bh.location_accesses(stations, prop, epoch, end_epoch, constraint)
print(f"Direct propagator: {len(direct_windows)} access windows found")
|
Compare Results
Compare each OEM-based access window against the corresponding direct propagator window. With 60-second OEM spacing and Lagrange interpolation, we expect sub-5-second timing agreement and sub-0.5° elevation agreement.
| # Compare OEM-based and direct propagator access windows
print(f"\n{'Comparison':=^80}")
print(f"OEM windows: {len(oem_windows)}, Direct windows: {len(direct_windows)}")
assert len(oem_windows) == len(direct_windows), (
f"Window count mismatch: OEM={len(oem_windows)}, Direct={len(direct_windows)}"
)
# Sort both sets by start time for comparison
oem_sorted = sorted(oem_windows, key=lambda w: w.start.mjd())
direct_sorted = sorted(direct_windows, key=lambda w: w.start.mjd())
max_start_diff = 0.0
max_end_diff = 0.0
max_elev_diff = 0.0
print(
f"\n{'Station':<16} {'Start Diff (s)':>14} {'End Diff (s)':>14} {'Elev Diff (°)':>14}"
)
print("-" * 62)
for oem_w, direct_w in zip(oem_sorted, direct_sorted):
start_diff = abs((oem_w.start - direct_w.start))
end_diff = abs((oem_w.end - direct_w.end))
elev_diff = abs(oem_w.properties.elevation_max - direct_w.properties.elevation_max)
max_start_diff = max(max_start_diff, start_diff)
max_end_diff = max(max_end_diff, end_diff)
max_elev_diff = max(max_elev_diff, elev_diff)
print(
f"{oem_w.location_name or 'N/A':<16} "
f"{start_diff:>14.6f} "
f"{end_diff:>14.6f} "
f"{elev_diff:>14.6f}"
)
print(f"\nMax start time difference: {max_start_diff:.6f} s")
print(f"Max end time difference: {max_end_diff:.6f} s")
print(f"Max elevation difference: {max_elev_diff:.6f}°")
# Verify differences are within tolerance
# With 60-second OEM spacing and Lagrange interpolation, sub-5-second timing
# and sub-0.5° elevation agreement demonstrates good fidelity
assert max_start_diff < 5.0, f"Start time diff too large: {max_start_diff:.6f} s"
assert max_end_diff < 5.0, f"End time diff too large: {max_end_diff:.6f} s"
assert max_elev_diff < 0.5, f"Elevation diff too large: {max_elev_diff:.6f}°"
print("\nAll differences within tolerance — OEM round-trip preserves access fidelity.")
|
Display Results
Print all OEM-based access windows grouped by station with summary statistics.
| # Display all OEM-based access windows grouped by station
print(f"\n{'OEM Access Windows':=^80}")
print(
f"{'Station':<16} {'Start (UTC)':<28} {'End (UTC)':<28} {'Dur (min)':>9} {'Max El (°)':>10}"
)
print("-" * 95)
# Group by station
for station_name in ["San Francisco", "New York", "London"]:
station_windows = [w for w in oem_sorted if w.location_name == station_name]
for w in station_windows:
start_str = str(w.start).split(".")[0]
end_str = str(w.end).split(".")[0]
dur_min = w.duration / 60.0
max_elev = w.properties.elevation_max
print(
f"{station_name:<16} {start_str:<28} {end_str:<28} {dur_min:>9.1f} {max_elev:>10.1f}"
)
if station_windows:
avg_dur = sum(w.duration for w in station_windows) / len(station_windows) / 60.0
print(f" => {len(station_windows)} passes, avg duration: {avg_dur:.1f} min")
print()
|
Full Code Example
| oem_access_prediction.py |
|---|
| import brahe as bh
import numpy as np
from brahe.ccsds import OEM
bh.initialize_eop()
# Define an ISS-like orbit: ~408 km altitude, 51.6° inclination
epoch = bh.Epoch.from_datetime(2024, 6, 15, 0, 0, 0.0, 0.0, bh.TimeSystem.UTC)
oe = np.array([bh.R_EARTH + 408e3, 0.0001, 51.6, 15.0, 30.0, 0.0])
# Create a Keplerian propagator with 60-second step size
prop = bh.KeplerianPropagator.from_keplerian(epoch, oe, bh.AngleFormat.DEGREES, 60.0)
# Propagate for 24 hours to generate the full trajectory
end_epoch = epoch + 86400.0
prop.propagate_to(end_epoch)
# Build an OEM from the propagated trajectory
oem = OEM(originator="BRAHE_EXAMPLE")
oem.classification = "unclassified"
oem.message_id = "OEM-ISS-EXAMPLE"
seg_idx = oem.add_segment(
object_name="ISS (ZARYA)",
object_id="1998-067A",
center_name="EARTH",
ref_frame="EME2000",
time_system="UTC",
start_time=epoch,
stop_time=end_epoch,
interpolation="LAGRANGE",
interpolation_degree=7,
)
# Add trajectory states to the OEM — frame conversion is automatic.
# The segment's ref_frame ("EME2000") tells add_trajectory() which frame to use,
# so it works regardless of the propagator's internal representation.
seg = oem.segments[seg_idx]
seg.add_trajectory(prop.trajectory)
print(f"Generated OEM with {seg.num_states} states over 24 hours")
# Write the OEM to a file in KVN format
oem_path = "/tmp/brahe_iss_ephemeris.oem"
oem.to_file(oem_path, "KVN")
print(f"Written OEM to {oem_path}")
# Load the OEM back from file
oem_loaded = OEM.from_file(oem_path)
seg_loaded = oem_loaded.segments[0]
print(f"Loaded OEM: object={seg_loaded.object_name}, states={seg_loaded.num_states}")
print(f" Time span: {seg_loaded.start_time} to {seg_loaded.stop_time}")
# Convert the loaded OEM segment into an OrbitTrajectory
traj = oem_loaded.segment_to_trajectory(0)
traj = traj.with_name("ISS")
print(f"Created OrbitTrajectory '{traj.get_name()}' with {len(traj)} states")
# Define ground stations: San Francisco, New York, London
sf = bh.PointLocation(-122.4194, 37.7749, 0.0)
sf.set_name("San Francisco")
nyc = bh.PointLocation(-74.006, 40.7128, 0.0)
nyc.set_name("New York")
london = bh.PointLocation(-0.1276, 51.5074, 0.0)
london.set_name("London")
stations = [sf, nyc, london]
print(f"Defined {len(stations)} ground stations")
# Compute access windows using the OEM-loaded trajectory
constraint = bh.ElevationConstraint(min_elevation_deg=10.0)
oem_windows = bh.location_accesses(stations, traj, epoch, end_epoch, constraint)
print(f"OEM trajectory: {len(oem_windows)} access windows found")
# Compute access windows directly from the propagator for comparison
direct_windows = bh.location_accesses(stations, prop, epoch, end_epoch, constraint)
print(f"Direct propagator: {len(direct_windows)} access windows found")
# Compare OEM-based and direct propagator access windows
print(f"\n{'Comparison':=^80}")
print(f"OEM windows: {len(oem_windows)}, Direct windows: {len(direct_windows)}")
assert len(oem_windows) == len(direct_windows), (
f"Window count mismatch: OEM={len(oem_windows)}, Direct={len(direct_windows)}"
)
# Sort both sets by start time for comparison
oem_sorted = sorted(oem_windows, key=lambda w: w.start.mjd())
direct_sorted = sorted(direct_windows, key=lambda w: w.start.mjd())
max_start_diff = 0.0
max_end_diff = 0.0
max_elev_diff = 0.0
print(
f"\n{'Station':<16} {'Start Diff (s)':>14} {'End Diff (s)':>14} {'Elev Diff (°)':>14}"
)
print("-" * 62)
for oem_w, direct_w in zip(oem_sorted, direct_sorted):
start_diff = abs((oem_w.start - direct_w.start))
end_diff = abs((oem_w.end - direct_w.end))
elev_diff = abs(oem_w.properties.elevation_max - direct_w.properties.elevation_max)
max_start_diff = max(max_start_diff, start_diff)
max_end_diff = max(max_end_diff, end_diff)
max_elev_diff = max(max_elev_diff, elev_diff)
print(
f"{oem_w.location_name or 'N/A':<16} "
f"{start_diff:>14.6f} "
f"{end_diff:>14.6f} "
f"{elev_diff:>14.6f}"
)
print(f"\nMax start time difference: {max_start_diff:.6f} s")
print(f"Max end time difference: {max_end_diff:.6f} s")
print(f"Max elevation difference: {max_elev_diff:.6f}°")
# Verify differences are within tolerance
# With 60-second OEM spacing and Lagrange interpolation, sub-5-second timing
# and sub-0.5° elevation agreement demonstrates good fidelity
assert max_start_diff < 5.0, f"Start time diff too large: {max_start_diff:.6f} s"
assert max_end_diff < 5.0, f"End time diff too large: {max_end_diff:.6f} s"
assert max_elev_diff < 0.5, f"Elevation diff too large: {max_elev_diff:.6f}°"
print("\nAll differences within tolerance — OEM round-trip preserves access fidelity.")
# Display all OEM-based access windows grouped by station
print(f"\n{'OEM Access Windows':=^80}")
print(
f"{'Station':<16} {'Start (UTC)':<28} {'End (UTC)':<28} {'Dur (min)':>9} {'Max El (°)':>10}"
)
print("-" * 95)
# Group by station
for station_name in ["San Francisco", "New York", "London"]:
station_windows = [w for w in oem_sorted if w.location_name == station_name]
for w in station_windows:
start_str = str(w.start).split(".")[0]
end_str = str(w.end).split(".")[0]
dur_min = w.duration / 60.0
max_elev = w.properties.elevation_max
print(
f"{station_name:<16} {start_str:<28} {end_str:<28} {dur_min:>9.1f} {max_elev:>10.1f}"
)
if station_windows:
avg_dur = sum(w.duration for w in station_windows) / len(station_windows) / 60.0
print(f" => {len(station_windows)} passes, avg duration: {avg_dur:.1f} min")
print()
|
See Also