Source code for sorcha.ephemeris.simulation_driver

from dataclasses import dataclass
from collections import defaultdict
from csv import writer
from io import StringIO

import numpy as np
import pandas as pd
import spiceypy as spice

from sorcha.ephemeris.simulation_setup import (
    create_assist_ephemeris,
    furnish_spiceypy,
    generate_simulations,
)
from sorcha.ephemeris.simulation_constants import *
from sorcha.ephemeris.simulation_geometry import *
from sorcha.ephemeris.simulation_parsing import *
from sorcha.ephemeris.pixel_dict import PixelDict
from sorcha.modules.PPOutput import PPOutWriteCSV, PPOutWriteHDF5


@dataclass
[docs] class EphemerisGeometryParameters: """Data class for holding parameters related to ephemeris geometry"""
[docs] obj_id: str = None
[docs] mjd_tai: float = None
[docs] rho: float = None
[docs] rho_hat: float = None
[docs] rho_mag: float = None
[docs] r_ast: float = None
[docs] v_ast: float = None
[docs] def get_vec(row, vecname): """ Extracts a vector from a Pandas dataframe row. Parameters ---------- row : row from the dataframe vecname : name of the vector Returns ------- : 3D numpy array """ return np.asarray([row[f"{vecname}_x"], row[f"{vecname}_y"], row[f"{vecname}_z"]])
[docs] def create_ephemeris(orbits_df, pointings_df, args, sconfigs): """Generate a set of observations given a collection of orbits and set of pointings. Parameters ---------- orbits_df : pandas dataframe The dataframe containing the collection of orbits. pointings_df : pandas dataframe The dataframe containing the collection of telescope/camera pointings. args : Various arguments necessary for the calculation sconfigs: Dataclass of configuration file arguments. Various configuration parameters necessary for the calculation ang_fov : float The angular size (deg) of the field of view buffer : float The angular size (deg) of the buffer around the field of view. A buffer is required to allow for some motion between the time of the observation and the time of the picket (t_picket) picket_interval : float The interval (days) between picket calculations. This is 1 day by default. Current there is only one such interval, used for all objects. It is currently possible for extremely fast-moving objects to be missed. This will be remedied in future releases. obsCode : string The MPC code for the observatory. (This is current a configuration parameter, but these should be included in the visit information, to allow for multiple observatories. nside : integer The nside value used for the HEALPIx calculations. Must be a power of 2 (1, 2, 4, ...) nside=64 is current default. n_sub_intervals: int Number of sub-intervals for the Lagrange interpolation (default: 101) Returns ------- observations: pandas dataframe The dataframe of observations needed for Sorcha to continue Notes ------- This works by calculating and regularly updating the sky-plane locations (unit vectors) of all the objects in the collection of orbits. The HEALPix index for each of the locations is calculated. A dictionary with pixel indices as keys and lists of ObjIDs for those objects in each HEALPix tile as values is generated. An individual one of these calculations is called a 'picket', as one element of a long picket fence. Typically, the interval between pickets is one day. Given a specific pointing, the set of HEALPix tiles that are overlapped by the pointing (and a buffer region) is computed. Then the precise locations of just those objects within that set of HEALPix tiles are computed. Details for those that actually do land within the field of view are passed along. """ verboselog = args.pplogger.info if args.loglevel else lambda *a, **k: None ang_fov = sconfigs.simulation.ar_ang_fov buffer = sconfigs.simulation.ar_fov_buffer ang_fov_buffer = ang_fov + buffer picket_interval = sconfigs.simulation.ar_picket obsCode = sconfigs.simulation.ar_obs_code nside = 2**sconfigs.simulation.ar_healpix_order n_sub_intervals = sconfigs.simulation.ar_n_sub_intervals ephemeris_csv_filename = None if args.output_ephemeris_file and args.outpath: ephemeris_csv_filename = os.path.join(args.outpath, args.output_ephemeris_file) verboselog("Building ASSIST ephemeris object.") ephem, gm_sun, gm_total = create_assist_ephemeris(args, sconfigs.auxiliary) verboselog("Furnishing SPICE kernels.") furnish_spiceypy(args, sconfigs.auxiliary) verboselog("Generating ASSIST+REBOUND simulations.") sim_dict = generate_simulations(ephem, gm_sun, gm_total, orbits_df, args) observatories = Observatory(args, sconfigs.auxiliary) output = StringIO() in_memory_csv = writer(output) column_names = ( "ObjID", "FieldID", "fieldMJD_TAI", "fieldJD_TDB", "Range_LTC_km", "RangeRate_LTC_km_s", "RA_deg", "RARateCosDec_deg_day", "Dec_deg", "DecRate_deg_day", "Obj_Sun_x_LTC_km", "Obj_Sun_y_LTC_km", "Obj_Sun_z_LTC_km", "Obj_Sun_vx_LTC_km_s", "Obj_Sun_vy_LTC_km_s", "Obj_Sun_vz_LTC_km_s", "Obs_Sun_x_km", "Obs_Sun_y_km", "Obs_Sun_z_km", "Obs_Sun_vx_km_s", "Obs_Sun_vy_km_s", "Obs_Sun_vz_km_s", "phase_deg", ) column_types = defaultdict(ObjID=str, FieldID=str).setdefault(float) in_memory_csv.writerow(column_names) # t_picket is the last time at which the sky positions of all the objects # were calculated and placed into a healpix dictionary, i.e. the # update_pixel_dict() function is called. That calculation is redone at # regular (tunable) intervals. # Setting t_picket to 0 ensures that the function is called on the # first run. t_picket = 0.0 verboselog("Generating ephemeris...") pixdict = PixelDict( pointings_df["fieldJD_TDB"].iloc[0], sim_dict, ephem, obsCode, observatories, picket_interval, nside, n_sub_intervals=n_sub_intervals, ) for _, pointing in pointings_df.iterrows(): mjd_tai = float(pointing["observationMidpointMJD_TAI"]) # If the observation time is too far from the # time of the last set of ballpark sky position, # compute a new set desigs = pixdict.get_designations( pointing["fieldJD_TDB"], pointing["fieldRA_deg"], pointing["fieldDec_deg"], ang_fov ) unit_vectors = pixdict.interpolate_unit_vectors(desigs, pointing["fieldJD_TDB"]) visit_vector = get_vec(pointing, "visit_vector") r_obs = get_vec(pointing, "r_obs") for k, uv in unit_vectors.items(): ephem_geom_params = EphemerisGeometryParameters() ephem_geom_params.obj_id = k ephem_geom_params.mjd_tai = mjd_tai v = sim_dict[k] sim, ex = v["sim"], v["ex"] uv /= np.linalg.norm(uv) ang = np.arccos(np.dot(uv, visit_vector)) * 180 / np.pi if ang < ang_fov_buffer: ( ephem_geom_params.rho, ephem_geom_params.rho_mag, _, ephem_geom_params.r_ast, ephem_geom_params.v_ast, ) = integrate_light_time(sim, ex, pointing["fieldJD_TDB"] - ephem.jd_ref, r_obs, lt0=0.01) ephem_geom_params.rho_hat = ephem_geom_params.rho / ephem_geom_params.rho_mag ang_from_center = 180 / np.pi * np.arccos(np.dot(ephem_geom_params.rho_hat, visit_vector)) if ang_from_center < ang_fov_buffer: out_tuple = calculate_rates_and_geometry(pointing, ephem_geom_params) in_memory_csv.writerow(out_tuple) verboselog("Ephemeris generated.") # reset to the beginning of the in-memory CSV output.seek(0) ephemeris_df = pd.read_csv(output, dtype=column_types) # if the user has defined an output file name for the ephemeris results, write out to that file if ephemeris_csv_filename: verboselog("Writing out ephemeris results to file.") write_out_ephemeris_file(ephemeris_df, ephemeris_csv_filename, args, sconfigs) # join the ephemeris and input orbits dataframe, take special care to make # sure the 'ObjID' column types match. verboselog("Joining ephemeris to orbits dataframe.") ephemeris_df["ObjID"] = ephemeris_df["ObjID"].astype("string") orbits_df["ObjID"] = orbits_df["ObjID"].astype("string") observations = ephemeris_df.join(orbits_df.set_index("ObjID"), on="ObjID") spice.kclear() # Return the dataframe needed for Sorcha to continue return observations
[docs] def get_residual_vectors(v1): """ Decomposes the vector into two unit vectors to facilitate computation of on-sky angles The decomposition is such that A = (-sin (RA), cos(RA), 0) is in the direction of increasing RA, and D = (-sin(dec)cos (RA), -sin(dec) sin(RA), cos(dec)) is in the direction of increasing Dec The triplet (A,D,v1) forms an orthonormal basis of the 3D vector space. Parameters ---------- v1 : array, shape = (3,)) The vector to be decomposed Returns ------- A : array, shape = (3,)) A vector D : array, shape = (3,)) D vector """ x, y, z = v1 cosd = np.sqrt(1 - z * z) A = np.array((-y, x, 0.0)) / cosd D = np.array((-z * x / cosd, -z * y / cosd, cosd)) return A, D
[docs] def calculate_rates_and_geometry(pointing: pd.DataFrame, ephem_geom_params: EphemerisGeometryParameters): """Calculate rates and geometry for objects within the field of view Parameters ---------- pointing : pandas dataframe The dataframe containing the pointing database. ephem_geom_params : EphemerisGeometryParameters Various parameters necessary to calculate the ephemeris Returns ------- : tuple Tuple containing the ephemeris parameters needed for Sorcha post processing. """ r_sun = get_vec(pointing, "r_sun") r_obs = get_vec(pointing, "r_obs") v_sun = get_vec(pointing, "v_sun") v_obs = get_vec(pointing, "v_obs") ra0, dec0 = vec2ra_dec(ephem_geom_params.rho_hat) drhodt = ephem_geom_params.v_ast - v_obs drho_magdt = (1 / ephem_geom_params.rho_mag) * np.dot(ephem_geom_params.rho, drhodt) ddeltatdt = drho_magdt / (SPEED_OF_LIGHT) drhodt = ephem_geom_params.v_ast * (1 - ddeltatdt) - v_obs A, D = get_residual_vectors(ephem_geom_params.rho_hat) drho_hatdt = ( drhodt / ephem_geom_params.rho_mag - drho_magdt * ephem_geom_params.rho_hat / ephem_geom_params.rho_mag ) dradt = np.dot(A, drho_hatdt) ddecdt = np.dot(D, drho_hatdt) r_ast_sun = ephem_geom_params.r_ast - r_sun v_ast_sun = ephem_geom_params.v_ast - v_sun r_ast_obs = ephem_geom_params.r_ast - r_obs phase_angle = np.arccos( np.dot(r_ast_sun, r_ast_obs) / (np.linalg.norm(r_ast_sun) * np.linalg.norm(r_ast_obs)) ) obs_sun = r_obs - r_sun dobs_sundt = v_obs - v_sun return ( ephem_geom_params.obj_id, pointing["FieldID"], ephem_geom_params.mjd_tai, pointing["fieldJD_TDB"], ephem_geom_params.rho_mag * AU_KM, drho_magdt * AU_KM / (24 * 60 * 60), ra0, dradt * 180 / np.pi, dec0, ddecdt * 180 / np.pi, r_ast_sun[0] * AU_KM, r_ast_sun[1] * AU_KM, r_ast_sun[2] * AU_KM, v_ast_sun[0] * AU_KM / (24 * 60 * 60), v_ast_sun[1] * AU_KM / (24 * 60 * 60), v_ast_sun[2] * AU_KM / (24 * 60 * 60), obs_sun[0] * AU_KM, obs_sun[1] * AU_KM, obs_sun[2] * AU_KM, dobs_sundt[0] * AU_KM / (24 * 60 * 60), dobs_sundt[1] * AU_KM / (24 * 60 * 60), dobs_sundt[2] * AU_KM / (24 * 60 * 60), phase_angle * 180 / np.pi, )
[docs] def write_out_ephemeris_file(ephemeris_df, ephemeris_csv_filename, args, sconfigs): """Writes the ephemeris out to an external file. Parameters ---------- ephemeris_df : Pandas DataFrame The data frame of ephemeris information to be written out. ephemeris_csv_filename : string The filepath (without extension) to write the ephemeris file to. args: sorchaArguments object or similar Command-line arguments from Sorcha. sconfigs: dataclass Dataclass of configuration file arguments. Returns ------- None. """ verboselog = args.pplogger.info if args.loglevel else lambda *a, **k: None if sconfigs.input.eph_format == "csv": verboselog("Outputting ephemeris to CSV file...") PPOutWriteCSV(ephemeris_df, ephemeris_csv_filename + ".csv") elif sconfigs.input.eph_format == "whitespace": verboselog("Outputting ephemeris to whitespaced CSV file...") PPOutWriteCSV(ephemeris_df, ephemeris_csv_filename + ".csv", separator=" ") elif sconfigs.input.eph_format == "hdf5" or sconfigs.output.output_format == "h5": verboselog("Outputting ephemeris to HDF5 binary file...") PPOutWriteHDF5(ephemeris_df, ephemeris_csv_filename + ".h5", "sorcha_ephemeris")