__authors__ = 'Greg Cohn'
__version__ = '0'
from post_gce_qc import qaqc, data_transfer, cross_probe_qc
import warnings
col_names = {'tank_col': 'INST',
'ppt_col': 'TOT',
'acc_col': 'ACC',
'flag_suffix': '_Flag'}
def _apply_fncs(cls_inst, param_dict, output_unused=True):
"""
For each key in param_dict, this method finds executes a class method of cls_inst. cls_inst is an instance of a
class, and as methods are executed, it will change data of that instance.
The method makes a dictionary of any methods that can not be found and executed. This can optionally be returned.
there is an exception coded in for methods like drains that require other methods to prep the data.
"""
all_meth = dir(cls_inst)
unapplied_params = {}
for fnc_name, fnc_kwarg in param_dict.items():
if fnc_name in all_meth:
fnc = getattr(cls_inst, fnc_name)
if fnc_kwarg:
fnc(**fnc_kwarg)
else:
fnc()
elif output_unused:
unapplied_params.update({fnc_name: fnc_kwarg})
else:
warnings.warn(f'QA rule not found in qaqc.QaRules or qaqc.ApplyFlags:\n{fnc_name}')
if output_unused:
return unapplied_params
return
[docs]
def load_data(strtyr, endyr, fname_base='MS043PPT_PPT_L1_5min_', data_path='../config.yaml', **kwargs):
"""
Load porvisional/post-GCE precipitation data.
See :meth:`data_transfer.LoadProvisionalData` and :meth:`data_transfer.LoadProvisionalData.load_ppt_data`.
:return: instance of :meth:`data_transfer.LoadProvisionalData`
"""
all_probes = data_transfer.LoadProvisionalData(strtyr=strtyr, endyr=endyr, file_n=data_path, fname_base=fname_base)
print(f'Loading all PPT data from {data_path}\n')
all_probes.load_ppt_data(**kwargs)
return all_probes
[docs]
def write_output_data(ap_flg_df, acc_df, site, probe, strtyr=2019, endyr=2024, fname_base='MS043PPT_PPT_L1_5min_',
file_path='../config.yaml', output_dir='/processed_data'):
"""
Write data to output file. Intended to be used after QAQC has been performed.
See :meth:`data_transfer.WriteProvisionalData` and :meth:`data_transfer.WriteProvisionalData.write_ouput_files`.
:param ap_flg_df: Instance of qaqc.ApplyFlags contianing data to be written to output file.
:param site: str. Site name to output
:param probe:
:param strtyr: int. First water year of data to write.
:param endyr: int. Last water year of data to write.
:param fname_base: str. Base name of output file.
:param file_path: str. Path to config.yaml containing path to data files and header information.
:return: None
"""
# add accumulation column to data
ap_flg_df.data['ACC'] = acc_df
output = data_transfer.WriteProvisionalData(file_n=file_path, fname_base=fname_base, strtyr=strtyr, endyr=endyr)
output.set_expected_probes()
# create a map to convert current column names to final
ppt_col = {'tank_height':col_names['tank_col']}
ppt_col['adj_precip'] = col_names['ppt_col']
ppt_col['ACC'] = col_names['acc_col']
flag_col = {'final_flag': 'TOT'}
print(f'Writing {site}{probe} data to csv...')
data_dict = {'ppt':ap_flg_df.data, 'flags':ap_flg_df.event}
df_melt = output.melt_ppt_data(data_dict, site, probe, ppt_col=ppt_col, flag_col=flag_col)
output.format_str_columns(df_melt)
output.write_file_per_WY(df_melt, output_dir)
notes_db = data_transfer.AddNotesdbEvents(ap_flg_df.event, f"{site}_{probe}")
notes_db.set_notes()
# write additional function here!!!
# notes_db.write_table
[docs]
def qc_provisional(df, params):
"""
Do quality checks on provisional/post-GCE data.
params is a dictionary where each key matches a method name of :meth:`qaqc.QaRules` and the values are the
keyword arguments necessary to execute the data check.
:param df: pandas DataFrame. Expects provisional/post-GCE precipitation data
:param params: dict. A dictionary of methods and their parameters to execute
:return: pd.DataFrames of flags and events; dict of checks not preformed; calculated moving window values.
"""
QA = qaqc.QaRules(df, params)
_apply_fncs(QA, params['auto_flag'], output_unused=True)
return QA.qa_flags, QA.qa_events
#@qc.command()
[docs]
def apply_all_flags(df_prov, qa_flags, qa_events, params):
"""
Combine flags from multiple sources and quality checks.
Applies manual flags, automatic post-GCE checks, and evaluates provisional/GCE flagging.
:param df_prov: pandas DataFrame. Expects provisional/post-GCE precipitation data.
:param qa_flags: pandas DataFrame. Expects a column of boolean values for each flag.
:param qa_events: pandas DataFrame. Expects a column of boolean values for each flag.
:param params: dict. A dictionary of methods and their parameters to execute.
:param unapplied_rules: dict. A dict of quality checks to preform based on flagging, primarily GCE flagging.
:return: instance of class :meth:`qaqc.ApplyFlags`.
"""
flag = qaqc.ApplyFlags(df_prov.index, params['precision'])
# get column names
tcol = col_names['tank_col']
if tcol not in df_prov.columns:
# if it's a tipping bucket
tcol = None
ppt_col = col_names['ppt_col']
pptcol_flag = f'{ppt_col}{col_names["flag_suffix"]}'
flag.import_provisional_data(df_prov, tank_col=tcol, ppt_flag_col=pptcol_flag, ppt_col=ppt_col)
flag.apply_QaRules_flags(qa_events, qa_flags)
flag.apply_GCE_flags()
if 'manual_flags' in params:
flag.apply_manual_flags(params['manual_flags'])
flag.apply_NAN_val()
flag.apply_0_val()
_apply_fncs(flag, params['auto_flag'])
return flag
[docs]
def qc_cross_probe(xacc, xppt, param_auto, probe, flag):
print(f'Performing cross probe on {probe}\n')
fnc_params = param_auto['flag_x_clogs']
# 1. create a ratio of ACC's
xprobe = cross_probe_qc.XProbesQc(xacc.index, probe)
xprobe.set_accum_ratio(xacc)
# 2. find clogs for each site
xprobe.set_x_clogs(xppt, xacc, fnc_params)
# 3. decide on final flags.
eventwt, Uwt, Cwt, = xprobe.get_weight_x_clog(param_auto['weight_x_clogs'])
xprobe.flag_x_clogs(eventwt, Uwt, Cwt)
# 4. apply these autoflags to our data
flag.apply_QaRules_flags(xprobe.event, xprobe.flags)
[docs]
def main(strtyr, endyr, fname_base='MS043PPT_PPT_L1_5min_', data_path='./config.yaml', qa_params='./qa_param.yaml',
probes={'all_param'}, keep_col_name=['Value', 'Flag_Value'], probeid_col='Parameter',
output_dir='processed_data', write_csv=True, **kwargs):
"""
Main process that loads data and runs all qa checks.
#. load provisional/post-GCE data
#. select data for specific probe and what QA to run on it
#. run rules and tests on data
#. combine all flags from tests: manual flags, rules applied above, GCE, and our Notes DataBase of events/flags.
Any keyword argument accepted by :ref:`pandas:/reference/api/pandas.read_csv.rst` can be supplied to this function
and will be passed to`pandas.read_csv()`.
:param strtyr: int. First water year to be processed
:param endyr: int. Last water year to be processed
:param fname_base: str. Prefix for all files containing provisional/post-GCE data
:param data_path: str. Path to directory containing data files. Will accept .yaml or .yml
:param qa_params: Dict of manual and automatic qa rules and probe parameters. Will load from yaml if type is str
:param probes: dict. A set of probe names to run. If 'all_data', or 'all_param' is passed, all unique probes are run
:param kwargs: dict. Accepts any valid kwargs for `pd.read_csv`
"""
params = qa_params if type(qa_params) is dict else qaqc._load_yaml(qa_params)
# 1. load data
all_probes = load_data(strtyr, endyr, fname_base=fname_base, data_path=data_path, **kwargs)
# Ensure probes are sorted for deterministic processing order
probes = tuple(sorted(probes))
all_param_probes = tuple(sorted(set(params.keys())))
# in post GCE formatting, INST, ACC, and TOT all have same probe prefix, but dict prevents duplicate keys
all_data_probes = tuple(sorted({f"{v.split('_')[0]}{v.split('_')[-1]}" for v in all_probes.df['Parameter'].unique()}))
# assign probes if 'all_params' or 'all_data' is provided
# flexible as long as both 'all' and 'data' are included
first_probe = list(probes)[0]
if 'all' in first_probe:
if 'param' in first_probe:
probes = all_param_probes
elif 'data' in first_probe:
probes = all_data_probes
# 2. select data and qa rules to apply
# This is run over all probes in case they are needed for cross probe qc of the
# requested probes.
all_flags = {}
for probe in all_param_probes:
site = probe[:3]
nprobe = probe[-2:]
print(f'Load data from {probe}\n')
df = all_probes.pivot_on_probe(all_probes.df, site, nprobe, keep_col_name=keep_col_name,
probeid_col=probeid_col)
param = params[probe]
# 3. run rules and tests on data
qa_flags, qa_events = qc_provisional(df, param)
# 4. combine all flags
flags = apply_all_flags(df, qa_flags, qa_events, param)
print(f'{probe}: All quality checks and quality assurance rules applied\n------------------\n')
all_flags[probe] = flags
print('Generating cross probe tables\n')
# a split here with one func handling single site QA another handling cross site QA would allow for garbage collection
# 5. build pivot table of cleaned data for cross site comparison
xppt = cross_probe_qc.BuildXTable.assemble_cross_table(all_flags, ppt_col='adj_precip')
xacc = cross_probe_qc.BuildXTable.assemble_wy_acc(xppt)
for probe in probes:
param_auto = params[probe]['auto_flag']
# 6. perform cross site comparison
if 'flag_x_clogs' in param_auto:
qc_cross_probe(xacc, xppt, param_auto, probe, all_flags[probe])
# 7. reapply manual flags
print(f'Checking for flagging consistency on {probe}\n')
if 'manual_flags' in params[probe]:
# adjust any autoflags by manual flags
all_flags[probe].apply_manual_flags(params[probe]['manual_flags'])
# 8. apply flags to final state
all_flags[probe].apply_final_flags()
# 9. write final data to output files
if write_csv:
site = probe[:3]
nprobe = probe[-2:]
write_output_data(all_flags[probe], xacc[probe], site, nprobe, strtyr=strtyr, endyr=endyr, fname_base=fname_base,
file_path=data_path, output_dir=output_dir)
all_flags[probe].create_flag_log(probe, output_dir=output_dir)
# 9(alt). instead of creating output files, return all data
if not write_csv:
return all_flags
if __name__ == "__main__":
main(2018, 2022, probes={'all_params'})