Source code for sorcha.modules.DESDiscoveryFilter

import numpy as np
import astropy.units as u
import numba
import pandas as pd
from numba.typed import List

## filter for the DES object discovery requirements.
#   either:
#       nunique >=9
#   or:
#       nunique >=7
#       An ARCCUT limit (has to be at least 2 objects not in a triplet discovery season)
#       the observation of the discovery triplets have to be within 60/90 days (depending on distance) of eachother


#  compute_arccut, compute_triplet and compute_nunique are from DESTNOSIM: https://github.com/bernardinelli/DESTNOSIM

[docs] bound = ((50 * u.au).to(u.km).value) ** 2 # square of the boundary 50au
[docs] def DESDiscoveryFilter( observations, objectId="ObjID", mjdTime="fieldMJD_TAI", x_km="Obj_Sun_x_LTC_km", y_km="Obj_Sun_y_LTC_km", z_km="Obj_Sun_z_LTC_km", ): """ Filter for the DES object discovery requirements. This filter checks for an ARCCUT limit (has to be at least 2 objects not in a triplet discovery season) and that the observation of the discovery triplets have to be within 60/90 days (depending on distance) of eachother. Parameters ----------- observations : Pandas dataframe Dataframe of observations containing midPointTai and distance from Sun in cartesian co-ordinates. Returns ---------- observations : Pandas dataframe Modified 'observations' dataframe without observations that could not be observed. """ # creating a numpy array of the obervations obsv = observations[[objectId, mjdTime, x_km, y_km, z_km]] # new dataframe of reduced columns nameLen = obsv[objectId].str.len().max() # allows strings in objectid to be fixed length for numby array obsv = obsv.to_records( index=False, column_dtypes=dict(objectId=f"S{nameLen}", midPointTai="f8", x_km="f8", y_km="f8", z_km="f8"), ) # converts pd.dataframe to numpy array i = np.argsort(obsv[objectId]) # index of objects sorted _, idx = np.unique(obsv[objectId][i], return_index=True) # making an idx for each unique object splits = np.split(i, idx[1:]) # splitting the objects into their detections # creating a mask for dropping observations that dont meet requirments mask = np.zeros(len(obsv), dtype=bool) for obsv_indices in splits: # loop for each object thisObsv = obsv[obsv_indices] # boundary condition for triplet detection (depends on minimum distance) distance_sq = np.min(thisObsv[x_km] ** 2 + thisObsv[y_km] ** 2 + thisObsv[z_km] ** 2) window = 90 if distance_sq >= bound else 60 # sort the object's observations by time mjd i_times = np.argsort(thisObsv[mjdTime]) thisObsv = thisObsv[i_times] # check conditions are meet for detection. if compute_nunique(thisObsv[mjdTime]) >= 9: mask[obsv_indices] = True elif ( compute_arccut(thisObsv[mjdTime]) and compute_triplet(thisObsv[mjdTime], window) and compute_nunique(thisObsv[mjdTime]) >= 7 ): mask[obsv_indices] = True else: continue observations = observations[mask] return observations.sort_values(mjdTime).reset_index(drop=True)
@numba.jit( "b1(f8[:])", nopython=True, )
[docs] def compute_arccut(times): """ Computes ARCCUT, the time between the first and last detection dropping one night of detection Parameters ----------- times: list list of times, must be in DAYS Returns ---------- flag: bool returns true if ARCCUT is greater than 6 months """ arccut = 0.0 t1 = np.min(times) t2 = np.max(times) t1a = t2 t2a = t1 n = len(times) for i in range(n): if times[i] - t1 > 0.7 and times[i] < t1a: t1a = times[i] if t2 - times[i] > 0.7 and times[i] > t2a: t2a = times[i] arccut = min(t2 - t1a, t2a - t1) return arccut > 0.5 * 365.25
@numba.jit( "b1(f8[:], f8)", nopython=True, )
[docs] def compute_triplet(times, thresh): """ Computes the time difference between triplets, returns if a triplet is formed Parameters ----------- times: list list of times, must be in DAYS thresh: threshold time for pairs Returns ---------- flag: bool returns true if triplet is formed """ first_pair = 999.0 second_pair = 999.0 det = List() det.append(times[0]) n = len(times) for i in range(n - 1): if times[i + 1] - times[i] > 0.4: det.append(times[i + 1]) n = len(det) for i in range(1, n - 1): if det[i + 1] - det[i] < first_pair and det[i] - det[i - 1] < second_pair: first_pair = det[i + 1] - det[i] second_pair = det[i] - det[i - 1] if first_pair < thresh and second_pair < thresh: return True else: return False
@numba.jit( "i8(f8[:])", nopython=True, )
[docs] def compute_nunique(times): """ Computes NUNIQUE, the number of unique nights an object has been observed Parameters -------------- times : list list of times, must be in DAYS Returns ---------- nunique : int Number of unique nightly detections """ nunique = 0 n = len(times) unique = True for i in range(n): unique = True for j in range(0, i): if abs(times[j] - times[i]) < 0.4: unique = False if unique: nunique += 1 return nunique