src.api.services.tasks.development_tasks module

import datetime import logging import time

import numpy as np from astropy import units as u from astropy.coordinates import AltAz, EarthLocation, SkyCoord from astropy.time import Time from flask import current_app from sgp4.api import Satrec from skyfield.api import EarthSatellite

from api.utils.skyfield_loader import load from skyfield.framelib import itrs as sk_itrs from skyfield.nutationlib import iau2000b from skyfield.sgp4lib import TEME as sk_TEME # noqa: N811 from sqlalchemy import and_, func

from api.celery_app import celery from api.domain import models from api.entrypoints.extensions import db from api.services.tasks.ephemeris_tasks import (

propagate_satellite_sgp4, propagate_satellite_skyfield,

) from api.utils.coordinate_systems import (

az_el_to_ra_dec, ecef_to_enu, enu_to_az_el, teme_to_ecef,

) from api.utils.time_utils import jd_to_gst

logger = logging.getLogger(__name__)

@celery.task def compare(): # pragma: no cover

# Two-line element set (TLE) for ISS tle_line_1 = “1 25544U 98067A 24087.50963912 .00035317 00000+0 63250-3 0 9993” tle_line_2 = “2 25544 51.6411 0.3077 0004676 8.9972 162.1343 15.49609731445878”

# Observer’s location lat = 33.0 long = -117.0 height = 0.0 date = 2460402.32304

# Create EarthLocation object for observer’s location location = EarthLocation(lat=33.0 * u.deg, lon=-117.0 * u.deg, height=0.0 * u.m) location_itrs = location.itrs.cartesian.xyz.value / 1000.0

# Load timescale and create time object ts = load.timescale() t = ts.ut1_jd(date)

# Compute the x, y coordinates of the CIP (Celestial Intermediate Pole) dpsi, deps = iau2000b(date)

# Compute the nutation in longitude nutation_arcsec = dpsi / 10000000 # Convert from arcseconds to degrees nutation = nutation_arcsec / 3600

# Propagate satellite position satellite_position = propagate_satellite_skyfield(

tle_line_1, tle_line_2, lat, long, height, date

)

# Calculate Greenwich Sidereal Time (GST) theta_gst = jd_to_gst(date, nutation)

# Create satellite object from TLE satellite = Satrec.twoline2rv(tle_line_1, tle_line_2)

# Calculate satellite position and velocity error, r, v = satellite.sgp4(date, 0)

# Create EarthSatellite object satellite = EarthSatellite(tle_line_1, tle_line_2, ts=ts)

# Log satellite position current_app.logger.error(f”r: {r} “) # ~20m is the issue current_app.logger.error(f”satellite - skyfield: {satellite.at(t).position.km} “)

# Convert location from ITRS to AU AU_KM = 149597870.700 # noqa: N806 location_itrs_au = location_itrs / AU_KM

# new_r calculation makes the TEME vector match the one from skyfield, # but using this one results in even less correct results # Rotate location # ITRS to XYZ RT = np.rollaxis(sk_itrs.rotation_at(t), 1) # noqa: N806 r1 = np.dot(

RT, location_itrs_au

) # np.einsum(‘ij…,j…->i…’,RT, location_itrs_au)

# Rotate satellite position from TEME to XYZ R = np.rollaxis(sk_TEME.rotation_at(t), 1) # noqa: N806 current_app.logger.error(f”: {np} “) r2 = np.einsum(“ij…,j…->i…”, R, r) current_app.logger.error(f”R.T: {R.T} “) current_app.logger.error(f”R inverse: {np.linalg.inv(R)} “) current_app.logger.error(f”diff: {R.T - np.linalg.inv(R)} “)

# Add rotated location and satellite position new_r = np.dot(R.T, r1 + r2) current_app.logger.error(f”new_r: {new_r} “)

# Convert satellite position from TEME to ECEF r_ecef = teme_to_ecef(new_r, theta_gst)

# Calculate difference between satellite position and observer’s location difference = r_ecef - location_itrs current_app.logger.error(f”difference: {difference} “)

# Convert difference from ECEF to ENU r_enu = ecef_to_enu(difference, lat, long)

# Convert ENU coordinates to azimuth and elevation az, el = enu_to_az_el(r_enu)

# Repeat the above steps for the original satellite position r_ecef = teme_to_ecef(r, theta_gst) difference = r_ecef - location_itrs current_app.logger.error(f”difference: {difference} “) r_enu = ecef_to_enu(difference, lat, long) az2, el2 = enu_to_az_el(r_enu)

# Convert azimuth and elevation to right ascension and declination ra, dec = az_el_to_ra_dec(az, el, lat, long, date) ra2, dec2 = az_el_to_ra_dec(az2, el2, lat, long, date)

# Log results current_app.logger.error(f”new ra: {ra} new dec: {dec}”) current_app.logger.error(f”sgp only ra: {ra2} sgp dec: {dec2}”)

current_app.logger.error(f”new az: {az} new alt: {el}”) current_app.logger.error(f”sgp only az: {az2} sgp alt: {el2}”) current_app.logger.error(

f”old az: {satellite_position.az} old alt: {satellite_position.alt}”

)

return “done”

@celery.task def test_fov(): # pragma: no cover

# for each satellite in the database, propagate the satellite to # the current date/time using the most recent TLE # Define the target date target_date = datetime.datetime.now() started = time.time() try:

subquery = (
db.session.query(

models.TLE.sat_id, func.min(

func.abs(

func.extract(“epoch”, models.TLE.date_collected) - func.extract(“epoch”, target_date)

)

).label(“min_difference”),

) .group_by(models.TLE.sat_id) .subquery()

)

query = (

db.session.query(models.TLE, models.Satellite) .filter_by(data_source=”celestrak”) .join(

subquery, and_(

models.TLE.sat_id == subquery.c.sat_id, func.abs(

func.extract(“epoch”, models.TLE.date_collected) - func.extract(“epoch”, target_date)

) == subquery.c.min_difference,

),

) # noqa: E501 .join(models.Satellite, models.TLE.sat_id == models.Satellite.id)

)

results = query.all()

except Exception:

return None

positions = []

data = [] jd = 2460402.61351 # Julian Date

# get alt/az of center of FOV fov = [177.07062666895, 56.29124903308] # ra/dec of center of FOV # in arcminutes radius = 30 c1 = SkyCoord(ra=fov[0], dec=fov[1], unit=”deg”) # Define the time and location of the observer

observer_location = EarthLocation(

lat=33 * u.deg, lon=-117 * u.deg, height=0 * u.m

) # Replace with actual location observer_time = Time(“2024-04-02T2:43:27”, scale=”utc”) # Replace with actual time # location_itrs = observer_location.itrs.cartesian.xyz.value / 1000 target_altaz = c1.transform_to(

AltAz(obstime=observer_time, location=observer_location)

)

for tle, sat in results:
position = propagate_satellite_sgp4(

tle.tle_line1, tle.tle_line2, 33, -117, 0, jd

)

if (

position.alt > 0 and position.alt < target_altaz.alt.deg + 2 and position.alt > target_altaz.alt.deg - 2

):

data.append((tle, sat.sat_name))

for d in data:
position = propagate_satellite_skyfield(

d[0].tle_line1, d[0].tle_line2, 33, -117, 0, jd

) c2 = SkyCoord(ra=position.ra, dec=position.dec, unit=”deg”) sep = c2.separation(c1).arcmin if sep <= radius:

positions.append((position.ra, position.dec)) current_app.logger.error(f”tle1: {d[0].tle_line1}”) current_app.logger.error(f”sep: {sep}”) current_app.logger.error(f”az: {position.az} alt: {position.alt}”)

elapsed = time.time() - started current_app.logger.error(f”Elapsed time: {elapsed}”) current_app.logger.error(f”Objects: {len(positions)}”) current_app.logger.error(f”Total checked: {len(data)}”) current_app.logger.error(f”Total results: {len(results)}”) current_app.logger.error(f”Center: {c1.ra.deg} {c1.dec.deg}”)

for p in positions:

current_app.logger.error(f”ra: {p[0]} dec: {p[1]}”)

return positions