Source code for cross_probe_qc

__authors__ = 'Greg Cohn'
__version__ = '0'

import pandas as pd
from post_gce_qc.qaqc import QaRules
from pyarrow import NA
import matplotlib.pyplot as plt

[docs] class BuildXTable: """ Takes a dictionary of instances of :meth:`qaqc.ApplyFlags`. The keys to the dictionary are probe codes. """ def __init__(self): pass
[docs] @staticmethod def calc_wy_acc(data_series): """ Reset the water year accumulation (cumulative sum) for adjusted values. To be preformed occasionally if this column of data is needed. .. Note:: This was originally applied by :meth:`assemble_wy_acc` using `df_tot.apply(calc_wy_acc)``. However, this method was 40% slower. As of 11/17/25 this method is no longer used. :param data_series: pandas data series containing precip data :return: pandas data series of cumulative some of precip by water year """ return data_series.groupby(pd.Grouper(freq='YE-SEP')).cumsum()
[docs] @staticmethod def assemble_wy_acc(df_tot): """ Calculate the water year accumulation (cumulative sum) for eac column. This will function on any dataframe, but its column-wise operations make it most suited for pivot table style DataFrames. The water year is defined as 10/1/xxxx 00:00 - 9/30/xxxx 23:55. :param df_tot: pandas DataFrame containing precipitation data :return: pandas DataFrame containing accumulated totals for each water year by column """ return df_tot.groupby(pd.Grouper(freq='YE-SEP')).cumsum()#.ffill()
[docs] @staticmethod def assemble_cross_table(probe_flags, probe_list=None, ppt_col='adj_precip'): """ Create a cross comparison table (like creating a pivot table on probe) where each column is a different probe. Takes a dictionary of instances of :meth:`qaqc.ApplyFlags`. The keys to the dictionary are probe codes. Data is then pulled from each site. :param probe_flags: dict. Dictionary of instances of :meth:`qaqc.ApplyFlags` :param probe_list: list or str. Each probe in list is a column in the output table. If str == 'all' or val == None, list is generated from `probe_flags` keys. :param ppt_col: str. Column name of precipitation data. :return: `pd.DataFrame` of precipitation by probe. Each column is precip from a different probe. """ if not probe_list or 'all' in probe_list: probe_list = probe_flags.keys() series_list=[] for probe in probe_list: # if data isn't already at the 5 min timestep, convert to 5 min and fill missing timesteps with 0 values # simply assigning the data to the final table fills with NA # this method leaves na values for partial water years while keeping na values out of the analysis probe_data = probe_flags[probe].data[ppt_col].asfreq('5min', fill_value=0) probe_data.name = probe probe_data.index = probe_data.index.astype('timestamp[s][pyarrow]') series_list.append(probe_data) return pd.concat(series_list, axis=1)
[docs] class Probe2ProbeXQc: """ Cross compare 2 probes against each other to identify clogs and anomalous accumulations. Each site has its own relationship to the base site, defined in `qa_param.yaml`. This class compares 2 sites at a time. New pairs can be imported into a new instance of the class using :meth:`import_QaRules_data` """ def __init__(self, df_index): """ """ self.ACC = pd.DataFrame(index=df_index, dtype='float[pyarrow]') self.TOT = pd.DataFrame(index=df_index, dtype='float[pyarrow]') self.Ratio = pd.DataFrame(index=df_index, dtype='float[pyarrow]')
[docs] @classmethod def import_QaRules_data(cls, df_2probe_tot, df_2probe_acc, df_2probe_ratio): """ Populate an instance of :meth:`Probe2ProbeXQc` with precip, YTD accumulated precip for each water year, and a ratio of the paired YTD accumulation. :param df_2probe_tot: pandas DataFrame containing precip totals since last time step for a pair of probes. :param df_2probe_acc: pandas DataFrame containing YTD accumulated precip for each water year for a pair of probes. :param df_2probe_ratio: pandas Series containing the ratio of YTD accumulated precip for a pair of probes. :return: instance of class :meth:`Probe2ProbeXQc` """ self = cls(df_2probe_tot.index) self.TOT = df_2probe_tot self.ACC = df_2probe_acc self.Ratio = df_2probe_ratio return self
[docs] def set_clog_event(self, pair=(), min_accum=34, lowest_normal_ratio=0.025, rolling_window='8D', window_precision=0.01, max_ratio=0.4): """ Asses a pair of probes to identify clog events. Identify clogs by periods where the ratio between a pair of probes drops below the running average of the ratio. This effectively identifies periods where the ratio has a dropping trend, signaling that the base probe is accumulating substantially less than the paired probe, which is interpreted as a clog event. :param pair: tuple of str. The base probe name and the paired probe name. :param min_accum: The minimum water year accumulation required before clog analysis can begin. :param lowest_normal_ratio: The lowest allowable ratio between the base probe and probe. Set as clog below this value. :param rolling_window: str. The size of the rolling window applied to ratio for clog analysis. Must be a valid format for :func:`pandas.to_timedelta`. :param window_precision: float. The minimum amount of change when the ratio between probes is dropping (negative trend). """ base, match = pair if type(rolling_window) is str: # find digits num_true = [i.isdigit() for i in rolling_window] # get digit indexes num_index = [i for i, val in enumerate(num_true) if val][-1] + 1 window_length_2x = 2 * int(rolling_window[:num_index]) rolling_window_2x = f'{window_length_2x}{rolling_window[num_index:]}' elif type(rolling_window) is int: rolling_window_2x = 2 * rolling_window ratio = self.Ratio # find clogs above_min_accum = (self.ACC[[base, match]] > min_accum).all(axis=1) ratio[~above_min_accum] = NA below_normal = ratio < lowest_normal_ratio # discount clogs when probe starts after water year. below_max_ratio = ratio < max_ratio # down_trend_ratio # 1. value is < running_avg dropping_ratio, ratio_run_avg = QaRules.find_drops(ratio, window_precision, col=match, wind=rolling_window, get_roll=True) # 2. slope of running_avg (2x large window) is negative # 2.a. calculate slope (1st differential) ratio_run_avg_slope = ratio_run_avg.diff() # 2.b. get size 2x window # 2.c. running avg of slope slope_smoothed = ratio_run_avg_slope.rolling(window=rolling_window_2x, closed='both' ).mean(engine='numba', engine_kwargs={"cache":True, "nopython":True}) # 2.d. slope is negative ratio_neg_slope = slope_smoothed < 0 ratio_down_trend = ratio_neg_slope & dropping_ratio # assign True/Fasle, they evaluate as 1/0 for clog, 0 for no clog # must make them all series (No DataFrames) return (below_normal[match] | ratio_down_trend) & above_min_accum & below_max_ratio[match]
[docs] def flag_clog_undercatch(self, clog, precip_run_avg, pair=()): """ Assess a pair of probes for periods where the base probe is measuring less precip than the paired probe. Find periods where the paired probe indicates that the base probe is clogged and is accumulating substantially less precip than the paired probe. :param clog: pandas Series of booleans that are True when the pair identifies a clog. :param precip_run_avg: pandas Series of running avg + n*std_deviation. :param pair: tuple of str. The base probe name and the paired probe name. :return: """ base, match = pair non0 = self.TOT[match] > 0 for shift in [-2,-1,1,2]: non0 |= non0.shift(shift) return (precip_run_avg[match] >= precip_run_avg[base]) & non0 & clog
[docs] def flag_clog_delayed_accum(self, clog, precip_run_avg, pair=()): """ Assess a pair of probes for periods where the base probe is measuring delayed accumulation from an earlier timstep. A clog can prevent accumulated precip from being measured (undercatch). A sudden unclog, a slow leaking clog, or a slow or sudden melt can all lead to precip accumulation after the preciptation fell, including during periods of low or no precipitation. The amount accumulated represents the cumulative total precipitation since the clog/undercatch began. The amount of precip is often correct (and large), but is recorded at the wrong time. :param clog: pandas Series of booleans that are True when the pair identifies a clog. :param precip_run_avg: pandas Series of running avg + n*std_deviation. :param pair: tuple of str. The base probe name and the paired probe name. :return: pandas Series of booleans that are True when the pair identifies delayed accumulation. """ base, match = pair non0 = self.TOT[base] > 0 return (precip_run_avg[match] < precip_run_avg[base]) & non0 & (clog.shift(3) | clog.shift(1))
[docs] def clog_pair_flagging_wrap(self, base, match, min_accum=34, lowest_normal_ratio=0.025, rolling_window='8D', window_precision=0.01, precision_val=0.254, window='1h', n_std=1.5): """ Compare a base probe to a matched/paired probe to identify periods where the base probe was clogged and flag all clogs with U for undercatch, C for delayed accumulation, or no flag during rain free periods. Flagging clogs requires a multistep approach for each probe pair (base and match). This method wraps together multiple steps into a single function: #. identify periods where the matched probe indicates a clog in the base probe #. calculate the running average of precip + n*std_deviation for both probes #. flag undercatch during the clog #. flag delayed accumulation (Cumulative total) within 3 timesteps of a clog ending :param base: str. The name of the probe being assessed for clogs. :param pair: str. The name of the probe to be compared to the base probe. :param min_accum: see :meth:`set_clog_event` :param lowest_normal_ratio: see :meth:`set_clog_event` :param rolling_window: see :meth:`set_clog_event`. :param window_precision: see :meth:`set_clog_event` :param precision_val: see :meth:`qaqc.QaRules.calc_rolling_val` :param window: see :meth:`qaqc.QaRules.calc_rolling_val` :return: three pandas Series of boolean values; clog_events, U_flags, C_flags """ pair = (base, match) clogs = self.set_clog_event(pair=pair, min_accum=min_accum, lowest_normal_ratio=lowest_normal_ratio, rolling_window=rolling_window, window_precision=window_precision) _, precip_run_std = QaRules.calc_rolling_mean(self.TOT, precision=precision_val, wind=window, nstd=n_std) precip_run_std.columns = self.TOT.columns U = self.flag_clog_undercatch(clogs, precip_run_std, pair=pair) C = self.flag_clog_delayed_accum(clogs, precip_run_std, pair=pair) return clogs, U, C
[docs] class XProbesQc: """ Cross probe quality checks to find anomalous collections. This class works with a pivot table by probe. Takes a base probe, and compares every other probe to that probe. Once all have been compared, site specific weighting is applied to the boolean DataFrames to determine if an event occured and then the flag with the highest weighted number is applied. Uses :meth:`Probe2ProbeXQc` to identify clogs and flags through comparison of individual pairs. """ def __init__(self, df_index, base_probe): self.base_probe = base_probe self.ratio = pd.DataFrame(index=df_index, dtype='float[pyarrow]') self.U = pd.DataFrame(index=df_index, dtype='boolean[pyarrow]') self.C = pd.DataFrame(index=df_index, dtype='boolean[pyarrow]') self.clog = pd.DataFrame(index=df_index, dtype='boolean[pyarrow]') # these remain empty until filled by flag_x_clog, which defines column names # this is an order of magnitude faster # U, C self.flags = pd.DataFrame(index=df_index, dtype='boolean[pyarrow]') # clog self.event = pd.DataFrame(index=df_index, dtype='boolean[pyarrow]')
[docs] @staticmethod def calc_ratio(base, compare): """ Calculate the raio between a base value and a comparison value. :param base: pandas Series of values being investigated. :param compare: pandas Series of values being compared to base values. :return: pandas Series containing raio between base and compare. """ return (base - compare)/base
[docs] def calc_accum_ratio(self, ACC): """ Calculate the ratio of YTD accumulated precip for all probes compared to the base_probe. Base probe is set for the instance. :param ACC: pandas DataFrame where each column is the YTD water year accumulated precip for a probe to be compared to the base_probe. :return: pandas DataFrame where each column is the ratio of YTD water year accumulated precip. """ base_site = ACC[self.base_probe] cross_site = ACC[ACC.columns[~ACC.columns.isin([self.base_probe])]] return cross_site.apply(lambda x: self.calc_ratio(base_site, x), axis=0)
[docs] def set_accum_ratio(self, ACC): """ Set the instance parameter `self.ratio` by calculating the ratio of YTD accumulated precip for all probes compared to the base_probe. :param ACC: pandas DataFrame where each column is the YTD water year accumulated precip for a probe to be compared to the base_probe. """ self.ratio = self.calc_accum_ratio(ACC)
[docs] def get_Probe2ProbeXQc_inst(self, tot, acc, probe=''): """ Initialize and return an instance of Probe2ProbeXQc. Create a paired comparison between the base probe and another probe. :param tot: Pandas DataFrame containing the total accumulated precipitation since last measurement (default 5 min). :param acc: Pandas DataFrame containing the accumulated precipitation for this Water Year. :param probe: str. The name of the probe to be compared to the base probe. :return: Instance of :meth:`Probe2ProbeXQc` comparing probe against base_probe. """ cols = [self.base_probe, probe] ratio = pd.DataFrame(self.ratio[probe]) return Probe2ProbeXQc.import_QaRules_data(tot[cols], acc[cols], ratio)
[docs] def set_x_clogs(self, tot, acc, params): """ For each probe, perform a cross comparison against all other probes to identify clogs. This function creates a pandas DataFrame where each column contains boolean values for a probe paired with a base parameter. :param tot:Pandas DataFrame containing the total accumulated precipitation since last measurement (default 5 min). :param acc: Pandas DataFrame containing the accumulated precipitation for this Water Year. :param params: dict. Keyed by site with kwargs for :meth:`Probe2ProbeXQc.clog_pair_flagging_wrap`. """ for probe, func_params in params.items(): xqc = self.get_Probe2ProbeXQc_inst(tot, acc, probe) kwargs = func_params['clog_pair_flagging_wrap'] clog, U, C = xqc.clog_pair_flagging_wrap(self.base_probe, probe, **kwargs) self.clog[probe] = clog self.U[probe] = U self.C[probe] = C
[docs] def get_weight_x_clog(self, weights): """ Weight each cross comparison. Probes at the same site are weighted 0.; probes at a simlar elevation or along the same ridge are weighted 0.3; and other probes are weighted lower. The total clog value will be determined as the sum of all weights time the clog value. values > 2/3 will be flagged as a clog. or at least 2 out of the probes * uplo- vara and mack each rated 0.334, within site rated 0.6. CENT 0.07 per probe, 0.21 total prim 0.2, and all others 0.17. * vara- uplo and cent 0.17 per probe, 0.51 total, prim/mack 0.2, and all others 0.17. :param weights: dictionary of integers defining the weight to apply to each site. Keyed by site string. :return: Single column of total weighted score for clog, U flag, and C flag """ clogs = self.clog.astype('int8[pyarrow]') U = self.U.astype('int8[pyarrow]') C = self.C.astype('int8[pyarrow]') for site, wt in weights.items(): clogs[site] *= wt U[site] *= wt C[site] *= wt return clogs.sum(axis=1), U.sum(axis=1), C.sum(axis=1)
[docs] def flag_x_clogs(self, clogstot, Utot, Ctot): """ Assign flags based on weighted totals from all probes assessed. :param clogstot: `pd.Series` of integers of weighted clog scores totalled across all probes. :param Utot: `pd.Series` of integers of weighted U-flag scores totalled across all probes. :param Ctot: `pd.Series` of integers of weighted C-flag scores totalled across all probes. """ #clog_event = pd.DataFrame(index=clogstot.index) clog = clogstot > 66 self.event['clog'] = clog #clog_flag = pd.DataFrame(index=Utot.index) # by making >= instead of >, it can have U where the score is too low for a clog self.flags['U'] = (Utot >= 66) & (Utot >= Ctot) & clog # C may be as many as 3 timesteps after the end of a clog self.flags['C'] = (Ctot > 66) & (Ctot > Utot)
#return clog_event, clog_flag
[docs] def plot_x_clogs(self, day, tdelta='10D'): """ Plot clog identification by multiple probes. The ratio of each probe (accumulation compared to the base probe) is plotted for the specified days and periods each probe identifies as clogs are highlighted. This helps visualize the pattern of the ratios during the period and shows which combination of probes signaled a clog. Final clogs are the weighted composite of individual probe clogs. :param day: Pandas Datetime. :param tdelta: Str. Defines how much time is plotted after `day`. Must be a valid format for :func:`pandas.to_timedelta`. Defaults to '10D'. :return: matplotlib axes. """ end = day + pd.to_timedelta(tdelta) plt.figure() ax1 = plt.subplot() for prb in self.clog: self.ratio.loc[day:end, prb].plot(grid=True, legend=True, ax=ax1) color = ax1.get_lines()[-1].get_color() pratio = self.ratio.loc[day:end, prb] pclog = self.clog.loc[day:end, prb] pratio[pclog].plot(grid=True, linestyle='', marker='.', ax=ax1, legend=True, label=f'clog {prb}', color=color) ax1.set_ylabel('Ratio ($\\frac{Base-Pair}{Base}$)') return ax1
[docs] def plot_wy_start(self, acc, comparison_probe): """ Plot the beginning of the water year to determine minimum accumulation before usable/stable ratios exist. :param acc: :param comparison_probe: :return: """ years = acc.groupby(pd.Grouper(freq='YE-SEP')).apply(lambda x: x.index[0].year) for y in years[:-1]: wy_0, wy_1 = pd.to_datetime(f'10/1/{y} 0005'), pd.to_datetime(f'12/31/{y}') plt.figure() ax1 = plt.subplot(211) self.ratio.loc[wy_0:wy_1, comparison_probe].plot(grid=True, ax=ax1) ax2 = plt.subplot(212) acc.loc[wy_0:wy_1, [self.base_probe, comparison_probe]].plot(grid=True, ax=ax2) plt.tight_layout()
[docs] def plot_clog_wind_thresholds(self, probe, xppt, xacc, min_ratio, min_accum=[50], days=[6, 8, 10], prec=[0.02, 0.03, 0.04]): """ Plot clogs identified using moving windows of different sizes and precision thresholds. This is used to determine the ideal parameters for identifying clogs from the ratio of a particular site to the base probe. :param probe: str. Probe being compared to the base probe. :param xppt: pandas DataFrame containing total accumulated precip since last measurement with a column for each probe. :param xacc: pandas DataFrame containing total accumulated precip for the wateryear with a column for each probe. :param min_ratio: float. Lowest normal ratio. All values < min_ratio are flagged as a clog. :param min_accum: iterable (list or tuple) of float. Minimum water year accumulation before ratios are evaluated for clogs. :param days: iterable (list or tuple) of int. Size of the moving window in days. :param prec: iterable (list or tuple) of float. The minimum amount of change to ID a clog when the ratio between probes is dropping (negative trend). """ p2p = self.get_Probe2ProbeXQc_inst(xppt, xacc, probe) plt.figure() ndays = len(days) naccum = len(min_accum) cnt = 0 for d in days: for ma in min_accum: cnt += 1 ax1 = plt.subplot(ndays, naccum, cnt) self.ratio[probe].plot(grid=True) plt.axhline(min_ratio, color='c') for p in prec: clogs = p2p.set_clog_event(pair=(self.base_probe, probe), min_accum=ma, lowest_normal_ratio=min_ratio, rolling_window=f'{d}D', window_precision=p) self.ratio.loc[clogs, probe].plot(ax=ax1, linestyle='', marker='.', grid=True, label=f'{d}D {p}', legend=True)