import pandas as pd
import logging
import sys
from sorcha.readers.ObjectDataReader import ObjectDataReader
[docs]
class CSVDataReader(ObjectDataReader):
"""A class to read in object data files stored as CSV or whitespace
separated values.
Requires that the file's first column is ObjID.
"""
def __init__(self, filename, sep="csv", header=-1, **kwargs):
"""A class for reading the object data from a CSV file.
Parameters
----------
filename : string
Location/name of the data file.
sep : string, optional
Format of input file ("whitespace"/"comma"/"csv").
Default = csv
header : integer, optional
The row number of the header. If not provided, does an automatic search.
Default = -1
**kwargs: dictionary, optional
Extra arguments
"""
super().__init__(**kwargs)
[docs]
self.filename = filename
if sep not in ["whitespace", "csv", "comma"]:
pplogger = logging.getLogger(__name__)
pplogger.error(f"ERROR: Unrecognized delimiter ({sep})")
sys.exit(f"ERROR: Unrecognized delimiter ({sep})")
# To pre-validation and collect the header information.
# A table holding just the object ID for each row. Only populated
# if we try to read data for specific object IDs.
[docs]
self.obj_id_table = None
[docs]
def get_reader_info(self):
"""Return a string identifying the current reader name
and input information (for logging and output).
Returns
--------
name : string
The reader information.
"""
return f"CSVDataReader:{self.filename}"
[docs]
def _find_and_validate_header_line(self, header=-1):
"""Read and validate the header line. If no line number is provided, use
a heuristic match to find the header line. This is used in cases
where the header is not the first line and we want to skip down.
Parameters
----------
header : integer, optional
The row number of the header. If not provided, does an automatic search.
Default = -1
Returns
--------
: integer
The line index of the header.
"""
pplogger = logging.getLogger(__name__)
with open(self.filename) as fh:
for i, line in enumerate(fh):
# Check we have either found the specified line or no line is specified and
# our heuristic matches.
if (header >= 0 and header == i) or (header < 0 and line.startswith("ObjID")):
pplogger.info(f"Reading line {i} of {self.filename} as header:\n{line}")
self._check_header_line(line)
return i
# Give up after 100 lines.
if i > 100: # pragma: no cover
break
error_str = (
f"ERROR: CSVReader: column headings not found in the first 100 lines of {self.filename}. "
"Ensure column headings exist in input files and first column is ObjID."
)
pplogger.error(error_str)
sys.exit(error_str)
return 0
[docs]
def _validate_csv(self, header):
"""Perform a validation of the CSV file, such as checking for blank lines.
This is an expensive test and should only be performed when something
has gone wrong. This is needed because panda's read_csv() function can
given vague errors (such as failing with an index error if the file
has blank lines at the end).
Parameters
----------
header : integer
The row number of the header.
Returns
-------
: bool
True indicating success.
"""
pplogger = logging.getLogger(__name__)
with open(self.filename) as fh:
for i, line in enumerate(fh):
if i >= header:
# Check for blank lines. We do this explicitly because pandas read_csv()
# has problems when skipping lines and finding blank lines at the end.
if len(line) == 0 or line.isspace():
error_str = f"ERROR: CSVReader: found a blank line on line {i} of {self.filename}."
pplogger.error(error_str)
sys.exit(error_str)
return True
[docs]
def _read_rows_internal(self, block_start=0, block_size=None, **kwargs):
"""Reads in a set number of rows from the input.
Parameters
-----------
block_start : integer, optional
The 0-indexed row number from which
to start reading the data. For example in a CSV file
block_start=2 would skip the first two lines after the header
and return data starting on row=2. Default =0
block_size: integer, optional, default=None
The number of rows to read in.
Use block_size=None to read in all available data.
default =None
**kwargs : dictionary, optional
Extra arguments
Returns
-----------
res_df : pandas dataframe
Dataframe of the object data.
"""
# Skip the rows before the header and then begin_loc rows after the header.
skip_rows = []
if self.header_row > 0:
skip_rows = [i for i in range(0, self.header_row)]
if block_start > 0:
skip_rows.extend([i for i in range(self.header_row + 1, self.header_row + 1 + block_start)])
# Read the rows.
if self.sep == "whitespace":
res_df = pd.read_csv(
self.filename,
sep="\\s+",
skiprows=skip_rows,
nrows=block_size,
)
else:
res_df = pd.read_csv(
self.filename,
delimiter=",",
skiprows=skip_rows,
nrows=block_size,
)
return res_df
[docs]
def _build_id_map(self):
"""Builds a table of just the object IDs"""
if self.obj_id_table is not None:
return
if self.sep == "whitespace":
self.obj_id_table = pd.read_csv(
self.filename,
sep="\\s+",
usecols=["ObjID"],
header=self.header_row,
)
else:
self.obj_id_table = pd.read_csv(
self.filename,
delimiter=",",
usecols=["ObjID"],
header=self.header_row,
)
self.obj_id_table = self._validate_object_id_column(self.obj_id_table)
[docs]
def _read_objects_internal(self, obj_ids, **kwargs):
"""Read in a chunk of data for given object IDs.
Parameters
-----------
obj_ids : list
A list of object IDs to use.
**kwargs : dictionary, optional
Extra arguments
Returns
-----------
res_df : pandas dataframe
The dataframe for the object data.
"""
self._build_id_map()
# Create list of only the matching rows for these object IDs and the header row.
skipped_row = [True] * self.header_row # skip the pre-header
skipped_row.extend([False]) # Keep the the column header
skipped_row.extend(~self.obj_id_table["ObjID"].isin(obj_ids).values)
# Read the rows.
try:
if self.sep == "whitespace":
res_df = pd.read_csv(
self.filename,
sep="\\s+",
skiprows=(lambda x: skipped_row[x]),
)
else:
res_df = pd.read_csv(
self.filename,
delimiter=",",
skiprows=(lambda x: skipped_row[x]),
)
except IndexError as current_exc:
# Check if there is a more understandable error we can raise.
self._validate_csv(self.header_row)
# If we do not detect a problem with _validate_csv, reraise the error.
raise current_exc
return res_df