Source code for sorcha.readers.CombinedDataReader

"""The CombinedDataReader class supports loading the entire input data
for the simulator post processing by using individuals reader classes
to read individual input files and combining the data into a single table.

The CombinedDataReader object reads the data in blocks to limit memory usage.
For each blocks, it uses two stages:
1) It reads a range of individual rows from the ``primary_reader``. By default this
   reader is the first auxiliary data reader, but can be set to the ephemeris reader.
   This reader is used to extract a list of object IDs for this block.
2) For each of the readers (ephemeris and auxiliary data) load in all the rows
   corresponding to the object IDs extracted in stage 1.

For example, if the ephemeris file is used as the primary reader, the algorithm
will load data in blocks of the ephemeris rows and join in the auxiliary data
for just the object IDs on those rows. It is not guaranteed to include all
rows for the current objects.
"""

import logging
import pandas as pd
import sys
import collections


[docs] class CombinedDataReader: def __init__(self, ephem_primary=False, **kwargs): """ Parameters ---------- ephem_primary: boolean, optional Use the ephemeris reader as the primary reader. Otherwise uses the first auxiliary data reader. Default = False **kwargs : dictionary, optional Extra arguments """
[docs] self.ephem_reader = None
[docs] self.aux_data_readers = []
[docs] self.block_start = 0
[docs] self.ephem_primary = ephem_primary
[docs] def add_ephem_reader(self, new_reader): """Add a new reader for ephemeris data. Parameters ---------- new_reader : ObjectDataReader The reader for a specific input file. """ pplogger = logging.getLogger(__name__) if self.ephem_reader is not None: pplogger.error("ERROR: Ephemeris reader already set.") sys.exit("ERROR: Ephemeris reader already set.") self.ephem_reader = new_reader
[docs] def add_aux_data_reader(self, new_reader): """Add a new object reader that corresponds to an auxiliary input data type.. Parameters ---------- new_reader : ObjectDataReader The reader for a specific input file. """ self.aux_data_readers.append(new_reader)
[docs] def check_aux_object_ids(self): """Checks the ObjIDs in all of the auxiliary data readers to make sure both files contain exactly the same ObjIDs. """ pplogger = logging.getLogger(__name__) if len(self.aux_data_readers) <= 1: pplogger.error("ERROR: Only one or zero aux_data_readers set.") sys.exit("ERROR: Only one or zero aux_data_readers set.") for i in range(0, len(self.aux_data_readers)): self.aux_data_readers[i]._build_id_map() if i == 0: primary_ids = self.aux_data_readers[i].obj_id_table continue raise_error = False if len(primary_ids) != len(self.aux_data_readers[i].obj_id_table): pplogger.error( "ERROR: mismatched ObjIDs in auxiliary input files. IDs in {} do not match {}.".format( self.aux_data_readers[0].filename, self.aux_data_readers[i].filename ) ) sys.exit( "ERROR: mismatched ObjIDs in auxiliary input files. IDs in {} do not match {}.".format( self.aux_data_readers[0].filename, self.aux_data_readers[i].filename ) ) elif collections.Counter(primary_ids["ObjID"].values) != collections.Counter( self.aux_data_readers[i].obj_id_table["ObjID"].values ): pplogger.error( "ERROR: mismatched ObjIDs in auxiliary input files. IDs in {} do not match {}.".format( self.aux_data_readers[0].filename, self.aux_data_readers[i].filename ) ) sys.exit( "ERROR: mismatched ObjIDs in auxiliary input files. IDs in {} do not match {}.".format( self.aux_data_readers[0].filename, self.aux_data_readers[i].filename ) )
[docs] def read_block(self, block_size=None, verbose=False, **kwargs): """Reads in a set number of rows from the input, performs post-processing and validation, and returns a data frame. Parameters ----------- block_size: integer, optional the number of rows to read in. Use block_size=None to read in all available data. Default = None verbose : boolean, optional Use verbose logging. Default = False **kwargs : dictionary, optional Extra arguments Returns ----------- res_df : pandas dataframe dataframe of the combined object data. """ pplogger = logging.getLogger(__name__) verboselog = pplogger.info if verbose else lambda *a, **k: None if self.ephem_reader is None: pplogger.error("ERROR: No ephemeris reader provided.") sys.exit("ERROR: No ephemeris reader provided.") if len(self.aux_data_readers) == 0: pplogger.error("ERROR: No auxiliary readers provided.") sys.exit("ERROR: No auxiliary readers provided.") # Load object IDs from the primary table. if self.ephem_primary: verboselog(f"Loading object IDs from: {self.ephem_reader.get_reader_info()}") ephem_df = self.ephem_reader.read_rows(self.block_start, block_size) self.block_start += len(ephem_df) if not "ObjID" in ephem_df.columns: pplogger.error("ERROR: No ObjID provided for ephemerides.") sys.exit("ERROR: No ObjID provided for ephemerides.") obj_ids = ephem_df["ObjID"].unique().tolist() primary_df = None else: verboselog(f"Loading object IDs from: {self.aux_data_readers[0].get_reader_info()}") primary_df = self.aux_data_readers[0].read_rows(self.block_start, block_size) self.block_start += len(primary_df) if not "ObjID" in primary_df.columns: pplogger.error("ERROR: No ObjID provided.") sys.exit("ERROR: No ObjID provided.") obj_ids = primary_df["ObjID"].unique().tolist() ephem_df = None # No more data to load. if len(obj_ids) == 0: return None # Load in the data for this block. if ephem_df is None: ephem_df = self.ephem_reader.read_objects(obj_ids) ephem_ids = set(ephem_df["ObjID"].unique().tolist()) for i, reader in enumerate(self.aux_data_readers): # Skip reading the data from the first auxiliary file if we already read it. if i == 0 and primary_df is not None: current_df = primary_df else: verboselog(f"Reading input file: {reader.get_reader_info()}") current_df = reader.read_objects(obj_ids) # Check that the new dataframe has at least the object IDs matching # the ephemeris frame. verboselog("Checking Object IDs in auxiliary data") current_ids = set(pd.unique(current_df["ObjID"]).astype(str)) if not ephem_ids.issubset(current_ids): # pragma: no cover pplogger.error(f"ERROR: At least one missing ObjID in {reader.get_reader_info()}") sys.exit(f"ERROR: At least one missing ObjID {reader.get_reader_info()}") verboselog("Joining auxiliary data with ephemeris") ephem_df = ephem_df.join(current_df.set_index("ObjID"), on="ObjID") return ephem_df
[docs] def read_aux_block(self, block_size=None, verbose=False, **kwargs): """Reads in a set number of rows from the input, performs post-processing and validation, and returns a data frame. This function DOES NOT include the ephemeris data in the returned data frame. It is to be used when generating the ephemeris during the execution of Sorcha. Parameters ----------- block_size : integer, optional the number of rows to read in. Use block_size=None to read in all available data. Default = None verbose : boolean, optional use verbose logging. Default = False **kwargs : dictionary, optional Extra arguments Returns ----------- res_df : pandas dataframe dataframe of the combined object data, excluding any ephemeris data. """ pplogger = logging.getLogger(__name__) verboselog = pplogger.info if verbose else lambda *a, **k: None if len(self.aux_data_readers) == 0: pplogger.error("ERROR: No auxiliary readers provided.") sys.exit("ERROR: No auxiliary readers provided.") # Load object IDs from the primary table. verboselog(f"Loading object IDs from: {self.aux_data_readers[0].get_reader_info()}") primary_df = self.aux_data_readers[0].read_rows(self.block_start, block_size) self.block_start += len(primary_df) if not "ObjID" in primary_df.columns: pplogger.error("ERROR: No ObjID provided.") sys.exit("ERROR: No ObjID provided.") obj_ids = primary_df["ObjID"].unique().tolist() # No more data to load. if len(obj_ids) == 0: return None aux_data_df = None for i, reader in enumerate(self.aux_data_readers): # Skip reading the data from the first auxiliary file if we already read it. if i == 0 and primary_df is not None: current_df = primary_df else: verboselog(f"Reading input file: {reader.get_reader_info()}") current_df = reader.read_objects(obj_ids) # Check that the new dataframe has at least the object IDs matching # the ephemeris frame. verboselog("Checking Object IDs in auxiliary data") current_ids = set(pd.unique(current_df["ObjID"]).astype(str)) if not current_ids.issubset(obj_ids): # pragma: no cover pplogger.error(f"ERROR: At least one missing ObjID in {reader.get_reader_info()}") sys.exit(f"ERROR: At least one missing ObjID {reader.get_reader_info()}") if i == 0: aux_data_df = current_df else: verboselog("Joining auxiliary data without ephemeris") aux_data_df = aux_data_df.join(current_df.set_index("ObjID"), on="ObjID") return aux_data_df