__authors__ = 'Greg Cohn'
__version__ = '0'
import csv
import pandas as pd
from os.path import join, isfile, getmtime
from os import makedirs, listdir
from datetime import datetime
from post_gce_qc import qaqc
from pyarrow import string, float32
[docs]
class LoadProvisionalData(ProvisionalDataFormat):
"""
Enable files from multiple water years to be loaded into a single pd.DataFrame and allow all parameters for an
individual probe to be queried from the data and transformed into a pivot table.
This class establishes the location and file naming convention of the data. Then it loads all files for the water
years requested. Additional methods are then provided to create pivot tables for individual probes.
"""
def __init__(self, **kwargs):
ProvisionalDataFormat.__init__(self, **kwargs)
[docs]
def load_ppt_data(self, **kwargs):
"""
Load GCE files for multiple water years.
Multiple years are concatenated together. All files must be in data_dir defined for this instance.
Any keyword argument accepted by :ref:`pandas:/reference/api/pandas.read_csv.rst` can be supplied to this
function and will be passed to `pandas.read_csv()`.
.. Warning::
Data is filtered by Water Year (WY). Even if file contains data spanning a different date range, it will be
trimmed. `f"10/1/{y - 1}":f"9/30/{y}"]`
.. Warning::
Assumes filename is format <fname_base><year>.csv. The method only works with year as suffix.
:param strtyr: int. First year to import.
:param endyr: int. Last year to import. If same as first year, only one year is imported.
:param fname_base: str. Filename without year (year must be suffix)
"""
data_dir = self.data_dir
fname_base = self.fname_base
strtyr = self.strtyr
endyr = self.endyr
col = self.cols
# kwargs for pd.read_csv()
kwargs.setdefault('dtype', {'Flag_Value': pd.ArrowDtype(string()),
'Parameter': 'category', 'Value': pd.ArrowDtype(float32())})
# 50% time reduction using pyarrow types.
# Output will be equivalent to defining dtypes as 'string[pyarrow]' or pd.StringDtype('pyarrow')
kwargs.setdefault('dtype_backend', 'pyarrow')
kwargs.setdefault('skiprows', self.skip_nrows)
kwargs.setdefault('index_col', 0)
# load data frame for NEW test year
df_list = []
for y in range(strtyr, endyr + 1, 1):
fpath = join(data_dir, f"{fname_base}{y}.csv")
# read csv
# get rid of large object columns in memory
dfnew = pd.read_csv(fpath, names=col, parse_dates=True, engine='pyarrow', **kwargs)
# files start at midnight on first day of WY, but they also end at midnight on the last day of the WY,
# so midnight is repeated when multiple files are appended.
# This enforces a non-overlapping water year.
wystrt = pd.to_datetime(f"10/1/{y - 1} 0005")
wyend = pd.to_datetime(f"10/1/{y} 0000")
df_list.append(dfnew.loc[(dfnew.index >= wystrt) & (dfnew.index <= wyend), :])
self.df = pd.concat(df_list)
[docs]
@classmethod
def pivot_on_probe(cls, df, site, probe_num, keep_col_name=['Value', 'Flag_Value'], probeid_col='Parameter'):
"""
Create pivot table of data for a single probe from GCE flat file.
As of April 2023, GCE precip output is a flat file with separate labels for 3 components. This method finds the
3 components for the requested probe and returns a pivot table. Components:
* INST - The instantaneous measure of tank height
* TOT - The total precipitation measured since the last timestep
* ACC - The accumulated precip, a cumulative sum of WY to date.
Example::
FlatFormat
+-----------------------+---------------------------+--------+----------+
|Date |Parameter |Value |Flag_Value|
+=======================+===========================+========+==========+
|2018-09-30 23:55:00 |CEN_PRECIP_INST_625_0_02 |44.150 |<NA> |
+-----------------------+---------------------------+--------+----------+
|2018-09-30 23:55:00 |CEN_PRECIP_TOT_625_0_02 |0.000 |<NA> |
+-----------------------+---------------------------+--------+----------+
|2018-09-30 23:55:00 |CEN_PRECIP_ACC_625_0_02 |1739.51 |<NA> |
+-----------------------+---------------------------+--------+----------+
Pivot
+----------------------+--------+----------+-------+-----------+-------+--------+
|Date |INST |INST_Flag |TOT |TOT_Flag |ACC |ACC_Flag|
+======================+========+==========+=======+===========+=======+========+
|2018-09-30 23:55:00 |44.15 |<NA> |0.00 |<NA> |1739.51|<NA> |
+----------------------+--------+----------+-------+-----------+-------+--------+
:param df: Pandas DataFrame of a GCE flat file
:param site: str containing 3 character site ID
:param probe_num: str containing 2 character probe num
:param keep_col_name: list of column names to keep in final output
:param probeid_col: str. Column name containing site ID.
:return: Pandas DataFrame. Pivot table of data and flags for a single probe.
"""
# find all parameter names for this probe
params = cls.find_probe(df, search_list=[site, probe_num])
# create a table with values and flags for all three data components for this probe
dfs = {}
for p in params:
dfp = df.loc[df[probeid_col] == p, keep_col_name]
# extract 'ACC', 'TOT', or 'INST' from param name and add to colum name
name = p.split('_')[2]
dfp.rename(columns={keep_col_name[0]: name, keep_col_name[1]: f'{name}_Flag'}, inplace=True)
dfs.update(dfp)
tbl = pd.DataFrame(data=dfs).sort_index()
# Fill out any missing data before returning table
####################################################
tbl.loc[tbl['TOT'].isna(), 'TOT'] = 0
# Tipping Buckets do not have an INST field
for c in ["INST", "ACC"]:
try:
# for QA analyses, large stretches of nan values obscure a lot of analytical efforts
tbl.loc[:, c] = tbl.loc[:, c].ffill(axis=0)
except KeyError:
# tipping buckets have no INST
continue
return tbl
[docs]
class WriteProvisionalData(ProvisionalDataFormat):
"""
Provides methods to create output files that match GCE normalized format.
This module is designed to import data in the GCE normalized format. Other modules then perform QAQC on the data
Then this class provides methods to create output files that match the original GCE normalized format.
Other modules process data in a pivot table format. This class melts the pivot table back into a normalized/flat
format and then writes csv files.
"""
def __init__(self, **kwargs):
ProvisionalDataFormat.__init__(self, **kwargs)
self.parameters = []
self.expected_probes = pd.DataFrame()
def _find_unique_params(self, prov_file, param_col=['Parameter']):
prov_fpath = join(self.data_dir, prov_file)
df = pd.read_csv(prov_fpath, names=self.cols, skiprows=self.skip_nrows, usecols=param_col)
return list(df[param_col[0]].unique())
@staticmethod
def _melt_df(df, site, probe_num ,ht, cols=None, **flat_format):
# fill out dictionary for flat format to melt to
flat_format.setdefault('value_name', 'Value')
flat_format.setdefault('var_name', 'Parameter')
flat_format.setdefault('ignore_index', False)
# convert column names to parameter name
val_col = {orig_col:f'{site}_PRECIP_{chg_col}_{ht}_0_{probe_num}' for orig_col, chg_col in cols.items()}
df.rename(columns=val_col, inplace=True)
flat_format.setdefault('value_vars', list(val_col.values()))
return pd.melt(df, **flat_format)
[docs]
def set_expected_probes(self, tests=[0, -1]):
"""
Get a list of expected output probe names from GCE output files.
By defualt it only looks at the first and last file, however any number of files can be used by adding
additional integers to the list.
:param tests: int. Index of files to be inspected.
:return: DataFrame of probe names.
"""
prov_files = [f for f in listdir(self.data_dir) if f.endswith('.csv')]
if len(prov_files) == 1:
tests = [tests[0]]
# assume that first and last file samples all probes.
probes = []
for test in tests:
probes.extend([p for p in self._find_unique_params(prov_files[test]) if p not in probes])
self.expected_probes = pd.DataFrame({'probes': probes})
[docs]
def get_probe_height(self, site, probe_num):
"""
Get probe height for an input site and probe number.
Site and probe number must be converted to a GCE style probe name in the output file. This is done by first
finding the probe in the original GCE file and extracting the height.
:Example:
``Site = UPL, Probe = 02``
Needs to be become
``'UPL_PRECIP_INST_625_0_02'``
:param site:
:param probe_num:
:return:
"""
probes_df = self.expected_probes
probe_names = self.find_probe(probes_df, search_list=[site, probe_num], search_col='probes')
return probe_names[0].split('_')[-3]
[docs]
def melt_ppt_data(self, data_dict, site, probe_num, flag_col={'final_flag':'TOT'},
ppt_col={'tank_height':'INST', 'adj_precip':'TOT'}):
"""
Convert data from the format of :meth:`qaqc.ApplyFlags` to the GCE flat format.
1. Relate names like 'tank_height' back to short names like 'INST'
1. Convert short names to full parameter names like `CEN_PRECIP_INST_625_0_04`
1. Reverse (or melt) the pivot table back to a flat format
1. Merge the parameters and flags into a single flat table
:param data_dict: dict. Dictionary of precip, and flags.
:param site: str. Site name.
:param probe_num: str. Probe number.
:param flag_col: dict. Relate pd.DataFrame column name to short names like {'adj_precip':'TOT'}.
:param ppt_col: dict. Relate pd.DataFrame column name to short names like {'adj_precip':'TOT'}.
:return:
"""
ht = self.get_probe_height(site, probe_num)
tank_col = [c for c in ppt_col if 'tank' in c.lower() or 'inst' in c.lower()][0]
if all(data_dict['ppt'][tank_col].isna()):
# This is a tipping bucket- No tank
ppt_col.pop(tank_col)
df_out = pd.DataFrame(columns=['Parameter', 'Date'])
for k,v in data_dict.items():
if 'flags' in k.lower():
val_name = 'Flag_Value'
val_cols = flag_col
elif 'ppt' in k.lower():
val_name = 'Value'
val_cols = ppt_col
#elif 'acc' in k.lower():
# val_name = 'Value'
# val_cols = {v.columns[0]:'ACC'}
df_melted = self._melt_df(v, site, probe_num, ht, cols=val_cols, value_name=val_name)
# merge on datetime and parameter name
df_melted.reset_index(inplace=True)
df_out = pd.merge(df_out, df_melted, on=['Parameter', 'Date'], how='outer')
# merge Value and Flag_value columns using common Date/Parameter pair
df_out.set_index('Date', inplace=True)
return df_out
[docs]
@staticmethod
def is_exists_file(fpath):
"""
Return True if file exists.
:param fpath: str. Valid file path
:return: Boolean. True if file exists
"""
return isfile(fpath)
[docs]
@staticmethod
def is_new_file(f_path, max_age='5min'):
"""
Return True if file is newer than max_age.
:param f_path: str. Valid file path
:param max_age: str. Must be valid Pandas timedelta.
:return: Boolean. True if file is newer than max_age.
"""
t_fpath = datetime.fromtimestamp(getmtime(f_path))
age_fpath = pd.to_timedelta(datetime.now() - t_fpath)
return age_fpath < pd.to_timedelta(max_age)
[docs]
@staticmethod
def write_df_to_file(filepath, df):
"""
Write the dataframe to file.
This method appends data to a csv file. It does not contain header information. Appedning is doen within a
context manager to ensure file is closed even on error.
:param filepath: str. Valid file path to .csv file
:param df: pandas dataframe.
:return: None if successful.
"""
# None of the quoting modes exactly match the desired format, so we disable
# quoting and write quotes explicitly in the fields.
df.to_csv(filepath, mode="a", index=True, sep=',', date_format='%Y-%m-%d %H:%M:%S', header=False, float_format='%.3f',
na_rep='NaN', quoting=csv.QUOTE_NONE)
[docs]
def write_file_per_WY(self, df, output_dir):
"""
Write a dataframe to a csv file for each water year.
This method is intended to write flat files with the output format specified in the header information of
config.yaml. That assumes the data has already been flattened by :meth:`WriteProvisionalData.melt_ppt_data`.
:param df: pandas dataframe. Data in flat format.
:param output_dir: str. Valid output directory.
:return: None if successful.
"""
write_dir = join(self.data_dir, output_dir)
makedirs(write_dir, exist_ok=True)
for y in range(self.strtyr, self.endyr + 1):
fname = f'{self.fname_base}out_{y}.csv'
fpath = join(write_dir, fname)
# Different probes append to the same WY file from different WriteProvisionalData
# instances, so we need to check the file age before we overwrite. Older files are
# overwritten because they are *likely* from a previous run, but this is a very
# brittle check that can result in truncated files if processing is slow. This should
# be replaced with something more reliable like a process-wide lock or temporary probe
# files that are merged at the end of processing.
if not self.is_exists_file(fpath) or not self.is_new_file(fpath, max_age='10min'):
self.create_file_header(fpath)
wystrt = pd.to_datetime(f"10/1/{y - 1} 0005")
wyend = pd.to_datetime(f"10/1/{y} 0000")
self.write_df_to_file(fpath, df.loc[(df.index >= wystrt) & (df.index <= wyend)])
[docs]
class AddNotesdbEvents:
"""
Record PostGCE events into NotesDB for inclusion in additional processing and creation of final data product.
Take events table which is organized by timestep:
Date | prov_flag|tank_flag| QaRule_flag| manual_flag|final_flag|event_code|explanation
- | - | - | - | - | - | - | -
2018-10-29 07:10:00| <NA> | <NA> | |U |U | CLOG |ManualFlag: small clog not caught by auto_flag...
And reorganize them to for import in NotesDB:
* [NoteID] [int] IDENTITY(1,1) NOT NULL,
* [sitecode] [varchar](10) NULL,
* [meas_code] [varchar](10) NULL,
* [probe_code] [varchar](10) NULL,
* [logger_id] [varchar](10) NULL,
* [note_code] [varchar](10) NOT NULL,
* [note_taker] [varchar](5) NULL,
* [note_entry] [varchar](5000) NULL,
* [event_begin_datetime] [datetime2](7) NULL,
* [active] [bit] NULL,
* [log_datetime] [datetime2](7) NULL,
* [event_end_datetime] [datetime2](7) NULL
"""
def __init__(self, events, probe):
self.events = events
self.set_sitecode(probe)
self.set_probecode(probe)
col = ['NoteID', 'sitecode', 'meas_code', 'probe_code', 'logger_id', 'note_code', 'note_taker', 'note_entry',
'event_begin_datetime', 'active', 'log_datetime', 'event_end_datetime']
self.notes = pd.DataFrame(columns=col)
[docs]
def set_sitecode(self, probe):
site = probe.split('_')[0]
self.sitecode = f"{site}MET"
[docs]
def set_probecode(self, probe):
probe_name = probe.replace('_','')
self.probecode = f"PPT{probe_name}"
[docs]
@staticmethod
def get_date_start(grp):
return grp.index[0]
[docs]
def get_date_end(grp):
return grp.index[-1]
[docs]
def set_notes(self):
event_code = self.events['event_code']
# NA values are ambiguous as booleans
event_code[event_code.isna()] = ''
event_types = event_code.unique()
for event in event_types:
if event == '':
continue
event_df = pd.DataFrame(columns=self.notes.columns)
# PyArrow doesn't support cumsum for boolean
event_happening = event_code.str.contains(event).astype('bool')
event_grp_num = event_happening.diff().cumsum()
event_df['event_begin_datetime'] = event_code[event_happening].groupby(event_grp_num).apply(self.get_date_start)
event_df['event_end_datetime'] = event_code[event_happening].groupby(event_grp_num).apply(self.get_date_start)
event_df['sitecode'] = self.sitecode
event_df['meas_code'] = 'PPT'
event_df['probe_code'] = self.probecode
event_df['note_code'] = event
event_df['note_taker'] = 'py'
event_df['note_entry'] = f"{event}: added by fsdb_ppt. https://bitbucket.org/hjandrews/fsdb_ppt/"
self.notes = pd.concat([self.notes, event_df], ignore_index=True)
if __name__ == '__main__':
probes = WriteProvisionalData(file_n='../config.yaml')
probes.set_expected_probes()
probes.get_probe_height('CS2', '02')
#probes.load_ppt_data(strtyr=2018, endyr=2019)