__authors__ = 'Greg Cohn'
__version__ = '0'
import pandas as pd
import yaml
from os.path import join, dirname
from os import makedirs
from numpy import floor, round, zeros, float64, bool_
from datetime import datetime
import matplotlib.pyplot as plt
from pyarrow import float32, string, NA
from numba import jit, objmode
import warnings
from scipy.ndimage import median_filter
# numba is a Just In Time compiler. Cacheing the compiled code in a raw
numba_kwargs = {"nopython": True, "cache": True, "nogil": True}
def _custom_formatwarning(message, category, filename, lineno, file=None, line=None):
"""
Custom format for warning messages. Exclude file paths so that output in reports like Jupyter Notebooks can be
shared without disclosing usernames or sensitive file paths.
Inputs are supplied by :meth:`warnings.warn()`. This format will apply to all warning messages.
"""
# Format the warning message without including the filename
return f'{lineno}: {category.__name__}: {message}\n'
warnings.formatwarning = _custom_formatwarning
# new in python 3.12 warnings.warn()
# _warn_skips = (dirname(__file__), )
# skip_file_prefixes=_warn_skips
def _load_yaml(file_n='./config.yaml'):
"""
Read YAML file :doc:`config.yaml` or :doc:`qa_param.yaml`. These files contain basic information about where data
directories are located, manual flags to be applied, and paramaters for applying QA rules.
:param file_n: string containing filename or path
:return: dictionary assigned to class object
"""
with open(file_n) as f:
yaml_dict = yaml.safe_load(f)
return yaml_dict
[docs]
class ApplyFlags:
"""
Apply flags from multiple sources to create a final set of flagged data.
Take flags from Provisional, :class:`QaRules`, and manual flags from :ref:`../qa_param.yaml` and combine them to
create a single set of data flags. Create documentation why things were flagged and provide tools to evaluate the
final flag set.
"""
def __init__(self, df_index, precision, flags=False):
self.precision = precision
col = ['Q', 'U', 'C', '*', 'SetNA', 'Set0', 'E', 'M']
self.flags = pd.DataFrame(index=df_index, data=flags, columns=col, dtype='boolean[pyarrow]')
col = ['prov_flag', 'tank_flag', 'QaRule_flag', 'manual_flag', 'final_flag', 'event_code', 'explanation']
self.event = pd.DataFrame(index=df_index, data="", columns=col, dtype=pd.ArrowDtype(string()))
col = ['tank_height', 'precip', 'adj_precip']
self.data = pd.DataFrame(index=df_index, columns=col, dtype=pd.ArrowDtype(float32()))
[docs]
def import_provisional_data(self, prov_df, tank_col='INST', ppt_col='TOT', ppt_flag_col='TOT_Flag', tank_flag_col='INST_Flag'):
"""
Populate pd.DataFrames with data from a provisional pivot table.
:param prov_df: pd.DataFrame. A pivot table of provisional data.
:param tank_col: str. Column name containing tank height.
:param ppt_col: str. Column name containing precipitation.
:param ppt_flag_col: str. column name containing precipitation flag.
:return: All values are saved to instance. Returns to None.
"""
self.data['precip'] = self.data['adj_precip'] = prov_df[ppt_col]
self.event['prov_flag'] = prov_df[ppt_flag_col]
if tank_col:
# Empty for tipping buckets
self.data['tank_height'] = prov_df[tank_col]
self.event['tank_flag'] = prov_df[tank_flag_col]
[docs]
@staticmethod
def read_manual_flags(site, qa_yaml='../qa_param.yaml'):
return _load_yaml(qa_yaml)[site]['manual_flags']
[docs]
def apply_manual_flags(self, manual_notes):
"""
A list of manual entered notes is used to change the flags for a specified time period. Each item in the list
is a dictionary containing exact date and time of an event, what flag to assign, and whether or not to delete
the data during that period.
.. code-block:: yaml
# EXAMPLE!!
- start: '11/8/21 1000'
end: '11/8/21 1000'
replace_wNAN: False
flag: Q
explanation: Tank level spikes, then immediately comes back down.
.. Note::
To avoid multiple calls to instance, `self.data` is assigned to a local variable. But `pandas.DataFrame` is
a mutable type, so both `self.data` and `df` refer to the same location in memory, i.e. are identical
:param manual_notes: list. List should contain dictionaries of exact time periods and how to flag them.
"""
flags = self.flags.copy(deep=False)
event = self.event.copy(deep=False)
for it_happened in manual_notes:
strt, end = pd.to_datetime((it_happened['start'], it_happened['end']), format='%m/%d/%y %H%M')
flag_it = it_happened['flag']
# Erase all flags,
# if flag is not None, repopulate one column
# If column exists for this flag, add instances of True, else create column
# if False not specified for new column, non-True values are NAN
# try is faster than assessing if statement in each loop
flags.loc[strt:end, :] = False
# if flag is not '', assign it
if flag_it:
try:
flags.loc[strt:end, flag_it] = True
except KeyError:
flags.loc[:, flag_it] = False
flags.loc[strt:end, flag_it] = True
event.loc[strt:end, 'manual_flag'] = flag_it
flags.loc[strt:end, 'SetNA'] = it_happened['replace_wNAN']
flags.loc[strt:end, 'Set0'] = it_happened['replace_w0']
# intended to reverse adjustments.
# At time of programming (Sep 2025), prorating is the only method that adjusts precip
# All other methods set a Set0 or SetNa flag.
if 'reset_adj_precip' in it_happened:
if it_happened['reset_adj_precip']:
self.data.loc[strt:end, 'adj_precip'] = self.data.loc[strt:end, 'precip']
# add the explanation of this event to any existing explanation
what_happened = it_happened['explanation'].lower()
event.loc[strt:end, 'explanation'] += f"ManualFlag: {what_happened}; "
if 'event_code' in it_happened:
event.loc[strt:end, 'event_code'] = it_happened['event_code']
else:
if 'drain' in what_happened:
event.loc[strt:end, 'event_code'] = 'DRAIN'
if 'repair' in what_happened:
event.loc[strt:end, 'event_code'] = 'MAINTE'
if 'install' in what_happened or 'replace' in what_happened or 'rewire' in what_happened:
event.loc[strt:end, 'event_code'] = 'INSREM'
[docs]
def apply_QaRules_flags(self, auto_qa_event, flags):
"""
Add QaRule flags. Flags are an accumulation of rows that are True.
Flags are accumulated with `|=`. I.e. flags = flags or new_flags. Explanations and event codes are populated
based on the auto_qa_event generated by :meth:`.QaRules`.
:param auto_qa_event: A boolean DataFrame with a column for each event the data was tested for.
:param flags: A boolean DataFrame with a column for each flag.
:return: updates instance variables `self.event` and `self.flags`
"""
event = self.event.copy(deep=False)
self.apply_flags_as_str(flags, 'QaRule_flag')
# add flags. Assumes 1 boolean column for each flag
# note auto flags have a slightly reduced set of columns. If not filtered, extra columns become nan
cols = flags.columns
self.flags[cols] |= flags
event_codes = {'drain_event': 'DRAIN',
'clog': 'CLOG',
'duplicate': 'INTPRO',
'diurnal_flux': 'INTPRO'}
for which_event in auto_qa_event.columns:
# grab boolean data for when this event happened
event_happened = auto_qa_event[which_event]
# append the autoflag for this event
event.loc[event_happened, 'explanation'] += f"QaRule AutoFlag: {which_event}; "
if which_event in event_codes:
event.loc[event_happened, 'event_code'] = event_codes[which_event]
[docs]
def apply_GCE_flags(self):
"""
Add GCE flags to final data.
This will only add flags where no other process has assigned one. If no other flags have been added, all GCE
flags will be applied to the data. This method is intended to be used after other QA processes.
:return: updates instance variable `self.flags`.
"""
prov_flags = self.event[['prov_flag', 'tank_flag']]
event_code = self.event['event_code']
flags = self.flags.copy(deep=False)
already_has_flag = self.flags.any(axis=1)
# clogs are a special case where dry periods are not flagged.
# do not add flags to clog events
already_has_flag |= event_code.str.contains('CLOG')
if not already_has_flag.any():
warnings.warn('No existing flags found. qaqc.ApplyFlags.apply_GCE_flags was designed to fill in where '
'there are not other flags. Consider running qaqc.ApplyFlags.apply_QaRules_flags first.')
convert_flags = {'Q':{'flag':'Q', 'col':['tank_flag', 'prov_flag']},
'U':{'flag':'U','col':['tank_flag']},
'E':{'flag':['M', 'SetNA'],'col':['tank_flag']},
}
has_prov_flg = pd.Series(index=prov_flags.index, data=False)
for flg, converted in convert_flags.items():
for col in converted['col']:
has_prov_flg |= prov_flags[col].str.contains(flg, na=False)
assign_flag = has_prov_flg & ~already_has_flag
flags.loc[assign_flag, converted['flag']] = True
has_prov_flg = False
[docs]
def apply_final_flags(self):
self.affirm_zero_flagged_E()
self.affirm_NaN_flagged_M()
self.affirm_CLOG_flagged_UC()
self.affirm_only_one_flag()
self.apply_flags_as_str(self.flags, 'final_flag')
[docs]
def apply_flags_as_str(self, flags, col):
"""
Convert all accumulated flags to str.
Flags are accumlated through a boolean `pd.DataFrame`, `self.flags` using `|=` (`df[col] = df[col] or RuleTrue`
). So multiple rules can add to a given flag. `SetNA` or `Set0` are applied directly to the data, but each rule
should associate it with a particular flag to explain why the value was changed.
:return: updates instance variable `self.event`.
"""
event = self.event.copy(deep=False)
# don't use Set0 or SetNA flags, they are applied to data directly
flag_val = flags.columns[~flags.columns.isin(['SetNA', 'Set0'])]
for which_flag in flag_val:
flag_happened = flags[which_flag]
event.loc[flag_happened, col] += f'{which_flag}'
[docs]
def apply_NAN_val(self):
"""
Insert NA values into the data.
Where the data has been flagged to change values to NA in the variable `self.flags['SetNA'], insert NA values.
This only changes the adjusted precip values (`self.data['adj_precip']`) and does not alter the raw data values.
This method also adds a missing (M) flag for each NA value.
:return: updates instance variables
"""
# use pyarrow nullable
self.data.loc[self.flags['SetNA'], 'adj_precip'] = NA
self.flags.loc[self.flags['SetNA'], 'M'] = True
#self.event.loc[self.flags['SetNA'], 'final_flag'] = 'M'
[docs]
def apply_0_val(self):
"""
Insert 0 values into the data.
Where the data has been flagged to change values to 0 in the variable `self.flags['Set0'], insert 0 values.
This only changes the adjusted precip values (`self.data['adj_precip']`) and does not alter the raw data values.
:return: updates instance variables
"""
self.data.loc[self.flags['Set0'], 'adj_precip'] = 0
[docs]
def affirm_zero_flagged_E(self):
"""
Check that anywhere the data was set to 0 also contains either an E flag or a manual flag.
If the data was set to 0, but has no E flag or manual flag, then an E flag will be added with a warning.
:return: updatess instance variables
"""
flags = self.flags.copy(deep=False)
event = self.event.copy(deep=False)
# Check zero flags
has_manual_flag = event['manual_flag'] != ''
set0_wo_E = flags['Set0'] & ~flags['E'] & ~has_manual_flag
if set0_wo_E.any():
warnings.warn('Precip set to 0 without E flag or manual flag. E flag added', UserWarning)
flags.loc[set0_wo_E, 'E'] = True
event.loc[set0_wo_E, 'QaRule_flag'] = 'E'
[docs]
def affirm_NaN_flagged_M(self):
"""
Check that any NA values are flagged as missing (M) and all M flags have NAN values.
`self.flags['SetNA']` and `self.flags['M']` will become the same with a warning.
:return: updates instance variables
"""
flags = self.flags.copy(deep=False)
event = self.event.copy(deep=False)
# Check missing flag
val_flag_mismatch = flags['SetNA'] ^ flags['M']
if val_flag_mismatch.any():
warnings.warn('Precip M flags do not match NA values. Both categories updated to be equal.', UserWarning)
flags['M'] |= flags['SetNA']
flags['SetNA'] |= flags['M']
event.loc[flags['M'], 'QaRule_flag'] = 'M'
[docs]
def affirm_only_one_flag(self):
"""
Check that only one flag is assigned at a time.
Final data can only contain one flag per time step. Only one flag will be retained. All others will be cleared
in order of precedence. Flags take precedence in the following order:
#. M - missing flag
#. U - undercatch during a clog
#. C - cumulative estimate of total precip since last measurement following clog
#. E - estimated value
#. Q - questionable value
:return: updates instance variables
"""
flags = self.flags.copy(deep=False)
event = self.event.copy(deep=False)
col = ~flags.columns.isin(['Set0', 'SetNA'])
n_flags = flags.loc[:, col].sum(axis=1)
has_multi_flag = n_flags > 1
if has_multi_flag.any():
warnings.warn('More than one flag assigned at the same time. Only one flag is retained by precedence.', UserWarning)
manual_flags = event.loc[has_multi_flag, 'manual_flag'].unique()
for mf in manual_flags:
if mf != '':
# first, clear anything that isn't the manual flag
# because this draws the manual flags from events, any other clearing does not have an affect
has_manual_flag = event.manual_flag.str.contains(mf, regex=False, na=False)
assign_manual_flag = has_manual_flag & has_multi_flag
self._clear_all_flags_but(assign_manual_flag, [mf])
elif mf == '':
# in ORDER of PRECEDENCE: keep only the following and clear all else
precedence = {'M': {'keep_col': ['M', 'SetNA'], 'has': has_multi_flag},
'U': {'keep_col': ['U'], 'has': has_multi_flag & event.event_code.str.contains('CLOG')},
'C': {'keep_col': ['C'], 'has': has_multi_flag},
'E': {'keep_col': ['E'], 'has': has_multi_flag},
}
for flg, f_param in precedence.items():
# Next, keep only the M flag, all other flags paired with M will be cleared.
# From the remaining flags in the multiclog, keep only clog flags, with
assign_flag = flags[flg] & f_param['has']
self._clear_all_flags_but(assign_flag, f_param['keep_col'])
[docs]
def affirm_CLOG_flagged_UC(self):
"""
During CLOG events, only U and C flags are used.
Sometimes GCE manually adds Q flags to cover clog periods. These Q flags will be overwritten by higher
precedence flags such as U and C. However, dry periods during a CLOG event do not have a flag, allowing them to
be filled with GCE manual flags such as Q.
:return: updatess instance variables
"""
has_clog = self.event.event_code == 'CLOG'
self._clear_all_flags_but(has_clog, ['U', 'C'])
def _clear_all_flags_but(self, rows, keep_cols):
clear_cols = ~self.flags.columns.isin(keep_cols)
self.flags.loc[rows, keep_cols] = True
self.flags.loc[rows, clear_cols] = False
[docs]
def remove_GCE_F_flags(self):
"""
Where provisional processing has placed an F flag immediately following a J flag, the precip value from the
record flagged J is duplicated in the record flagged F. The J flag signifies large precip jumps, so this
artifact added > 200 mm of precip to CENT in WY19 if not removed.
:return: updates instance variables
"""
event = self.event
j = event['prov_flag'].str.contains('J', na=False)
after_j = j.shift(1)
f = event['prov_flag'].str.contains('F', na=False)
duplicate = after_j & f
# is this right? shouldn't it just be 0? or whatever the tank diff was?
self.flags['Set0'] |= duplicate
self.data.loc[duplicate, 'adj_precip'] = 0
event.loc[duplicate, 'explanation'] += 'ApplyFlags AutoFlag: Consecutive prov flags J and F duplicate precip. 2nd ppt reset to 0;'
event.loc[duplicate, 'event_code'] = 'INTPRO'
[docs]
def remove_GCE_E_flags(self):
"""
Where provisional processing has placed an E flag immediately following an R flag, the E flag does NOT represent
an estimated value, but does represent a questionable value. This event identifies a jump in precip following a
reduction in tank values when the current state is NOT_RAINING.
This event should be caught by :meth:`QaRules.drain_recharge_flagging_wrap`. However, if it has not been caught,
the GCE flag is misleading and should be changed. In practice, the only examples of this flagging pattern in
the data are completely bogus precip, but there aren't enough examples to have high confidence that this is
always the case. Therefore, this method assigns flagging based on the underlying GCE logic that applies the flag.
Note, that any other flag assigned will take precedence over the Q flag assigned here, and that manual flags
will clear any other flags for that period.
:return: updates instance variables
"""
event = self.event.copy(deep=False)
r = event['prov_flag'].str.contains('R', na=False)
after_r = r.shift(1)
e = event['prov_flag'].str.contains('E', na=False)
# `simple_pre.m` calls this a "rebound" event and flags it E
# but there is no estimation occurring
rebound = after_r & e
self.flags.loc[rebound, 'Q'] = True
event.loc[rebound, 'explanation'] += 'ApplyFlags AutoFlag: Consecutive prov flags R and E ID a jump in precip following a tank reduction;'
[docs]
def prorate_precip_during_tank_flux(self, tank_col='tank_height', precip_col='adj_precip', max_prorate=15):
"""
Adjust precipitation amounts to match total daily change in tank level.
Total daily tank level is prorated throughout each day based on the proportion of precipitation reported at
each timestep. It is a simple linear interpolation. New, prorated daily totals match the midnight to midnight
change in tank level.
If tank level changes by less than 2 x precision (positive or negative), the daily total is adjusted to zero.
Days where there is no likely rain outside of a fluctuation are not prorated and left with a value of 0.
:param tank_col: str. Name of column containing tank level.
:param precip_col: str. Name of column containing precipitation.
:param max_prorate: float. Max allowable ratio to use in prorating.
"""
precision = self.precision
df = self.data
# when is the tank fluctuating (5 min)
# diurnal_flux = flag_precip_during_tank_flux(df, precision, tank_col=tank_col, ppt_col=precip_col)
# group data by day
day_grp = pd.Grouper(freq='1D')
# what days have tank fluctuations
# these two rows together are ~30% faster than rerunning the whole flag function
diurnal_flux = self.event['explanation'].str.contains('diurnal_flux', na=False)
over_accum_dly = diurnal_flux.groupby(day_grp).transform('max')
# get dly change in tank level and
dly_chg = QaRules.calc_dly_tank_change(df, day_grp, precision, tank_col=tank_col)
# set daily change to zero if near precision limit
dly_chg[dly_chg < 2 * precision] = 0
# get dly precip excluding 5 min fluctuations
likely_dly_ppt = QaRules.calc_dly_precip(df, day_grp, precip_col=precip_col, exclude=diurnal_flux)
# what days have rain (can't divide by 0)
rain_day = likely_dly_ppt > 0
# select days to prorate where it is raining, and accumulated more than the tank
prorate = rain_day & over_accum_dly
# create a ratio of 1 for all precip values
ratio = pd.Series(1.0, name='ratio', index=dly_chg.index, dtype=pd.ArrowDtype(float32()))
# create a different ratio for values to be prorated
ratio[prorate] = dly_chg[prorate] / likely_dly_ppt[prorate]
# this is fixes very specific situations where tank level jumps up
ratio[ratio > max_prorate] = 0
self.data['adj_precip'] *= ratio
# find pre-prorated values that have been adjusted and flag new value as an estimate
changed = prorate & (self.data['precip'] > 0)
self.flags['E'] |= changed
self.event.loc[changed, 'QaRule_flag'] = 'E'
self.event.loc[changed, 'event_code'] = 'INTPRO'
self.event.loc[changed, 'explanation'] = 'ApplyFlags AutoFlag: prorate precip during diurnal tank fluctuations;'
[docs]
def round_precip_to_min_increment(self, increment=None, scrape_remainder_window=6):
"""
Round precip to minimum increment.
Precipitation amounts cannot be smaller than what we can measure. With tipping buckets this is a pretty basic
check, but with tank gauges there is the potential to round off large amounts of precip. To mitigate these
large pools of precip, a moving window is applied to look for consecutive time steps that cumulatively total
additional increments. For example, if 0.15 is rounded down to 0.1 for two consecutive time steps, then 0.1 is
added to the total of the second timestep. Once added to the total, the 0.05 is excluded from all other window
totals.
:param increment: float. The minimum increment by which precip can accumulate. Only "full" bins of this size
persist in the final data.
:param scrape_remainder_window: int. The size moving window to use to look for cumulative precip.
"""
# note: effort made to keep consistent type and avoid DataFrame checks and conversions.
# Below numpy stays numpy until the end (and is 30% faster for it)
# if df was used, floor would convert the df to numpy types
if increment is None:
increment=self.precision
df = self.data['adj_precip']
# use floor rounding, not nearest, to only show full bins
rounded_df = QaRules.round_to_precision(df.to_numpy(), increment, floor)
# look for rounded off values what would have accumulated another full increment over a longer period.
rounded_off = df.to_numpy() - rounded_df
accum_rounded_off = self.rolling_accumulate_increment(rounded_off, scrape_remainder_window, increment)
self.data['adj_precip'] = pd.Series(data=(rounded_df + accum_rounded_off), index=df.index,
dtype=pd.ArrowDtype(float32()))
[docs]
@staticmethod
@jit(cache=True)
def rolling_accumulate_increment(arr, window, increment):
"""
Rolling window that accumulates by increment.
All values within the window are summed. Output is filled when the sum is greater than the increment. Once a
value is used to accumulate output it is not reused in subsequent windows. Any accumulated value greater than a
whole increment is rounded off and added to the next timestep.
This method was designed to take the remainders of values off into full increments and scrape them together
into additional precip over a larger moving window.
This is a JustInTime (jit) compiled function that explicitly uses numpy (not Python).
:param arr: numpy array. Intended for reainders from rounding precip down to full increments.
:param window: int. Size of moving window to be used.
:param increment: float. The increment to accumulate by.
:return: Numpy array of accumulated values in whole increments.
"""
nrow = len(arr)
window_sum = zeros(nrow, dtype=float64)
remainder = zeros(nrow, dtype=float64)
used = zeros(nrow, dtype=bool_) # True = exclude from future sums
# for each row
for row in range(nrow):
# Compute sum of last `window` values, excluding masked ones
start = max(0, row - window + 1)
end = row + 1
tot = 0.0
for i in range(start, end):
if not used[i]:
tot += arr[i]
tot += remainder[i]
#remainder[i] -= remainder[i]
if tot >= increment:
used[start:end] = True
with objmode(prec_tot='float64'):
# function passes the object rounding method as input
prec_tot = QaRules.round_to_precision(tot, increment, floor)
window_sum[row] = prec_tot
remainder[row + 1] = tot - prec_tot
return window_sum
[docs]
def get_flagged_days(self):
flags = self.flags
has_flag = flags.any(axis='columns')
# get rid of time and create one row for each day where something was flagged.
return pd.to_datetime(pd.Series(flags[has_flag].index).dt.date.unique())
# drain_days = pd.Series(cs.loc[drain_events, 'TOT'].index).sort_values().dt.strftime('%m/%d/%Y').unique()
[docs]
def create_flag_log(self, probe, output_dir='./processed_data'):
event = self.event
now = datetime.now()
log_list = [f"""\
FSDB_PPT: QAQC applied to provisional precipitation data from sensors at H.J. Andrews
********************************************************************************************
FSDB_PPT version: {__version__}
run at: {now.strftime('%Y/%m/%d %H:%M:%S')}
Site: {probe}
===============
Data flagged (NOT Accepted):
"""]
pd.set_option("display.max_colwidth", None)
flagged_days = self.get_flagged_days()
for day in flagged_days:
day_end = day + pd.to_timedelta('1D') - pd.to_timedelta('5min')
# if not dropna=False, NA values in provisional flags will cause an empty Series to be returned.
what_happened_today = event.loc[day: day_end].value_counts(dropna=False)
log_list.append(f"\n{day}\n--------\n{what_happened_today}\n")
#log += f"{day}\n{what_happened_today}"
log_str = ''.join(log_list)
log_dir = join(output_dir, 'logs')
makedirs(log_dir, exist_ok=True)
log_name = join(log_dir, f"{probe}_{now.strftime('%Y%m%d_%H%M%S')}.log")
with open(log_name, 'w') as f:
f.write(log_str)
[docs]
def plot_flagged_day(self, day, site, tdelta='1D', **kwargs):
"""
Plot flags over the both instantaneous tank values and total accumulated precipitation to show how the raw data
is acting and how flagging is being applied.
This is an interactive method that allows multiple inputs to increase the descriptive ability of the graph,
while continuing to graph components in the same way to create consistency in review of flagging. Precipitation
is plotted on the left axis, while tank levels are plotted on the right axis. There are several optional
plotting inputs, but flags are always displayed:
* All flags are plotted where the flag letter is a red point over top of the precipitation data.
* Events, if provided, can be plotted on either axis and use predetermined symbology defined by
the dictionary `event_plt_kwargs`. An events DataFrame as formated in :code:`post_gce_qc.QaRules.events` is expected.
* Paired tank, if provided, is plotted as a yellow dotted line on the right axis and is noramlized to the value
of the site's tank level at the beginning of the time period.
* Running values, if provided, are plotted on the left axis as a dotted line for std deviation and a dashed
line for running averages.
:param day: datetime object. This is the start time for the figure. Typically midnight of a given day
:param site: Str. The name of the site plotted
:param tdelta: Str. Defines how much time is plotted after `day`. Must be a valid format for
:func:`pandas.to_timedelta`. Defaults to '1D'
:param auto_qa_event: DataFrame of boolean values as formated in `post_gce_qc.QaRules.events`
:param running_vals: DataFrame of running mean and std deviation
:param paired_tank: a DataFrame of instantaneous tank level for a cross comparison of gauges
:return:
"""
# plot_day_flagged_rev3(day, df, find_drains, neg_delta_tank, running_mean, running_std, q, na):
#tank_drop = neg_delta_tank.loc[day]
#drain_event = find_drains.loc[day]
# accomodate pyarrow conversion to datetime (includes HH:MM:SS)
strt, end = (day, day + pd.to_timedelta(tdelta))
df_day = self.data.loc[strt:end, :]
flag_today = self.flags.loc[strt:end, :]
#run_day = running_mean.loc[day]
#runstd_day = running_std.loc[day]
# plot precip
plt.figure()
ax1 = df_day['precip'].plot(grid=True, marker='x')
lgnd1 = ['precip']
# plot tank height
ax2 = ax1.twinx()
lgnd2 = []
# if provided, add qa events and running avg, etc.
if 'auto_qa_event' in kwargs:
auto_qa_event = kwargs['auto_qa_event'].loc[strt:end]
event_plt_kwargs = {'drain_event': {'ax': ax1, 'linestyle': '', 'marker': 'o', 'color': 'k', 'label':'drain_event'},
'neg_delta_tank': {'ax': ax1, 'linestyle': '', 'marker': 'x', 'color': 'm',
'label':'neg_delta_tank'},
'clog': {'ax': ax2, 'linestyle': '', 'marker': 'o', 'color': 'navy', 'label':'clog'},
'duplicate': {'ax': ax1, 'linestyle': '', 'marker': 'X', 'color': 'lime', 'label':'duplicate'},
'diurnal_flux':{'ax':ax2, 'linestyle': '', 'marker': 's', 'color': 'mediumorchid',
'label':'diurnal_flux'},
}
event_lgnd = {'drain_event': lgnd1,
'neg_delta_tank': lgnd1,
'clog': lgnd2,
'duplicate': lgnd1,
'diurnal_flux': lgnd2,
}
for event in auto_qa_event.columns:
if auto_qa_event[event].any():
line_to_mark = 'precip' if event in ' neg_delta_tank drain_event duplicate' else 'tank_height'
df_day.loc[auto_qa_event[event], line_to_mark].plot(**event_plt_kwargs[event])
event_lgnd[event].append(event.replace('_', ' ').capitalize())
if 'running_ppt' in kwargs:
running_mean = kwargs['running_ppt']['mean'].loc[strt:end]
running_std = kwargs['running_ppt']['std'].loc[strt:end]
# plot running avg
running_mean.plot(ax=ax1, linestyle='--', color=[0, 0.78, 1])
running_std.plot(ax=ax1, linestyle=':', color=[0, 0.78, 1], grid=True)
lgnd1.extend(['TOT RunAvg window', 'TOT RunAvg+SD'])
if 'running_tank' in kwargs:
running_mean = kwargs['running_tank']['mean'].loc[strt:end]
# plot running avg
running_mean.plot(ax=ax2, linestyle='--', color=[0, 0.78, 1])
lgnd2.append('TOT RunAvg window')
if 'paired_tank' in kwargs:
pair = kwargs['paired_tank']
strt_tank_diff = pair[strt] - df_day.loc[strt, 'tank_height']
pair_day = pair[strt:end] - strt_tank_diff
pair_day.plot(ax=ax2, marker='.', color='gold', legend=True)
lgnd2.append('Paired Tank Height')
# plot QA
for col in flag_today.columns:
this_flag = flag_today[col]
if this_flag.any():
df_day.loc[this_flag, 'precip'].plot(ax=ax1, grid=True, linestyle='', marker=f'${col}$', color='crimson'
, markersize=8)
lgnd1.append(f'Set as {col}')
ax1.legend(lgnd1, loc='center left')
# plt tank
df_day['tank_height'].plot(ax=ax2, marker='.', color='orange', legend=True)
ax2.set_ylim([0.97*df_day['tank_height'].min(), 1.03*df_day['tank_height'].max()])
lgnd2.append('Tank Height')
ax2.legend(lgnd2, loc='upper right')
plt.title(f'{site} - {day}')
ax1.set_ylabel('Precip (mm)')
ax2.set_ylabel('Tank Height (mm)')
return ax1, ax2
[docs]
class QaRules:
"""
A set of QA rules to run on a single sensor.
Rules to identify places where there are problems in the data or low confidence in the accuracy of the values and
all the metrics and calculations to evaluate those rules. Raw data is input and 2 outputs are produced:
#. A DataFrame of flags accumulated by the rules applied. This shares a DateTime index with the data
and is in the format of a single boolean column for each flag, identifying rows where flag conditions exist
(flag is True).
#. A boolean DataFrame of events or conditions. E.g. column 'overflow' is true wherever the rain gauge overflowed.
"""
def __init__(self, df, qa_params):
self.precision = qa_params['precision']
self.df_orig = df
col = ['drain_event', 'neg_delta_tank', 'overflow', 'duplicate', 'tank_empty', 'clog', 'diurnal_flux', 'over_intensity']
self.qa_events = pd.DataFrame(index=df.index, data=False, columns=col, dtype='boolean[pyarrow]')
col = ['Q', 'U', 'C', 'SetNA', 'Set0', 'E', 'M']
self.qa_flags = pd.DataFrame(index=df.index, data=False, columns=col, dtype='boolean[pyarrow]')
[docs]
@staticmethod
def find_neg_delta(df, col='INST', threshold=-25):
"""
Return boolean Series True where value dropped more than the threshold
Primarily used for identifying the time stamp where the precip storage tank is drained.
:param df: Pandas DataFrame
:param col: str of column name to search
:param threshold: float. Threshold to use to define a drain.
:return: Pandas Series of boolean values with index of df.
"""
return df[col].diff() < threshold
[docs]
@staticmethod
def find_drops(df, precision, col='INST', wind=3, get_roll=False):
"""
Return boolean Series True where value is below the running average.
Primarily used to identify precip tank drain events and any subsequent recharge of mineral oil or antifreeze.
.. Note::
Whole DataFrame is input instead of a slice with just one column to make it more clear what the function is
doing.
Rolling values are not suitable for average precip: rolling average up to current time step instead of
centering on timestep; no interpolation of nan values will create a gapped record; values are not rounded
to nearest precision of measurement.
:param df: Pandas DataFrame
:param precision: float. minimum precision of measurement
:param col: str. Name of column to assess in DataFrame
:param wind: Size of rolling window to use. Integer interpreted as number of timesteps. Any Pandas DateTime
frequency that evenly divides into multiyear data is also accepted. For example, month is not
accepted because not all months have the same number of days.
:param get_roll: bool: If True, returns the rolling average that drops are assessed against.
:return: Panda Series of boolean values with index of df. Optional additional output get_roll=True.
"""
data = df[col]
# explicitly define defaults: closed both includes numbers at both the left and right edge of the window:
# i.e. all 3 values used.
run_avg = data.rolling(window=wind, closed='both').mean(engine='numba', engine_kwargs=numba_kwargs)
drops = run_avg > (data + precision)
if get_roll:
return drops, run_avg
elif not get_roll:
return drops
[docs]
@staticmethod
def find_duplicate_precip(df, precision, ppt_col='TOT', tank_col='INST', min_ppt_nprecision=1,
flat_tank_nprecision=1, flat_ppt_nprecision=1):
"""
Return boolean value True where the tank is flat, and the precip value is repeating and above a minimum
threshold.
This is used to find both single time steps where large cumulative totals are being doubled and long periods
where identical precip values repeat. In both cases, tank values do not change.
.. Note::
Whole DataFrame is input instead of a slice with just one column to make it more clear what the function is
doing.
:param df: Pandas DataFrame
:param precision: float. minimum precision of measurement
:param ppt_col: str with column name
:param tank_col: str with column name
:param min_ppt_nprecision: float. Multiple of precision
:param flat_tank_nprecision: float. Multiple of precision
:param flat_ppt_nprecision: float. Multiple of precision
:return: Pandas Series of boolean values with index of df.
"""
tank_precision = precision * flat_tank_nprecision
ppt_precision = precision * flat_ppt_nprecision
min_ppt_size = precision * min_ppt_nprecision
rainfall = df[ppt_col]
repeat_val = (abs(rainfall.diff()) <= ppt_precision) & (rainfall > 0)
minimum_ppt = rainfall > min_ppt_size
flat_tank = df[tank_col].diff() <= tank_precision
return minimum_ppt & repeat_val & flat_tank
[docs]
@staticmethod
def find_tank_flux(df, precision, col='INST', window=289, nprecision=5, extend_ahead=2):
"""
Return boolean value True where the tank notably fluctuates beyond the trend, i.e. beyond the median tank level.
This method applies a median filter to the data, calculating the median value within a moving window. When the
tank value is more than `nprecision` levels beyond this running median, it identifies the tank level as fluxing.
Any flux is then extended ahead a number of additional timesteps specifies by `extend_ahead`.
This method was designed to capture daily fluctuations in tank value. It may be used for other purposes by
adjusting its parameters, but has only been trained on these diurnal tank fluctuations.
:param df: Pandas DataFrame containing precipitation data.
:param precision: float. minimum precision of measurement
:param col: str. Name of column containing tank values
:param window: inst. Size of rolling window applied
:param nprecision: float. Multiplied by precision to determine threshold for deviation from trend.
:param extend_ahead: int. Number of additional timesteps to extend flux events by
:return: Pandas Series of boolean values identifying tank fluctuations.
"""
trend = median_filter(df[col], size=window, mode='nearest')
flux = trend - df[col]
is_flux = abs(flux) > nprecision * precision
for extend in range(1, extend_ahead + 1):
is_flux |= is_flux.shift(extend)
return is_flux
[docs]
@classmethod
def find_over_accum(cls, df, precision, nprecision=2, zero_precision=3, window='1D', tank_col='INST', precip_col='TOT'):
"""
Return boolean Series True where precipitation values exceed the change in tank level.
This method was designed to be used with a daily window. It calculates the change in tank values over the
specified window and compares them to the precipitation accumulated over the window. When the precipitation is
greater than the change in tank by at least `nprecision` levels, it assigns a value of True indicating over
accumulation of precip for this window. Run with a daily window calculates midnight to midnight values, NOT
a rolling window.
.. Note::
All drains are excluded. The change in tank is always negative following a drain, so it would always return
an over accumulation. Therefore they have been excluded.Using this function will always ignore the day (or
window) where a drain occurs.
:param df:Pandas DataFrame containing precipitation data.
:param precision: float. minimum precision of measurement
:param nprecision: float. Multiplied by precision to determine threshold for over accumulation.
:param zero_preciion: inst. Multiplied by precision to determine tank threshold for 0 accumulation.
:param window: If int, number of timesteps. If str, must be a valid format for :func:`pandas.to_timedelta`.
:param tank_col: str. Name of column with tank level in DataFrame
:param precip_col: str. Name of column with precipitation amount in DataFrame
:return: Pandas Series of boolean values identifying over accumulation of precipitation.
"""
day_grp = pd.Grouper(freq=window) # df.index.to_series().dt.date #
dly_chg = cls.calc_dly_tank_change(df, day_grp, precision, tank_col=tank_col)
dly_ppt = cls.calc_dly_precip(df, day_grp, precip_col=precip_col)
# any tank change that is within 2 precision levels is considered flat
dly_chg[abs(dly_chg) < zero_precision * precision] = 0
# no negative changes (larger than 2 precision levels)
no_drain = dly_chg >= 0
return (dly_ppt > (dly_chg + nprecision * precision)) & no_drain
[docs]
def set_drain_event(self, tank_col='INST', event_window=3, neg_delta_threshold=-25):
"""
Find the drain events, defined by a running tank average greater than tank level, and any moments where the
tank has a negative change in depth.
set_* functions assign values to instance without return.
:param tank_col: str containing name of the column with tank level.
:param event_window: int. What size window to use to calculate the event running average.
:param neg_delta_threshold: int. What size threshold to use to define negative changes in tank depth
"""
tank_drop_event = self.find_drops(self.df_orig, self.precision, col=tank_col,
wind=event_window)
neg_delta_tank = self.find_neg_delta(self.df_orig, col=tank_col,
threshold=neg_delta_threshold)
neg_delta_during_event = pd.Series(data=False, index=neg_delta_tank.index)
for shift in range(-4,4):
neg_delta_during_event |= neg_delta_tank.shift(shift) & tank_drop_event
self.qa_events['drain_event'] = neg_delta_during_event
self.qa_events['neg_delta_tank'] = neg_delta_tank
[docs]
def calc_run_avg_rainfall(self, ppt_col='TOT', wind=4, nstd=2):
"""
Calculate the average and standard deviation for a rolling window of precipitation amounts.
Drain events are removed from data and running values are filled using linear interpolation. All running values
are also rounded to the minimum precision.
:param rainfall_col: str. Column name for precip accumulated since last time step
:param wind: size of running window. If int, number of timesteps. If str, must be a valid
format for :func:`pandas.to_timedelta`.
:param nstd: number of standard deviations to add to running average.
:return: 2 Pandas Series: 1) running avg; 2) running avg +N std deviation of running average.
"""
df_clean = self.df_orig[ppt_col].copy(deep=True)
# remove values during drain event and then calculate running average
df_clean.loc[self.qa_events['drain_event']] = NA
return self.calc_rolling_mean(df_clean, precision=self.precision, wind=wind, nstd=nstd)
[docs]
@staticmethod
def calc_rolling_mean(df_clean, precision=0.254, wind=4, nstd=2):
"""
Calculate the rolling avg and avg + n*std_deviation for the given pd.Series.
Rolling values are rounded to the nearest precision increment. E.g. 1.23 would round to 1.254.
Rolling window is centered; i.e. an equal number of values are included from ahead and behind of the current
value.
:param df_clean: pd.Series containing values to calculate rolling avg on.
:param precision: float. Precision of base data. Output will be rounded to this increment.
:param wind: int. size of running window. If int, number of timesteps. If str, must be a valid format for
:func:`pandas.to_timedelta`.
:param nstd: float. Number of standard deviations to add to running average.
:return: two pandas Series: rolling avg, and avg + n*std_deviation.
"""
df_roll = df_clean.rolling(window=wind, center=True, min_periods=None)
# interpolate across drain event
df_mean = df_roll.mean(engine='cython').interpolate(method='linear')
df_std = df_mean + (nstd * df_roll.std(engine='cython').interpolate(method='linear'))
# round to full increments.
# returns np.array
arr_mean = QaRules.round_to_precision(df_mean.to_numpy(), precision)
arr_std = QaRules.round_to_precision(df_std.to_numpy(), precision)
# even after conversion to numpy and back to pandas, whole process 70% time reduction (faster)
return pd.DataFrame(arr_mean, index=df_clean.index, dtype='float[pyarrow]'), pd.DataFrame(arr_std,
index=df_clean.index,
dtype='float[pyarrow]')
[docs]
@staticmethod
def calc_dly_tank_change(df, day_keys, precision, tank_col='INST'):
"""
Calculate the change in tank level for each day.
This method takes the first and last tank level of each day, and returns the total change in tank level for
the day, midnight to midnight. This method uses the format `pd.DataFrame().groupby().transform()`. Transform
creates an output DataFrame with the same index as the input DataFrame, `df`, repeating the daily value at each
time step.
Any valid input to `pd.DataFrame().groupby()` will be accepted, including those at intervals other than daily.
A list of valid dates (without time) will create the same output as a `pd.Grouper(freq='1D')`, but is slower.
:param df: DataFrame containing daily tank levels.
:param day_keys: Any valid input to `pd.DataFrame().groupby()` will be accepted. A `pd.Grouper(freq='1D')`
object is expected.
:param precision: float. Precision of tank level measurement.
:param tank_col: str. Column name containing tank level.
:return: DataFrame containing daily tank level change with same index as input df.
"""
start_tank = df[tank_col].groupby(day_keys).transform('first')
end_tank = df[tank_col].groupby(day_keys).transform('last')
dly_chg = end_tank - start_tank
# submitting as numpy array is faster even after conversion back to a Series
# using floor will automatically convert date type
rounded_chg = QaRules.round_to_precision(dly_chg.to_numpy(), precision, round_fnc=floor)
return pd.Series(rounded_chg, index=df.index, dtype='float[pyarrow]')
[docs]
@staticmethod
def calc_dly_precip(df, day_keys, precip_col='TOT', exclude=None):
"""
Calculate the total daily precipitation.
This method totals the precipitation for each day. This method uses the format
`pd.DataFrame().groupby().transform()`. Transform creates an output DataFrame with the same index as the input
DataFrame, `df`, repeating the daily value at each sub-daily time step (5 min).
Any valid input to `pd.DataFrame().groupby()` will be accepted, including those at intervals other than daily.
:param df: DataFrame containing precipitation.
:param day_keys: Any valid input to `pd.DataFrame().groupby()` will be accepted. A `pd.Grouper(freq='1D')`
object is expected.
:param precip_col: str. Column name containing precipitation.
:param exclude: Boolean Series or None. Default is None, including all data. If Series, all True values are
excluded from daily total
:return: DataFrame containing daily precipitation totals with the same index as input df.
"""
df = df[precip_col]
if exclude is None:
# exclude none/ include all
exclude = df < 0.
elif exclude is not None:
df = df.copy(deep=True)
# If exclude is NONE, it is assumed that this will change no data
# if exclude is a series, a deep copy is used to create a new reference in memory and avoid changing values
# of the input dataframe (shallow copy behavior references single memory bank for each shallow copy).
df.loc[exclude] = 0
return df.groupby(day_keys).transform('sum')
[docs]
@staticmethod
@jit(cache=True, forceobj=True)
def round_to_precision(arr, precision, round_fnc=round, digits=4):
"""
Round all values to the nearest precision of the instrument. For example, all tipping bucket measurements
rounded to a number of whole bucket tips (no partial tips).
:param df_col: A single column of data. Array math will be preformed to each row of any input format.
:param precision: a numerical value that is the minimum step of the data.
:param round_fnc: a function such as :meth:`round` or :meth:`np.floor` that will round the input numbers.
:param digits:
:return: A column or array of data that has been rounded to whole steps equal to precision.
"""
# remove any floating point imprecision (15 decimal places out 1.0 can become 0.999999999999999)
df_int = round(arr, decimals=digits) * 10**digits
prec_int = precision * 10**digits
nsteps = round(df_int / prec_int, decimals=digits)
# round in the direction desired (round, ceil, floor)
n_steps = round_fnc(nsteps)
return n_steps * precision
[docs]
def flag_drains(self, runavg_rainfall, ppt_col='TOT'):
"""
Flag data during drain events.
When the tank level is dropping, all precip values are changed to nan unless the value is <= running average of
precipitation, which is flagged Q.
`flag_*` functions assign values to instance without return. Changes value of `self.qa_flags` to True where
a flag's conditions are met.
:param runavg_rainfall: a Pandas Series containing the running average of precipitation.
:meth:`.calc_run_avg_rainfall`
:param ppt_col: str of column containing precipitation data.
"""
'''
returns Boolean Dataframe. True where value should be set nan or flagged based on column description
'''
rainfall = self.df_orig[ppt_col]
drain_events = self.qa_events
tank_dropping = drain_events['drain_event'] & drain_events['neg_delta_tank'] & (rainfall > 0)
self.qa_flags['Q'] |= tank_dropping & (rainfall <= runavg_rainfall)
setna = tank_dropping & (rainfall > runavg_rainfall)
self.qa_flags['SetNA'] |= setna
self.qa_flags['M'] |= setna
[docs]
def flag_recharge(self, runstd_rainfall, max_recharge, ppt_col='TOT'):
"""
Flag any recharge added to tank following a drain.
When some tanks are drained, they must be recharged with mineral oil and/or antifreeze. Mineral oil is used to
prevent evaporation, primarily in summer months, while antifreeze is used to liquify unheated or frozen gauges.
The addition of these liquids should not be counted as rain. The following criteria are applied:
#. Drain events: a probe specific window following a drop in tank level (:meth:`set_drain_event`).
#. NA: impossible/highly-unlikely value is rainfall, meets both of:
#. value > precip running avg +N std deviation of running avg
#. value > probe max recharge
#. Q: questionable whether value is rainfall or recharge. Only meets one of NA criteria, but does not meet both.
`flag_*` functions assign values to instance without return. Changes value of `self.qa_flags` to True where
a flag's conditions are met.
:param runstd_rainfall: a Pandas Series containing precipitation running avg +N std deviation of running average
:meth:`.calc_run_avg_rainfall`
:param max_recharge: float. maximum precip during a drain event. 75% of ppt during drain events recommended.
:param ppt_col: str of column containing precipitation data.
"""
rainfall = self.df_orig[ppt_col]
drain_events = self.qa_events
# discard anything where the tank level actually dropped. i.e. draining was happening
potential_recharge = ~drain_events['neg_delta_tank'] & drain_events['drain_event']
grtr_run_sdev = potential_recharge & (rainfall > runstd_rainfall)
grtr_max_ppt_in_drain = potential_recharge & (rainfall > max_recharge)
# precip total for this timestep is <= both criteria
#not_recharge = ~grtr_run_sdev & ~grtr_max_ppt_in_drain
# precip total for this timestep is > both criteria
setna = grtr_run_sdev & grtr_max_ppt_in_drain
self.qa_flags['SetNA'] |= setna
self.qa_flags['M'] |= setna
# precip total for this timestep is <= one criteria. "^" is logical xor, i.e. either, not both (one or other).
self.qa_flags['Q'] |= grtr_run_sdev ^ grtr_max_ppt_in_drain
[docs]
def drain_recharge_flagging_wrap(self, event_window=3, drain_threshold=-25, runavg_wind=4, runavg_nstd=2,
max_recharge=2.67,
tank_col='INST', ppt_col='TOT',):
"""
Flag all drains and subsequent recharges.
This function wraps together the multi-part approach required to flag drains:
#. identify a drain is occurring
#. calculate the running average of precipitation
#. flag precip during the drain using the running average
#. calculate the running average + n * std deviation
#. flag recharge following a drain using the avg + std.
Flagging only occurs during drain events, so it must be identified first, and both drain and rehcarge flagging
require an auxiliary variable (avg; avg+n*std) calculated using a running window.
:param event_window: See :meth:`set_drain_event`.
:param drain_threshold: see `neg_delta_threshold` :meth:`set_drain_event`.
:param runavg_wind: see `wind` :meth:`calc_run_avg_rainfall`.
:param runavg_nstd: see `nstd` :meth:`calc_run_avg_rainfall`.
:param max_recharge: see :meth:`flag_recharge`.
:param tank_col: str containing name of the column with tank level.
:param ppt_col: str of column containing precipitation data.
:return: None. Changes instance values.
"""
self.set_drain_event(tank_col=tank_col, event_window=event_window, neg_delta_threshold=drain_threshold)
run_avg, run_std = self.calc_run_avg_rainfall(ppt_col=ppt_col, wind=runavg_wind, nstd=runavg_nstd)
self.flag_drains(run_avg.iloc[:,0], ppt_col=ppt_col)
self.flag_recharge(run_std.iloc[:,0], ppt_col=ppt_col, max_recharge=max_recharge)
[docs]
def flag_precip_during_tank_flux(self, tank_col='INST', ppt_col='TOT', fluxprecision=5, accprecision=5,
zero_precision=2, fluxwindow=289, accwindow='1D', extend_ahead=3):
"""
Flag all precipitation when tank values are fluctuating.
Cyclical fluctuations are normal in the rain collection tanks, but can be mistaken for precipitation. This
method identifies excessive accumulation during these fluctuations. First, it selects days where accumulation
exceeds the midnight to midnight change in tank level, then it selects moments within those days where the tank
is fluctuating and rain was recorded.
`flag_*` functions assign values to instance without return. Changes value of `self.qa_flags` to True where
a flag's conditions are met.
:param tank_col: str. Name of the column with tank level.
:param ppt_col: str. Name of the column containing precipitation data.
:param fluxprecision: float. Threshold for identifying fluctuations in intervals of precision. See
:meth:`find_tank_flux`.
:param accprecision: float. Threshold for identifying over accumulation in intervals of precision. See
:meth:`find_over_accum`.
:param zero_precision: int. Multiplied by precision to determine tank threshold for 0 accumulation.
:param fluxwindow: int. Number of time steps in moving window. See :meth:`find_tank_flux`.
:param accwindow: str. Timestep at which accumulation is compared. See :meth:`find_over_accum`.
:param extend_ahead: int. Number of time steps to extend flux by. Especially important in negative fluctuations.
"""
precision = self.precision
df = self.df_orig
flux = self.find_tank_flux(df, precision, col=tank_col, window=fluxwindow, nprecision=fluxprecision,
extend_ahead=extend_ahead)
dly_over_accum = self.find_over_accum(df, precision, nprecision=accprecision, tank_col=tank_col,
zero_precision=zero_precision, precip_col=ppt_col, window=accwindow)
raining = df[ppt_col] > 0.
diurnal_flux = flux & dly_over_accum & raining
self.qa_flags['Set0'] |= diurnal_flux
self.qa_flags['E'] |= diurnal_flux
self.qa_events['diurnal_flux'] |= diurnal_flux
[docs]
def flag_propagate_EM_from_tank(self, tankflag_col='INST_Flag', ppt_col='TOT'):
"""
Makes sure that Estimated and Missing flags that are applied to tank are also applied to precip values.
`flag_*` functions assign values to instance without return. Adds str to flags in `self.df_orig` where a flag's
conditions are met. To avoid repeated calls to instance, a local variable, `df` is created to reference the
memory where `self.df_orig` is stored, but the 2 are identical (mutable type).
:param tankflag_col: str. Name of column containing flags for
:param pptflag_col:
"""
df = self.df_orig
pptflag_col = f'{ppt_col}_Flag'
mflag = df[tankflag_col].str.contains('M', na=False)
# Add to flag column so that GCE flags are included in final
df.loc[mflag, pptflag_col] += 'M'
df.loc[df[tankflag_col].str.contains('E'), pptflag_col] += 'E'
# apply rule to data
self.qa_flags['M'] |= mflag
self.qa_flags['SetNA'] |= mflag
# self.qa_flags['GCE_M'] |= mflag
[docs]
def flag_tank_overflow(self, tank_max=744.69, tank_col='INST'):
"""
Flag values where tank is above maximum fill. Gauge will not be able to record additional precip until the
tank level drops.
`flag_*` functions assign values to instance without return. Changes value of `self.qa_flags` to True where
a flag's conditions are met.
:param max_tank_depth: float. Maximum fill point of tank.
:param tank_col: str with column name
"""
overflow = self.df_orig[tank_col] > tank_max
self.qa_events['overflow'] |= overflow
self.qa_flags['U'] |= overflow
[docs]
def flag_double_precip(self, ppt_col='TOT', tank_col='INST', min_ppt_nprecision=2,
flat_tank_nprecision=1, flat_ppt_nprecision=1):
"""
Large amounts of precip were found duplicated in consecutive records. For example 173.1 and 173.0 in
consecutive 5 min intervals.
This method identifies duplicates by looking for large precip that occurs where the tank level is flat and
nearly duplicates the previous value. This has similar purpose to :meth:`.ApplyFlags.remove_GCE_F_flags`, but
uses a numerical method.
`flag_*` functions assign values to instance without return. Changes value of `self.qa_flags` to True where
a flag's conditions are met.
:param ppt_col: str with column name
:param tank_col: str with column name
:param min_ppt_nprecision: float. Multiple of precision. Filters for the smallest amount of precip considered.
:param flat_tank_nprecision: float. Multiple of precision. Select for close but not identical tank values.
:param flat_ppt_nprecision: float. Multiple of precision. Select for close but not identical precip values.
"""
double = self.find_duplicate_precip(self.df_orig, self.precision, ppt_col=ppt_col, tank_col=tank_col,
min_ppt_nprecision=min_ppt_nprecision,
flat_tank_nprecision=flat_tank_nprecision,
flat_ppt_nprecision=flat_ppt_nprecision)
self.qa_events['duplicate'] |= double
self.qa_flags['Set0'] |= double
self.qa_flags['E'] |= double
[docs]
def flag_repeating_val_precip(self, ppt_col='TOT', tank_col='INST', min_ppt_nprecision=0,
flat_tank_nprecision=0, flat_ppt_nprecision=0, min_number_repeating=5):
"""
Large periods were found where small amounts of precip, 0.05 mm to 0.25 mm, are accumulated in the exact same
amount, every 5 minutes, for as long as 80 hours straight, accumulating to over 200 mm. This rule is in place
to catch these periods where the exact same value is repeated consecutively.
This method identifies duplicates by looking for any precip that occurs where the tank level is flat and
exactly duplicates the previous value for multiple consecutive time steps.
`flag_*` functions assign values to instance without return. Changes value of `self.qa_flags` to True where
a flag's conditions are met.
:param ppt_col: str with column name
:param tank_col: str with column name
:param min_ppt_nprecision: float. Multiple of precision. Filters the smallest amount of precip considered.
:param flat_tank_nprecision: float. Multiple of precision. Should be near 0 for flat or artificial tank values.
:param flat_ppt_nprecision: float. Multiple of precision Should be near 0 for repeating identical values.
:param min_number_repeating: int. Number of consecutive values required to be counted as repeating
"""
repeat = self.find_duplicate_precip(self.df_orig, self.precision, ppt_col=ppt_col, tank_col=tank_col,
min_ppt_nprecision=min_ppt_nprecision,
flat_tank_nprecision=flat_tank_nprecision,
flat_ppt_nprecision=flat_ppt_nprecision).astype('boolean')
# `.diff()` of boolean yields True whenever consecutive rows have different boolean values
# `.cumsum()` then assigns a number at the beginning of each group.
number_continuous_grps = repeat.diff().cumsum()
# this returns the size of each group of consecutive values
how_many_in_group = repeat.groupby(number_continuous_grps).transform('size')
constant_repeating = repeat & (how_many_in_group >= int(min_number_repeating))
self.qa_events['duplicate'] |= constant_repeating
self.qa_flags['Set0'] |= constant_repeating
self.qa_flags['E'] |= constant_repeating
[docs]
def flag_overaccum_precip(self, overaccum_threshold=5, tank_col='INST', ppt_col='TOT'):
"""
The change in tank level should match the amount of precip.
The algorithm that evaluates the tank level for precipitation looks for increases in 3 consecutive measurements
to begin a rain event, so this must be carefully parameterized to prevent overflagging. It is also unclear
what flags should emanate from this metric.
.. Warning::
With a test case of CENT, an over-accumulation-threshold of 5 x precision captures all the events captured
by :meth:`.flag_duplicate_precip` plus one additional event. In that case,
:meth:`.ApplyFlags.remove_GCE_F_flags` captures all the same events as this method. So this is a more
accurate numerical approach to capturing the conditions occurring when a "J" precedes an "F" flag. But the
criteria in this method have the potential to capture other events that are not necessarily duplicates.
I don't want to nebulously flag duplicates as Q when they are an artifact, nor do I want to remove things
captured by this criteria, so as of 5/17/23 this function is not used.
:param overcount_threshold: float. Number multiplied by probe precision defining the threshold for ppt overcount
:param tank_col: col
:param ppt_col:
:return:
"""
df = self.df_orig
ppt_minus_tank = abs(df[ppt_col] - df[tank_col].diff())
raining = df[ppt_col] > 0
overaccum_ppt = ppt_minus_tank > overaccum_threshold * self.precision
return raining & overaccum_ppt
[docs]
def flag_empty_tank(self, tank_col='INST', pause_nsteps=2):
"""
A tank value <0 is not possible and means the sensor float is in a dead zone where it can not be read. This
can be a result of a logger reboot or sensor removal rather than an actual measurement. If the tank value is <0
the next measurement will be falsely counted as precip. Due to the 'F' flag functionality in simple_pre.m
it is necessary to filter for at least 2 timesteps after a tank measurement becomes <0.
:meth:`.flag_duplicate_precip` and :meth:`.ApplyFlags.remove_GCE_F_flags` catch the second timestep of values
resulting from this case, but they leave the first timestep. This method removes both values.
`flag_*` functions assign values to instance without return. Changes value of `self.qa_flags` to True where
a flag's conditions are met.
:param tank_col: str. Name of column with tank level
:param pause_nsteps: int. number of time steps after zero tank to delete.
"""
empty_pause = (self.df_orig[tank_col] <= 0).shift(periods=range(0, pause_nsteps+1, 1)).any(axis=1)
self.qa_flags['Set0'] |= empty_pause
self.qa_flags['E'] |= empty_pause
self.qa_events['tank_empty'] |= empty_pause
[docs]
def flag_over_intensity(self, max_per_hour=25, ppt_col='TOT'):
max_5min = max_per_hour / 12
over_intensity = self.df_orig[ppt_col] > max_5min
self.qa_flags['Q'] |= over_intensity
self.qa_events['over_intensity'] |= over_intensity
if __name__ == '__main__':
# load data
import data_transfer
all_probes = data_transfer.LoadProvisionalData(file_n='../config.yaml')
all_probes.load_ppt_data(strtyr=2018, endyr=2019)
df = all_probes.pivot_on_probe(all_probes.df, 'CS2', '02')
# QC sensor/probe
# initiate instance with param and data for this probe
probe = _load_yaml('../qa_param.yaml')['CS2_02']
qc = QaRules(df, qa_params=probe)
# calculate values to identify events
drn_param = probe['auto_flag']['wrap_drain_recharge_flagging']
qc.wrap_drain_recharge_flagging(tank_col='INST', event_window=drn_param['drain_event_window'],
neg_delta_threshold=-15, ppt_col='TOT', wind=drn_param['ppt_runavg_window'],
nstd=drn_param['drain_event_nstd'],
max_recharge=drn_param['drain_event_max_recharge'])
qc.flag_tank_overflow(probe['auto_flag']['flag_tank_overflow']['tank_max'])
qc.flag_empty_tank(tank_col='INST', pause_nsteps=2)
qc.flag_propagate_EM_from_tank(tankflag_col='INST_Flag', ppt_col='TOT')
qc.flag_double_precip(tank_col='INST', ppt_col='TOT', min_ppt_nprecision=2,
flat_tank_nprecision=1, flat_ppt_nprecision=1)
qc.flag_repeating_val_precip(tank_col='INST', ppt_col='TOT', min_ppt_nprecision=0,
flat_tank_nprecision=0, flat_ppt_nprecision=0, min_number_repeating=5)
# Add manual flags
flag = ApplyFlags(df.index)
flag.apply_QaRules_flags(qc.qa_events, qc.qa_flags)
flag.apply_manual_flags(probe['manual_flags'])
flag.apply_event_code()
flag.import_provisional_data(qc.df_orig, tank_col='INST', ppt_flag_col='TOT_Flag')
flag.remove_GCE_F_flags()
flag.apply_NAN_val()
flag.apply_0_val()
# Evaluate flags applied
flag.create_flag_log('CS2_02', output_dir='../processed_data')
days_flagged = flag.get_flagged_days()
run_avg, run_std = qc.calc_run_avg_rainfall(ppt_col='TOT', wind=4, nstd=2)
running = {'mean':run_avg, 'std':run_std}
flag.plot_flagged_day(days_flagged[-1], 'CS2_02', auto_qa_event=qc.qa_events, running_vals=running)