import concurrent.futures
import logging
import multiprocessing
import time
from abc import ABC, abstractmethod
from collections import namedtuple
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor
from typing import Any
import numpy as np
from astropy import units as u
from astropy.coordinates import EarthLocation
from astropy.time import Time, TimeDelta
from sgp4.api import Satrec
from skyfield.api import EarthSatellite, wgs84
from skyfield.nutationlib import iau2000b
from api.adapters.repositories.ephemeris_repository import AbstractEphemerisRepository
from api.domain.models.interpolable_ephemeris import InterpolableEphemeris
from api.utils import coordinate_systems, output_utils
from api.utils.coordinate_systems import (
az_el_to_ra_dec,
calculate_current_position,
calculate_satellite_observer_relative,
ecef_to_enu,
ecef_to_itrs,
enu_to_az_el,
get_phase_angle,
icrf2radec,
is_illuminated,
is_illuminated_vectorized,
itrs_to_gcrs,
load_earth_sun,
teme_to_ecef,
)
from api.utils.interpolation_utils import (
InterpolatedSplinesDict,
generate_and_propagate_sigma_points,
get_interpolated_sigma_points_KI,
interpolate_sigma_pointsKI,
reconstruct_covariance_at_time,
)
from api.utils.skyfield_loader import load
from api.utils.time_utils import jd_to_gst
logger = logging.getLogger(__name__)
_ts = load.timescale()
[docs]
def get_timescale():
return _ts
"""
A named tuple containing the following fields:
ra (float):
The right ascension of the satellite relative to observer
coordinates in ICRS reference frame in degrees. Range of response
is [0, 360).
dec (float):
The declination of the satellite relative to observer
coordinates in ICRS reference frame in degrees. Range of response
is [-90, 90].
alt (float):
The altitude of the satellite relative to observer
coordinates in ICRS reference frame in degrees. Range of response
is [0, 90].
az (float):
The azimuth of the satellite relative to observer
coordinates in ICRS reference frame in degrees. Range of response
is [0, 360).
distance (float):
Range from observer to object in km.
dracosdec (float):
Rate of change of right ascension.
ddec (float):
Rate of change of declination.
ddistance (float):
Rate of change of distance.
phase_angle (float):
Phase angle between the satellite, observer, and the Sun.
sat_altitude_km (float):
Satellite altitude above Earth's surface in km.
solar_elevation_deg (float):
Solar elevation angle in degrees.
solar_azimuth_deg (float):
Solar azimuth angle in degrees.
illuminated (bool):
Whether the satellite is illuminated.
satellite_gcrs (list):
Satellite coordinates in GCRS.
observer_gcrs (list):
Observer coordinates in GCRS.
julian_date (float):
The input Julian date.
"""
satellite_position = namedtuple(
"satellite_position",
[
"ra",
"dec",
"dracosdec",
"ddec",
"alt",
"az",
"distance",
"ddistance",
"phase_angle",
"sat_altitude_km",
"solar_elevation_deg",
"solar_azimuth_deg",
"illuminated",
"satellite_gcrs",
"observer_gcrs",
"julian_date",
],
)
satellite_position_fov = namedtuple(
"satellite_position_fov",
[
"ra",
"dec",
"covariance",
"angle",
"altitude",
"azimuth",
"range_km",
"julian_date",
"name",
"norad_id",
"orbital_data_epoch",
"orbital_data_source",
],
)
[docs]
class BasePropagationStrategy(ABC):
"""Base class for all propagation strategies."""
[docs]
@abstractmethod
def propagate(
self,
julian_dates: float | list[float] | np.ndarray,
tle_line_1: str,
tle_line_2: str,
latitude: float,
longitude: float,
elevation: float,
**kwargs,
) -> (
satellite_position
| list[satellite_position]
| list[satellite_position_fov]
| list[dict[str, Any]]
):
"""
Propagate satellite positions.
Args:
julian_dates: Single Julian date or array of Julian dates
tle_line_1: First line of TLE
tle_line_2: Second line of TLE
latitude: Observer latitude in degrees
longitude: Observer longitude in degrees
elevation: Observer elevation in meters
**kwargs: Additional strategy-specific parameters
Returns:
Propagated position(s) in the strategy's format
"""
pass
[docs]
class SkyfieldPropagationStrategy(BasePropagationStrategy):
[docs]
def propagate(
self,
julian_dates: float | list[float] | np.ndarray,
tle_line_1: str,
tle_line_2: str,
latitude: float,
longitude: float,
elevation: float,
**kwargs,
) -> list[satellite_position]:
"""
Use Skyfield (https://rhodesmill.org/skyfield/earth-satellites.html)
to propagate satellite and observer states.
Args:
julian_dates: Single Julian date or array of Julian dates
tle_line_1: TLE line 1
tle_line_2: TLE line 2
latitude: The observer WGS84 latitude in degrees
longitude: The observers WGS84 longitude in degrees (positive value
represents east, negative value represents west)
elevation: The observer elevation above WGS84 ellipsoid in meters
**kwargs: Additional parameters (not used in this strategy)
Returns:
List of propagated positions
Raises:
RuntimeError: If propagation fails due to invalid TLE
or numerical instability
"""
# Convert single date to list for consistent handling
if isinstance(julian_dates, (float, int)):
julian_dates = [julian_dates]
try:
ts = get_timescale()
satellite = EarthSatellite(tle_line_1, tle_line_2, ts=ts)
curr_pos = wgs84.latlon(latitude, longitude, elevation)
except Exception as e:
raise RuntimeError(f"Failed to initialize propagation: {str(e)}") from e
results = []
for julian_date in julian_dates:
try:
jd = Time(julian_date, format="jd", scale="ut1")
if jd.jd == 0:
# Use ts.ut1_jd instead of ts.from_astropy because from_astropy uses
# astropy.Time.TT.jd instead of UT1
t = ts.ut1_jd(satellite.model.jdsatepoch)
else:
t = ts.ut1_jd(jd.jd)
difference = satellite - curr_pos
topocentric = difference.at(t)
position_norm = np.linalg.norm(topocentric.position.km)
if position_norm == 0:
raise RuntimeError(
f"Zero magnitude position vector " f"at JD {julian_date}"
)
topocentricn = topocentric.position.km / position_norm
ra, dec, distance = topocentric.radec()
alt, az, distance = topocentric.altaz()
dtday = TimeDelta(1, format="sec")
tplusdt = ts.ut1_jd((jd + dtday).jd)
tminusdt = ts.ut1_jd((jd - dtday).jd)
dtsec = 1
dtx2 = 2 * dtsec
sat_gcrs = satellite.at(t).position.km
# satn = sat / np.linalg.norm(sat)
# satpdt = satellite.at(tplusdt).position.km
# satmdt = satellite.at(tminusdt).position.km
# vsat = (satpdt - satmdt) / dtx2
sattop = difference.at(t).position.km
sattopr = np.linalg.norm(sattop)
if sattopr == 0:
raise RuntimeError(
f"Zero magnitude topocentric vector " f"at JD {julian_date}"
)
sattopn = sattop / sattopr
sattoppdt = difference.at(tplusdt).position.km
sattopmdt = difference.at(tminusdt).position.km
ratoppdt, dectoppdt = icrf2radec(sattoppdt)
ratopmdt, dectopmdt = icrf2radec(sattopmdt)
vsattop = (sattoppdt - sattopmdt) / dtx2
ddistance = np.dot(vsattop, sattopn)
rxy = np.dot(sattop[0:2], sattop[0:2])
if rxy == 0:
raise RuntimeError(
f"Zero magnitude XY projection at JD {julian_date}"
)
dra = (sattop[1] * vsattop[0] - sattop[0] * vsattop[1]) / rxy
denominator = np.sqrt(1 - sattopn[2] * sattopn[2])
if denominator == 0:
raise RuntimeError(
f"Invalid position vector for declination rate "
f"at JD {julian_date}"
)
ddec = vsattop[2] / denominator
dracosdec = dra * np.cos(dec.radians)
dra = (ratoppdt - ratopmdt) / dtx2
ddec = (dectoppdt - dectopmdt) / dtx2
dracosdec = dra * np.cos(dec.radians)
# drav, ddecv = icrf2radec(vsattop / sattopr, unit_vector=True)
# dracosdecv = drav * np.cos(dec.radians)
obs_gcrs = curr_pos.at(t).position.km
phase_angle = get_phase_angle(topocentricn, sat_gcrs, julian_date)
# Get solar altitude and azimuth
earth, sun = load_earth_sun()
sun_relative_to_earth = sun - earth
sun_relative_to_observer = sun_relative_to_earth - curr_pos
solar_alt, solar_az, _ = sun_relative_to_observer.at(t).altaz()
solar_elevation_deg = solar_alt.degrees
solar_azimuth_deg = solar_az.degrees
illuminated = is_illuminated(sat_gcrs, julian_date)
# Calculate satellite altitude above Earth's surface
sat_altitude_km = wgs84.height_of(satellite.at(t)).km
results.append(
satellite_position(
ra._degrees,
dec.degrees,
dracosdec,
ddec,
alt._degrees,
az._degrees,
distance.km,
ddistance,
phase_angle,
sat_altitude_km,
solar_elevation_deg,
solar_azimuth_deg,
illuminated,
sat_gcrs.tolist(),
obs_gcrs.tolist(),
julian_date,
)
)
except Exception as e:
raise RuntimeError(
f"Propagation failed at JD {julian_date}: {str(e)}"
) from e
return results
[docs]
class SGP4PropagationStrategy(BasePropagationStrategy): # pragma: no cover
[docs]
def propagate(
self,
julian_dates: float | list[float] | np.ndarray,
tle_line_1: str,
tle_line_2: str,
latitude: float,
longitude: float,
elevation: float,
**kwargs,
) -> satellite_position | list[satellite_position]:
"""
Propagates satellite and observer states using the SGP4 propagation model.
Args:
julian_dates: Single Julian date or array of Julian dates
tle_line_1: First line of TLE
tle_line_2: Second line of TLE
latitude: Observer latitude in degrees
longitude: Observer longitude in degrees
elevation: Observer elevation in meters above the WGS84 ellipsoid.
**kwargs: Additional parameters (not used in this strategy)
Returns:
Single satellite position or list of positions
"""
# Convert single date to list for consistent handling
if isinstance(julian_dates, (float, int)):
julian_dates = [julian_dates]
results = []
observer_location = EarthLocation(
lat=latitude * u.deg, lon=longitude * u.deg, height=elevation * u.m
)
location_itrs = observer_location.itrs.cartesian.xyz.value / 1000
for julian_date in julian_dates:
# Compute the coordinates of the CIP (Celestial Intermediate Pole)
dpsi, deps = iau2000b(julian_date)
# Compute the nutation in longitude
nutation_arcsec = dpsi / 10000000 # Convert from arcseconds to degrees
nutation = nutation_arcsec / 3600
location_itrs = np.array(location_itrs)
theta_gst = jd_to_gst(julian_date, nutation)
obs_gcrs = observer_location.get_gcrs(
obstime=Time(julian_date, format="jd")
)
# Split Julian Date into integer and fractional parts for full accuracy
# See Usage note here: https://pypi.org/project/sgp4/
jd_int = int(julian_date)
jd_frac = julian_date - jd_int
# Propagate satellite
satellite = Satrec.twoline2rv(tle_line_1, tle_line_2)
error, r, v = satellite.sgp4(jd_int, jd_frac)
r_ecef = teme_to_ecef(r, theta_gst)
difference = r_ecef - location_itrs
latitude = observer_location.lat.value
longitude = observer_location.lon.value
r_enu = ecef_to_enu(difference, latitude, longitude)
# Az, Alt, RA, Dec
az, alt = enu_to_az_el(r_enu)
ra, dec = az_el_to_ra_dec(az, alt, latitude, longitude, julian_date)
# Convert ECEF to ITRS
r_itrs = ecef_to_itrs(r_ecef)
# Convert ITRS to GCRS
sat_gcrs = itrs_to_gcrs(r_itrs, julian_date)
topocentric_gcrs = itrs_to_gcrs(ecef_to_itrs(difference), julian_date)
topocentric_gcrs_norm = topocentric_gcrs / np.linalg.norm(topocentric_gcrs)
# phase angle
phase_angle = get_phase_angle(topocentric_gcrs_norm, sat_gcrs, julian_date)
illuminated = is_illuminated(sat_gcrs, julian_date)
# TODO: Implement distance, dracosdec, ddec, ddistance for SGP4
dracosdec = None
ddec = None
distance = None
ddistance = None
obs_gcrs = observer_location.get_gcrs_posvel(
obstime=Time(julian_date, format="jd")
)
obs_gcrs = obs_gcrs[0].xyz.value / 1000
results.append(
satellite_position(
ra,
dec,
dracosdec,
ddec,
alt,
az,
distance,
ddistance,
phase_angle,
None, # sat_altitude_km - not calculated in SGP4
None,
None,
illuminated,
sat_gcrs.tolist() if sat_gcrs is not None else None,
obs_gcrs.tolist() if obs_gcrs is not None else None,
julian_date,
)
)
return results[0] if len(results) == 1 else results
[docs]
class TestPropagationStrategy(BasePropagationStrategy): # pragma: no cover
[docs]
def propagate(
self,
julian_dates: float | list[float] | np.ndarray,
tle_line_1: str,
tle_line_2: str,
latitude: float,
longitude: float,
elevation: float,
**kwargs,
) -> satellite_position | list[satellite_position]:
"""
Test propagation strategy that uses Skyfield implementation.
Args:
julian_dates: Single Julian date or array of Julian dates
tle_line_1: First line of TLE
tle_line_2: Second line of TLE
latitude: Observer latitude in degrees
longitude: Observer longitude in degrees
elevation: Observer elevation in meters
**kwargs: Additional parameters (not used in this strategy)
Returns:
Single satellite position or list of positions
"""
# Convert single date to list for consistent handling
if isinstance(julian_dates, (float, int)):
julian_dates = [julian_dates]
results = []
for julian_date in julian_dates:
ts = get_timescale()
eph = load("de430t.bsp")
earth = eph["Earth"]
sun = eph["Sun"]
satellite = EarthSatellite(tle_line_1, tle_line_2, ts=ts)
curr_pos = calculate_current_position(latitude, longitude, elevation)
jd = Time(julian_date, format="jd", scale="ut1")
if jd.jd == 0:
t = ts.ut1_jd(satellite.model.jdsatepoch)
else:
t = ts.ut1_jd(jd.jd)
difference = satellite - curr_pos
topocentric = difference.at(t)
topocentricn = topocentric.position.km / np.linalg.norm(
topocentric.position.km
)
ra, dec, distance = topocentric.radec()
alt, az, distance = topocentric.altaz()
dtday = TimeDelta(1, format="sec")
tplusdt = ts.ut1_jd((jd + dtday).jd)
tminusdt = ts.ut1_jd((jd - dtday).jd)
dtsec = 1
dtx2 = 2 * dtsec
sat = satellite.at(t).position.km
sattop = difference.at(t).position.km
sattopr = np.linalg.norm(sattop)
sattopn = sattop / sattopr
sattoppdt = difference.at(tplusdt).position.km
sattopmdt = difference.at(tminusdt).position.km
ratoppdt, dectoppdt = icrf2radec(sattoppdt)
ratopmdt, dectopmdt = icrf2radec(sattopmdt)
vsattop = (sattoppdt - sattopmdt) / dtx2
ddistance = np.dot(vsattop, sattopn)
rxy = np.dot(sattop[0:2], sattop[0:2])
dra = (sattop[1] * vsattop[0] - sattop[0] * vsattop[1]) / rxy
ddec = vsattop[2] / np.sqrt(1 - sattopn[2] * sattopn[2])
dracosdec = dra * np.cos(dec.radians)
dra = (ratoppdt - ratopmdt) / dtx2
ddec = (dectoppdt - dectopmdt) / dtx2
dracosdec = dra * np.cos(dec.radians)
earthp = earth.at(t).position.km
sunp = sun.at(t).position.km
earthsun = sunp - earthp
earthsunn = earthsun / np.linalg.norm(earthsun)
satsun = sat - earthsun
satsunn = satsun / np.linalg.norm(satsun)
phase_angle = np.rad2deg(np.arccos(np.dot(satsunn, topocentricn)))
illuminated = is_illuminated(sat, earthsunn)
results.append(
satellite_position(
ra._degrees,
dec.degrees,
dracosdec,
ddec,
alt.degrees,
az.degrees,
distance.km,
ddistance,
phase_angle,
None,
None,
None,
illuminated,
None, # satellite_gcrs
None, # observer_gcrs
jd.jd,
)
)
return results[0] if len(results) == 1 else results
# remove pragma: no cover if another use is found for this
[docs]
class FOVPropagationStrategy(BasePropagationStrategy):
[docs]
def propagate(
self,
julian_dates: float | list[float] | np.ndarray,
tle_line_1: str,
tle_line_2: str,
latitude: float,
longitude: float,
elevation: float,
fov_center: tuple[float, float] = (0.0, 0.0), # Default to (0,0)
fov_radius: float = 0.0, # Default to 0 degrees
**kwargs,
) -> list[dict[str, Any]]: # pragma: no cover
"""
Propagate satellite positions and check if they fall within FOV.
Args:
julian_dates: Single Julian date or array of Julian dates
tle_line_1: First line of TLE
tle_line_2: Second line of TLE
latitude: Observer latitude in degrees
longitude: Observer longitude in degrees
elevation: Observer elevation in meters
fov_center: Tuple of (RA, Dec) in degrees. Defaults to (0,0)
fov_radius: FOV radius in degrees. Defaults to 0
**kwargs: Additional parameters (not used in this strategy)
Returns:
List of dictionaries containing position data for points in FOV
Raises:
RuntimeError: If propagation fails due to invalid TLE
or numerical instability
"""
# Convert single date to list for consistent handling
if isinstance(julian_dates, (float, int)):
julian_dates = [julian_dates]
try:
ts = get_timescale()
t = ts.ut1_jd(julian_dates)
# Set up observer and FOV vectors
curr_pos = wgs84.latlon(latitude, longitude, elevation)
icrf = coordinate_systems.radec2icrf(fov_center[0], fov_center[1]).reshape(
3, 1
)
# Create satellite and get positions
satellite = EarthSatellite(tle_line_1, tle_line_2, ts=ts)
difference = satellite - curr_pos
topocentric = difference.at(t)
position_norm = np.linalg.norm(
topocentric.position.km, axis=0, keepdims=True
)
if np.any(position_norm == 0):
raise RuntimeError("Zero magnitude position vector detected")
topocentricn = topocentric.position.km / position_norm
# Vectorized angle calculation
sat_fov_angles = np.arccos(np.sum(topocentricn * icrf, axis=0))
in_fov_mask = np.degrees(sat_fov_angles) < fov_radius
if not np.any(in_fov_mask):
return []
# Get alt/az for points in FOV
alt, az, distance = topocentric.altaz()
fov_indices = np.where(in_fov_mask)[0]
# Vectorized creation of results
positions = topocentricn[:, fov_indices]
ra_decs = np.array(
[coordinate_systems.icrf2radec(pos) for pos in positions.T]
)
return [
{
"ra": ra_dec[0],
"dec": ra_dec[1],
"altitude": float(alt._degrees[idx]),
"azimuth": float(az._degrees[idx]),
"range_km": float(distance.km[idx]),
"julian_date": julian_dates[idx],
"angle": np.degrees(sat_fov_angles[idx]),
}
for idx, ra_dec in zip(fov_indices, ra_decs, strict=True)
]
except Exception as e:
raise RuntimeError(f"FOV propagation failed: {str(e)}") from e
[docs]
def process_satellite_batch(
args: tuple[Any, ...],
) -> tuple[list[dict[str, Any]], int, float]:
"""Process a batch of satellites for FOV calculations.
Returns:
tuple: (batch_results, satellites_processed, execution_time)
"""
batch_start = time.time()
(
tle_batch,
julian_dates,
lat,
lon,
elev,
fov_center,
fov_radius,
include_tles,
illuminated_only,
) = args
# Convert single date to list for consistent handling
if isinstance(julian_dates, (float, int)):
julian_dates = [julian_dates]
if not isinstance(julian_dates, np.ndarray):
julian_dates = np.array(julian_dates)
ts = get_timescale()
t = ts.ut1_jd(julian_dates)
# Set up observer and FOV vectors
curr_pos = wgs84.latlon(lat, lon, elev) # do here because of serialization
icrf = coordinate_systems.radec2icrf(fov_center[0], fov_center[1]).reshape(3, 1)
batch_results = []
satellites_processed = 0
for tle in tle_batch:
try:
# Create satellite
satellite = EarthSatellite(tle.tle_line1, tle.tle_line2, ts=ts)
difference = satellite - curr_pos
topocentric = difference.at(t)
topocentricn = topocentric.position.km / np.linalg.norm(
topocentric.position.km, axis=0, keepdims=True
)
# Vectorized angle calculation
sat_fov_angles = np.arccos(np.sum(topocentricn * icrf, axis=0))
in_fov_mask = (
np.degrees(sat_fov_angles) < fov_radius * 1.2
) # add 20% margin
if illuminated_only:
# only show points that are illuminated
sat_gcrs = satellite.at(t).position.km
# positions for each julian date
sat_gcrs_list = [sat_gcrs[:, i] for i in range(len(julian_dates))]
illuminated = is_illuminated_vectorized(sat_gcrs_list, julian_dates)
visible_mask = np.logical_and(in_fov_mask, illuminated)
else:
visible_mask = in_fov_mask
if not np.any(visible_mask):
satellites_processed += 1
continue
# Get alt/az for points in FOV
alt, az, distance = topocentric.altaz()
fov_indices = np.where(in_fov_mask)[0]
# Vectorized creation of results
positions = topocentricn[:, fov_indices]
ra_decs = np.array(
[coordinate_systems.icrf2radec(pos) for pos in positions.T]
)
# Prepare results with conditional TLE data
result_entries = []
for idx, ra_dec in zip(fov_indices, ra_decs, strict=True):
result = {
"ra": ra_dec[0],
"dec": ra_dec[1],
"altitude": float(alt._degrees[idx]),
"azimuth": float(az._degrees[idx]),
"range_km": float(distance.km[idx]),
"julian_date": julian_dates[idx],
"angle": np.degrees(sat_fov_angles[idx]),
"name": tle.satellite.sat_name,
"norad_id": tle.satellite.sat_number,
"tle_epoch": output_utils.format_date(tle.epoch),
}
# Only include TLE data if requested
if include_tles:
result["tle_data"] = {
"tle_line1": tle.tle_line1,
"tle_line2": tle.tle_line2,
"source": tle.data_source,
}
result_entries.append(result)
batch_results.extend(result_entries)
satellites_processed += 1
except Exception as e:
logger.warning(
"Error processing satellite %s: %s", tle.satellite.sat_name, e
)
satellites_processed += 1
execution_time = time.time() - batch_start
return batch_results, satellites_processed, execution_time
[docs]
def _run_batches_threadpool(
args_list: list[tuple],
max_workers: int,
progress_callback: Callable[..., None] | None,
) -> tuple[list[dict[str, Any]], int]:
"""
Run batches using ThreadPoolExecutor (sync path).
"""
all_results = []
satellites_processed = 0
total_batches = len(args_list)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
completed = 0
futures = [executor.submit(process_satellite_batch, args) for args in args_list]
for future in concurrent.futures.as_completed(futures):
try:
batch_results, batch_sats, _ = future.result()
all_results.extend(batch_results)
satellites_processed += batch_sats
completed += 1
if progress_callback:
progress_percent = int((completed / total_batches) * 100)
progress_callback(
progress_percent, completed, total_batches, satellites_processed
)
except Exception as e:
logger.exception("Error processing batch: %s", e)
return all_results, satellites_processed
[docs]
class FOVParallelPropagationStrategy:
"""
Propagate satellite positions and check if they fall within FOV.
Supports two execution modes:
- In-process: uses ThreadPoolExecutor (sync path; GIL-limited)
- Distributed: batch_executor runs batches (e.g. Celery chord in fov_service);
batch_serializer converts TLE batches to serializable form for the executor
"""
[docs]
def propagate(
self,
all_tles,
jd_times,
location,
fov_center,
fov_radius,
batch_size=1000,
max_workers=None,
include_tles=True,
illuminated_only=False,
progress_callback=None,
*,
batch_executor: Callable[..., list[tuple[list, int, float]]] | None = None,
batch_serializer: Callable[[list], list] | None = None,
) -> tuple[list[dict[str, Any]], float, int]:
"""
Propagate satellite positions and check if they fall within FOV.
Args:
all_tles: List of TLE objects
jd_times: Array of Julian dates
location: Observer's location
fov_center: Tuple of (RA, Dec) in degrees. Defaults to (0,0)
fov_radius: FOV radius in degrees. Defaults to 0
batch_size: Number of satellites to process in each batch
max_workers: Maximum number of workers (in-process mode only)
include_tles: Whether to include TLE data in results
illuminated_only: Whether to only include illuminated satellites
progress_callback: Optional callback for progress updates
batch_executor: Callable(serialized_batches, common_args) -> list of
(results, sats). When provided, runs distributed (e.g. Celery).
batch_serializer: Callable(batch) -> serialized_batch. Required when
batch_executor is provided; converts TLE batches for message passing.
Returns:
tuple: (
results: List of dictionaries containing position data for points
in FOV,
execution_time: Total execution time in seconds,
satellites_processed: Number of satellites processed
)
"""
all_results = []
start_time = time.time()
satellites_processed = 0
lat = location.lat.value
lon = location.lon.value
elev = location.height.value
satellite_batches = [
all_tles[i : i + batch_size] for i in range(0, len(all_tles), batch_size)
]
if batch_executor is not None:
# Distributed mode
if batch_serializer is None:
raise ValueError("batch_serializer required when using batch_executor")
serialized_batches = [
batch_serializer(batch) for batch in satellite_batches
]
common_args = {
"jd_times": jd_times,
"location_lat": lat,
"location_lon": lon,
"location_height": elev,
"fov_center": fov_center,
"fov_radius": fov_radius,
"include_tles": include_tles,
"illuminated_only": illuminated_only,
}
batch_results = batch_executor(serialized_batches, common_args)
for item in batch_results:
batch_result = item[0]
batch_sats = item[1]
all_results.extend(batch_result)
satellites_processed += batch_sats
else:
# In-process mode (sync)
if max_workers is None:
max_workers = min(multiprocessing.cpu_count(), 16)
args_list = [
(
batch,
jd_times,
lat,
lon,
elev,
fov_center,
fov_radius,
include_tles,
illuminated_only,
)
for batch in satellite_batches
]
logger.debug(
"Processing %d satellites in %d batches (%d workers)",
len(all_tles),
len(satellite_batches),
max_workers,
)
all_results, satellites_processed = _run_batches_threadpool(
args_list, max_workers, progress_callback
)
execution_time = time.time() - start_time
logger.debug("Total execution time: %.2f seconds", execution_time)
return all_results, execution_time, satellites_processed
[docs]
class KroghPropagationStrategy(BasePropagationStrategy): # pragma: no cover
[docs]
def __init__(self):
"""Initialize the Krogh propagation strategy."""
self.ephemeris_data: InterpolableEphemeris | None = None
self.sigma_points_dict: dict | None = None
self.interpolated_splines: InterpolatedSplinesDict | None = None
[docs]
def load_ephemeris(
self, ephemeris: InterpolableEphemeris, ephem_repo: AbstractEphemerisRepository
) -> None:
"""
Load and parse ephemeris data from a file.
Args:
sat_number: Satellite NORAD number
epoch: Epoch time for the ephemeris
"""
self.ephemeris_data = ephemeris
# Generate sigma points for this specific ephemeris
import time
start_time = time.time()
print(f"Generating sigma points for ephemeris {ephemeris.id}")
self.sigma_points_dict = generate_and_propagate_sigma_points(
self.ephemeris_data
)
sigma_time = time.time() - start_time
print(f"Sigma points generation took {sigma_time:.2f} seconds")
# Generate splines on-demand since we disabled database storage
start_time = time.time()
print(f"Generating interpolated splines for ephemeris {ephemeris.id}")
if ephemeris.id is None:
raise ValueError(
"Ephemeris ID is None, cannot retrieve interpolator splines"
)
interpolated_splines_obj = ephem_repo.get_interpolator_splines(ephemeris.id)
if interpolated_splines_obj is None:
interpolated_splines = interpolate_sigma_pointsKI(self.sigma_points_dict)
else:
interpolated_splines = interpolated_splines_obj.get_interpolated_splines()
self.interpolated_splines = interpolated_splines
spline_time = time.time() - start_time
print(f"Spline generation took {spline_time:.2f} seconds")
[docs]
def propagate(
self,
julian_dates: float | list[float] | np.ndarray,
tle_line_1: str,
tle_line_2: str,
latitude: float,
longitude: float,
elevation: float,
**kwargs,
) -> list[satellite_position_fov]:
"""
Propagate satellite positions using Krogh interpolation.
Args:
julian_dates: Single Julian date or array of Julian dates
tle_line_1: First line of TLE (not used in this strategy)
tle_line_2: Second line of TLE (not used in this strategy)
latitude: Observer latitude in degrees
longitude: Observer longitude in degrees
elevation: Observer elevation in meters
**kwargs: Additional parameters (not used in this strategy)
Returns:
List of satellite positions
"""
if self.interpolated_splines is None:
raise ValueError("No ephemeris data loaded. Call load_ephemeris() first.")
# Convert single date to list for consistent handling
if isinstance(julian_dates, (float, int)):
julian_dates = [julian_dates]
results = []
# Calculate observer position in GCRS
observer_location = EarthLocation(
lat=latitude * u.deg, lon=longitude * u.deg, height=elevation * u.m
)
for jd in julian_dates:
# Get interpolated sigma points
interpolated_points = get_interpolated_sigma_points_KI(
self.interpolated_splines, jd
)
# Get observer position in GCRS for this time
obs_gcrs = (
observer_location.get_gcrs(
obstime=Time(jd, format="jd")
).cartesian.xyz.value
/ 1000
) # Convert to km
# Transform from satellite GCRS positions to topocentric GCRS positions
transformed_points = np.zeros_like(interpolated_points)
for i in range(len(interpolated_points)):
# position
satellite_position_gcrs = interpolated_points[i][:3]
topocentric_gcrs = satellite_position_gcrs - obs_gcrs
transformed_points[i][:3] = topocentric_gcrs
# keep original velocity components?
transformed_points[i][3:] = interpolated_points[i][3:]
# Reconstruct mean state and covariance from transformed points
mean_state, covariance = reconstruct_covariance_at_time(transformed_points)
topocentric_gcrs = mean_state[:3]
ra, dec = icrf2radec(topocentric_gcrs)
# Calculate observer-relative coordinates for altitude/azimuth
altitude, azimuth, range_km = calculate_satellite_observer_relative(
topocentric_gcrs, latitude, longitude, jd
)
# Convert to satellite position format
results.append(
satellite_position_fov(
ra=ra,
dec=dec,
covariance=covariance,
angle=None,
altitude=altitude,
azimuth=azimuth,
range_km=range_km,
julian_date=float(jd), # Ensure jd is a float
name=None, # added in fov_service
norad_id=None, # added in fov_service
orbital_data_epoch=None, # fov_service / fov_tasks when ephemeris
orbital_data_source=None, # added in fov_service
)
)
return results
[docs]
class PropagationInfo:
[docs]
def __init__(
self,
propagation_strategy,
tle_line_1,
tle_line_2,
julian_date,
latitude,
longitude,
elevation,
):
self.propagation_strategy = propagation_strategy
self.tle_line_1 = tle_line_1
self.tle_line_2 = tle_line_2
self.julian_date = julian_date
self.latitude = latitude
self.longitude = longitude
self.elevation = elevation
[docs]
def propagate(self):
return self.propagation_strategy.propagate(
self.julian_date,
self.tle_line_1,
self.tle_line_2,
self.latitude,
self.longitude,
self.elevation,
)