Source code for data_transfer

__authors__ = 'Greg Cohn'
__version__ = '0'

import csv
import pandas as pd
from os.path import join, isfile, getmtime
from os import makedirs, listdir
from datetime import datetime
from post_gce_qc import qaqc
from pyarrow import string, float32

[docs] class ProvisionalDataFormat: """ This class is a child class that holds format information about the flat files exported from GCE provisional QA. Both load and write classes require this base format information to function. See `normalize_cols.m` from GCE Tools for normalization context: https://bitbucket.org/hjandrews/im_gcetoolbox/src/master/core/normalize_cols.m """ def __init__(self, file_n='./config.yaml', skip_nrows=5, fname_base='MS043PPT_PPT_L1_5min_', cols=None, strtyr=2018, endyr=2022): """ Initiate data transfer classes with basic information about where file directories are located and establishing a file naming convention. :param file_n: str. File path to config.yaml containing file location and format information. :param skip_nrows: int. Number of header rows to skip in files. :param fname_base: str. Base name of output files. :param cols: list. List of column names. If None, column names are automatically parsed from first year's file. :param strtyr: int. First year of data, primarily used for file naming. :param endyr: int. Last year of data, primarily used for file naming. """ self.skip_nrows = skip_nrows self.fname_base = fname_base self.strtyr = strtyr self.endyr = endyr config = qaqc._load_yaml(file_n) if '.yaml' in file_n or '.yml' in file_n else file_n self.data_dir = config['data_path'] self.header = config['data_header'] # use test year to load column names fpath = join(self.data_dir, f"{fname_base}{strtyr}.csv") if not cols: try: self._find_col(fpath, line_num=2) except FileNotFoundError: raise FileNotFoundError(f'Data path provided does not exist:\n{fpath}\nEdit ./config.yaml') elif cols is not None: self.cols = cols self.df = pd.DataFrame() def _find_col(self, fpath, line_num=2): col = [] with open(fpath, 'r') as f: for j, line in enumerate(f): if j == line_num: col = [l.strip('"') for l in line.strip().split(',')] break self.cols = col
[docs] @staticmethod def find_probe(df, search_list=[], search_col='Parameter'): """ Find probe name in GCE output. GCE output is in a flat format. As of April 2023 there are the following columns: Example:: Date Parameter Value Flag_Value 2018-09-30 23:55:00 UPL_PRECIP_INST_455_0_01 45.050 <NA> 2018-09-30 23:55:00 UPL_PRECIP_TOT_455_0_01 0.000 <NA> 2018-09-30 23:55:00 UPL_PRECIP_ACC_455_0_01 2372.92 <NA> So to find all the data for probe 1 at UPLO, you need to query for 3 different parameters from the flat file. This function looks at all the unique component names, currently in the parameter col, and searches for a list of identifiers, such as site and probe number, or site and probe height. It returns a list of Parameter names to query all data for a given probe. :param df: Pandas dataframe containing GCE output :param search_list: list of strings to identify a probe. Searches for Parameters that contain the whole list. :param search_col: Column to search for probe names :return: list of Parameter names that contain data for search probe. """ has_probe = [] for param in df[search_col].unique(): found = [] for search in search_list: found.append(search in param) if all(found): has_probe.append(param) if len(has_probe) == 0: raise NameError(f'No prameters could be found that contain:\nsite: {search_list[0]}\nprobe_num: {search_list[1]}') return has_probe
[docs] class LoadProvisionalData(ProvisionalDataFormat): """ Enable files from multiple water years to be loaded into a single pd.DataFrame and allow all parameters for an individual probe to be queried from the data and transformed into a pivot table. This class establishes the location and file naming convention of the data. Then it loads all files for the water years requested. Additional methods are then provided to create pivot tables for individual probes. """ def __init__(self, **kwargs): ProvisionalDataFormat.__init__(self, **kwargs)
[docs] def load_ppt_data(self, **kwargs): """ Load GCE files for multiple water years. Multiple years are concatenated together. All files must be in data_dir defined for this instance. Any keyword argument accepted by :ref:`pandas:/reference/api/pandas.read_csv.rst` can be supplied to this function and will be passed to `pandas.read_csv()`. .. Warning:: Data is filtered by Water Year (WY). Even if file contains data spanning a different date range, it will be trimmed. `f"10/1/{y - 1}":f"9/30/{y}"]` .. Warning:: Assumes filename is format <fname_base><year>.csv. The method only works with year as suffix. :param strtyr: int. First year to import. :param endyr: int. Last year to import. If same as first year, only one year is imported. :param fname_base: str. Filename without year (year must be suffix) """ data_dir = self.data_dir fname_base = self.fname_base strtyr = self.strtyr endyr = self.endyr col = self.cols # kwargs for pd.read_csv() kwargs.setdefault('dtype', {'Flag_Value': pd.ArrowDtype(string()), 'Parameter': 'category', 'Value': pd.ArrowDtype(float32())}) # 50% time reduction using pyarrow types. # Output will be equivalent to defining dtypes as 'string[pyarrow]' or pd.StringDtype('pyarrow') kwargs.setdefault('dtype_backend', 'pyarrow') kwargs.setdefault('skiprows', self.skip_nrows) kwargs.setdefault('index_col', 0) # load data frame for NEW test year df_list = [] for y in range(strtyr, endyr + 1, 1): fpath = join(data_dir, f"{fname_base}{y}.csv") # read csv # get rid of large object columns in memory dfnew = pd.read_csv(fpath, names=col, parse_dates=True, engine='pyarrow', **kwargs) # files start at midnight on first day of WY, but they also end at midnight on the last day of the WY, # so midnight is repeated when multiple files are appended. # This enforces a non-overlapping water year. wystrt = pd.to_datetime(f"10/1/{y - 1} 0005") wyend = pd.to_datetime(f"10/1/{y} 0000") df_list.append(dfnew.loc[(dfnew.index >= wystrt) & (dfnew.index <= wyend), :]) self.df = pd.concat(df_list)
[docs] @classmethod def pivot_on_probe(cls, df, site, probe_num, keep_col_name=['Value', 'Flag_Value'], probeid_col='Parameter'): """ Create pivot table of data for a single probe from GCE flat file. As of April 2023, GCE precip output is a flat file with separate labels for 3 components. This method finds the 3 components for the requested probe and returns a pivot table. Components: * INST - The instantaneous measure of tank height * TOT - The total precipitation measured since the last timestep * ACC - The accumulated precip, a cumulative sum of WY to date. Example:: FlatFormat +-----------------------+---------------------------+--------+----------+ |Date |Parameter |Value |Flag_Value| +=======================+===========================+========+==========+ |2018-09-30 23:55:00 |CEN_PRECIP_INST_625_0_02 |44.150 |<NA> | +-----------------------+---------------------------+--------+----------+ |2018-09-30 23:55:00 |CEN_PRECIP_TOT_625_0_02 |0.000 |<NA> | +-----------------------+---------------------------+--------+----------+ |2018-09-30 23:55:00 |CEN_PRECIP_ACC_625_0_02 |1739.51 |<NA> | +-----------------------+---------------------------+--------+----------+ Pivot +----------------------+--------+----------+-------+-----------+-------+--------+ |Date |INST |INST_Flag |TOT |TOT_Flag |ACC |ACC_Flag| +======================+========+==========+=======+===========+=======+========+ |2018-09-30 23:55:00 |44.15 |<NA> |0.00 |<NA> |1739.51|<NA> | +----------------------+--------+----------+-------+-----------+-------+--------+ :param df: Pandas DataFrame of a GCE flat file :param site: str containing 3 character site ID :param probe_num: str containing 2 character probe num :param keep_col_name: list of column names to keep in final output :param probeid_col: str. Column name containing site ID. :return: Pandas DataFrame. Pivot table of data and flags for a single probe. """ # find all parameter names for this probe params = cls.find_probe(df, search_list=[site, probe_num]) # create a table with values and flags for all three data components for this probe dfs = {} for p in params: dfp = df.loc[df[probeid_col] == p, keep_col_name] # extract 'ACC', 'TOT', or 'INST' from param name and add to colum name name = p.split('_')[2] dfp.rename(columns={keep_col_name[0]: name, keep_col_name[1]: f'{name}_Flag'}, inplace=True) dfs.update(dfp) tbl = pd.DataFrame(data=dfs).sort_index() # Fill out any missing data before returning table #################################################### tbl.loc[tbl['TOT'].isna(), 'TOT'] = 0 # Tipping Buckets do not have an INST field for c in ["INST", "ACC"]: try: # for QA analyses, large stretches of nan values obscure a lot of analytical efforts tbl.loc[:, c] = tbl.loc[:, c].ffill(axis=0) except KeyError: # tipping buckets have no INST continue return tbl
[docs] class WriteProvisionalData(ProvisionalDataFormat): """ Provides methods to create output files that match GCE normalized format. This module is designed to import data in the GCE normalized format. Other modules then perform QAQC on the data Then this class provides methods to create output files that match the original GCE normalized format. Other modules process data in a pivot table format. This class melts the pivot table back into a normalized/flat format and then writes csv files. """ def __init__(self, **kwargs): ProvisionalDataFormat.__init__(self, **kwargs) self.parameters = [] self.expected_probes = pd.DataFrame() def _find_unique_params(self, prov_file, param_col=['Parameter']): prov_fpath = join(self.data_dir, prov_file) df = pd.read_csv(prov_fpath, names=self.cols, skiprows=self.skip_nrows, usecols=param_col) return list(df[param_col[0]].unique()) @staticmethod def _melt_df(df, site, probe_num ,ht, cols=None, **flat_format): # fill out dictionary for flat format to melt to flat_format.setdefault('value_name', 'Value') flat_format.setdefault('var_name', 'Parameter') flat_format.setdefault('ignore_index', False) # convert column names to parameter name val_col = {orig_col:f'{site}_PRECIP_{chg_col}_{ht}_0_{probe_num}' for orig_col, chg_col in cols.items()} df.rename(columns=val_col, inplace=True) flat_format.setdefault('value_vars', list(val_col.values())) return pd.melt(df, **flat_format)
[docs] def set_expected_probes(self, tests=[0, -1]): """ Get a list of expected output probe names from GCE output files. By defualt it only looks at the first and last file, however any number of files can be used by adding additional integers to the list. :param tests: int. Index of files to be inspected. :return: DataFrame of probe names. """ prov_files = [f for f in listdir(self.data_dir) if f.endswith('.csv')] if len(prov_files) == 1: tests = [tests[0]] # assume that first and last file samples all probes. probes = [] for test in tests: probes.extend([p for p in self._find_unique_params(prov_files[test]) if p not in probes]) self.expected_probes = pd.DataFrame({'probes': probes})
[docs] @staticmethod def format_str_columns(df, cols=("Flag_Value", "Parameter")): """ Explicitly format string columns by wrapping values in double quotes. :param df: pd.DataFrame of data in flat format :return: """ # Replace NaNs with empty strings df.fillna({k: '' for k in cols}, inplace=True) for c in cols: df.loc[:, c] = '"' + df.loc[:, c].str.strip('"') + '"'
[docs] def get_probe_height(self, site, probe_num): """ Get probe height for an input site and probe number. Site and probe number must be converted to a GCE style probe name in the output file. This is done by first finding the probe in the original GCE file and extracting the height. :Example: ``Site = UPL, Probe = 02`` Needs to be become ``'UPL_PRECIP_INST_625_0_02'`` :param site: :param probe_num: :return: """ probes_df = self.expected_probes probe_names = self.find_probe(probes_df, search_list=[site, probe_num], search_col='probes') return probe_names[0].split('_')[-3]
[docs] def melt_ppt_data(self, data_dict, site, probe_num, flag_col={'final_flag':'TOT'}, ppt_col={'tank_height':'INST', 'adj_precip':'TOT'}): """ Convert data from the format of :meth:`qaqc.ApplyFlags` to the GCE flat format. 1. Relate names like 'tank_height' back to short names like 'INST' 1. Convert short names to full parameter names like `CEN_PRECIP_INST_625_0_04` 1. Reverse (or melt) the pivot table back to a flat format 1. Merge the parameters and flags into a single flat table :param data_dict: dict. Dictionary of precip, and flags. :param site: str. Site name. :param probe_num: str. Probe number. :param flag_col: dict. Relate pd.DataFrame column name to short names like {'adj_precip':'TOT'}. :param ppt_col: dict. Relate pd.DataFrame column name to short names like {'adj_precip':'TOT'}. :return: """ ht = self.get_probe_height(site, probe_num) tank_col = [c for c in ppt_col if 'tank' in c.lower() or 'inst' in c.lower()][0] if all(data_dict['ppt'][tank_col].isna()): # This is a tipping bucket- No tank ppt_col.pop(tank_col) df_out = pd.DataFrame(columns=['Parameter', 'Date']) for k,v in data_dict.items(): if 'flags' in k.lower(): val_name = 'Flag_Value' val_cols = flag_col elif 'ppt' in k.lower(): val_name = 'Value' val_cols = ppt_col #elif 'acc' in k.lower(): # val_name = 'Value' # val_cols = {v.columns[0]:'ACC'} df_melted = self._melt_df(v, site, probe_num, ht, cols=val_cols, value_name=val_name) # merge on datetime and parameter name df_melted.reset_index(inplace=True) df_out = pd.merge(df_out, df_melted, on=['Parameter', 'Date'], how='outer') # merge Value and Flag_value columns using common Date/Parameter pair df_out.set_index('Date', inplace=True) return df_out
[docs] @staticmethod def is_exists_file(fpath): """ Return True if file exists. :param fpath: str. Valid file path :return: Boolean. True if file exists """ return isfile(fpath)
[docs] @staticmethod def is_new_file(f_path, max_age='5min'): """ Return True if file is newer than max_age. :param f_path: str. Valid file path :param max_age: str. Must be valid Pandas timedelta. :return: Boolean. True if file is newer than max_age. """ t_fpath = datetime.fromtimestamp(getmtime(f_path)) age_fpath = pd.to_timedelta(datetime.now() - t_fpath) return age_fpath < pd.to_timedelta(max_age)
[docs] @staticmethod def write_df_to_file(filepath, df): """ Write the dataframe to file. This method appends data to a csv file. It does not contain header information. Appedning is doen within a context manager to ensure file is closed even on error. :param filepath: str. Valid file path to .csv file :param df: pandas dataframe. :return: None if successful. """ # None of the quoting modes exactly match the desired format, so we disable # quoting and write quotes explicitly in the fields. df.to_csv(filepath, mode="a", index=True, sep=',', date_format='%Y-%m-%d %H:%M:%S', header=False, float_format='%.3f', na_rep='NaN', quoting=csv.QUOTE_NONE)
[docs] def create_file_header(self, fpath): """ Create a new file and write its header. Uses class instance of header read from config.yaml. :param fpath: str. Valid file path. :return: None if successful. """ with open(fpath, 'w') as f: f.write("\n".join(self.header)) f.write("\n")
[docs] def write_file_per_WY(self, df, output_dir): """ Write a dataframe to a csv file for each water year. This method is intended to write flat files with the output format specified in the header information of config.yaml. That assumes the data has already been flattened by :meth:`WriteProvisionalData.melt_ppt_data`. :param df: pandas dataframe. Data in flat format. :param output_dir: str. Valid output directory. :return: None if successful. """ write_dir = join(self.data_dir, output_dir) makedirs(write_dir, exist_ok=True) for y in range(self.strtyr, self.endyr + 1): fname = f'{self.fname_base}out_{y}.csv' fpath = join(write_dir, fname) # Different probes append to the same WY file from different WriteProvisionalData # instances, so we need to check the file age before we overwrite. Older files are # overwritten because they are *likely* from a previous run, but this is a very # brittle check that can result in truncated files if processing is slow. This should # be replaced with something more reliable like a process-wide lock or temporary probe # files that are merged at the end of processing. if not self.is_exists_file(fpath) or not self.is_new_file(fpath, max_age='10min'): self.create_file_header(fpath) wystrt = pd.to_datetime(f"10/1/{y - 1} 0005") wyend = pd.to_datetime(f"10/1/{y} 0000") self.write_df_to_file(fpath, df.loc[(df.index >= wystrt) & (df.index <= wyend)])
[docs] class AddNotesdbEvents: """ Record PostGCE events into NotesDB for inclusion in additional processing and creation of final data product. Take events table which is organized by timestep: Date | prov_flag|tank_flag| QaRule_flag| manual_flag|final_flag|event_code|explanation - | - | - | - | - | - | - | - 2018-10-29 07:10:00| <NA> | <NA> | |U |U | CLOG |ManualFlag: small clog not caught by auto_flag... And reorganize them to for import in NotesDB: * [NoteID] [int] IDENTITY(1,1) NOT NULL, * [sitecode] [varchar](10) NULL, * [meas_code] [varchar](10) NULL, * [probe_code] [varchar](10) NULL, * [logger_id] [varchar](10) NULL, * [note_code] [varchar](10) NOT NULL, * [note_taker] [varchar](5) NULL, * [note_entry] [varchar](5000) NULL, * [event_begin_datetime] [datetime2](7) NULL, * [active] [bit] NULL, * [log_datetime] [datetime2](7) NULL, * [event_end_datetime] [datetime2](7) NULL """ def __init__(self, events, probe): self.events = events self.set_sitecode(probe) self.set_probecode(probe) col = ['NoteID', 'sitecode', 'meas_code', 'probe_code', 'logger_id', 'note_code', 'note_taker', 'note_entry', 'event_begin_datetime', 'active', 'log_datetime', 'event_end_datetime'] self.notes = pd.DataFrame(columns=col)
[docs] def set_sitecode(self, probe): site = probe.split('_')[0] self.sitecode = f"{site}MET"
[docs] def set_probecode(self, probe): probe_name = probe.replace('_','') self.probecode = f"PPT{probe_name}"
[docs] @staticmethod def get_date_start(grp): return grp.index[0]
[docs] def get_date_end(grp): return grp.index[-1]
[docs] def set_notes(self): event_code = self.events['event_code'] # NA values are ambiguous as booleans event_code[event_code.isna()] = '' event_types = event_code.unique() for event in event_types: if event == '': continue event_df = pd.DataFrame(columns=self.notes.columns) # PyArrow doesn't support cumsum for boolean event_happening = event_code.str.contains(event).astype('bool') event_grp_num = event_happening.diff().cumsum() event_df['event_begin_datetime'] = event_code[event_happening].groupby(event_grp_num).apply(self.get_date_start) event_df['event_end_datetime'] = event_code[event_happening].groupby(event_grp_num).apply(self.get_date_start) event_df['sitecode'] = self.sitecode event_df['meas_code'] = 'PPT' event_df['probe_code'] = self.probecode event_df['note_code'] = event event_df['note_taker'] = 'py' event_df['note_entry'] = f"{event}: added by fsdb_ppt. https://bitbucket.org/hjandrews/fsdb_ppt/" self.notes = pd.concat([self.notes, event_df], ignore_index=True)
if __name__ == '__main__': probes = WriteProvisionalData(file_n='../config.yaml') probes.set_expected_probes() probes.get_probe_height('CS2', '02') #probes.load_ppt_data(strtyr=2018, endyr=2019)