Skip to content

Calculating Imaging Data Latency

In this example we'll analyze the imaging data latency for a satellite constellation. Data latency is defined as the time between a satellite exiting an imaging region (Area of Interest) and the start of its next ground station contact for data downlink.

For this analysis we'll use the Capella constellation (a commercial SAR imaging constellation) and the KSAT ground station network. The Area of Interest is the continental United States.

This metric is critical for understanding how quickly collected imagery can be delivered to end users, which is a key performance indicator for Earth observation missions.


Setup

First, we'll import the necessary libraries and initialize Earth orientation parameters:

import csv
import os
import pathlib
import time

import brahe as bh
import matplotlib.pyplot as plt
import numpy as np
from shapely.geometry import Point, Polygon

import cartopy.crs as ccrs

bh.initialize_eop()

We download all active satellite TLEs from CelesTrak and filter for Capella satellites:

# Download TLE data for all active satellites and filter for Capella
print("Downloading active satellite TLEs from CelesTrak...")
start_time = time.time()
all_active_props = bh.datasets.celestrak.get_tles_as_propagators("active", 60.0)

# Filter for Capella satellites (name contains "CAPELLA")
capella_props = [
    prop for prop in all_active_props if "CAPELLA" in prop.get_name().upper()
]
elapsed = time.time() - start_time
print(f"Found {len(capella_props)} Capella satellites in {elapsed:.2f} seconds")
for prop in capella_props:
    print(f"  - {prop.get_name()}")

Next, we load the KSAT ground station network:

1
2
3
4
# Load all KSAT ground stations
print("\nLoading KSAT ground stations...")
ksat_stations = bh.datasets.groundstations.load("ksat")
print(f"Loaded {len(ksat_stations)} KSAT ground stations")

Define the Area of Interest

We define the AOI as a polygon covering the continental United States using a detailed GeoJSON outline:

# Define AOI polygon for continental United States (CONUS)
# Using a simplified polygon that captures the main landmass
print("\nDefining continental US AOI polygon...")
aoi_vertices = [
    (-117.1, 32.5),
    (-114.6, 32.7),
    (-111.0, 31.3),
    (-108.2, 31.3),
    (-106.5, 31.8),
    (-103.0, 29.0),
    (-97.1, 25.9),
    (-90.1, 30.9),
    (-80.0, 24.5),
    (-80.1, 31.0),
    (-75.5, 35.2),
    (-73.9, 40.5),
    (-70.0, 41.5),
    (-66.9, 44.8),
    (-67.0, 47.5),
    (-82.5, 41.7),
    (-83.5, 46.0),
    (-92.0, 48.6),
    (-104.0, 49.0),
    (-117.0, 49.0),
    (-124.7, 48.4),
    (-125.0, 42.0),
    (-124.0, 39.0),
    (-117.1, 32.5),  # Close polygon
]

print(f"Defined US AOI with {len(aoi_vertices)} vertices")

# Create PolygonLocation for visualization
aoi_polygon = bh.PolygonLocation(
    [[lon, lat, 0.0] for lon, lat in aoi_vertices]
).with_name("Continental US")

Filter Ground Stations

We filter out ground stations that are inside the AOI. The reasoning is that a satellite cannot begin a downlink pass while it is still over the imaging region - it must first exit the AOI:

# Filter out ground stations that are inside the AOI
# (We assume you can't start a downlink while still over the imaging region)
aoi_shapely = Polygon(aoi_vertices)


def station_in_aoi(station):
    """Check if station is inside AOI using point-in-polygon test."""
    point = Point(station.lon, station.lat)
    return aoi_shapely.contains(point)


# Keep only stations outside the AOI
external_stations = [s for s in ksat_stations if not station_in_aoi(s)]
filtered_count = len(ksat_stations) - len(external_stations)
print(f"\nFiltered out {filtered_count} stations inside AOI")
print(f"Using {len(external_stations)} external stations for downlink:")
for station in external_stations:
    print(f"  - {station.get_name()} ({station.lon:.2f}, {station.lat:.2f})")

Compute AOI Exit Events

Using the AOIExitEvent detector with SGPPropagator, we detect every time a Capella satellite exits the US imaging region:

# Compute AOI exit events for each Capella satellite
print("\nComputing AOI exit events...")
start_time = time.time()

# Define analysis period using the EARLIEST satellite epoch
# (different satellites have different TLE epochs)
epoch_start = min(prop.epoch for prop in capella_props)
epoch_end = epoch_start + 7 * 86400.0  # 7 days

# Add AOI exit event detector to each propagator
for prop in capella_props:
    sat_name = prop.get_name()

    # Create AOI exit event detector
    exit_event = bh.AOIExitEvent.from_coordinates(
        aoi_vertices, f"{sat_name}_AOI_Exit", bh.AngleFormat.DEGREES
    )

    # Add event detector to propagator
    prop.add_event_detector(exit_event)

# Propagate all satellites in parallel
bh.par_propagate_to(capella_props, epoch_end)

# Collect all AOI exit events from all propagators
aoi_exits = []
for prop in capella_props:
    sat_name = prop.get_name()
    for event in prop.event_log():
        if "AOI_Exit" in event.name:
            aoi_exits.append(
                {"satellite": sat_name, "exit_time": event.window_open, "event": event}
            )

elapsed = time.time() - start_time
print(f"Found {len(aoi_exits)} AOI exit events in {elapsed:.2f} seconds")

Compute Ground Contacts

We reset the propagators and use the access computation pipeline to find all ground station contacts over the 7-day period:

# Reset propagators and compute ground contacts
print("\nComputing ground station contacts...")
start_time = time.time()

# Reset all propagators
for prop in capella_props:
    prop.reset()

# Propagate all satellites in parallel for 7 days
bh.par_propagate_to(capella_props, epoch_end)

# Compute access windows with 5 degree minimum elevation
constraint = bh.ElevationConstraint(min_elevation_deg=5.0)
windows = bh.location_accesses(
    external_stations, capella_props, epoch_start, epoch_end, constraint
)

elapsed = time.time() - start_time
print(f"Computed {len(windows)} ground contact windows in {elapsed:.2f} seconds")

Calculate Latencies

For each AOI exit event, we find the next ground contact for that satellite and compute the latency (time difference):

# For each AOI exit, find the next ground contact and compute latency
print("\nComputing imaging data latencies...")
start_time = time.time()

# Group contacts by satellite
satellite_contacts = {}
for window in windows:
    sat_name = window.satellite_name
    if sat_name not in satellite_contacts:
        satellite_contacts[sat_name] = []
    satellite_contacts[sat_name].append(window)

# Sort each satellite's contacts by start time
for sat_name in satellite_contacts:
    satellite_contacts[sat_name].sort(key=lambda w: w.start.jd())

# Calculate latency for each AOI exit
latencies = []
for exit_info in aoi_exits:
    sat_name = exit_info["satellite"]
    exit_time = exit_info["exit_time"]

    # Get contacts for this satellite
    contacts = satellite_contacts.get(sat_name, [])

    # Find the first contact that starts AFTER the exit time
    for contact in contacts:
        if contact.start > exit_time:
            latency = contact.start - exit_time  # Duration in seconds
            latencies.append(
                {
                    "satellite": sat_name,
                    "exit_time": exit_time,
                    "contact_start": contact.start,
                    "contact_end": contact.end,
                    "station": contact.location_name,
                    "latency": latency,
                }
            )
            break

elapsed = time.time() - start_time
print(f"Computed {len(latencies)} latency values in {elapsed:.2f} seconds")

Results

Top 5 Worst Latencies

The table below shows the 5 longest imaging data latencies - these represent the worst-case scenarios for data delivery:

Satellite AOI Exit (UTC) Contact Start (UTC) Station Latency
CAPELLA-15 (ACADIA-5) 2026-01-04 16:34:58 2026-01-04 17:29:23 Bangalore 54m 24s
CAPELLA-15 (ACADIA-5) 2026-01-03 16:27:08 2026-01-03 17:21:14 Bangalore 54m 5s
CAPELLA-16 (ACADIA-6) 2026-01-09 16:27:53 2026-01-09 17:21:57 Bangalore 54m 4s
CAPELLA-17 (ACADIA-7) 2026-01-09 19:01:12 2026-01-09 19:55:13 Bangalore 54m 1s
CAPELLA-15 (ACADIA-5) 2026-01-05 16:42:47 2026-01-05 17:32:20 Mauritius 49m 33s

Latency Statistics

Summary statistics for all imaging data latencies over the 7-day period:

Metric Value
Worst (Maximum) 54m 24s
Average 12m 59s
Median 5m 32s
Best (Minimum) 11s
Total AOI Exits 233
Matched Latencies 233
# Sort latencies by duration (longest first)
latencies.sort(key=lambda x: x["latency"], reverse=True)

# Compute statistics
latency_values = [x["latency"] for x in latencies]
if latency_values:
    worst_latency = max(latency_values)
    best_latency = min(latency_values)
    avg_latency = np.mean(latency_values)
    median_latency = np.median(latency_values)
else:
    worst_latency = best_latency = avg_latency = median_latency = 0.0

print("\n" + "=" * 100)
print("Imaging Data Latency Statistics")
print("=" * 100)
print(f"  Worst (Max):  {bh.format_time_string(worst_latency)}")
print(f"  Best (Min):   {bh.format_time_string(best_latency)}")
print(f"  Average:      {bh.format_time_string(avg_latency)}")
print(f"  Median:       {bh.format_time_string(median_latency)}")
print("=" * 100)

# Print top 5 worst latencies
print("\n" + "=" * 120)
print("Top 5 Worst Imaging Data Latencies")
print("=" * 120)
print(
    f"{'Satellite':<25} {'AOI Exit Time':<28} {'Contact Start':<28} {'Station':<20} {'Latency':>15}"
)
print("-" * 120)
for entry in latencies[:5]:
    exit_str = str(entry["exit_time"]).split(".")[0]
    contact_str = str(entry["contact_start"]).split(".")[0]
    latency_str = bh.format_time_string(entry["latency"], short=False)
    print(
        f"{entry['satellite']:<25} {exit_str:<28} {contact_str:<28} {entry['station']:<20} {latency_str:>15}"
    )
print("=" * 120)

# Export top 5 latencies to CSV
csv_top5_path = OUTDIR / f"{SCRIPT_NAME}_top5.csv"
with open(csv_top5_path, "w", newline="") as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(
        ["Satellite", "AOI Exit (UTC)", "Contact Start (UTC)", "Station", "Latency"]
    )
    for entry in latencies[:5]:
        exit_str = str(entry["exit_time"]).split(".")[0]
        contact_str = str(entry["contact_start"]).split(".")[0]
        latency_str = bh.format_time_string(entry["latency"], short=True)
        writer.writerow(
            [entry["satellite"], exit_str, contact_str, entry["station"], latency_str]
        )
print(f"\n✓ Exported top 5 latencies to {csv_top5_path}")

# Export statistics to CSV
csv_stats_path = OUTDIR / f"{SCRIPT_NAME}_stats.csv"
with open(csv_stats_path, "w", newline="") as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(["Metric", "Value"])
    writer.writerow(
        ["Worst (Maximum)", bh.format_time_string(worst_latency, short=True)]
    )
    writer.writerow(["Average", bh.format_time_string(avg_latency, short=True)])
    writer.writerow(["Median", bh.format_time_string(median_latency, short=True)])
    writer.writerow(["Best (Minimum)", bh.format_time_string(best_latency, short=True)])
    writer.writerow(["Total AOI Exits", str(len(aoi_exits))])
    writer.writerow(["Matched Latencies", str(len(latencies))])
print(f"✓ Exported statistics to {csv_stats_path}")

Visualization

The ground track plot below shows the satellite paths during the top 3 worst-case latency periods. The green dashed line indicates the US AOI boundary, and the blue circles show the ground station communication cones:

Imaging Data Latency Ground Tracks Imaging Data Latency Ground Tracks

The colored tracks show the satellite ground paths from the moment of AOI exit until the start of the next ground contact:

  • Red: Longest latency (worst case)
  • Orange: Second longest latency
  • Yellow: Third longest latency

This visualization helps identify geographic regions where additional ground stations might reduce data latency.

# Create ground track visualization for top 3 worst latencies
print("\nCreating ground track visualization for top 3 worst latencies...")
start_time = time.time()

# Get the top 3 worst latencies
top_3_latencies = latencies[:3]
gap_colors = ["red", "orange", "yellow"]

gap_segments_all = []

for lat_idx, lat_entry in enumerate(top_3_latencies):
    sat_name = lat_entry["satellite"]
    exit_time = lat_entry["exit_time"]
    contact_start = lat_entry["contact_start"]
    station_name = lat_entry["station"]
    total_latency = lat_entry["latency"]

    latency_str = bh.format_time_string(total_latency, short=True)
    gap_label = f"{sat_name}{station_name}"

    # Find the propagator for this satellite
    sat_prop = None
    for prop in capella_props:
        if prop.get_name() == sat_name:
            sat_prop = prop
            break

    if sat_prop is None:
        print(f"Warning: Could not find propagator for {sat_name}")
        continue

    # Get trajectory data from the stored trajectory
    traj = sat_prop.trajectory
    states = traj.to_matrix()
    epochs = traj.epochs()

    # Extract ALL ground track points from AOI exit to ground contact
    gap_lons = []
    gap_lats = []

    for i, ep in enumerate(epochs):
        if exit_time <= ep <= contact_start:
            ecef_state = bh.state_eci_to_ecef(ep, states[i])
            lon, lat, alt = bh.position_ecef_to_geodetic(
                ecef_state[:3], bh.AngleFormat.RADIANS
            )
            gap_lons.append(np.degrees(lon))
            gap_lats.append(np.degrees(lat))

    if len(gap_lons) < 2:
        print(f"Warning: Insufficient points for gap {lat_idx + 1}")
        continue

    # Split ground track at antimeridian crossings to avoid wrap-around lines
    segments = bh.split_ground_track_at_antimeridian(gap_lons, gap_lats)

    gap_segments_all.append(
        {
            "segments": segments,
            "color": gap_colors[lat_idx],
            "label": gap_label,
            "satellite": sat_name,
            "latency": total_latency,
        }
    )

    print(
        f"  Gap {lat_idx + 1}: {len(gap_lons)} points, {len(segments)} segments, latency={latency_str}"
    )

# Create ground track plot with stations and AOI
fig_groundtrack = bh.plot_groundtrack(
    ground_stations=[{"stations": external_stations, "color": "red", "alpha": 0.2}],
    gs_cone_altitude=np.min(
        [prop.semi_major_axis - bh.R_EARTH for prop in capella_props]
    ),
    gs_min_elevation=5.0,
    basemap="stock",
    show_borders=False,
    show_coastlines=False,
    backend="matplotlib",
)

ax = fig_groundtrack.get_axes()[0]

# Plot AOI boundary
aoi_lons = [v[0] for v in aoi_vertices]
aoi_lats = [v[1] for v in aoi_vertices]
ax.plot(
    aoi_lons,
    aoi_lats,
    color="green",
    linewidth=2,
    linestyle="--",
    transform=ccrs.Geodetic(),
    zorder=8,
    label="US AOI Boundary",
)
ax.fill(
    aoi_lons, aoi_lats, color="green", alpha=0.1, transform=ccrs.Geodetic(), zorder=7
)

# Plot each latency gap segment
for gap_data in gap_segments_all:
    for i, (lon_seg, lat_seg) in enumerate(gap_data["segments"]):
        if ccrs is not None:
            ax.plot(
                lon_seg,
                lat_seg,
                color=gap_data["color"],
                linewidth=2.5,
                transform=ccrs.Geodetic(),
                zorder=10,
                label=gap_data["label"] if i == 0 else "",
            )

# Add legend
legend = ax.legend(loc="lower left", fontsize=9)
legend.set_zorder(100)

# Add title
ax.set_title(
    "Capella Constellation: Top 3 Worst Imaging Data Latencies\n"
    f"KSAT Network ({len(external_stations)} stations outside US, 5° elevation)",
    fontsize=11,
)

Full Code Example

imaging_data_latency.py
import csv
import os
import pathlib
import time

import brahe as bh
import matplotlib.pyplot as plt
import numpy as np
from shapely.geometry import Point, Polygon

import cartopy.crs as ccrs

bh.initialize_eop()

# Configuration for output files
SCRIPT_NAME = pathlib.Path(__file__).stem
OUTDIR = pathlib.Path(os.getenv("BRAHE_FIGURE_OUTPUT_DIR", "./docs/figures/"))
os.makedirs(OUTDIR, exist_ok=True)

# Download TLE data for all active satellites and filter for Capella
print("Downloading active satellite TLEs from CelesTrak...")
start_time = time.time()
all_active_props = bh.datasets.celestrak.get_tles_as_propagators("active", 60.0)

# Filter for Capella satellites (name contains "CAPELLA")
capella_props = [
    prop for prop in all_active_props if "CAPELLA" in prop.get_name().upper()
]
elapsed = time.time() - start_time
print(f"Found {len(capella_props)} Capella satellites in {elapsed:.2f} seconds")
for prop in capella_props:
    print(f"  - {prop.get_name()}")

# Load all KSAT ground stations
print("\nLoading KSAT ground stations...")
ksat_stations = bh.datasets.groundstations.load("ksat")
print(f"Loaded {len(ksat_stations)} KSAT ground stations")

# Define AOI polygon for continental United States (CONUS)
# Using a simplified polygon that captures the main landmass
print("\nDefining continental US AOI polygon...")
aoi_vertices = [
    (-117.1, 32.5),
    (-114.6, 32.7),
    (-111.0, 31.3),
    (-108.2, 31.3),
    (-106.5, 31.8),
    (-103.0, 29.0),
    (-97.1, 25.9),
    (-90.1, 30.9),
    (-80.0, 24.5),
    (-80.1, 31.0),
    (-75.5, 35.2),
    (-73.9, 40.5),
    (-70.0, 41.5),
    (-66.9, 44.8),
    (-67.0, 47.5),
    (-82.5, 41.7),
    (-83.5, 46.0),
    (-92.0, 48.6),
    (-104.0, 49.0),
    (-117.0, 49.0),
    (-124.7, 48.4),
    (-125.0, 42.0),
    (-124.0, 39.0),
    (-117.1, 32.5),  # Close polygon
]

print(f"Defined US AOI with {len(aoi_vertices)} vertices")

# Create PolygonLocation for visualization
aoi_polygon = bh.PolygonLocation(
    [[lon, lat, 0.0] for lon, lat in aoi_vertices]
).with_name("Continental US")

# Filter out ground stations that are inside the AOI
# (We assume you can't start a downlink while still over the imaging region)
aoi_shapely = Polygon(aoi_vertices)


def station_in_aoi(station):
    """Check if station is inside AOI using point-in-polygon test."""
    point = Point(station.lon, station.lat)
    return aoi_shapely.contains(point)


# Keep only stations outside the AOI
external_stations = [s for s in ksat_stations if not station_in_aoi(s)]
filtered_count = len(ksat_stations) - len(external_stations)
print(f"\nFiltered out {filtered_count} stations inside AOI")
print(f"Using {len(external_stations)} external stations for downlink:")
for station in external_stations:
    print(f"  - {station.get_name()} ({station.lon:.2f}, {station.lat:.2f})")

# Compute AOI exit events for each Capella satellite
print("\nComputing AOI exit events...")
start_time = time.time()

# Define analysis period using the EARLIEST satellite epoch
# (different satellites have different TLE epochs)
epoch_start = min(prop.epoch for prop in capella_props)
epoch_end = epoch_start + 7 * 86400.0  # 7 days

# Add AOI exit event detector to each propagator
for prop in capella_props:
    sat_name = prop.get_name()

    # Create AOI exit event detector
    exit_event = bh.AOIExitEvent.from_coordinates(
        aoi_vertices, f"{sat_name}_AOI_Exit", bh.AngleFormat.DEGREES
    )

    # Add event detector to propagator
    prop.add_event_detector(exit_event)

# Propagate all satellites in parallel
bh.par_propagate_to(capella_props, epoch_end)

# Collect all AOI exit events from all propagators
aoi_exits = []
for prop in capella_props:
    sat_name = prop.get_name()
    for event in prop.event_log():
        if "AOI_Exit" in event.name:
            aoi_exits.append(
                {"satellite": sat_name, "exit_time": event.window_open, "event": event}
            )

elapsed = time.time() - start_time
print(f"Found {len(aoi_exits)} AOI exit events in {elapsed:.2f} seconds")

# Reset propagators and compute ground contacts
print("\nComputing ground station contacts...")
start_time = time.time()

# Reset all propagators
for prop in capella_props:
    prop.reset()

# Propagate all satellites in parallel for 7 days
bh.par_propagate_to(capella_props, epoch_end)

# Compute access windows with 5 degree minimum elevation
constraint = bh.ElevationConstraint(min_elevation_deg=5.0)
windows = bh.location_accesses(
    external_stations, capella_props, epoch_start, epoch_end, constraint
)

elapsed = time.time() - start_time
print(f"Computed {len(windows)} ground contact windows in {elapsed:.2f} seconds")

# For each AOI exit, find the next ground contact and compute latency
print("\nComputing imaging data latencies...")
start_time = time.time()

# Group contacts by satellite
satellite_contacts = {}
for window in windows:
    sat_name = window.satellite_name
    if sat_name not in satellite_contacts:
        satellite_contacts[sat_name] = []
    satellite_contacts[sat_name].append(window)

# Sort each satellite's contacts by start time
for sat_name in satellite_contacts:
    satellite_contacts[sat_name].sort(key=lambda w: w.start.jd())

# Calculate latency for each AOI exit
latencies = []
for exit_info in aoi_exits:
    sat_name = exit_info["satellite"]
    exit_time = exit_info["exit_time"]

    # Get contacts for this satellite
    contacts = satellite_contacts.get(sat_name, [])

    # Find the first contact that starts AFTER the exit time
    for contact in contacts:
        if contact.start > exit_time:
            latency = contact.start - exit_time  # Duration in seconds
            latencies.append(
                {
                    "satellite": sat_name,
                    "exit_time": exit_time,
                    "contact_start": contact.start,
                    "contact_end": contact.end,
                    "station": contact.location_name,
                    "latency": latency,
                }
            )
            break

elapsed = time.time() - start_time
print(f"Computed {len(latencies)} latency values in {elapsed:.2f} seconds")

# Sort latencies by duration (longest first)
latencies.sort(key=lambda x: x["latency"], reverse=True)

# Compute statistics
latency_values = [x["latency"] for x in latencies]
if latency_values:
    worst_latency = max(latency_values)
    best_latency = min(latency_values)
    avg_latency = np.mean(latency_values)
    median_latency = np.median(latency_values)
else:
    worst_latency = best_latency = avg_latency = median_latency = 0.0

print("\n" + "=" * 100)
print("Imaging Data Latency Statistics")
print("=" * 100)
print(f"  Worst (Max):  {bh.format_time_string(worst_latency)}")
print(f"  Best (Min):   {bh.format_time_string(best_latency)}")
print(f"  Average:      {bh.format_time_string(avg_latency)}")
print(f"  Median:       {bh.format_time_string(median_latency)}")
print("=" * 100)

# Print top 5 worst latencies
print("\n" + "=" * 120)
print("Top 5 Worst Imaging Data Latencies")
print("=" * 120)
print(
    f"{'Satellite':<25} {'AOI Exit Time':<28} {'Contact Start':<28} {'Station':<20} {'Latency':>15}"
)
print("-" * 120)
for entry in latencies[:5]:
    exit_str = str(entry["exit_time"]).split(".")[0]
    contact_str = str(entry["contact_start"]).split(".")[0]
    latency_str = bh.format_time_string(entry["latency"], short=False)
    print(
        f"{entry['satellite']:<25} {exit_str:<28} {contact_str:<28} {entry['station']:<20} {latency_str:>15}"
    )
print("=" * 120)

# Export top 5 latencies to CSV
csv_top5_path = OUTDIR / f"{SCRIPT_NAME}_top5.csv"
with open(csv_top5_path, "w", newline="") as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(
        ["Satellite", "AOI Exit (UTC)", "Contact Start (UTC)", "Station", "Latency"]
    )
    for entry in latencies[:5]:
        exit_str = str(entry["exit_time"]).split(".")[0]
        contact_str = str(entry["contact_start"]).split(".")[0]
        latency_str = bh.format_time_string(entry["latency"], short=True)
        writer.writerow(
            [entry["satellite"], exit_str, contact_str, entry["station"], latency_str]
        )
print(f"\n✓ Exported top 5 latencies to {csv_top5_path}")

# Export statistics to CSV
csv_stats_path = OUTDIR / f"{SCRIPT_NAME}_stats.csv"
with open(csv_stats_path, "w", newline="") as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(["Metric", "Value"])
    writer.writerow(
        ["Worst (Maximum)", bh.format_time_string(worst_latency, short=True)]
    )
    writer.writerow(["Average", bh.format_time_string(avg_latency, short=True)])
    writer.writerow(["Median", bh.format_time_string(median_latency, short=True)])
    writer.writerow(["Best (Minimum)", bh.format_time_string(best_latency, short=True)])
    writer.writerow(["Total AOI Exits", str(len(aoi_exits))])
    writer.writerow(["Matched Latencies", str(len(latencies))])
print(f"✓ Exported statistics to {csv_stats_path}")

# Create ground track visualization for top 3 worst latencies
print("\nCreating ground track visualization for top 3 worst latencies...")
start_time = time.time()

# Get the top 3 worst latencies
top_3_latencies = latencies[:3]
gap_colors = ["red", "orange", "yellow"]

gap_segments_all = []

for lat_idx, lat_entry in enumerate(top_3_latencies):
    sat_name = lat_entry["satellite"]
    exit_time = lat_entry["exit_time"]
    contact_start = lat_entry["contact_start"]
    station_name = lat_entry["station"]
    total_latency = lat_entry["latency"]

    latency_str = bh.format_time_string(total_latency, short=True)
    gap_label = f"{sat_name}{station_name}"

    # Find the propagator for this satellite
    sat_prop = None
    for prop in capella_props:
        if prop.get_name() == sat_name:
            sat_prop = prop
            break

    if sat_prop is None:
        print(f"Warning: Could not find propagator for {sat_name}")
        continue

    # Get trajectory data from the stored trajectory
    traj = sat_prop.trajectory
    states = traj.to_matrix()
    epochs = traj.epochs()

    # Extract ALL ground track points from AOI exit to ground contact
    gap_lons = []
    gap_lats = []

    for i, ep in enumerate(epochs):
        if exit_time <= ep <= contact_start:
            ecef_state = bh.state_eci_to_ecef(ep, states[i])
            lon, lat, alt = bh.position_ecef_to_geodetic(
                ecef_state[:3], bh.AngleFormat.RADIANS
            )
            gap_lons.append(np.degrees(lon))
            gap_lats.append(np.degrees(lat))

    if len(gap_lons) < 2:
        print(f"Warning: Insufficient points for gap {lat_idx + 1}")
        continue

    # Split ground track at antimeridian crossings to avoid wrap-around lines
    segments = bh.split_ground_track_at_antimeridian(gap_lons, gap_lats)

    gap_segments_all.append(
        {
            "segments": segments,
            "color": gap_colors[lat_idx],
            "label": gap_label,
            "satellite": sat_name,
            "latency": total_latency,
        }
    )

    print(
        f"  Gap {lat_idx + 1}: {len(gap_lons)} points, {len(segments)} segments, latency={latency_str}"
    )

# Create ground track plot with stations and AOI
fig_groundtrack = bh.plot_groundtrack(
    ground_stations=[{"stations": external_stations, "color": "red", "alpha": 0.2}],
    gs_cone_altitude=np.min(
        [prop.semi_major_axis - bh.R_EARTH for prop in capella_props]
    ),
    gs_min_elevation=5.0,
    basemap="stock",
    show_borders=False,
    show_coastlines=False,
    backend="matplotlib",
)

ax = fig_groundtrack.get_axes()[0]

# Plot AOI boundary
aoi_lons = [v[0] for v in aoi_vertices]
aoi_lats = [v[1] for v in aoi_vertices]
ax.plot(
    aoi_lons,
    aoi_lats,
    color="green",
    linewidth=2,
    linestyle="--",
    transform=ccrs.Geodetic(),
    zorder=8,
    label="US AOI Boundary",
)
ax.fill(
    aoi_lons, aoi_lats, color="green", alpha=0.1, transform=ccrs.Geodetic(), zorder=7
)

# Plot each latency gap segment
for gap_data in gap_segments_all:
    for i, (lon_seg, lat_seg) in enumerate(gap_data["segments"]):
        if ccrs is not None:
            ax.plot(
                lon_seg,
                lat_seg,
                color=gap_data["color"],
                linewidth=2.5,
                transform=ccrs.Geodetic(),
                zorder=10,
                label=gap_data["label"] if i == 0 else "",
            )

# Add legend
legend = ax.legend(loc="lower left", fontsize=9)
legend.set_zorder(100)

# Add title
ax.set_title(
    "Capella Constellation: Top 3 Worst Imaging Data Latencies\n"
    f"KSAT Network ({len(external_stations)} stations outside US, 5° elevation)",
    fontsize=11,
)

elapsed = time.time() - start_time
print(f"Created ground track visualization in {elapsed:.2f} seconds.")

See Also