PK
W
time_shifters.RUT keke### Script containing time-shift specific functions
scriptsroot = "~/Documents/Academic/PhD/Scripts/R/pptClean/share/gamlss_cleaner/"
source(paste0(scriptsroot, "helpers.R"))
library(tidyverse)
library(zeallot) # direct assignment for multiple return values from helper functions
#####################
# SUBTASK FUNCTIONS #
#####################
ms_shift = function(dfwide, prediction, rat, raty, ratt, sourcelist, mus,
remcols=c("date", "year", "month"), nt = nrow(dfwide),
stations = colnames(rat), r_thres = 0.2, thres_ms = 1.5, verbose = F)
{
### This is a function to perform time shifts where multiple sources are available
### for some stations. `dfwide` is the default original dataset; see shift_time() for
### full description. `rat`, `raty`, and `ratt` are, respectively, the ratios
### between observed and predicted data for the default case, if today were replaced
### by yesterday (fwd-shifting) or by tomorrow (backward shifting). `sourcelist` is
### the list of `data.frame`s with data by station from alternative sources;
### (see shift_time for further details). `nt` is the number of timesteps considered.
### `mus` is a vector with the mean values of the stations in dfwide.
### `stations` is a character vector with the names of the stations to cycle through.
### `r_thres` is a rainfall threshold that must be exceeded
### for a day to be considered for time-shifting. `thres_ms` determines how much better
### the shifted series must be for it to be preferred to the original
# list of days not be considered again later for source selection as they have been
# dealt with here
ts_list = list()
# data frame with a record of all Multi-Source shifts conducted initialise with
# 0-rows
ms_shifts = dplyr::bind_cols(dfwide %>%
dplyr::select(dplyr::any_of(remcols)) %>%
dplyr::filter(F),
vec = numeric(), pred = numeric(),
station = character(), source = character(),
rat = numeric(), raty = numeric(), ratt = numeric())
# add a column for each of the possible sources
for (source in names(sourcelist))
{
ms_shifts %<>%
dplyr::bind_cols(tibble(xx = numeric()) %>%
dplyr::rename_with(~source, "xx"))
}
for (S in stations)
{
vec = dfwide %>%
dplyr::pull(S)
pred = prediction %>%
dplyr::pull(S)
# initialise a tibble with correct
# rows to be col_bound later
st_sources = tibble(.rows = nt)
for (source in names(sourcelist))
{
sourcedf = sourcelist[[source]]
if (S %in% colnames(sourcedf))
{
st_sources %<>%
dplyr::bind_cols(sourcedf %>%
dplyr::select(dplyr::all_of(S)) %>%
dplyr::rename_with(~source, dplyr::all_of(S)))
}
}
nds = length(st_sources)
# more than one source:
if (nds > 1)
{
message("MultiSource for station ", S)
rat_s = rat[, S]
ratt_s = ratt[, S]
raty_s = raty[, S]
printdf = dplyr::bind_cols(dfwide %>%
dplyr::select(any_of(remcols)),
station = S, vec = vec, pred = pred, st_sources,
rat = rat_s, raty = raty_s, ratt = ratt_s)
# if day has already been considered don't assess it again
completef = NULL # fwd-shift
completeb = NULL # back-shift
# only consider time steps with sufficiently large values as first time step
# of sequence remove first entry to avoid problems for checking against
# previous day
t_rain = which(vec[-1] > r_thres) + 1
for (source in colnames(st_sources))
{
# current source under consideration
alt = st_sources[[source]]
# time steps to consider for this source; ignore those where source
# equals default
tl = t_rain[alt[t_rain] != vec[t_rain] & !is.na(alt[t_rain])]
# no more differences to consider (fwd or bck); next source
if (length(tl) == 0)
{
if (verbose)
{
message("nothing to do for source: ", source)
}
next
}
printdf$source = source
### FORWARD-SHIFTING
# ignore days previously considered;
t_i = tl[!(tl %in% completef)]
# no forward shifts to consider, go straight to backshifting
if (length(t_i) == 0)
{
d = nt
}
# time steps for which the current source equals the original a day later
t_i = t_i[vec[t_i - 1] == alt[t_i] & !is.na(vec[t_i - 1])]
if (length(t_i) == 0)
{
d = nt
} else
{
# cycle through the potential sequence starts
d = t_i[1]
}
# fails when tomorrow is no longer in the dataset
while (d < (nt - 3))
{
orig = rat_s[d] # ratio to prediction for default data series
rival = raty_s[d] # if shifted forward
# would backshifting be better? don't shift NAs unnecessarily
rivalback = ifelse(!is.na(ratt_s[d]), ratt_s[d], 1)
completef = c(completef, d) # days that need not be considered again
l = 1 # length of current sequence
# while previous day at reference is equal to today at alt or previous
# day in original is missing end if the alt is missing as this would
# imply replacing a valid data point with an NA
while (((is.na(vec[d + l - 1])) |
(vec[d + l - 1] == alt[d + l] &
!(is.na(alt[d + l])))) &
(d + l) < (nt - 3))
{
update = ifelse(!is.na(rat_s[d + l]), rat_s[d + l], 1)
orig = orig * update
# where there are NAs do not advantage/disadvantage any option
rival = rival * ifelse(!is.na(raty_s[d + l]),
raty_s[d + l], update)
rivalback = rivalback * ifelse(!is.na(ratt_s[d + l]),
ratt_s[d + l], update)
completef = c(completef, d + l)
l = l + 1
}
# minimum of 2 days in the sequence
if (orig/rival > max(thres_ms, orig/rivalback) & l > 1)
{
shift = TRUE
# don't want to overwrite a large rainfall day at the end of the
# sequence because it would then be lost continue looking for
# an end point that would not result in such a data loss
while (vec[d + l - 1] > r_thres & is.finite(vec[d + l - 1]))
{
update = ifelse(!is.na(rat_s[d + l]), rat_s[d + l], 1)
orig = orig * update
rival = rival * ifelse(!is.na(raty_s[d + l]),
raty_s[d + l], update)
rivalback = rivalback * ifelse(!is.na(ratt_s[d + l]),
ratt_s[d + l], update)
completef = c(completef, d + l)
l = l + 1
# if it's no longer good enough
if (orig/rival < max(thres_ms, orig/rivalback))
{
shift = FALSE
break
}
}
# perform the shift if still good enough after extension
if (shift)
{
dfwide[d:(d + l - 1), S] = vec[(d - 1):(d + l - 2)]
ms_shifts %<>%
bind_rows(printdf[d:(d + l - 1), ])
}
}
# move onto the next potential start point after this sequence
# Inf returned with warning when last element is passed
d = min(t_i[t_i > d + l])
}
# update list with data resolved for current station.
# NAs in shift should be explicitly reconsidered
### BACK-SHIFTING
# ignore days previously considered;
t_i = tl[!(tl %in% completeb)]
# no differences here, go to next source
if (length(t_i) == 0)
{
next
}
# time steps for which the current source equals the original a day
# earlier;
# ignore those where source equals default
t_i = t_i[vec[t_i] == alt[t_i - 1] & !is.na(alt[t_i - 1])]
# no forward shifts to consider, go straight to backshifting
if (length(t_i) == 0)
{
d = nt
} else
{
# cycle through the potential sequence starts
d = t_i[1] - 1 # the first replaced day is a day earlier
}
# fails when tomorrow is no longer in the dataset
while (d < (nt - 3))
{
# Avoid overwriting large rainfall events
if (vec[d] > r_thres & !is.na(vec[d]))
{
d = min(t_i[t_i > d + 1]) - 1
next
}
# do not advantage either possibility if the start of the sequence
# considered for replacement is NA.
orig = ifelse(is.na(vec[d]), ratt_s[d], rat_s[d])
rival = ratt_s[d]
completeb = c(completeb, d) #days that need not be considered again
l = 1
# while previous day at reference is equal to today at alt
while (((is.na(vec[d + l + 1])) |
(vec[d + l + 1] == alt[d + l] &
!(is.na(alt[d + l])))) &
(d + l) < (nt - 3))
{
update = ifelse(!is.na(rat_s[d + l]),
rat_s[d + l], 1)
orig = orig * update
rival = rival * ifelse(!is.na(ratt_s[d + l]),
ratt_s[d + l], update)
completeb = c(completeb, d + l)
l = l + 1
}
if (orig/rival > thres_ms & l > 1)
{
dfwide[d:(d + l - 1), S] = vec[(d + 1):(d + l)]
ms_shifts %<>%
bind_rows(printdf[d:(d + l - 1), ])
}
d = min(t_i[t_i > d + l]) - 1
# Inf returned with warning when last element is passed
}
}
# update list with data resolved for current station
ts_list[[S]] = completef[!(is.na(vec[completef]))]
ts_list[[S]] = c(ts_list[[S]], completeb[!(is.na(vec[completeb]))])
}
}
return(list(dfwide, ts_list, ms_shifts))
}
### GENERAL HELPER FNs
good_enough = function(ror, thres_p, thres_max)
{
### returns logical: whether the sequence represented by ror is good enough to
### perform shifting
return(prod(ror, na.rm = T) < (1/thres_p) & min(ror, na.rm = T) < (1/thres_max))
}
good_enough_l = function(ror, thres_p, thres_max)
{
### as with good_enough, but to be used on an entire sequence, including an extra
### check for at least two separate days which support the shift
return(prod(ror, na.rm = T) < (1/thres_p) &
min(ror, na.rm = T) < (1/thres_max) &
sum(ror < 1, na.rm = T) > 1)
}
### FWD-SHIFT HELPERS
NA_rep_fwd = function(vec, rat, raty, ts)
{
### Function which checks whether an NA replacement at timestep `ts` for a forward
### shift is not justified because the replacement number is better suited to its
### original position
# only 1 NA in the two AND (original is NA @ d+l & non-NA value at d+l-1 works better
# there than @ d+l OR or previous day is NA and non-NA @ d+l works better there than
# @ d+l-1)
check = (sum(is.na(vec[(ts - 1):(ts)])) == 1 &
((is.na(rat[ts]) & (raty[ts] > rat[ts - 1])) |
(is.na(raty[ts]) & (rat[ts] < raty[ts + 1]))))
return(check)
}
NA_check_fwd = function(update, d_check, rat, raty, ts)
{
### Function returns day to be checked later because of NA replacement concern
### together with the associated ror in update
update = c(update, max(raty[ts]/rat[ts - 1], raty[ts + 1]/rat[ts], na.rm = T))
d_check = c(d_check, ts)
return(list(update, d_check))
}
update_ror_fwd = function(ror, ror_alt, l, rat, raty, ratt, ts)
{
### function to update ror, ror_alt for timestep(s) ts for length l
# create a vector here, rather than product as in shift_ms as min/max matter
ror = c(ror, raty[ts]/rat[ts])
ror_alt = c(ror_alt, ratt[ts]/rat[ts])
l = l + length(ts)
return(list(ror, ror_alt, l))
}
fwd_better = function(rat, raty, ts)
{
### Check whether yesterday's ror is better/equal than today's or indeterminate (NA)
return(is.na(raty[ts]) | (is.na(rat[ts])) | (raty[ts] <= rat[ts]))
}
fwd_search = function(ror, ror_alt, l, update, d_check, vec, rat, raty, ratt, d,
nt = length(rat))
{
### Function searches for the end point of a potential fwd shifting sequence and
### returns updates to ror, ror_lat and l
# Stop when today is better than yesterday again
while (fwd_better(rat, raty, d + l))
{
# if an NA replacement would be better in its original position, stop the seq
if (NA_rep_fwd(vec, rat, raty, d + l))
{
c(update, d_check) %<-% NA_check_fwd(update, d_check, rat, raty, d + l)
break
}
c(ror, ror_alt, l) %<-% update_ror_fwd(ror, ror_alt, l, rat, raty, ratt, d + l)
# too close to the end
if ((l + d) > (nt - 2))
{
break
}
}
return(list(ror, ror_alt, l, update, d_check))
}
avoid_0end_fwd = function(ror, ror_alt, lf, vec, d, l, l1, l2, l3, rat, raty, ratt,
r_thres, nt = length(vec))
{
### This function attempts to ensure that a shifting sequence ends on an NA, 0, or
### insignificant rainfall amount, to avoid losing an important recorded data pnt
### First attempts to find a non-zero end day immediately before the current end.
### If this fails, looks forward
# if ending a day earlier would mean ending on a 0 or NA, do so as long as it doesn't
# mean the sequence is of length < 2
if ((vec[lf - 1] <= r_thres | is.na(vec[lf - 1])) & (lf > (d + 2)))
{
# if we've removed the last day of an added sequence and this sequence now makes
# the situation worse
if ((l2 == (length(ror) + 1)) && !(good_enough(ror[l1:(l2 - 2)], 1, 1)))
{
l0 = lf # problem end point
lf = l3 # revert to end point before
ror = ror[1:(l1 - 1)]
ror_alt = ror_alt[1:(l1 - 1)]
# first try extending fwd to a non-zero, but ensure one doesn't extend to the
# point that has just been rejected. This could produce an infinite loop
while (is.finite(vec[lf]) & vec[lf] > r_thres)
{
lf = lf + 1
c(ror, ror_alt) %<-% update_ror_fwd(ror, ror_alt, l, rat,
raty, ratt, lf)[1:2]
# back at the problem point
if (lf == l0)
{
lf = l3 # start again, try backwards
ror = ror[1:(l1 - 1)]
ror_alt = ror_alt[1:(l1 - 1)]
break
}
}
# if the above failed, go backward instead
while (lf <= l3 & is.finite(vec[lf]) & vec[lf] > r_thres & lf > d)
{
lf = lf - 1
ror = ror[-length(ror)] # remove last data point from ror
ror_alt = ror_alt[-length(ror)]
}
} else
{
lf = lf - 1
ror = ror[-length(ror)]
ror_alt = ror_alt[-length(ror)]
}
} else
{
# look for a later non-zero day to end on
l = lf - d + 1 #first day after the period (day initially leading to end of seq)
ror_alt = c(ror_alt, ratt[d + l]/rat[d + l])
# forward shifting onto the next day replaces an NA with a number in a worse
# position
if (is.na(vec[d + l]) & (raty[d + l]/rat[d + l - 1] > 1))
{
ror = c(ror, raty[d + l]/rat[d + l - 1])
} else
{
ror = c(ror, raty[l + d]/rat[l + d])
while (vec[d + l] > r_thres & is.finite(vec[d + l]) & ((l + d) < (nt - 2)))
{
l = l + 1
ror_alt = c(ror_alt, ratt[d + l]/rat[d + l])
if (is.na(vec[d + l]) & (raty[d + l]/rat[d + l - 1] > 1))
{
ror = c(ror, raty[d + l]/rat[d + l - 1])
break
}
# rainfall at this pnt is 0 and will be over-written so needs to be added
# to RoR
ror = c(ror, raty[l + d]/rat[l + d])
}
}
lf = d + l # last day is the 0-rainfall day
}
return(list(ror, ror_alt, lf))
}
# FWD-SHIFT MAIN FUNCTION
fwd_shift = function(vec, s, fwd, rat, ratt, raty, n_diffs, nt = length(vec),
r_thres = 0.2, thres_p = 10/3, thres_max = 2.5,
prediction = rep(NA, length(vec)),
datedf = data.frame(1:length(vec)),
verbose = F)
{
### This function is a helper function to shift_time which performs the general
### forward shifting for a specific station (name specified as character by `station`
### parameter) the default data for which is supplied with the `vec` parameter and
### for which `rat`, `ratt`, and `raty` are supplied fwd is the time steps selected
### to consider for fwd-shifting for this station For a detailed description of other
### input parameters, see shift_time
n_fwd = 0 # initialise number of fwd shifts
days_fwd = 0 # corresponding number of days
d = fwd[1] # first case to consider
ret_v = vec # vector to return
shift_df = tibble() # store shifts in here
while (d < nt - 3)
{
# avoid timeshifts causing duplicate non-zeros
if (is.finite(vec[d - 1]) & vec[d - 1] > r_thres)
{
d = min(fwd[fwd > d]) # next value in fwd
next
}
l = 1
d_check = NULL # initialise day to check
update = NULL # initialise RoR replacement for problem NA days
ror = raty[d]/rat[d] # ratio of the ratios; check whether it's ever big enough
ror_alt = ratt[d]/rat[d] # for back shifting
# search for the end point when yesterday is no longer better than today
c(ror, ror_alt, l, update, d_check) %<-% fwd_search(ror, ror_alt, l, update,
d_check, vec, rat, raty,
ratt, d, nt)
# Need at least a 2 day sequence
if (l == 1)
{
d = min(fwd[fwd > (d + 1)]) # to at least past the end of this section
next
}
# check whether a significant improvement occurrs with shifting
if (good_enough(ror, thres_p, thres_max))
{
# improvement big enough
lf = d + l - 1 # end point of shift
} else
{
# reject if not good enough
d = min(fwd[fwd > (d + l)])
next
}
# Check whether to continue
l = l + 1
l2 = Inf # length of ror after previous sequence;
# initialised as Inf to be ignored by min()
# Is next time ratio of ratios != 1 also favouring raty? if it's too close to
# the end, don't bother searching for an extension
while (fwd_better(rat, raty, d + l) & (d + l) < (nt - 3))
{
if (NA_rep_fwd(vec, rat, raty, d + l))
{
c(update, d_check) %<-% NA_check_fwd(update, d_check, rat, raty, d + l)
break
}
# the next non equal ratio favours shifting so we continue
if ((raty[d + l] < rat[d + l]) &
(sum(is.finite(vec[(d + l - 1):(d + l)])) > 1))
{
# include the days with non-1 ratios; i.e. the day prompting the initial
# stop (day after current lf) and the last one prompting the continued
# search for end pnt
c(ror, ror_alt, l) %<-% update_ror_fwd(ror, ror_alt, l,
rat, raty, ratt,
c(lf + 1, l + d))
l = l - 1 # only one of the updates wasn't appended to l
# if the stop day was because of an inadvisable NA replacement, that
# should be considered in RoR
if ((lf + 1) %in% d_check)
{
check_i = which(d_check == lf + 1)
l_ror = length(ror)
ror[l_ror - 1] = update[check_i]
}
} else
{
# not yet at the next non-1 ratio
l = l + 1
next
}
# continue to search for the end pnt of the next section
c(ror, ror_alt, l, update, d_check) %<-% fwd_search(ror, ror_alt, l, update,
d_check, vec, rat, raty,
ratt, d, nt)
## Check whether it is worth adding this sequence to the previous one
# first day after previous end point; one RoR entry per day up to that point
l1 = min(l2, lf - d + 2)
l2 = length(ror)
# improvement big enough; don't need to be quite as strict here\t
if (good_enough(ror[l1:l2], thres_p * 3/5, thres_max/1.75))
{
l3 = lf # previous end point; in case this update must be discarded later
lf = d + l - 1 # new end pnt
l2 = l2 + 1 # start one index further in the array next time
} else
{
l0 = lf - d + 1 # length of shift period prior to last ignored sequence
# reassign RatOfRat to apply to just the previous period (s).
ror = ror[1:l0]
ror_alt = ror_alt[1:l0]
break
}
# should we go on for a third (or (n>3)^th) time? Update `l` and check while
# condition again
l = l + 1
}
# Once we're out of the above loop, the next ratio confirms the end of the
# shifting sequence preferably end on 0 and don't overwrite non-zero data that
# would be lost
if (vec[lf] > r_thres & is.finite(vec[lf]))
{
c(ror, ror_alt, lf) %<-% avoid_0end_fwd(ror, ror_alt, lf, vec, d, l,
l1, l2, l3, rat, raty, ratt,
r_thres, nt)
}
# remove infinites from product
ror[!is.finite(ror)] = NA
ror_alt[!is.finite(ror_alt)] = NA
# effect not big enough or only one problem member in sequence;
# strict condition for seq as a whole
if (!good_enough_l(ror, thres_p * 1.2, thres_max))
{
d = min(fwd[fwd > lf]) # to at least past the end of this section
next
} else if (prod(ror_alt, na.rm = T) < prod(ror, na.rm = T))
{
# back shifting would be better
d = min(fwd[fwd > lf])
next
} else
{
# do the shifting!
n_fwd = n_fwd + 1 # number of shifts
days_fwd = days_fwd + (lf - d + 1)
ret_v[d:lf] = vec[(d - 1):(lf - 1)]
printdf = dplyr::bind_cols(datedf[d:lf, ], obs = vec[d:lf],
rep = ret_v[d:lf], pred = prediction[d:lf],
ror = ror[1:(lf - d + 1)],
raty = raty[d:lf], rat = rat[d:lf],
ror_alt = ror_alt[1:(lf - d + 1)],
ratt = ratt[d:lf])
shift_df %<>%
bind_rows(printdf)
d = min(fwd[fwd > lf])
next
}
}
if (verbose)
{
message("Station is ", s, ", number fwd of shifts = ", n_fwd,
", number of fwd shifted days = ", days_fwd)
}
# differences between original and replacement
diff_i = sum(ret_v != vec, na.rm = T)
n_diffs = n_diffs + diff_i
shift_df$station = s
shift_df$dir = "fwd"
return(list(ret_v, n_diffs, shift_df))
}
### BCK-SHIFT HELPERS
bck_better = function(rat, ratt, ts)
{
### Check whether tomorrow's ror is better/equal than today's or indeterminate (NA)
return((is.na(ratt[ts]) | (is.na(rat[ts])) | ratt[ts] <= rat[ts]))
}
bck_search = function(ror, l, update, d_check, vec, rat, ratt, d, nt = length(rat))
{
### Function searches for the end point of a potential fwd shifting sequence and
### returns updates to ror, ror_lat and l
# Stop when today is better than yesterday again
while (bck_better(rat, ratt, d + l))
{
# if an NA replacement would be better in its original position, stop the seq
if (NA_rep_bck(vec, rat, ratt, d + l))
{
c(update, d_check) %<-% NA_check_bck(update, d_check, rat, ratt, d + l)
break
}
c(ror, l) %<-% update_ror_bck(ror, l, rat, ratt, d + l)
# too close to the end
if ((l + d) > (nt - 2))
{
break
}
}
return(list(ror, l, update, d_check))
}
NA_rep_bck = function(vec, rat, ratt, ts)
{
### Function which checks whether an NA replacement at timestep `ts` for a backward
### shift is not justified because the replacement value is better suited to its
### original position
# only 1 NA in the two & (original is NA @ ts & non-NA value at ts + 1 works better
# there than @ ts OR next day is NA and non-NA @ ts works better there than @ ts-1)
check = (sum(is.na(vec[(ts):(ts + 1)])) == 1 &
((is.na(rat[ts]) & (ratt[ts] > rat[ts + 1])) |
(is.na(ratt[ts]) & (rat[ts] < ratt[ts - 1]))))
return(check)
}
NA_check_bck = function(update, d_check, rat, ratt, ts)
{
### Function returns day to be checked later because of NA replacement concern
### together with the associated ror in update
update = c(update, max(ratt[ts]/rat[ts + 1], ratt[ts - 1]/rat[ts], na.rm = T))
d_check = c(d_check, ts)
return(list(update, d_check))
}
update_ror_bck = function(ror, l, rat, ratt, ts)
{
### function to update ror for timestep(s) ts for length l
# create a vector here, rather than product as in shift_ms as min/max matter
ror = c(ror, ratt[ts]/rat[ts])
l = l + length(ts)
return(list(ror, l))
}
avoid_0end_bck = function(ror, lf, vec, d, l, l1, l2, l3, rat, ratt, r_thres,
nt = length(vec))
{
### This function attempts to ensure that a shifting sequence ends on an NA, 0, or
### insignificant rainfall amount, to avoid losing an important recorded data pnt
### First attempts to find a non-zero end day immediately before the current end If
### this fails, looks forward
# if ending a day earlier would mean ending on a 0 or NA, do so as long as it doesn't
# mean the sequence is of length < 2 if ending a day earlier would mean ending on a
# 0, do so
if ((vec[lf] <= r_thres | is.na(vec[lf])) & lf > (d + 2))
{
# if we've removed the last day of an added sequence and this sequence now makes
# the situation worse
if ((l2 == (length(ror) + 1)) && !(good_enough(ror[l1:(l2 - 2)], 1, 1)))
{
l0 = lf # problem end point
lf = l3 # end point before
ror = ror[1:(l1 - 1)]
# first try extending fwd to a non-zero, but ensure one doesn't extend to the
# point that has just been rejected. This could produce an infinite loop
while (is.finite(vec[lf + 1]) & vec[lf + 1] > r_thres)
{
lf = lf + 1
ror = update_ror_bck(ror, l, rat, ratt, lf)[[1]]
# back at the problem point
if (lf == l0)
{
lf = l3 # start again, try backwards
ror = ror[1:(l1 - 1)]
ror_alt = ror_alt[1:(l1 - 1)]
break
}
}
# if the above failed, go backward instead
while (lf <= l3 & is.finite(vec[lf + 1]) & vec[lf + 1] > r_thres & lf > d)
{
lf = lf - 1
ror = ror[-length(ror)] # remove last data point from ror
}
} else
{
lf = lf - 1
ror = ror[-length(ror)]
}
} else
{
l = lf - d + 1 # first day after the period
# back shifting onto the previous day replaces an NA with a number in a worse
# position
if (is.na(vec[d + l]) & (ratt[d + l]/rat[d + l + 1] > 1))
{
ror = c(ror, ratt[d + l]/rat[d + l + 1])
} else
{
ror = c(ror, ratt[d + l]/rat[l + d])
while (vec[d + l + 1] > 0 & is.finite(vec[d + l + 1]) & ((l + d) < (nt - 2)))
{
l = l + 1
if (is.na(vec[d + l]) & (ratt[d + l]/rat[d + l + 1] > 1))
{
ror = c(ror, ratt[d + l]/rat[d + l + 1])
break
}
ror = c(ror, ratt[d + l]/rat[d + l])
}
}
lf = d + l #last day is the 0-rainfall day
}
return(list(ror, lf))
}
### BCK-SHIFT MAIN FUNCTION
bck_shift = function(vec, s, bck, rat, ratt, n_diffs, nt = length(vec), r_thres = 0.2,
thres_p = 10/3, thres_max = 2.5, prediction = rep(NA, length(vec)),
datedf = data.frame(1:length(vec)), verbose = F)
{
### This function is a helper function to shift_time which performs the general
### backward shifting for a specific station (name specified as character by `s`
### parameter) the default data for which is supplied with the `vec` parameter and
### for which `rat`, `ratt`, and are supplied fwd is the time steps selected to
### consider for fwd-shifting for this station For a detailed description of other
### input parameters, see shift_time
n_bck = 0 # initialise number of fwd shifts
days_bck = 0 # corresponding number of days
d = bck[1] # first case to consider
ret_v = vec # vector to return
shift_df = tibble() # store shift data here
while (d < nt - 3)
{
# avoid timeshifts causing duplicate non-zeros and
if ((is.finite(vec[d])) & (vec[d] > r_thres))
{
# Because d is in bck, we know vec[d] and vec[d+1] are both not NA can't
# shift back from too close to the start (hence d > 2) if yesterday is 0 and
# the net effect of a back shift from d to d-1 is an improvement, or if
# yesterday is NA and today's value is better suited for yesterday: start the
# sequence from yesterday instead and do the shift
if ((((!is.na(vec[d - 1]) & vec[d - 1] == 0) &
(prod(ratt[(d - 1):d]) < prod(rat[(d - 1):d]))) |
(is.na(vec[d - 1]) & (ratt[d - 1] < rat[d])))
&& (d > 2))
{
# remove the current day from list of potentials, to ensure we don't get
# caught in an infinite loop
bck = bck[-which(bck == d)]
d = d - 1
} else
{
d = min(bck[bck > d]) # next value in bck
next
}
}
# sequence initialisation
l = 1
d_check = 0
update = NULL
# only need ror and not ror_alt here as fwd shifting has already been performed
ror = ratt[d]/rat[d] # ratio of the ratios
# Stop when today is better than tomorrow again
c(ror, l, update, d_check) %<-% bck_search(ror, l, update, d_check, vec,
rat, ratt, d, nt)
# Need at least a 2 day sequence
if (l == 1)
{
d = min(bck[bck > (d + 1)]) # to at least past the end of this section
next
}
# check whether a significant improvement occurrs with shifting
if (good_enough(ror, thres_p, thres_max))
{
lf = d + l - 1 # end point of shift
} else
{
d = min(bck[bck > (d + l)])
next
}
# Check whether to continue
l = l + 1
l2 = Inf # length of RoR after previous sequence
# Is next time ratio of ratios != 1 also favouring ratt?
while (bck_better(rat, ratt, d + l) & ((l + d) < (nt - 3)))
{
if (NA_rep_bck(vec, rat, ratt, d + l))
{
c(update, d_check) %<-% NA_check_bck(update, d_check, rat, ratt, d + l)
break
}
# the next non equal well-defined ratio favours shifting so we continue
if (ratt[d + l] < rat[d + l] & sum(is.finite(vec[(d + l + 1):(d + l)])) > 1)
{
# include the days with non-1 ratios; i.e. the day prompting the initial
# stop (day after current lf) and the last one prompting the continued
# search
if ((lf + 1) %in% d_check)
{
d_i = which(d_check == lf + 1)
ror = c(ror, update[d_i], ratt[l + d]/rat[l + d])
} else
{
ror = c(ror, ratt[lf + 1]/rat[lf + 1], ratt[l + d]/rat[l + d])
}
l = l + 1
} else
{
# if the ratios are still equal or indeterminate, continue with while
l = l + 1
next
}
# continue to search for the end pnt of the next section
c(ror, l, update, d_check) %<-% bck_search(ror, l, update, d_check, vec,
rat, ratt, d, nt)
# Check whether it is worth adding this sequence to the previous one
l1 = min(l2, lf - d + 2) # first day after previous end point
# one RoR entry per day up to that point
l2 = length(ror)
# improvement for update big enough; don't need to be quite as strict here
if (good_enough(ror[l1:l2], thres_p * 3/5, thres_max/1.75))
{
l3 = lf # previous end pnt; in case this update must be discarded later
lf = d + l - 1
l2 = l2 + 1 # start one index further in the array next time
} else
{
l0 = lf - d + 1 # length to the old end point
# reassign RatOfRat to apply to just the previous period (s).
ror = ror[1:l0]
break
}
# should we go on for a third (or (n>3)^th) time? Update `l` and check while
# condition again
l = l + 1
}
# Once we're out of the above loop, the next ratio confirms the end of the
# shifting sequence
# preferably end on 0 and don't overwrite non-zero data that would be lost
if (vec[lf + 1] > r_thres & is.finite(vec[lf + 1]))
{
c(ror, lf) %<-% avoid_0end_bck(ror, lf, vec, d, l, l1, l2, l3, rat, ratt,
r_thres, nt)
}
# remove infinites from product
ror[!is.finite(ror)] = NA
# effect not big enough or only one problem member in sequence
if (!good_enough_l(ror, thres_p * 1.2, thres_max))
{
d = min(bck[bck > lf]) # to at least past the end of this section
next
} else
{
# do the shifting!
ret_v[d:lf] = vec[(d + 1):(lf + 1)]
n_bck = n_bck + 1 # number of shifts
days_bck = days_bck + (lf - d + 1)
printdf = dplyr::bind_cols(datedf[d:lf, ], obs = vec[d:lf],
rep = ret_v[d:lf], pred = prediction[d:lf],
ror = ror[1:(lf - d + 1)],
ratt = ratt[d:lf], rat = rat[d:lf],
ror_alt = NA_real_, raty = NA_real_)
shift_df %<>% bind_rows(printdf)
d = min(bck[bck > lf])
next
}
}
if (verbose)
{
message("Station is ", s, ", number of backward shifts = ", n_bck,
", number of backward shifted days = ", days_bck)
}
shift_df$station = s
shift_df$dir = "bck"
diff_i = sum(ret_v != vec, na.rm = T)
n_diffs = n_diffs + diff_i
return(list(ret_v, n_diffs, shift_df))
}
###############################
## MAIN TIME-SHIFTER FUNCTION #
###############################
shift_time = function(dfwide, prediction, mus = NULL, sourceslist = NULL,
remcols = c("date", "year", "month"), r_thres = 0.2,
thres_ms = 1.5, thres_p = 10/3, thres_max = 2.5, verbose = F)
{
### This function performs time-shifing on a wide-format rainfall data.frame `dfwide`.
### It checks whether it would be better to use yesterday/tomorrow than today for a
### period at a station in `dfwide` (all columns not listed in `remcols` are
### stations), relative to the reference series contained in the `prediction`
### data.frame, whose structure should be equivalent to `dfwide` (same stations and
### time steps to be used and id columns must be named from `remcols`).
### `sourceslist` is a list of data.frames, each with the same structure as `dfwide`
### (and `prediction`). It contains alternative institutional sources for rainfall
### The first listed source is assumed to be the default source. Each data.frame
### should be named according to the institutional source.
### `r_thres` is a rainfall threshold that must be exceeded for a day to be considered
### for time-shifting.
### `thres_ms` determines how much better the shifted series must be for it to be
### preferred to the original in the case of multisource shifting.
### `thres_p` is the equivalent for normal shifting.
### `thres_max` is the minimum threshold to be used for the largest improvement in the
### series; i.e. if there is no single day improved by more than thres_max, the shift
### will not be performed.
### The function returns a named list containing the cleaned data.frame `dfret`, an
### integer `ti_shifts` containing a count of the shifts performed, a metadata tibble
### containing information regarding the shifts performed (`shift_df`), an integer
### parameter noting the number of differences between the original and shifted series
### and a tibble `ms_df` which is the equivalent of `shift_df` for multisource shifts.
### It will be empty if no multi-source shifting is performed.
nt = nrow(dfwide) # number of timesteps
stations = dfwide %>%
get_stations(remcols)
if (is.null(mus))
{
mus = dfwide %>%
smeans(remcols = remcols)
}
rat = calc_rat(dfwide, prediction, mus = mus)
# for yesterday:
raty = calc_rat(dfwide[1:(nt - 1), ] %>%
add_row(.before = 1), prediction[2:nt, ] %>%
add_row(.before = 1), mus = mus)
# for tomorrow:
ratt = calc_rat(dfwide[2:nt, ] %>%
add_row(), prediction[1:nt - 1, ] %>%
add_row(), mus = mus)
### First try to shift using different station sources; we look at stations with
### multiple sources
###################################################################
# ==== MULTI-SOURCE SHIFTING #
###################################################################
# more than one source supplied
if (length(sourceslist) > 1)
{
c(dfwide, ts_list, ms_shifts) %<-% ms_shift(dfwide, prediction, rat, raty, ratt,
sourceslist, mus, remcols, nt,
stations, r_thres, thres_ms,
verbose)
if (verbose)
{
message("MultiSource Complete")
}
} else
{
# list of days not be considered again later for source selection as they have
# been dealt with here
if (verbose)
{
message("No MultiSource Shifting to perform")
}
ts_list = list()
ms_shifts = data.frame()
}
# reinitilise
rat = calc_rat(dfwide, prediction, mus = mus)
raty = calc_rat(dfwide[1:(nt - 1), ] %>%
add_row(.before = 1), prediction[2:nt, ] %>%
add_row(.before = 1), mus = mus)
ratt = calc_rat(dfwide[2:nt, ] %>%
add_row(), prediction[1:nt - 1, ] %>%
add_row(), mus = mus)
###################################################################
# ==== GENERAL SHIFTING #
###################################################################
fwd_diffs = 0
bck_diffs = 0
shift_df = tibble() # record shifts in here
# all columns to consider
for (s in stations)
{
vec = dfwide %>%
pull(s)
# time steps to consider at this station: when is yesterday better?
fwd = which(raty[, s] < rat[, s])
if (length(fwd) > 0)
{
###########################################################
# ==== FORWARD SHIFTING #
###########################################################
c(vec, fwd_diffs, shifts) %<-% fwd_shift(vec, s, fwd, rat[, s], ratt[, s],
raty[, s], fwd_diffs, nt, r_thres,
thres_p, thres_max,
prediction[[s]],
dfwide %>%
dplyr::select(any_of(remcols)),
verbose)
shift_df %<>%
bind_rows(shifts)
}
bck = which(ratt[, s] < rat[, s]) # when is tomorrow better?
if (length(bck) > 0)
{
###########################################################
# ==== BACKWARD SHIFTING #
###########################################################
# update for this station only
rat_s = update_rat(vec, prediction[[s]], mus[s])
ratt_s = update_rat(c(vec[2:nt], NA), c(prediction[[s]][1:(nt - 1)], NA),
mus[s])
c(dfwide[, s],
bck_diffs,
shifts) %<-% bck_shift(vec, s, bck, rat_s, ratt_s, bck_diffs,
nt, r_thres, thres_p, thres_max,
prediction[[s]],
dfwide %>%
dplyr::select(any_of(remcols)),
verbose)
shift_df %<>%
bind_rows(shifts)
} else
{
dfwide[, s] = vec # ensure fwd shifting is performed regardless
}
}
n_diffs = bck_diffs + fwd_diffs
if (verbose)
{
message("total fwd difference is ", fwd_diffs,
", total bck difference is ", bck_diffs)
message("total shift difference is ", n_diffs)
}
return(list(dfret = dfwide, ti_shift = ts_list, shift_df = shift_df,
shift_diffs = n_diffs, ms_df = ms_shifts))
}
PKIPK
W?`?`
helpers.RUT keke### This script contains functions used by cleaning functions
library(lubridate)
library(tidyverse)
library(zoo)
add_start = function(dfaccum, length = 10, endcol = "date")
{
### Takes in a dataframe with cumulates of a set `length` with a set end date, in
### the column named by the parameter `end` and returns the dataframe with a 'start'
### column containing the start day of each period and with the `end` column renamed
### to 'end'.
return(dfaccum %>%
rename(end = all_of(endcol)) %>%
mutate(start = end - length + 1) %>%
dplyr::select(c("start", "end", setdiff(colnames(dfaccum), endcol))))
# get start and end on the left:
}
stationwide_to_long = function(dfwide,
ids = c("year", "month", "date", "start", "end", "source"),
varname = "atot")
{
### Converts a wide-form station dataframe to a long-form one, with station column
### 'station' and rainfall in a column named by the `varname` parameter, returning
### only entries with valid (not-NA data)
# filter (is.finite) => only entries with valid data
return(dfwide %>%
pivot_longer(-any_of(ids), names_to = "station", values_to = varname) %>%
dplyr::filter(across(all_of(varname), is.finite)))
}
long_to_stationwide = function(dflong, ids = c("date", "year", "month"), svar = "station",
rvar = "rain")
{
# names sort to ensure station order is preserved (since stations with NA at the
# beginning will be skipped first, as missing values have been removed in dflong)
return(dflong %>%
pivot_wider(id_cols = any_of(ids), names_from = all_of(svar),
values_from = all_of(rvar), names_sort = TRUE))
}
smeans = function(dfwide, remcols = c("date", "year", "month"))
{
### Given a df in wide-format (dfwide) with values in columns by station (and remcols
### as other columns) returns a vector with climatological mean daily rainfall by
### station.
return(dfwide %>%
dplyr::select(-any_of(remcols)) %>%
colMeans(., na.rm = T))
}
corrected_smeans = function(dfwide, dfcomp = dfwide, thres_cov = 0.95,
remcols = c("date", "year", "month"))
{
### Given a df in wide-format (dfwide) with values in columns by station (and remcols
### as other columns) returns a vector with mean daily rainfall by station, corrected
### according to the mean rainfall at surroundings stations in dfcomp over the period
### for which data are available, only neighbouring stations where the proportion of
### days with valid data is greater than thres_cov are used.
### only stations not satisfying this condition will have their means corrected
# first guess
mus = smeans(dfwide, remcols = remcols)
nt = length(dfcomp[[1]]) # number of time steps
coverage = dfcomp %>%
summarise(across(-any_of(remcols), ~sum(!is.na(.))))/nt
dfcomp %<>% dplyr::select(all_of(names(coverage)[coverage > thres_cov]))
# station to correct
dfwide %<>% dplyr::select(any_of(names(coverage)[coverage <= thres_cov]))
stations = colnames(dfwide)
neighbours = dfwide %>%
neighbour_select(dfcomp, remcols = remcols, nmin = 8, nmax = 12)
for (S in stations)
{
s_per = which(!is.na(dfwide[[S]])) # period of valid data for this station
mu_adj = (dfcomp %>%
dplyr::select(colnames(neighbours[[S]])))[s_per, ] %>%
colMeans(na.rm = T) # this station's neigbours
mu_adj = mu_adj / mus[names(mu_adj)] # ratio to overall mean
mu_adj = exp(mean(log(mu_adj))) # geometric mean
mus[S] = mus[S] / mu_adj
}
return(mus)
}
get_stations = function(dfwide, remcols = c("date", "year", "month"))
{
### This function takes a wide dataframe and returns the column names of station
### variables, assuming that all columns other than those provided in the remcols
### character vector are stations
return(dfwide %>%
dplyr::select(-any_of(remcols)) %>%
colnames())
}
neighbour_select = function(dfwide, dfcomp = dfwide, remcols = c("date", "year", "month"),
thresi = 0.8, nmin = 8, nmax = Inf, thresd = 0.01,
equivs = NULL, verbose = TRUE)
{
### This function cycles through the stations in the wide-format data frame dfwide
### (station columns are assumed to be all columns other than remcols) and selects
### stations to use as neighbours for CCW or similar filling procedure based on
### correlations computed with the cormat function. The selection is done from the
### stations in `dfcomp`, which, if not supplied, is assumed to `dfwide`. The function
### returns a list of named vectors with correlations^4 of the selected stations
### equivs is a list of character vectors of stations to be considered equivalent,
### i.e. such that only one of them at a time should be selected as a neighbour
thres1 = thresi # threshold correlation level for station selection
stations = dfwide %>%
get_stations(remcols)
CorMat = cormat(dfwide, dfcomp, remcols)
corlist = list() # list of stations to include and corre. weights
for (S in stations)
{
# do not select the station itself
corvec = CorMat %>%
as.data.frame() %>%
dplyr::select(-any_of(S)) %>%
.[S, ] # this returns a data frame
thres1 = thresi
selected = names(corvec)[which(corvec > thres1)]
while (length(selected) < nmin & thres1 > 0)
{
thres1 = thres1 - thresd
selected = names(corvec)[which(corvec > thres1)]
# if more than the maximum number selected, select only the first nmax,
# (alphabetically)
if (length(selected) > nmax)
{
selected = selected[1:nmax]
}
}
while (length(selected) > nmax & thres1 < 1)
{
thres1 = thres1 + thresd
selected = names(corvec)[which(corvec > thres1)]
# if fewer than the minimum are now selected, go back and choose the first
# nmax
if (length(selected) < nmin)
{
thres1 = thres1 - thresd
selected = names(corvec)[which(corvec > thres1)]
selected = selected[1:nmax]
}
}
# Stations that should not be considered together as they are too similar
if (!is.null(equivs))
{
for (eq in equivs)
{
if (sum(selected %in% eq) > 1)
{
len_eqs = colSums(!is.na(dfcomp[, selected[selected %in% eq]]))
# which one of the equivalents has the longest seq; retain that one
# only there may be more than 1; don't just take 1st one, so don't use
# which.max
retain = names(len_eqs[len_eqs == max(len_eqs)])
# if equal length at maximum; choose the highest correlation
if (length(retain) > 1)
{
# in case correlations also equal, just take the first one
cor_eqs = corvec[retain]
retain = names(which.max(cor_eqs))
# print(paste('secondary retention', retain))
}
removeq = setdiff(eq, retain)
selected = setdiff(selected, removeq)
corvec = corvec[setdiff(names(corvec), removeq)]
}
}
while (length(selected) < nmin & thres1 > 0)
{
thres1 = thres1 - thresd
selected = names(corvec)[which(corvec > thres1)]
# if more than the maximum number selected, select only the first nmax,
# (alphabetically)
if (length(selected) > nmax)
{
selected = selected[1:nmax]
}
}
}
corlist[[S]] = corvec[selected]^4 # ~(R2)^2; i.e. D = 1/R^2
if (verbose)
{
message("Station is: ", S, " threshold used: ", thres1,
" neigbours selected and correlations:")
print(corlist[[S]])
}
}
return(corlist) # a list of matrices with named elements
}
cormat = function(df1, df2, remcols = c("date", "year", "month"))
{
### Returns a correlation matrix between stations from two dataframes, where
### non-station columns are specified by 'remcols'
df1 %<>% dplyr::select(-any_of(remcols))
df2 %<>% dplyr::select(-any_of(remcols))
return(cor(df1, df2, method = "spearman", use = "pairwise.complete.obs"))
}
est_pdry = function(dfwide, remcols = c("date", "year", "month"), thres = 0.2)
{
### naive estimation of the dry day probability at a given station
return(dfwide %>%
summarise(across(-remcols, ~sum(.x < thres, na.rm = T) /
sum(!is.na(.x)))) %>%
data.frame())
}
pdry_mon = function(dfwide, remcols = c("date", "year", "month"), grouper = month,
thres = 0.2, expand = T)
{
### as with est_pdry, but with climatological dry day probability computed by
### 'grouper' and returning either: (a) dataframe with a row month if expand = False;
### or (b) a dataframe with the same number of rows as dfwide if expand = True
### (default)
p0 = dfwide %>%
group_by({{ grouper }}) %>%
summarise(across(-any_of(remcols), ~sum(.x < thres, na.rm = T) /
sum(!is.na(.x))))
if (!(expand))
{
return(p0)
}
stations = p0 %>%
dplyr::select(-{{ grouper }}) %>%
colnames()
dfwide[, stations] = p0[dfwide %>% pull({{ grouper }}), stations]
return(dfwide)
}
calc_rat = function(dfwide, prediction, remcols = c("date", "year", "month"), mus = NULL)
{
### This function returns a mean-adjusted ratio between the observed and predicted
### rainfall at stations included as columns in dfwide and prediction (should be the
### same). Other columns in dfwide and dfprediction should be passed to remcols mus
### is the mean rainfall at each of these stations (computed if not provided) The
### returned ratios are formatted to ensure they are >=1 so that larger values imply
### greater inconsistencies
dfwide %<>% dplyr::select(-any_of(remcols))
prediction %<>% dplyr::select(all_of(colnames(dfwide)))
if (is.null(mus))
{
mus = dfwide %>% smeans()
} else
{
# ensure that the column order of mus and dfwide are equivalent
mus = mus[colnames(dfwide)]
}
# use mus in the denominator to avoid div by 0 and in numerator for consistency
rat = (t(mus + t(dfwide)) / t(mus + t(prediction)))
# ensure that all rats are >= 1 for comparability
rat[!is.na(rat) & rat < 1] = 1 / rat[!is.na(rat) & rat < 1]
return(rat)
}
update_rat = function(vec, pred, mu = mean(vec, na.rm = T))
{
### Performs the same operation as calc_rat above, but on a vector, rathern than a
### df/matrix This is so that the ratios for a single station at a time can be
### updated after fwd shifts
rat = (vec + mu) / (pred + mu)
rat[!is.na(rat) & rat < 1] = 1 / rat[!is.na(rat) & rat < 1]
return(rat)
}
ldm = function(date = NULL, year = NULL, month = NULL)
{
## Function to return the last day of a month, defined either by the year and month,
## or by a date within the month. Returns a Date object If a date is provided,
## defaults to using that
# if all are NULL
if (length(paste(date, year, month)) == 0)
{
stop("Please supply either a date or a year and a month")
}
if (!is.null(date))
{
return((ceiling_date(as.Date(date), unit = "month",
change_on_boundary = T) - days(1)))
} else if (is.null(year) | is.null(month))
{
stop("Please provide both a year and month; otherwise provide a date")
} else
{
return((paste(year, month, sep = "-") %>%
as.yearmon() %>%
as.Date() %>%
ceiling_date(unit = "month") - days(1)))
}
}
fdm = function(date = NULL, year = NULL, month = NULL)
{
## Function to return the first day of a month, defined either by the year and month,
## or by a date within the month. Returns a Date object If a date is provided,
## defaults to using that
# if all are NULL
if (length(paste(date, year, month)) == 0)
{
stop("Please supply either a date or a year and a month")
}
if (!is.null(date))
{
return(floor_date(as.Date(date), unit = "month"))
} else if (is.null(year) | is.null(month))
{
stop("Please provide both a year and month; otherwise provide a date")
} else
{
return(paste(year, month, sep = "-") %>%
as.yearmon() %>%
as.Date())
}
}
long_monthly = function(dframe, varname = rain, stationname = station, date = date)
{
## This function takes a wide dataframe at monthly or higher frequency with a
## date/datetime variable (which can be set with the date parameter (promise)), and
## converts it to long format with monthly totals in a column whose name is specified
## by 'varname'. The old column names are moved to a column labelled by
## 'stationname'.
dframe %>%
mutate({{ date }} := as.POSIXct({{ date }})) %>%
mutate(year = as.integer(format({{ date }}, "%Y")),
month = as.integer(format({{ date }}, "%m"))) %>%
pivot_longer(-c({{ date }}, year, month), names_to = "station",
values_to = "rain") %>%
group_by(year, month, station) %>%
summarise(rain = mean(rain, na.rm = T) * n()) %>%
ungroup() %>%
rename({{ stationname }} := station, {{ varname }} := rain)
# summarise step estimates total; produces NaN if no valid days
}
mon_aggr = function(dfwide, funname = sum, monname = "month", yrname = "year",
ids = "date")
{
ids = c(monname, yrname, ids)
return(dfwide %>%
pivot_longer(cols = -any_of(ids), names_to = "station",
values_to = "rain") %>%
group_by(across(all_of(c("station", monname, yrname)))) %>%
summarise(across(rain, {{ funname }})) %>%
ungroup())
}
monclim = function(dflong_mon, monname = "month", varname = rain)
{
return(dflong_mon %>%
group_by(across(all_of(c("station", monname)))) %>%
summarise(clim = mean({{ varname }}, na.rm = T),
ssd = sd({{ varname }}, na.rm = T),
valid = sum(is.finite({{ varname }}))) %>%
ungroup())
}
mon_pred = function(dfwide, stations = NULL, n_select = 4, monname = "month",
yrname = "year", ids = "date")
{
### Find stations to use for predicting monthly rainfall series Only complete months
### are considered
dfret = tibble()
dfmon = dfwide %>%
mon_aggr(funname = sum, monname = monname, yrname = yrname, ids = ids)
# monthly climatology
dfclim = dfmon %>% monclim()
# stations with insufficient valid data for at least one month
incompletes = dfclim %>%
dplyr::filter(valid < 3) %>%
pull(station) %>%
unique()
if (length(incompletes) > 0)
{
warning(paste("The following stations have been removed,",
"due to insufficient complete monthly data",
paste(incompletes, collapse = ", ")))
}
dfmon %<>% dplyr::filter(!(station %in% incompletes)) %>%
left_join(dfclim, by = c("station", monname)) %>%
mutate(anom = rain - clim)
# correlations on anomalies
monwide = dfmon %>%
pivot_wider(id_cols = all_of(c(monname, yrname)), names_from = "station",
values_from = "anom")
# use Pearson's correlations for monthly anomalies that are near-Gaussian
cormat4 = monwide %>%
dplyr::select(-all_of(c(monname, yrname))) %>%
cor(use = "pairwise.complete") %>%
.^4
corstations = colnames(cormat4)
# if not explicitly supplied, use all stations in the original df except those in
# incompletes
if (length(stations) == 0)
{
stations = dfwide %>%
get_stations(remcols = c(monname, yrname, ids))
}
stations = setdiff(stations, incompletes)
for (S in stations)
{
obs = monwide %>%
pull(S)
preds = tibble() # initialise vector to assign preds to
scor = cormat4[S, setdiff(corstations, S)] # all other stations
for (k in as.integer(rownames(monwide)))
{
yr0 = monwide[[yrname]][k]
mon0 = monwide[[monname]][k]
# only relevant where at least one data point is missing
if (!is.na(obs[k]))
{
preds %<>% bind_rows(tibble(year = yr0, month = mon0, pred = NA))
next
}
# stations with valid data for the given month
df0 = dfmon %>%
dplyr::filter(year == yr0 & month == mon0) %>%
dplyr::filter(!is.na(rain))
valids = df0 %>%
pull("station") %>%
unique()
scor0 = scor[valids]
# print('selected correlations:') print(scor0)
# selected nearest neigbour stations for each time step
if (length(scor0) < 1)
{
stop(paste("No valid data to use for", S, "in", month.abb[mon0], yr0))
}
n_use = Inf # initialse to ignore
if (length(scor0) < n_select)
{
n_use = length(scor0)
warning(paste("Fewer than", n_select, "valid data to use for", S, "in",
month.abb[mon0], yr0, "; using only available ones"))
}
nn = names(scor0)[scor0 %>% order(decreasing = T)][1:min(n_select, n_use)]
sdev = dfclim %>%
dplyr::filter(station == S & month == mon0) %>%
pull(ssd)
sclim = dfclim %>%
dplyr::filter(station == S & month == mon0) %>%
pull(clim)
nndev = dfclim %>%
dplyr::filter(station %in% nn & month == mon0) %>%
pull(ssd, name = station)
# anomaly expectation from ratio of standard deviations
weighting = (sdev/nndev)[nn] * scor[nn]/sum(scor[nn])
df0 %<>% dplyr::filter(station %in% nn)
pred0 = df0 %>% pull(anom, name = station)
pred0 = sclim + sum(pred0 * weighting[names(pred0)]) # ensure correct order
preds %<>% bind_rows(tibble(year = yr0, month = mon0, pred = pred0))
}
preds$station = S
dfret %<>% bind_rows(preds)
}
return(dfret)
}
choose_mon_source = function(dfwide, dfmon, yrname = "year", monname = "month",
sourcevar = "source", ids = "date", n_select = 4)
{
## This function is an alternative to mon_choose_source dfwide is a dataframe at the
## original temporal resolution in station-wide form it is used to select stations to
## use as nearest neighbours for monthly 'prediction' dfmon is a data frame with
## monthly cumulates ('atot'), `yrname`, `monname', `sourcevar`, and 'station'
## column. All duplicate rows in dfmon (i.e. all columns except source are equal),
## as well as all rows with missing data should be removed prior to being passed to
## this function Returns a data frame with only the 'best' (rel. to nn stations with
## complete data) esimates by month, year & station retained
# cases for which decisions need to be made:
decide = dfmon %>%
group_by(across(all_of(c(yrname, monname, "station")))) %>%
mutate(n = n()) %>%
ungroup() %>%
dplyr::filter(n > 1) %>%
pivot_wider(values_from = "atot", names_from = all_of(sourcevar))
# stations to get monthly predictions for
station_dec = decide %>%
pull(station) %>%
unique()
# no stations to consider
if (length(station_dec) == 0)
{
return(tibble())
}
predf = mon_pred(dfwide, stations = station_dec, ids = ids)
# cases skipped by mon_pred will be skipped by known_accum too
return(decide %>%
left_join(predf, by = c(yrname, monname, "station")) %>%
dplyr::filter(is.finite(pred)) %>%
pivot_longer(names_to = all_of(sourcevar), values_to = "atot",
cols = any_of(unique(dfmon[[sourcevar]]))) %>%
mutate(diff = abs(atot - pred)) %>%
group_by(across(all_of(c(yrname, monname, "station")))) %>%
slice_min(diff) %>%
ungroup() %>%
dplyr::select(-all_of("n")))
}
prep_mon_accum = function(dfmon, dfwide, sourcecol = NULL, monname = "month",
yrname = "year", ids = "date", n_select = 4)
{
### This function is used to prepare a dataframe with monthly accumulates, with
### possible source overlap, to be used as a (or part of a) cumulates data frame by
### `distr_accum_known()` The `sourcecol` parameter specifies the column in which the
### source from which the accumulate was obtained is specified. If this is NULL, it
### is assumed that each station has a unique source and hence selection is not
### necessary for any stations
# use distinct to keep only 1 row of those with all col's except source equal
dfmon %<>% dplyr::filter(is.finite(atot)) %>%
distinct(across(-any_of(sourcecol)), .keep_all = T)
# If there is a source column and it contains multiple unique values
if (length(sourcecol) == 1 & length(unique(dfmon[[sourcecol]])) > 1)
{
decide = choose_mon_source(dfwide, dfmon, yrname = yrname, monname = monname,
sourcevar = sourcecol, ids = ids, n_select = n_select)
# if ther are stations for which a selection was made
if (length(decide) > 0)
{
## try anti-join?
dfmon %<>% anti_join(decide, by = c(yrname, monname, "station")) %>%
group_by(across(c("station", all_of(c(yrname, monname))))) %>%
mutate(n = n()) %>%
ungroup() %>%
dplyr::filter(n == 1) %>%
bind_rows(decide) %>%
dplyr::select(-all_of(c("n", "pred", "diff")))
# n>1 (from mutate step) cases not in decide implies that monthly data are
# complete & hence no accumulation, needs to be performed, so skip those
}
}
return(dfmon %>%
rename(year = all_of(yrname), month = all_of(monname)) %>%
mon2accum())
}
mon2accum = function(dfmon, yrvar = year, monvar = month)
{
### Takes a dataframe with monthly data identified by month (`monvar`) and year
### (`yrvar`) columns and converts it into the cumulative total dataframe with 'start'
### and 'end' columns
return(dfmon %>%
mutate(start = fdm(year = {{ yrvar }}, month = {{ monvar }}),
end = ldm(year = {{ yrvar }}, month = {{ monvar }}))) %>%
dplyr::select(-c({{ yrvar }}, {{ monvar }}))
}
shorter2top = function(dfaccum)
{
### This function arranges accumulated totals in a data frame by station and length of
### accumulation period, so that for each station the shortest periods are listed
### first
return(dfaccum %>%
mutate(length = as.integer(end - start) + 1) %>%
arrange(station, length))
}
PKl~?`?`PK
WBB
cleaners.RUT keke### This script contains functions used to clean the dataset, by detecting and corrected
### for common errors in South African rainfall data.
scriptsroot = "~/Documents/Academic/PhD/Scripts/R/pptClean/share/gamlss_cleaner/"
source(paste0(scriptsroot, "helpers.R"))
library(comprehenr) # for list comprehension
dec_error = function(dfwide, predf, predNN, dec_thres = 10, mult_thres = 9, r_thres = 0.2,
mus = NULL, remcols = c("date", "year", "month"))
{
### This function identifies likely decimal errors (or other related errors that
### would likely be adequately addressed by dividing the specified value by 10) and
### divides them by 10
### This fn takes in 3 dataframes: `dfwide`: As with all other cleanfill functions; a
### wide-form station rainfall dataframe with observed rainfall in all columns other
### than those specified as `remcols` `predf` : A corresponding dataframe with
### expected rainfall based on surrounding stations `predNN`: As above, using nearest
### neighbour data were available
### `dec_thres` is a parameter used to obtain a threshold minimum rainfall value for
### each station to be considered for division by ten. This threshold is given by
### `dec_thres + dec_thres * \mu` where `\mu` is the mean daily rainfall at the
### station obtained from the mus vector
### `mult_thres` is a parameter used to determine which rainfall data are
### sufficiently large relative to that expected from surrounding stations to be
### considered as likely decimal errors When rainfall > mult_thres * both the predf
### and predNN expected values for that timestep, and rainfall > 1/3 * (mult_thres) *
### both the predf and predNN expected values for both the previous and the following
### time steps (to ensure the large value is not the result of either: (a) rain
### occurring later or earlier at the surroundings than at the current station; or
### (b) is the result of rainfall readings being taken too early or late on a given
### day
### `r_thres` is the rainfall threshold below which a day is considered 'dry'
### `mus` is a named vector with mean rainfall for each station in dfwide If mus is
### not supplied (NULL), it is computed using `corrected_smeans()`
### The function returns a list containing two dataframes: the corrected version of
### df `dfwide` and another dataframe recording the indices for which an adjustment
### was performed
nt = nrow(dfwide)
if (length(mus) == 0)
{
mus = dfwide %>%
corrected_smeans(dfwide, remcols = remcols)
}
stations = dfwide %>%
get_stations(remcols = remcols)
mus %<>% .[stations]
thres = dec_thres * (1 + mus)
tday = 2:(nt - 1)
yest = 1:(nt - 2)
tom = 3:nt
# ignore columns not containing rainfall data
stationdf = dfwide %>%
dplyr::select(-any_of(remcols))
predf %<>% dplyr::select(-any_of(remcols))
predNN %<>% dplyr::select(-any_of(remcols))
# days to consider
ldays = which(!is.na(stationdf[tday, ]) &
(stationdf[tday, ] > mult_thres * predf[tday, ]) &
(stationdf[tday, ] > mult_thres * predNN[tday, ]) &
(stationdf[tday, ] > (mult_thres / 3) * predf[yest, ]) &
(stationdf[tday, ] > (mult_thres / 3) * predNN[yest, ]) &
(stationdf[tday, ] > (mult_thres / 3) * predf[tom, ]) &
(stationdf[tday, ] > (mult_thres / 3) * predNN[tom, ]) &
(t(t(stationdf[tday, ]) > thres)) &
(predNN[tday, ] > 0) &
(predf[tday, ] > 0),
arr.ind = TRUE)
# returns a matrix where rows are time steps (from step 2) and cols are station
# indices
# get station names rather than indeces; since we started at time step 2, add 1
ldays %<>% as_tibble() %>%
mutate(col = stations[col], row = row + 1)
if (nrow(ldays) > 0)
{
for (k in 1:(nrow(ldays)))
{
dfwide[ldays$row[k], ldays$col[k]] = dfwide[ldays$row[k], ldays$col[k]] / 10
}
}
# Suspiciously large rainfall values in the absence of rainfall at predictor stations
# We can use less strict conditions here
thres = dec_thres * (0.5 + mus)
l0days = which(!is.na(stationdf[tday, ]) &
(t(t(stationdf[tday, ]) > thres)) &
(predf[tday, ] < 0.01) &
(predNN[tday, ] < 0.01) &
(predf[tom, ] < (5 * r_thres)) &
(predNN[tom, ] < (5 * r_thres)) &
(predf[yest, ] < (5 * r_thres)) &
(predNN[yest, ] < (5 * r_thres)),
arr.ind = T)
# < 0.01 => effectively == 0
l0days %<>% as_tibble() %>%
mutate(col = stations[col], row = row + 1)
# These suspiciously large values are less likely to be decimal errors, but are very
# unusual. Hence set them to NA
if (nrow(l0days) > 0)
{
for (k in 1:(nrow(l0days)))
{
dfwide[l0days$row[k], l0days$col[k]] = NA
}
}
ldays$rep = "div10"
l0days$rep = "NA"
ldays %<>% bind_rows(ldays, l0days)
return(list(dfwide, ldays))
}
dec_error_prob = function(dfwide, df_p0, df_mu, df_sigma,
remcols = c("date", "year", "month"), datec = remcols[1],
p_thres = 1e-06, p_max = 0.95, r_thres = 0.2)
{
### Using estimates of the probability of rainfall exceedance derived from the GAMLSS
### models run for stations, this function performs the same task as `dec_error()`,
### but using explicit assessments of the probability of observed rainfall values
### occurring.
### It should be noted that these estimates are derived from the same data
### used for fitting the models, which is not optimal, but will in general lead to
### conservative decisions regarding excessive rainfall. Future versions of these
### functions could consider using different models for different sub-periods so that
### they are fitted and applied to disjoint subsets of the full period.
### In addition to identifying decimal errors occurring where observed data are
### suspiciously large, as in `dec_error()`, this function also considers cases where
### rainfall values are suspiciously low, and may need to be multiplied by 10.
### The function takes in 4 dataframes:
### 1. `dfwide`: Reference station-wide format dataset to operate on with all columns
### containing rainfall data for the station indicated by the column header, except
### for the columns listed in the remcols character vector.
### 2. `p0`: A corresponding dataset with dry day probability.
### 3. `mu`: As for `p0`, for best-estimate of rainfall
### 4. `sigma`: As for `mu`, for sigma parameter of the ZAG distribution rainfall
### `r_thres` is the minimum daily rainfall required for a day to not be considered
### 'dry'.
### `p_thres` is the maximum probability of a rainfall exceedance that is
### considered sufficiently low that a decimal shift is desired.
### Because the ZAG distribution tends to underestimate exceedance probabilities for
### very large values, this threshold is adjusted by set factors by case.
### `p_max` is the probability above which, if the likelihood of observing less than
### 1/10*observed or more than 10*observed, it assumed that that the error is not a
### decimal error but rather a missed time-shift, accumulated total, or other error
### that is not likely to be solved either by decimal correction or by NA masking
### (which may adversely affect aggregates) and hence the suspect recordings are
### retained in this case. To disable this check, set ``p_max = 1``.
df_fixed = tibble() # record of changes made
stations = dfwide %>%
get_stations(remcols = remcols)
# transform to long-form for easier correction stationwide_to_long() returns only
# valid data points => no NAs to worry about
dflong = dfwide %>%
stationwide_to_long(varname = "rain", ids = remcols) %>%
left_join(df_p0 %>%
stationwide_to_long(varname = "p0", ids = remcols),
by = c(datec, "station")) %>%
left_join(df_mu %>%
stationwide_to_long(varname = "mu", ids = remcols),
by = c(datec, "station")) %>%
left_join(df_sigma %>%
stationwide_to_long(varname = "sigma", ids = remcols),
by = c(datec, "station")) %>%
mutate(plxs = pZAGA(rain * 0.1, mu = mu, sigma = sigma, nu = p0,
lower.tail = TRUE),
plx = pZAGA(rain, mu = mu, sigma = sigma, nu = p0, lower.tail = TRUE),
pgxl = pZAGA(rain * 10, mu = mu, sigma = sigma, nu = p0,
lower.tail = FALSE))
# plxs = probability x is less than 0.1*observed;
# plx = probability x is less than observed;
# pgxl = probability x is greater than 10*observed
# when 1/10*rainfall is more likely than not and getting at least the current value
# is extremely unlikely
thres_h = 5 * p_thres
dflong %<>% mutate(rain_old = rain, rain = case_when((rain > r_thres & plxs > 0.5 &
plxs < p_max &
((1 - plx) < thres_h)) ~
0.1 * rain, TRUE ~ rain))
# keep a record of changes made
df_fixed %<>% bind_rows(bind_cols(dflong %>%
dplyr::filter(rain != rain_old),
operation = "div10"))
# very unlikely values that don't appear to be decimal errors, but are highly suspect
# and the predicted value is likely a reasonable replacement
dflong %<>% mutate(rain_old = rain, rain = case_when((rain > r_thres &
(1 - plx) < p_thres &
plxs < 0.5) ~ NA_real_,
TRUE ~ rain))
df_fixed %<>% bind_rows(bind_cols(dflong %>%
dplyr::filter(is.na(rain) & !is.na(rain_old)),
operation = "setNA"))
# likelihood of randomly getting less rain than very small; lower limit required
# because model is overly positively skewed
l_thres = 3 * sqrt(p_thres)
dflong %<>% mutate(rain_old = rain, rain = case_when((rain > r_thres & pgxl > 0.5 &
pgxl < p_max &
(plx < l_thres)) ~ 10 * rain,
TRUE ~ rain))
df_fixed %<>% bind_rows(bind_cols(dflong %>%
dplyr::filter(rain != rain_old),
operation = "mul10"))
return(list(dflong %>%
long_to_stationwide(ids = remcols), df_fixed))
}
rem_rep_nz = function(dfwide, remcols = c("date", "year", "month"))
{
### Removes sequences of 3 or more identical non-zero rainfall
### figures, which are likely to be spurious.
### `dfwide` is wide-format data.frame with columns for
### stations and `remcols` columns for date/time
### Returns the cleaned data.frame and a tibble containing metadata regarding the
### cleaning performed.
# only retain those columns present in the df (for dfrm)
remcols = intersect(remcols, colnames(dfwide))
stations = dfwide %>%
get_stations(remcols) # names of stations
nt = length(dfwide[[1]])
dfrm = tibble() # initiliase tibble to store metadata regarding removals
# Loop through stations
for (S in stations)
{
vec = dfwide %>% pull(S)
i = 2
# Cycle through timesteps
while (i < (nt - 1))
{
# skip if you're not in the middle of 3 equal non-0 values
if (sum(is.finite(vec[(i - 1):(i + 1)])) < 3 |
vec[i] < 0.1 |
vec[i] != vec[i - 1] |
vec[i] != vec[i + 1])
{
i = i + 1
next
} else
{
a = i - 1 #start of the problem seq
while (vec[i + 1] == vec[i])
{
i = i + 1
}
dfrm %<>% bind_rows(tibble(station = S, start = dfwide[a, remcols],
end = dfwide[i, remcols], rain = vec[a]))
dfwide[a:i, S] = NA # set sequence to NA
i = i + 2 # skip 2 days, as we know vec[i]!=vec[i+1]
}
}
}
return(list(dfwide, dfrm))
}
rep_nz_cum = function(dfwide, remcols = c("date", "year", "month"))
{
### As with rem_rep_nz(), but returns not only the input data.frame `dfwide`, but also
### the sum of rainfall over the period with start and end dates and station in a
### long-format `data.frame` to be used for possible disaggregation later.
### The first entry in the `remcols` input vector, containing column names for
### rainfall columns, must be the date column.
### In addition to the returned `data.frame`s containing the cleaned input and the
### presumed cumulative totals, a `tibble` is returned containing metadata
### regarding the sequence removed.
# initialise cummulative totals tibble
dfcum = tibble(start = ymd(), end = ymd(), station = character(), atot = numeric())
stations = dfwide %>%
get_stations(remcols) # names of stations
nt = length(dfwide[[1]])
# Loop through stations
dfrm = tibble() # initiliase tibble to store metadata regarding removals
for (S in stations)
{
vec = dfwide %>% pull(S)
i = 2
# Cycle through timesteps
while (i < (nt - 1))
{
# skip if you're not in the middle of 3 equal non-0 values
if (sum(is.finite(vec[(i - 1):(i + 1)])) < 3 |
vec[i] < 0.1 |
vec[i] != vec[i - 1] |
vec[i] != vec[i + 1])
{
i = i + 1
next
} else
{
a = i - 1 #start of the problem seq
while (vec[i + 1] == vec[i])
{
i = i + 1
}
dfrm %<>% bind_rows(tibble(station = S, start = dfwide[a, remcols],
end = dfwide[i, remcols], rain = vec[a]))
dfcum %<>% bind_rows(tibble(start = dfwide[[remcols[1]]][a],
end = dfwide[[remcols[1]]][i],
station = S, atot = sum(vec[a:i])))
dfwide[a:i, S] = NA # set sequence to 0
i = i + 2 # skip 2 days, as we know vec[i]!=vec[i+1]
}
}
}
return(list(dfwide, dfcum, dfrm))
}
rem0 = function(dfwide, dfcomp = NULL, remcols = c("date", "year", "month"),
datec = remcols[1], monthc = remcols[3], thresi = 0.8, thresdry = 0.2,
nmin = 8, thresd = 0.01, exponent = 0.5,
pthres = 0.005, pignore = 1, preliable = 0.93, pmin = 0.01)
{
### Initial detection and masking of suspicious sequences of dry days.
### Dry day probabilities at a given station are estimated based on rainfall
### occurrence at selected neighbouring stations, rather than from an input data.frame
### of probabilities used in `rem0seq()`. The final probability estimate is computed
### as a combination of climatological and neighbour-based estimates, and, if
### exponent < 1, a dependency correction. Sequences are set to `NA` iff the estimated
### probability of the sequence is lower than `pthres`.
### `dfwide` is a data.frame with columns for date/time specified by `remcols`.
### `datec` (string) specifies a column name containing dates;
### `monthc` (string or NULL) is the equivalent for months. If `monthc` is NULL, a
### month column is derived from the date column.
### `dfcomp` is an optional data.frame to draw neighbour stations for reference series
### construction from. By default `dfwide` is used for this purpose. If `dfcomp` is
### provided, `dfwide` should be a (possibly proper) subset of dfcomp, ignoring
### remcols.
### `nmin`, `thresi` and `thresd` are passed to the corresponding parameters in
### `neighbour_select()`.
### `thresdry` is a threshold value below which daily rainfall values are regarded as
### insignificant and hence the corresponding days are counted as `dry`.
### `exponent` (`dep_exp` in `cleanfill()`) is a numerical parameter determining the
### strength of the imposed dependency correction. It must be in the range (0, 1] for
### sensible results. A lower number results in a stronger dependency correction;
### no dependency correction is implemented if ``exponent == 1``. The correction is
### imposed by first taking the day with lowest observed dry day probability, which
### is a lower bound for the total probability of the sequence and then raising the
### product of the remaining probabilities in the sequence to the power specified by
### the `exponent` parameter.
### `pignore` is a probability threshold, such that if the dry day probability exceeds
### `pignore`, the wet day probability is considered negligible and hence set to 0.
### `preliable` is a probability threshold value, such that at stations where the
### climatological dry day probability falls above this value, the estimated rain day
### probability based on surrounding stations is less robust and hence the weighting
### of the climatological dry day probability is doubled.
### `pmin` is a probability threshold specifying the minimum reasonable probability
### of a dry day occurring; probabilities below this threshold will be set to `pmin`.
### Returns the cleaned data.frame and a data.frame containing metadata regarding the
### cleaning that was performed.
if (is.null(dfcomp))
{
dfcomp = dfwide
}
ch = 0 # number of corrections
# DFc = data.frame(DATE = as.POSIXct(DF[,1])) #Cleaned DF; initialise with date
# vector
if (is.null(monthc))
{
monthc = "month"
# vector of month at every ts
dfwide$month = as.integer(format(dfwide[[datec]], "%m"))
}
stations = dfwide %>% get_stations(remcols) # names of stations
nt = length(dfwide[[1]])
p0 = dfcomp %>% pdry_mon(remcols = remcols, thres = thresdry, expand = F)
thres1 = thresi # threshold correlation level for station selection
# for this purpose, ignore equivs and use all available info from nearby stations
neighbours = dfwide %>%
neighbour_select(dfcomp, remcols = remcols, nmin = nmin, thresi = thresi,
thresd = thresd)
remls = list() # initialise list of data frames to store data regarding removals
# will be bind_rowed into a single data frame later
for (S in stations)
{
vec = dfwide %>% pull(S)
corvec = as.matrix(neighbours[[S]])[1, ] # get a named vector out
selected = names(corvec)
dfn = dfcomp %>%
dplyr::select(all_of(selected)) # neigbours df
pdry = p0 %>%
dplyr::select(all_of(selected))
pdry_S = p0[[S]]
# Weight by correlation and 1/probability of rain, so that rain at dry stations
# counts more
n = rowSums(t(t(as_tibble(!is.na(dfn))) * corvec)) # effective number of stations;
# conversion to tibble necessary as is.na returns a matrix, which cannot be
# multiplied by another matrix of a different size
# too dry; method not as trustworthy
if (mean(pdry_S) > preliable)
{
n[n < 0.5] = 0.5 # ensure 1/(2n)<1
# weighting of climatological probability, tends to 0 as number of comp
# stations tends to inf, 1 when there are no stations if n<0.5 => climw=1,
# imply incorrect divisor irrelevant for neighbour based estimate
climw = 1 / (2 * n)
pthresi = pthres / 5 # threshold level at which to NA seq
} else
{
n[n < 0.25] = 0.25 # ensure 1/(4n)<1
climw = 1 / (4 * n) # as above
pthresi = pthres
}
# probability of dry at station and comparison (intersection)
dryn = (dfn < thresdry) %>%
data.frame() %>%
as_tibble() # whether neighbours are dry
# test converts to logical matrix, but df required
dry = vec < thresdry
pDD = bind_cols(dfwide %>%
dplyr::select(all_of(monthc)), (dryn) * (dry)) %>%
group_by(across(all_of(monthc))) %>%
summarise(across(-any_of(remcols), ~mean(.x, na.rm = T))) %>%
ungroup() %>%
dplyr::select(-any_of(remcols))
# nonsense values introduced by NAs: intersection can't be more likely than one
# of the events
pDD %<>% mutate(across(-any_of(remcols), ~if_else(.x > pdry_S, pdry_S, .x)))
# probability dry at the target station, based on predictor stations
monv = dfwide %>% pull(monthc)
pD = (pDD / pdry)[monv, ] * dryn + ((pdry_S - pDD) / (1 - pdry))[monv, ] *
(1 - dryn)
# rough probability of 0 rain for i^th time step
pD[pD < pmin] = pmin # minimum dry day probability
pD[pD > 1] = 1
# probability of target station being dry, based on combined assessment of
# predictor stations and monthly station climatology
pi = pdry_S[monv] * (climw) + (1 - climw) * rowSums(t(t(pD) * as.vector(corvec)),
na.rm = T) / n
pi[pi < pmin] = pmin
pi[pi > 1] = 1 # prob <= 1
c(dfwide[, S], remls[[S]]) %<-% rem0v(vec, pi, dfwide %>%
dplyr::pull(all_of(datec)), S, nt = nt,
exponent = exponent, pignore = pignore,
pthres = pthresi, thresdry = thresdry)
}
remls = bind_rows(remls) # convert list of tibbles to tibble
return(list(dfwide, remls))
}
rem0v = function(vec, p0, dates, stationname, nt = length(vec), exponent = 0.5,
pignore = 1, pthres = 0.0025, thresdry = 0.2, verbose = T)
{
### This function NAs zero sequences in a vector (`vec`) where the probability of the
### recorded sequence is deduced from the time series of wet day probabilities
### provided in `p0` using a dependency correction parameterised by 'exponent'.
### All but the lowest dry day probability values in the product used to calculate the
### overall sequence probability are raised to the power `exponent`.
### Probabilities greater than pignore are assumed to correspond to certainty of a dry
### day (effectively ignored if ``pignore = 1``, by default).
### Dates is a vector of dates corresponding to timesteps used to record which dates
### were included in sequences acted upon.
### `ch` is the current number of changes to have been made (to be updated in the
### function).
### `stationname` is the name of the station to which the data in `vec` corresponds.
### It is also used for metadata recording.
### `pthres` is the probability level below which sequences are regarded as suspect
### and hence removed.
### Only rainfall values greater than or equal `thresdry` are considered as wet days.
###
### The function returns both the cleaned vector and a dataframe containing metadata
### regarding all sequences in the vector that were cleaned.
# Cycle through timesteps
i = 1
ch = 0
remdf = tibble() # initialise for record of days cleared
while (i < nt)
{
# only start sequence if this day saw zero rain
if (is.na(vec[i]) | vec[i] >= thresdry)
{
i = i + 1
} else
{
a = i # remember the first time step for later replace if necessary
p = p0[i] # the probability of 0 rain over the period
i = i + 1
l = 1 # days considered in the sequence; 1 if there are none
# continue until there is another recorded rainfall event
while ((i < nt) & (is.na(vec[i]) | vec[i] < thresdry))
{
# ignore days where zero rainfall is highly likely
include = is.finite(vec[i]) & (p0[i] < pignore)
l = l + include
p = c(p, if_else(include, p0[i], 1))
i = i + 1
}
# dependence correction:
# this should never be higher than the lowest probability in the sequence
p = min(p) * (prod(p[-which.min(p)]))^exponent
# sequence sufficiently unlikely
if (p < pthres)
{
vec[a:(i - 1)] = NA
remdf %<>% bind_rows(tibble(start = dates[a], end = dates[i - 1],
length = l, prob = p, station = stationname))
ch = ch + 1
}
i = i + 1
}
}
if (verbose)
{
message("sequences NA'd for ", stationname, " = ", ch)
}
return(list(vec, remdf))
}
rem0seq = function(dfwide, pdry = NULL, pthres = 0.0025, pignore = 0.975, exponent = 1,
r_thres = 0.2, remcols = c("date", "year", "month"),
datec = remcols[1], verbose = F)
{
### This function replaces sequences of consecutive zeroes (dry days) with NAs when
### the probability of such an extensive zero sequence is estimated to be less than
### `pthres`.
### An adaptation of `rem0`, it is to be used primarily on probability series obtained
### from GAMLSS model fits. For a full description of all parameters not discussed
### below, see `rem0()`.
### This function uses a named vector/tibble/data.frame for `pdry`. The names should
### be the same as the stations included in the input data.frame `dfwide` (columns in
### `dfwide` other than those in `remcols` are assumed to be station names). Either a
### single climatological probability per station should be provided or (preferably)
### an estimated dry day probability for each time step.
### `dfwide` should have columns for different stations and each row should represent
### a timestep.
### A very simple correction for autocorrelation can be employed, following that in
### `rem0()`. By default, no correction is applied (exponent = 1), since it is assumed
### that where there is dependency between rain day estimates this is accounted for
### by the corresponding GAMLSS fit. See `rem0()` for more details on the dependency
### correction and the meaning of `exponent`.
### pignore is the dry day probability which, if exceeded is set to 1, as the
### procedure is found to be inaccurate where dry days are extremely likely.
remdf = tibble()
ch = 0 # number of corrections
stations = dfwide %>% get_stations(remcols) # names of stations
nt = length(dfwide[[1]])
# Loop through stations
for (S in stations)
{
vec = dfwide %>% pull(S)
p0 = pdry[[S]] # approx probability of 0
if (length(p0 == 1))
{
p0 = rep(p0, nt)
}
c(dfwide[, S], s_remdf) %<-% rem0v(vec, p0, dfwide %>% pull(datec), S, nt = nt,
exponent = exponent, pignore = pignore,
pthres = pthres, thresdry = r_thres,
verbose = verbose)
remdf %<>% bind_rows(s_remdf) # record of removed data
}
return(list(dfwide, remdf))
}
source_select = function(dfwide, sourceslist, rat, source_rats, excl,
remcols = c("date", "year", "month"), verbose = FALSE,
ret = TRUE)
{
### For all stations and all days when mutliple institutional sources for rainfall
### records are available and at least two of these provide different values, the
### recorded value for which the absolute value of the logarithm of the ratio
### between the recorded value and the CCW reference series value is selected for that
### day.
### `dfwide` is a wide-format data.frame with station rainfall data in all columns
### not specified as id columns in `remcols`.
### `sourceslist` a list of data.frames of the same format and with equal numbers of
### rows as `dfwide`.
### `rat` is a matrix of the ratio between observed and predicted rainfall by timestep
### `source_rats` is a list of matrices corresponding to `sourceslist`, indicating the
### ratio between the rainfall expected based on surrounding observations to that
### actually observed from by that source.
### `excl` is a list of vectors, containing time steps that have already been
### considered by the multi-source time-shifting procedure and therefore need not be
### considered here again.
# stations occurring in more that one source; first count the number of times
# stations occur
multi_stations = table(to_vec(for (dsource in sourceslist) colnames(dsource)))
multi_stations = setdiff(names(multi_stations[multi_stations > 1]), remcols)
nt = length(dfwide[[1]]) # time steps
rat = rat[, multi_stations]
# dataframe to store record of selections made
dfret = tibble()
for (S in multi_stations)
{
if (verbose)
{
print(paste("performing source selection for", S))
}
obs = dfwide %>% pull(S)
st_sources = tibble(.rows = nt) # sources for current station
s_rat = rat[, S] # ratio for current station
s_rat_sources = tibble(.rows = nt) # ratios from sources for this station
for (dsource in names(sourceslist))
{
sourcedf = sourceslist[[dsource]]
if (S %in% colnames(sourcedf))
{
st_sources %<>% bind_cols(sourcedf %>%
dplyr::select(all_of(S)) %>%
rename_with(~dsource, all_of(S)))
rat_vec = source_rats[[dsource]][, S]
s_rat_sources %<>% bind_cols(xx = rat_vec) %>%
rename_with(~dsource, "xx")
}
}
diffs = rowSums(abs(st_sources - obs), na.rm = T) # differences from observation
# time steps to consider: either a difference between sources OR the original is
# NA and at least one of the others have data
t_diff = which((!is.na(diffs) & diffs > 0) |
(is.na(obs) & rowSums(!is.na(st_sources)) > 0))
# exclude time steps adjusted by multi-source time-shifting algorithm already
t_diff = t_diff[!(t_diff %in% excl[[S]])]
# extract the name of the best source to use for each t_diff
s_rat_sources %<>% .[t_diff, ] %>%
rowwise() %>%
mutate(pref = colnames(.)[which.min(c_across(everything()))])
chosen = s_rat_sources %>% pull()
# row and columns to select
chosen = cbind(t_diff, chosen)
# rownames.force allows one to use a character matrix for subsetting
dfwide[t_diff, S] = as.matrix(st_sources, rownames.force = T)[chosen]
if (verbose)
{
print("selections made:")
print(chosen)
}
if (ret)
{
dfret %<>% bind_rows(bind_cols(dfwide[t_diff, remcols], st_sources[t_diff, ],
station = S) %>%
left_join(bind_cols(dfwide[t_diff, remcols],
s_rat_sources),
suffix = c("", "_rat"), by = remcols))
}
}
if (ret)
{
return(list(dfwide, dfret))
} else
{
return(dfwide)
}
}
distr_accum_known = function(dfwide, predf, accumdf, remcols = c("date", "year", "month"),
datec = remcols[1], minrain = 0.2)
{
### This function disaggregates precipitation amounts known to be accumulations
### across the period which they are known to have occurred.
### `dfwide` is the original data.frame with recorded rainfall values by station.
### `predf` is a corresponding dataframe of predicted rainfall from CCW reference
### series; both `dfwide` and `predf` are wide-format with id-columns among remcols.
### `accumdf` is a data.frame with station names in a "station" column,
### an "atot" column which contains the accumulated totals and
### "start" and "end" columns containing the start and end dates for each known
### accumulation periods.
### Data in accumdf should be arranged from shortest to longest period, as data aren't
### overwritten once they are included, so data over longer periods are ignored where
### shorter period data exist.
### deduced rainfall values below `minrain` are set to 0.
### `datec` specifies the string name of the column containing dates.
stations = accumdf %>% pull(station) %>% unique()
for (S in stations)
{
accums = accumdf %>% dplyr::filter(station == S)
obs = dfwide %>% pull(S)
pred = predf %>% pull(S)
for (k in as.integer(rownames(accums)))
{
t0 = which(dfwide[[datec]] == accums[["start"]][k])
tN = which(dfwide[[datec]] == accums[["end"]][k])
# monthly data are for a period not---or only partially---covered in the
# daily dataset
if (length(t0) == 0 | length(tN) == 0)
{
next
}
if (tN < t0)
{
stop(paste("For station", S, "not all `start` values are earlier",
"than the corresponding `end` values."))
}
## check first that there are missing data in dfwide for the period in
## question, if not skip.
nNA = sum(is.na(obs[t0:tN]))
if (nNA == 0) # the entire sequence already contains valid data
{
next
}
## If there are missing data, calculate the sum of the non-missing values for
## the period. If the difference between the supplied accumulation for the
## period is negative, skip (i.e. preferentially use the highest temporal
## resolution data)
obs_total = sum(obs[t0:tN], na.rm = T)
if (obs_total > accums[["atot"]][k]) # observed total exceeds supplied total
{
next
}
## Otherwise, consider the difference between the supplied total and
## observations and distribute it across the NA data points in the period
obs_diff = accums[["atot"]][k] - obs_total
# the time steps during the period that are NA
tNA = (t0:tN)[which(is.na(obs[t0:tN]))]
pred_total = sum(pred[tNA])
if ((pred_total == obs_diff) & (pred_total == 0)) # rat would be NaN
{
dfwide[tNA, S] = 0
next
}
rat = obs_diff / pred_total
# if no rainfall predicted for the whole period from surroundings
# default to placing all the rainfall on the last day of the period
if (rat == Inf)
{
dfwide[tNA, S] = c(pred[tNA[-length(tNA)]], pred_total)
}
else
{
fills = round(pred[tNA] * rat, 2)
dfwide[tNA, S] = if_else(fills >= minrain, fills, 0)
}
}
}
return(dfwide)
}
# A potential future enhancement to consider here, is adding checks on the day before and
# after the distribution period, to check that it is not more likely to have been a
# missed time shift
distr_accum = function(dfwide, predf, min_rain = 0.2, min_accum = 10, min_rat = 5,
min_back_rat = 2, min_cum_rat = 0.5, max_cum_rat = 5, min_repl = 2,
remcols = c("date", "year", "month"), verbose = F)
{
### This function identifies values that are suspected to be accumulated values
### and disaggregates them over the period after the previous recorded rainfall event,
### proportional to expected rainfall amounts from surrounding stations.
### `dfwide` is the station data.frame in wide form whose station rainfall data
### (all columns not in remcols) are to be considered for distribution
### `predf` is a corresponding data.frame containing expected rainfall amounts by
### station based on rainfall observed at surrounding stations (reference series).
### Columns in these two data.frames should either be id columns, whose names are
### in `remcols`, or should be station names, exactly the same set of which should
### be included in both. Exactly the same time steps should also be included in both
### data.frames.
### Proposed rainfall amounts below the threshold min_rain are set to 0.
### In order for a rainfall value to be considered as a likely cumulatative value,
### it must
### (a) be larger than the absolute threshold `min_accum`
### (b) at least `min_rat` times larger than the predicted value for that day
### (c) be preceded by a sequence of at least one day with 0 or NA rainfall
### Once a potential sequence to distribute backwards across has been identified a
### further 4 conditions need to be satisfied for the distribution to be implemented:
### (1) the expected rainfall over the period to be distributed is sufficiently
### large compared to the expected rainfall on the day rain is to be distributed
### away from; this is assumed to indicate that the probability of rainfall on the
### days during the distribution sequence is sufficiently large compared to the
### day to be distributed from. Specifically, the expected cumulative rainfall
### over the duration of the suspected sequence must be at least `min_back_rat`
### times the rainfall expected on the day to be distributed from.
### (2) The result of the distribution should not be creating several rainfall values
### much smaller than would be expected. Specifically, the total to be distributed
### should be at least `min_cum_rat` of the predicted total. If the predicted
### total is too large, this is assumed to indicate that additional rain
### occurred during the period where data are missing or have been incorrectly
### set to 0.
### (3) The total to be distributed shouldn't be too much (> `max_cum_rat` times)
### greater than the rainfall expected over the period to be distributed across.
### Otherwise, unrealistically large rainfall totals would be produced.
### (4) At least `min_repl` zeroes or NAs should be replaced by non-zeros. Otherwise
### this is effectively a poorly-checked (possibly long-distance) time-shift
### The function returns a list of two dataframes: one the original dataframe with
### cumulates filled in, the other a record of the distributions that were performed
stations = dfwide %>% get_stations(remcols = remcols)
nt = nrow(dfwide) # number of days
accums_df = tibble()
for (S in stations)
{
obs = dfwide %>% pull(S)
pred = predf %>% pull(S)
obsy = c(NA, obs[1:(nt - 1)]) # shifted one-day back
# days to consider: today got a lot of rain, much more than expected and no valid
# rainfall was recorded yesterday
days = which(obs >= min_accum & obs >= min_rat * pred &
(is.na(obsy) | obsy < min_rain))
if (verbose)
{
message("Station is: ", S)
}
# ignore cases occurring too close to the start of the series
for (d in days[days > 3])
{
d_start = d - 2 # find the sequence origin
while((d_start > 0) && (obs[d_start] == 0 | is.na(obs[d_start])))
{
d_start = d_start - 1
}
d_start = d_start + 1
dd = d_start:(d - 1) # sequence of days to consider for distribution
ddd = c(dd, d) # sequence including end point
rat = obs[d] / sum(pred[ddd][pred[ddd] >= min_rain])
if ((sum(pred[dd]) >= min_back_rat * pred[d]) &
(rat >= min_cum_rat) &
(rat <= max_cum_rat) &
(sum((pred[ddd] * rat) > 1) > min_repl))
{
# time steps for distribution sequence
dseq = c(max(1, d_start-1), ddd, d + 1)
rep = if_else(pred[ddd] >= min_rain, round(rat * pred[ddd], 2), 0)
accums_df %<>% bind_rows(tibble(station=S,
dfwide[dseq, c(remcols)],
obs=obs[dseq],
pred=pred[dseq],
rep=c(NA, rep, NA)))
dfwide[ddd, S] = rep
} else # not of interest, move to next potential day
{
next
}
}
}
return(list(dfwide, accums_df))
}
distr_accum_prob = function(dfwide, predf, df_p0, df_mu, df_sigma, min_rain = 0.2,
min_accum = 7.5, min_rat = 1.5, max_prob = 0.01,
min_improve = 50, max_cum_rat = 5,
remcols = c("date", "month", "year"),
datec = remcols[1], verbose = F)
{
### This is an adaptation of `distr_accum()` for use with the output of a GAMLSS fit.
### The function considers three cases: when all values preceding the suspected
### cumulate are 0, all those for which non-zero rainfall is expected from predf are
### missing, or when there is a mix. The specific conditions used in each case are
### explained in detail where each case is considered and collected at the bottom
### of this description.
### `dfwide` and `predf` are as described in `distr_accum()`.
### `df_p0`, `df_mu` and `df_sigma` should satisfy the same conditions and structure
### as `predf`, but contain respectively the `nu`, `mu` and `sigma` parameters of a
### GAMLSS ZIG/ZAGA fit.
### Threshold parameters are as in `distr_accum()`, except for:
### `max_prob`: a probability threshold which if exceeded by a modelled probability of
### observing a value at least as large as the recorded suspected cumulate
### disqualifies that value from consideration as a cumulate; and
### `min_improve`: the minimum ratio between the product of CDF deviations from the
### median for a proposed disaggregation sequence relative to the original sequence,
### in the case when the sequence preceding the suspected cumulate is all 0s,
### with no missing values.
### The function returns a list with the cleaned data.frame and a metadata data.frame
### detailing the changes made.
### Conditions used for detection:
### Days considered as potential cumulates
### 1. Only consider days with sufficiently large rainfall values
### 2. When the prediction is sufficiently greater than the recorded value
### 3. The day is preceded by dry or NA days
### 4. The value is sufficiently unlikely to have occurred at random:
### i.e. if the probability of this large a deviation is less than max_prob
### The sequence is ignored immediately if the proposed disaggregation replacement on
### the suspect day is too improbable.
### Conditions for replacement (for replacing zeroes only):
### 1. Don't produce unrealistically large replacement values. (This cond.
### is imposed earlier where `rat` is calculated).
### 2. The sequence of zeros preceding the suspected cumulate
### should be sufficiently unlikely.
### 3. The joint probability of deviations as large as those observed
### should be much (default: 50x) smaller for the disaggregated seq.
### than the original.
### 4. The proposed disaggregation total on the suspect day should be
### reasonably consistent with the modelled distribution (applies
### to all cases so this condition is imposed earlier)
### Conditions for replacement with NAs:
### 1. Don't produce unrealistically large replacement values. (This cond.
### is imposed earlier where `rat` is calculated).
### 2. Ensure that the over all consistency with surrounding stations
### is still superior after disaggregation, in effect requiring that
### the advantage the reference series almost certainly has as relative
### to itself, is more than offset by the advantage the disaggregation
### has on the suspected cumulate day.
### 3. Ensure that the rainfall total from the disaggregation is closer to
### to the total predicted for the disaggregation period than what would
### result from leaving the suspected cumulate intact.
### 4. The proposed disaggregation total on the suspect day should be
### reasonably consistent with the modelled distribution (applies
### to all cases so this condition is imposed in the outer loop)
### Conditions for replacement with NAs & 0s:
### 1. Don't produce unrealistically large replacement values. (This cond.
### is imposed earlier where `rat` is calculated).
### 2. Across days with zero rainfall observed, the disaggregation should
### be more consistent with surrounding observations
### than the alternative.
### 3. The total that would result from disaggregation is closer to the
### predicted cumulative total over that period than the total that would
### result from direct infilling.
### 4. An overall improvement in consistency of the rainfall distribution
### is required.
### 5. The proposed disaggregation total on the suspect day should be
### reasonably consistent with the modelled distribution (applies
### to all cases so this condition is imposed in the outer loop)
stations = dfwide %>% get_stations(remcols = remcols)
nt = nrow(dfwide) # number of days
accums_df = tibble()
for (S in stations)
{
obs = dfwide %>% pull(S)
pred = predf %>% pull(S)
mu = df_mu %>% pull(S)
p0 = df_p0 %>% pull(S)
sigma = df_sigma %>% pull(S)
obsy = c(NA, obs[1:(nt - 1)]) # shifted one-day back
# deviations from median in CDF; use F(0) = p0/2
cdf_devs = to_vec(for (`i, xi` in numerate(obs))
if(is.na(xi)) xi else pZAGA(xi, mu = mu[i],
sigma = sigma[i], nu = p0[i]))
zdays = !is.na(obs) & obs == 0 # days with 0 rainfall recorded
cdf_devs[zdays] = p0[zdays] / 2
cdf_devs = if_else(cdf_devs < 0.5, cdf_devs, 1 - cdf_devs, missing = NA_real_)
# Days considered as potential cumulates
# 1. Only consider days with sufficiently large rainfall values
# 2. When the prediction is sufficiently greater than the recorded value
# 3. The day is preceded by dry or NA days
# 4. The value is sufficiently unlikely to have occurred at random:
# i.e. if the probability of this large a deviation is less than max_prob
days = which((obs > min_accum) &
(obs > min_rat * pred) &
(is.na(obsy) | obsy < min_rain) &
(cdf_devs < max_prob / 2))
if (verbose)
{
message(paste("Station is:", S))
message("cases to consider")
print(bind_cols(dfwide[days, c(datec, S)], d = days), n = length(days))
}
# ignore cases occurring too close to the start of the series
for (d in days[days > 3])
{
d_start = d - 2 # find the sequence origin
while ((d_start > 0) && (obs[d_start] == 0 | is.na(obs[d_start])))
{
d_start = d_start - 1
}
d_start = d_start + 1
dd = d_start:(d - 1) # sequence of days to consider for distribution
ddd = c(dd, d)
pred_sum = sum(pred[ddd])
# no use trying to disaggregate if no rain was expected for the disaggregation
# period:
if (pred_sum < min_rain) {next}
# use if_else in case no rainfall expected
rat = obs[d] / pred_sum
# In case the disaggergation would yield multiple unrealistically large
# data points, skip immediately
if (rat > max_cum_rat) {next}
rd = pred[dd] > min_rain # rain days
n_wet = sum(rd) # number of predicted wet days during sequence
nwNA = sum(is.na(obs[dd][rd])) # number of NA days with rain predicted
na_rep = obs[ddd] # initialise sequence with NAs replaced
rep = which(is.na(na_rep))
nNA = length(rep)
disaggr = rat * pred[ddd] # potential disaggregation replacement
disaggr = if_else(disaggr < min_rain, 0, round(disaggr, 2))
nd = length(dd) + 1 # number of days in the seq
# compare the product of cdf deviations from median for the two possibilities:
# the redistribution of the suspected cumulate vs the original 1-day total
# with all preceding NAs infilled based on pred
# we want both the probability of exceeding the rainfall amount recorded and
# being below it to be small; i.e. the optimal cumulative density is 0.5
# hence, map all cdf estimates from [0, 1] to [0, 0.5]: distance
# from median
# CDF values for the proposed disaggregation series
disaggr_dens = pZAGA(disaggr, mu[ddd], sigma[ddd], p0[ddd])
# if the suggested replacement is 0, use F(0) = Pr[0] / 2
disaggr_dens[disaggr == 0] = p0[ddd][disaggr == 0] / 2
# map others to [0, 0.5] as well; probability of being at least as far away
# from the median
disaggr_dens = if_else(disaggr_dens > 0.5, 1 - disaggr_dens, disaggr_dens)
# ignore this sequence if:
# the proposed disaggregation replacement on the suspect day is too improbable
if (disaggr_dens[nd] < (2.5 * max_prob))
{
message("Disaggregation replacement would be suspect @ ", S, " on ", d)
next
}
# Consider the case with NAs and without them separately.
# First look @ cases where all days in observations are 0
if (nNA == 0)
{
# density for alternative to disaggregation
na_dens = cdf_devs[ddd]
# product of probabilities for preceding seq.
p_ps = prod(2 * na_dens[1:(nd - 1)])
# ratio of product of probabilities
# use sqrt to reduce chance of getting 0 in denominator
if (p_ps != 0 & na_dens[nd] != 0)
{
rpp = prod(sqrt(2 * disaggr_dens)) / sqrt(2 * na_dens[nd] * p_ps)
} else if (prod(sqrt(2 * disaggr_dens)) == 0)
{
next # both sequences too unlikely to consider
}
else
{
# prob(na infilling = 0; disaggr > 0)
message("Disaggregate CDF product finite, alternative not")
message("station is ", S, " days from: ", d_start)
rpp = min_improve + 1 # in this case do disaggregation
}
# Conditions for replacement (for replacing zeroes only):
# 1. Don't produce unrealistically large replacement values. (This cond.
# is imposed earlier where `rat` is calculated).
# 2. The sequence of zeros preceding the suspected cumulate
# should be sufficiently unlikely.
# 3. The joint probability of deviations as large as those observed
# should be much (default: 50x) smaller for the disaggregated seq.
# than the original.
# 4. The proposed disaggregation total on the suspect day should be
# reasonably consistent with the modelled distribution (applies
# to all cases so this condition is imposed earlier)
if ((p_ps < (5 * max_prob)) &
(rpp > sqrt(min_improve)))
{
dfwide[ddd, S] = disaggr
accums_df %<>% bind_rows(tibble(date = dfwide[ddd, ] %>% pull(datec),
original = obs[ddd], pred = pred[ddd],
p0 = p0[ddd],
replacement = disaggr,
dev_prob = disaggr_dens,
alternative = na_rep,
alt_dev_prob = na_dens, station = S))
if (verbose)
{
message("0-disaggregation @ ", S, " from day: ", d_start,
" with rpp = ", rpp)
}
} else if (verbose)
{
message("not worth replacing 0 case; rpp = ", rpp, " day ", d)
printdf = tibble(date = dfwide[ddd, ] %>% pull(datec),
original = obs[ddd], pred = pred[ddd],
replacement = disaggr, cumdens = disaggr_dens,
alternative = na_rep, alt_cumdens = na_dens,
station = S)
printdf %>% print(n = nd)
}
} else if (nwNA == n_wet)
{
# now consider the case where all days which are wet in the reference
# series are NA in observations.
na_rep = c(pred[dd], obs[d])
na_rep = if_else(na_rep < min_rain, 0, round(na_rep, 2))
# CDF values for the original series, with NAs directly infilled
na_dens = pZAGA(na_rep, mu[ddd], sigma[ddd], p0[ddd])
na_dens[na_rep == 0] = p0[ddd][na_rep == 0] / 2
na_dens = if_else(na_dens > 0.5, 1 - na_dens, na_dens)
if (prod(sqrt(na_dens)) != 0)
{
rpp = prod(sqrt(disaggr_dens)) / prod(sqrt(na_dens))
} else if (prod(sqrt(disaggr_dens)) == 0)
{
next # both sequences too unlikely to consider
}
else
{
# prob(na infilling) = 0; prob(disaggr) > 0
message("Disaggregate CDF product finite, alternative not")
message("station is ", S, " days from: ", d_start)
rpp = min_improve + 1 # in this case do disaggregation
}
# Conditions for replacement with NAs:
# 1. Don't produce unrealistically large replacement values. (This cond.
# is imposed earlier where `rat` is calculated).
# 2. Ensure that the over all consistency with surrounding stations
# is still superior after disaggregation, in effect requiring that
# the advantage the reference series almost certainly has as relative
# to itself, is more than offset by the advantage the disaggregation
# has on the suspected cumulate day.
# 3. Ensure that the rainfall total from the disaggregation is closer to
# to the total predicted for the disaggregation period than what would
# result from leaving the suspected cumulate intact.
# 4. The proposed disaggregation total on the suspect day should be
# reasonably consistent with the modelled distribution (applies
# to all cases so this condition is imposed in the outer loop)
if ((rpp > (sqrt(min_improve) / 2)) &
(abs(pred_sum - obs[d]) < abs(sum(na_rep) - pred_sum)))
{
dfwide[ddd, S] = disaggr
accums_df %<>% bind_rows(tibble(date = dfwide[ddd, ] %>% pull(datec),
original = obs[ddd], pred = pred[ddd],
p0 = p0[ddd],
replacement = disaggr,
dev_prob = disaggr_dens,
alternative = na_rep,
alt_dev_prob = na_dens, station = S))
if (verbose)
{
message("NA-disaggregation @ ", S, " start day: ", d_start,
" with rpp = ", rpp)
}
} else if (verbose)
{
message("not worth replacing NA case; rpp = ", rpp, " day ", d)
# only print output if difference would occur between the two
# options before the suspected aggregate total day
if (rpp > 1)
{
printdf = tibble(date = dfwide[ddd, ] %>% pull(datec),
original = obs[ddd], pred = pred[ddd],
replacement = disaggr, cumdens = disaggr_dens,
alternative = na_rep, alt_cumdens = na_dens,
station = S)
printdf %>% print(n = nd)
}
}
} else
{
# With both NA and 0 days, consider them separately
na_rep[rep] = pred[dd][rep]
zeros = setdiff(dd - d_start + 1, rep) # days recorded as dry
# CDF values for the original series, with NAs directly infilled
na_dens = pZAGA(na_rep, mu[ddd], sigma[ddd], p0[ddd])
na_dens[na_rep == 0] = p0[ddd][na_rep == 0] / 2
na_dens = if_else(na_dens > 0.5, 1 - na_dens, na_dens)
cum_sum = sum(na_rep[rep], obs[d])
if (prod(sqrt(na_dens)) != 0)
{
rpp = prod(sqrt(disaggr_dens)) / prod(sqrt(na_dens))
} else if (prod(sqrt(disaggr_dens)) == 0)
{
next # both sequences too unlikely to consider
}
else
{
# prob(na infilling) = 0; prob(disaggr) > 0
message("Disaggregate CDF product finite, alternative not")
message("station is ", S, " days are: ", ddd)
rpp = min_improve + 1 # in this case do disaggregation
}
# Conditions for replacement with NAs & 0s:
# 1. Don't produce unrealistically large replacement values. (This cond.
# is imposed earlier where `rat` is calculated).
# 2. Across days with zero rainfall observed, the disaggregation should
# be more consistent with surrounding observations
# than the alternative.
# 3. The total that would result from disaggregation is closer to the
# predicted cumulative total over that period than the total that would
# result from direct infilling.
# 4. An overall improvement in consistency of the rainfall distribution
# is required.
# 5. The proposed disaggregation total on the suspect day should be
# reasonably consistent with the modelled distribution (applies
# to all cases so this condition is imposed in the outer loop)
if ((rpp > (sqrt(min_improve) / 2)) &
(prod(disaggr_dens[zeros]) >= prod(na_dens[zeros])) &
(abs(pred_sum - obs[d]) < abs(pred_sum - cum_sum)))
{
dfwide[ddd, S] = disaggr
accums_df %<>% bind_rows(tibble(date = dfwide[ddd, ] %>% pull(datec),
original = obs[ddd], pred = pred[ddd],
p0 = p0[ddd],
replacement = disaggr,
dev_prob = disaggr_dens,
alternative = na_rep,
alt_dev_prob = na_dens, station = S))
if (verbose)
{
message("mixed-disaggregation @ ", S, " start day: ", d_start,
" with rpp = ", rpp)
}
} else if (verbose)
{
message("not worth replacing mixed case; rpp = ", rpp, " day ", d)
# only print output if difference would occur between the two
# options before the suspected aggregate total day
if (rpp > 1)
{
printdf = tibble(date = dfwide[ddd, ] %>% pull(datec),
original = obs[ddd], pred = pred[ddd],
replacement = disaggr, cumdens = disaggr_dens,
alternative = na_rep, alt_cumdens = na_dens,
station = S)
printdf %>% print(n = nd)
}
}
}
}
}
return(list(dfwide, accums_df))
}
PKBBPK
WLL
cleanfill_fn.RUT keke### These are the primary functions used for cleaning and gap-filling. The full procedure
### is run using `cleanfill()`, which calls all other requisite functions
library(tidyverse)
library(magrittr)
library(zeallot)
# adjust according to where scripts and data are kept
scriptsroot = "./gamlss_cleaner/" # assume wd is parent directory
source(paste0(scriptsroot, "helpers.R"))
source(paste0(scriptsroot, "gamlss_fit.R"))
source(paste0(scriptsroot, "cleaners.R"))
source(paste0(scriptsroot, "estimators.R"))
source(paste0(scriptsroot, "time_shifters.R"))
# directory to output data to; adjust according to where scripts and data are kept
outdir = "./data/"
### FUNCTIONS
infill = function(dfwide, dfexpected, remcols = c("date", "year", "month"), rmin = 0.2,
rounding = 2, datec = remcols[1])
{
### Function that fills missing values in a wide-format observed rainfall data.frame
### from a corresponding set of reference time series.
### `dfwide` is a data.frame including date/time columns listed in `remcols` and
### columns for stations whose missing values are to be filled in from reference time
### series contained in `dfexpected`. Columns in `dfwide` and `dfexpected` should be
### the same once `remcols` have been removed, although the orders can be different.
### Column names should be in `remcols` or should be station names.
### `rmin` is the threshold below which an expected quantity of rainfall is
### disregarded (effectively the minimum quantity of rainfall recorded).
### Rounding is the number of decimal places used for non-zero imputed
### values.
dfwide %<>% pivot_longer(-any_of({{ remcols }}), names_to = "station",
values_to = "rain") %>%
dplyr::filter(is.finite(rain))
# retain only the valid data from the original
dfexpected %<>% pivot_longer(-any_of({{ remcols }}), names_to = "station",
values_to = "rain") %>%
anti_join(dfwide, by = c("station", remcols))
return(dfwide %>%
bind_rows(dfexpected) %>%
mutate(rain = if_else(rain < rmin, 0, rain)) %>%
mutate(rain = round(rain, rounding)) %>%
pivot_wider(names_from = "station", values_from = "rain",
names_sort = T) %>%
arrange(datec))
}
cleanfill = function(dfwide, sourceslist, NN, accumdf, mon_accum, meta, NNlist, NNstrict,
ids = c("date", "year", "month"), monthc = NULL, equivs = NULL,
cores = 3, stepRDS = FALSE, RDSname = NULL, dep_exp = 0.5,
nmin = 4, nmax = 8, nmin_day = 3, nmin_ts = 6, nmax_ts = 10,
thresi = 0.8, thresd = 0.01, preliable = 0.93, pignore = 0.975,
r_thres = 0.2, thres_ms = 1.5, thres_p = 10/3,
thres_max = 2.5, max_ts_iter = 5, min_shift_frac = 2e-04,
thres_cov = 0.925, nmon = 4, min_rat = 5, min_back_rat = 2,
min_cum_rat = 0.5, max_cum_rat = 5, min_accum = 10, min_repl = 2,
dec_thres = 10, mult_thres = 9, sbc_thres = 10, max_stations = 9,
max_excl = 9, pthres = 0.0025, min_accum_zg = 7.5, min_rat_zg = 1.5,
max_prob = 0.01, min_improve = 50, min_prob = 1e-6, p_max = 0.95,
savemodel = TRUE, verbose = FALSE)
{
## This function performs a series of quality control steps on a wide-format daily
## rainfall data.frame (a tibble is recommended), `dfwide` and then fills in missing
## values. The quality control is informed by known shortcomings of South African
## rainfall data. The procedure involves the use of GAMLSS model fits for each gauge
## location, which are used both for infilling and quality control (QC). Since the
## GAMLSS fits require complete series, an initial QC & infilling is done using
## mean-adjusted correlation co-efficient weighted (CCW) reference series.
## For a subset of stations a pre-determined set of nearby neighbours can be
## specified, which will be used to construct CCW reference series, which will be
## preferred as infilling values. These stations and their neighbours are specified
## by the list of character vectors `NNstrict`. Additional neighbour stations not to
## be considered part of the dataset can be specified in the `NN` data.frame.
## Where pre-selected stations are desired for the initial CCW-based QC and infilling
## these should be supplied in the list of character vectors `NNlist`.
## cleanfill() requires as input 4 data frames (dfwide, NN, accumdf, mon_accum),
## a list of dataframes (sourceslist), a character string (meta) and
## 2 lists of character vectors (NNlist, NNstrict).
## If only a single source of rainfall data is available, pass a length-1 list
## with the full dataset to sourceslist.
## cleanfill() returns either 2 tibbles (if stepRDS is TRUE) or 4 tibbles (default;
## if stepRDS is FALSE). All intermediate cleaned and gap-filled datasets and corre-
## sponding reference series are written to file. The 2 tibbles returned if stepRDS
## is FALSE are the cleaned and gap-filled series after the first GAMLSS iteration.
## The 4 tibbles returned if stepRDS is FALSE are the final CCW cleaned and gap-filled
## datasets and the final cleaned and gap-filled datasets after the second GAMLSS
## iteration.
## Optional input parameters are: a vector of character strings (ids),
## a list of character vectors (equivs), 37 numeric parameters (description below),
## 3 boolean parameters (stepRDS, savemodel, verbose) and a string (RDSname).
##
## The full list of parameters are described below. DF refers to a data.frame:
##
## 1. dfwide (DF) = Dataframe with date-related information in columns whose names
## appear in the `ids` character vector, followed by raw daily station data in mm
## in columns labelled by station names.
## 2. sourceslist (list of DFs) = Dataframes with the same structure as dfwide, 1
## each for each data source used (e.g. SAWS, DWS, etc.). Element names
## should corresponding to the source institution. Station columns should be included
## only for those stations for which the given data source provides data.
## 3. NN (DF) = Dataframe with the same structure as dfwide, with station columns for
## stations to be used as nearest neighbours for other stations only (only
## time-shifting performed, not other quality control (QC)).
## 4. accumdf (DF) = Dataframe containing accumulated data for periods over which
## accumulations are known, to be used by the `distr_accum_known` function. The
## `accumdf` must have station names in a 'station' column, an 'atot' column which
## contains the accumulated totals and 'start' and 'end' columns containing the start
## and end dates for each known accumulation.
## 5. mon_accum (DF) = Dataframe like accumdf, but for monthly data only, and
## containing 'year' and 'month' columns, rather than 'start' and 'end' ones.
## Multiple cumulates from different sources can be supplied for the same station and
## month; in these cases, the data most consistent with surrounding stations is
## selected. In this case a 'source' column should be included to distinguish rows.
## Months for which complete daily data are available (after step 4. below) are
## ignored. This DF is manipulated into the same format as accumdf and then added to
## it.
## 6. meta (string) = meta-data regarding the data passed to be used for saving files
## produced. A directory with this name should exist inside 'outdir'.
## 7. NNlist (list of named character vectors) = A list whose elements are named by
## the stations for which pre-selected stations ("nearest neighbours") should
## preferably be used in constructing CCW reference series. The elements should
## be character vectors containing the stations to be used as nearest neighbours
## for the station specified by the element label.
## 8. NNstrict (vector or list of strings) = as above, where NN is to be preferred to
## GAMLSS-informed infilling.
## 9. ids (character vector) = the names of columns that are not stations. The first
## of these should be a date column, in `Date`, `POSIXct`\`POSIXlt` or similar
## format, from which year, month and day of the year data may be obtained. Default:
## c("date", "year", "month")
## 10. monthc (character string) = name of the column containing month data. If NULL
## (the default) a new month column is construced from the date column, whose name
## is assumed to be the first entry in the ids vector. Default: NULL
## 11. equivs (list of character vectors) = Each vector is a list of stations
## considered too similar to be used together as neighbours for CCW reference series.
## Default: NULL
## 12. cores (integer) = number of cores to be used by gamlss procedure. Default: 3
## 13. stepRDS (boolean) = should the procedure be split into two steps (1--9 and
## 10--13)? If TRUE, 2 tibbles are returned, if FALSE, 4 tibbles are returned.
## The 2 tibbles returned if stepRDS is FALSE are the cleaned and gap-filled series
## after the first GAMLSS iteration. The 4 tibbles returned if stepRDS is FALSE are
## the final CCW cleaned and gap-filled datasets and the final cleaned and gap-filled
## datasets after the second GAMLSS iteration. Default: FALSE
## 14. RDSname (string) = name to provide to RDS files written after the first GAMLSS
## fitting iteration if procedure is to be run in 2 steps and all data and parameters
## need to be read back into the environment for `cleanfill_cont()`. Only relevant
## if `stepRDS` is TRUE, in which case it is required. Default: NULL
## 15. dep_exp (numeric) = dependency correction exponent used in the estimation of
## dry day probabilities in the initial detection and masking of suspicious sequences
## of zero rainfall recordings. See `rem0()` for details. Default: 0.5 (square root)
## 16. nmin (integer) = minimum number of neighbouring stations for CCW. Default: 4
## 17. nmax (integer) = maximum number of neighbouring stations for CCW. Default: 8
## 18. nmin_day (integer) = minimum number of stations to use for each day when using
## robust CCW reference series, in contrast to `nmin` which is the total number of
## stations considered. Robust CCW reference series are used in cases
## (time-shifting, source selection, decimal error detection, cumulate detection)
## where it is essential that errors at individual neighbouring stations do not
## dominate reference values. Default: 3
## 19. nmin_ts (integer) = maximum number of neighbouring stations for
## robust CCW reference series. Default: 6
## 20. nmax_ts (integer) = maximum allowable number of neighbouring stations for
## robust CCW reference series. Default: 10
## 21. thresi (numeric) = a number between 0 and 1, the initial minimum correlation
## for inclusion as a neighbouring station for CCW reference series construction.
## Default: 0.8
## 22. thresd (numeric) = the increment by which to adjust thresi if there are too
## few or too many neighbouring stations. Default: 0.01
## 23. preliable (numeric) = climatological dry day probability above which the
## initial zero sequence detection scheme is considered to be less reliable.
## Default: 0.93
## 24. pignore (numeric) = probability of a dry day above which the wet day
## probability is considered negligible and therefore set to 0. Default: 0.975
## 25. r_thres (numeric) = minimum rainfall amount for a rain day. Values below this
## from reference series will not be infilled. Default: 0.2
## 26. thres_ms (numeric) = For multi-source time-shifting, the minimum improvement
## (reduction) required in the product of the ratios between observed and expected
## rainfall (adjusted so that for all days' ratio >= 1), between the proposed shifted
## and original sub-series for shifting to be performed. Default: 1.5
## 27. thres_p (numeric) = The equivalent of `thres_ms` for single-source shifting.
## Default: 10/3
## 28. thres_max (numeric) = For single source shifting, the additional requirement
## is imposed that at least one day should see an improvement by at least
## `thres_max`. This check can be disabled by setting `thres_max = 1`. Default: 2.5
## 29. max_ts_iter (integer) = The maximum number of iterations of time-shifting that
## should be performed. An upper bound is required, since otherwise it is is possible
## that an infinite loop of shifts at neighbouring stations prevents convergence (the
## preferable basis for halting the process).
## 30. min_shift_frac (numeric) = number in [0, 1], should be small. If the
## proportion of rainfall data points that are time-shifted during a given iteration
## of time-shifting is less than this fraction, the procedure is considered to be
## near enough to convergence and the iterativive time-shifting is ceased.
## Default: 2e-4
## 31. thres_cov (numeric) = minimum proportion of valid available data for stations
## for unadjusted mean rainfall to be deemed sufficiently reliable. Stations whose
## proportion of valid daily data falls below this level at any stage in the QC
## process, will have their mean daily rainfall adjusted by the ratio between mean
## rainfall on days when the target station has valid data, to the overall mean at
## predictor stations, averaged over a set of predictor stations, all of which need
## to have at least `thres_cov` of their data be valid. Default: 0.925
## 32. nmon (integer) = number of stations per month to use for monthly CCW reference
## series. Default: 4
## 33. min_rat (numeric) = minimum ratio between recorded value and CCW reference
## value for a value to be considered a candidate for cumulative disaggregation.
## Default: 5
## 34. min_back_rat (numeric) = minimum ratio between the expected (from the CCW
## reference series) rainfall over a potential disaggregation period and the expected
## rainfall on the day of the suspected cumulate. This is assumed to indicate that
## the probability of rainfall on the days during the distribution sequence is
## sufficiently large compared to the day to be distributed from. Default: 2
## 35. min_cum_rat (numeric) = minimum ratio between the suspected cumulate and the
## expected (from the CCW reference series) rainfall over the proposed disaggregation
## period. If the predicted total is too large, this is assumed to indicate that
## additional rain occurred during the period where data are missing or have been
## incorrectly set to 0. Default: 0.5
## 36. max_cum_rat (numeric) = maximum ratio between the suspected cumulate and the
## expected (from the CCW reference series) rainfall over the proposed disaggregation
## period. If this ratio is too large, unrealistically large rainfall totals would be
## produced. Default: 5
## 37. min_accum (numeric) = minimum rainfall value to be considered as candidate
## cumulate for disaggreation. Default: 10
## 38. min_repl (integer) = minimum number of rainfall days across which the cumulate
## is to be disaggregated. Default: 2
## 39. dec_thres (numeric) = minimum rainfall value to be considered for division by
## 10 is calculated from this parameter as `dec_thres + dec_thres * \mu` where `\mu`
## is the mean daily rainfall at the station. Default: 10
## 40. mult_thres (numeric) = minimum ratio between expected and recorded rainfall to
## be considered for division by 10. See `dec_error()` for a more complete description
## of the checks performed. Default: 9
## 41. sbc_thres (numeric) = minimum Schwarz Bayesian Criterion (SBC) improvement
## obtained from including an additional station in the GAMLSS model for a given
## target station for the search for additional predictor stations to be continued
## thereafter. Default: 10
## 42. max_stations (integer) = maximum number of predictor stations to be included
## in the GAMLSS model for a given target station. Default: 9
## 43. max_excl (integer) = maximum number of stations to exclude from considering
## as potential predictors for a GAMLSS fit for a given target station due to
## non-convergence, before the search for additional predictor stations is abandoned.
## Default: 9
## 44. pthres (numeric) = threshold probability value; if the estimated probability
## of a dry day sequence falls below this threshold, the sequence will be regarded as
## spurious and the corresponding observations marked as missing. Default: 0.0025
## 45. min_accum_zg (numeric) = Equivalent of `min_accum` for GAMLSS-informed
## disaggregation of suspected cumulative totals. Default: 7.5
## 46. min_rat_zg (numeric) = Equivalent of `min_rat` for GAMLSS-informed
## disaggregation of suspected cumulative totals. Default: 1.5
## 47. max_prob (numeric) = probability threshold for GAMLSS-informed disaggregation
## of suspected cumulates. The modelled probability of observing a value at least as
## large as the recorded suspected cumulate must be no more than this value.
## Default: 0.01
## 48. min_improve (numeric) = for GAMLSS-informed disaggregation, the minimum ratio
## between the product of CDF deviations from the median for a proposed disaggregation
## sequence relative to the original sequence, in the case when the sequence preceding
## the suspected cumulate is all 0s, with no missing values. Default: 50
## 49. min_prob (numeric) = for GAMLSS-informed decimal error detection, a probability
## threshold, multiples of which (see `dec_error_prob() for details`) are used to
## determine whether a particular recording is sufficiently unlikely to be regarded as
## being likely to be a decimal error. Default: 1e-6
## 50. p_max (numeric) = probability threshold parameter. If the modelled probability
## of observing less than 1/10*recorded value (for division by 10 cases) or the
## probability of observed more than 10*recorded value is greater than `p_max`, it
## is assumed that that the error is not a decimal error, but some other recorded
## error that is not likely to be mitigated effectively either by decimal correction
## or by `NA` masking (which may adversely affect aggregates). Hence, in these cases,
## the suspect recordings are retained. To disable this check, set ``p_max = 1``.
## Default: 0.95
## 51. savemodel (boolean) = Should RDS output of the GAMLSS objects produced by the
## GAMLSS fits be saved to file? Note that the outputs from the first iteration are
## overwritten by the second iteration. If results from both iterations are to be
## retained, the files should be renamed or transferred to a different location before
## the second GAMLSS fitting iteration is initiated. Default: TRUE
## 52. verbose (boolean) = should additional information regarding the stage of the
## QC procedure and cases for which action is not taken be output? It should be noted
## that the `gamlss()` functions always provide feedback on the fitting process.
## Default: FALSE
######## =========================================================================
######## =========================================================================
## -------------------------------------------------------------------------------
## Cleaning and filling process: -------------------------------------------------
## -------------------------------------------------------------------------------
## Format all data.frames correctly prior to procedure Steps:
## 1. Run `rep_nz_cum()` (or `rem_rep_nz()`) and `rem0()` to mask out repeated
## non-zero rainfall readings and suspicious sequences of dry days.
## 2. Use Mean-adjusted CCW (rank correlation) reference series for infill (I)
## 3. Use Mean-adjusted nearest neighbour (CCW with preselected predictors) where
## available, followed by step 2 (N).
## 4. Iteratively perform time-shifting to near-convergence (TS).
## 5. Use Source selection (SS) on the output of step 4.
## 6. Do disaggregation of known cumulates (FA) on output of step 6.
## 7. Disaggregate suspected accumulated values on step 6. output (A) and then redo a
## single iteration of step 4.
## 8. Perform decimal (and related) error detection and correction (L).
## 9. Fit GAMLSS models to the result of applying step 3 to the output of step 8 (ZG).
## 10. Do GAMLSS-informed equivalent of step 1 (dry day sequences component) (SZ).
## 11. Do GAMLSS-informed application of steps 6, 7 on the output of step 5 (AP).
## 12. Do GAMLSS-informed application of step 8 on the ouput of step 11.
## 13. Rerun step 9 on the output of step 12 and use it to fill gaps, except for
## stations specified in `NNstrict`, for which final gap-filling is performed using
## step 3.
### Setup
outdir = paste0(outdir, meta, "/")
message("running step 1")
## Remove repeat (>= 3 consecutive) non-zero values
if (verbose) {message("removing consecutive length-3+ non-zeroes")}
c(NN, repcumNN, remdf) %<-% (NN %>%
rep_nz_cum(remcols = ids))
remdf %>%
jsonlite::flatten() %>%
write_csv(paste0(outdir, "rem_nz_recNN.csv"))
c(dfwide, repcum, remdf) %<-% (dfwide %>%
rep_nz_cum(remcols = ids))
remdf %>%
jsonlite::flatten() %>%
write_csv(paste0(outdir, "rem_nz_rec.csv"))
for (dsource in names(sourceslist))
{
c(sourceslist[[dsource]], remdf) %<-% (sourceslist[[dsource]] %>%
rem_rep_nz(remcols = ids))
remdf %>%
jsonlite::flatten() %>%
write_csv(paste0(outdir, "rem_nz_rec_", dsource, ".csv"))
}
## Remove spurious sequences of zeroes
if (verbose) {message("removing suspicious zero sequences")}
# Since these sequence may co-occur at numerous stations, it is necessary to
# perform this step iteratively until no further suspect sequences are identified
k = 1 # number of times rem0 has been run
nt = Inf # number of sequences masked. Start with Inf to ensure process starts
nNN = Inf
nWRZ = Inf
while (nt > 0)
{
if (nNN > 0)
{
c(NN, rem0ls) %<-% (
NN %>%
rem0(dfcomp = bind_cols(dfwide,
NN %>%
dplyr::select(-any_of(ids))),
datec = ids[1], monthc = NULL, thresi = thresi,
thresdry = r_thres, nmin = nmin_ts, thresd = thresd,
exponent = dep_exp, pignore = pignore,
preliable = preliable, pthres = pthres))
nNN = length(rem0ls)
} else
{
rem0ls = tibble()
}
if (nWRZ > 0)
{
c(dfwide, rem0lst) %<-% (
dfwide %>%
rem0(dfcomp = bind_cols(dfwide, NN %>%
dplyr::select(-any_of(ids))),
datec = ids[1], monthc = NULL, thresi = thresi,
thresdry = r_thres, nmin = nmin_ts, thresd = thresd,
exponent = dep_exp, pignore = pignore, preliable = preliable,
pthres = pthres))
nWRZ = length(rem0lst)
rem0ls = bind_rows(rem0ls, rem0lst)
}
rem0ls %>%
write_csv(paste0(outdir, "rem0rec", k, ".csv"))
k = k + 1
nt = nNN + nWRZ
}
# Now for the individual sources
k = 1
# Initialise vector with number of shifts made for each source
nsources = rep(Inf, length(sourceslist))
while (sum(nsources) > 0)
{
rem0ls = tibble() # initialise for the sources cycle
for (j in 1:length(sourceslist))
{
# if changes were made in the previous iteration:
if (nsources[j] > 0)
{
dsource = names(sourceslist)[j]
c(sourceslist[[dsource]],
rem0lst) %<-% (
sourceslist[[dsource]] %>%
rem0(dfcomp = bind_cols(dfwide,
NN %>%
dplyr::select(-any_of(ids))),
datec = ids[1], monthc = NULL, thresi = thresi,
thresdry = r_thres, nmin = nmin_ts, thresd = thresd,
exponent = dep_exp, pignore = pignore,
preliable = preliable, pthres = pthres))
nsources[j] = nrow(rem0lst)
rem0ls %<>% bind_rows(rem0lst)
}
}
rem0ls %>%
write_csv(paste0(outdir, "rem0sources", k, ".csv"))
k = k + 1
}
NN %>%
write_csv(paste0(outdir, "NN_", meta, "_cz.csv"))
dfwide %>%
write_csv(paste0(outdir, meta, "_cz.csv"))
for (dsource in names(sourceslist))
{
sourceslist[[dsource]] %>%
write_csv(paste0(outdir, meta, "_", dsource, "_cz.csv"))
}
# assume repeated non-zeroes are equally distributed cumulates they will be
# considered in step 6
accumdf %<>% bind_rows(repcum) %>%
bind_rows(repcumNN)
### 2.
message("running step 2")
stations = dfwide %>%
get_stations(remcols = ids)
# include nearest neighbour stations as potential predictors
predictor = bind_cols(dfwide,
NN %>% dplyr::select(-any_of(ids)))
controls = predictor %>%
get_stations(remcols = ids)
cor_mat = cormat(dfwide, predictor, remcols = ids)
predf = expected(dfwide, predictor, cor_mat = cor_mat, remcols = ids,
thres = thresi, thresd = thresd, nmin = nmin, nmax = nmax,
equivs = equivs)
meta_exp = paste0(meta, "_expected")
saveRDS(predf, paste0(outdir, meta_exp, ".rds")) # Save the prediction to file
# for potential subsequent use
message("performing infilling...")
dfwide_I = infill(dfwide, predf, remcols = ids)
dfwide_I %>%
write_csv(paste0(outdir, meta, "_I.csv"))
### 3.
message("running step 3")
predNN = nn_exp(dfwide, NN, NNlist, predf, cor_mat, remcols = ids,
thres_cov = thres_cov, verbose = verbose)
# Save the prediction to file for potential subsequent use
saveRDS(predNN, paste0(outdir, meta_exp, "_N.rds"))
message("performing infilling...")
dfwide_I_N = infill(dfwide, predNN)
write_csv(dfwide_I_N, paste0(outdir, meta, "_I_N.csv"))
### 4.
message("running step 4: time-shifting")
mus = predictor %>%
corrected_smeans(predictor, thres_cov = thres_cov, remcols = ids)
predf_ts = ts_expected(dfwide, predictor, cor_mat = cor_mat, remcols = ids,
thres = thresi, thresd = thresd, nmin = nmin_ts,
nmax = nmax_ts, nmin_day = nmin_day, equivs = equivs,
verbose = verbose)
# use prediction ts, rather prediction N, to have more stations here
c(dfwide_TS, tsr,
shift1_df,
n_shifts,
ms_shifts) %<-% shift_time(dfwide, predf_ts, mus, sourceslist,
r_thres = r_thres, thres_ms = thres_ms,
thres_p = thres_p, thres_max = thres_max,
remcols = ids, verbose = verbose)
predictor = bind_cols(dfwide_TS, NN %>%
dplyr::select(-any_of(ids)))
shift1_df %>%
write_csv(paste0(outdir, meta, "_shifts1.csv"))
ms_shifts %>%
write_csv(paste0(outdir, meta, "_ms_shifts1.csv"))
corNN = cormat(NN, predictor)
# 'prediction' on NN dataset
NNpred = ts_expected(NN, predictor, cor_mat = corNN, remcols = ids,
thres = thresi, thresd = thresd, nmin = nmin_ts,
nmax = nmax_ts, nmin_day = nmin_day, equivs = equivs,
verbose = verbose)
saveRDS(NNpred, paste0(outdir, "NN_", meta_exp, ".csv"))
message("time-shifting nearest neighbour dataset")
c(NN_TS, tsrNN, NNshifts, nNN, NNms) %<-% shift_time(NN, NNpred, mus,
r_thres = r_thres,
thres_ms = thres_ms,
thres_p = thres_p,
thres_max = thres_max,
remcols = ids,
verbose = verbose)
n_shifts = n_shifts + nNN # update shift countc
NNshifts %>%
write_csv(paste0(outdir, meta, "_NN_shifts1.csv"))
sources_TS = sourceslist # initialise
message("recursively time-shifting, including multi-source shifting, ",
"all sources until sufficiently few changes are made")
nt = length(dfwide[[1]]) # number of days
ns = length(controls) # number of stations
k = 2 # iterations
# default: stop when fewer than 0.1% of days affected or max 5 of iterations
# reached (risk of infinite loop)
while (n_shifts > min_shift_frac * nt * ns & k < (1 + max_ts_iter))
{
if (verbose)
{
message("Iteration number: ", k, ". Number of shifts made = ", n_shifts,
" hence continuing")
}
for (dsource in names(sourceslist))
{
source_stations = get_stations(sources_TS[[dsource]], remcols = ids)
# only consider stations available for this source
source_pred = predf_ts %>%
dplyr::select(any_of(source_stations))
c(sources_TS[[dsource]],
tsrs, shift_source,
n_src_shifts, ms1) %<-% shift_time(sources_TS[[dsource]],
source_pred, mus, sources_TS,
r_thres = r_thres,
thres_ms = thres_ms,
thres_p = thres_p,
thres_max = thres_max,
remcols = ids,
verbose = verbose)
shift_source %>%
write_csv(paste0(outdir, meta, "shifts_", dsource, "_", k - 1, ".csv"))
ms1 %>%
write_csv(paste0(outdir, meta, "ms_shifts_", dsource, "_", k - 1, ".csv"))
}
predictor = bind_cols(dfwide_TS, NN_TS %>%
dplyr::select(-any_of(ids)))
corNN = cormat(NN_TS, predictor)
NNpred = ts_expected(NN_TS, predictor, cor_mat = corNN, remcols = ids,
thres = thresi, thresd = thresd, nmin = nmin_ts,
nmax = nmax_ts, nmin_day = nmin_day, equivs = equivs,
verbose = verbose)
c(NN_TS, tsrNNi, NNshifts, nNN, NNms) %<-% shift_time(NN_TS, NNpred, mus,
r_thres = r_thres,
thres_ms = thres_ms,
thres_p = thres_p,
thres_max = thres_max,
remcols = ids,
verbose = verbose)
# join together all previously considered multi-source timesteps
# names to ensure the same order
tsrNN = mapply(c, tsrNN, tsrNNi[names(tsrNN)])
NNshifts %>%
write_csv(paste0(outdir, meta, "_NN_shifts", k, ".csv"))
predictor = bind_cols(dfwide_TS, NN_TS %>%
dplyr::select(-any_of(ids)))
cor_mat = cormat(dfwide_TS, predictor, remcols = ids)
predf_ts = ts_expected(dfwide_TS, predictor, cor_mat = cor_mat,
thres = thresi, thresd = thresd, nmin = nmin_ts,
nmax = nmax_ts, nmin_day = nmin_day, remcols = ids,
equivs = equivs, verbose = verbose)
c(dfwide_TS, tsi, shift1_df,
n_shifts, ms_shifts) %<-% shift_time(dfwide_TS, predf_ts, mus,
sources_TS, r_thres = r_thres,
thres_ms = thres_ms,
thres_p = thres_p,
thres_max = thres_max,
remcols = ids,
verbose = verbose)
tsr = mapply(c, tsr, tsi[names(tsr)])
shift1_df %>%
write_csv(paste0(outdir, meta, "_shifts", k, ".csv"))
ms_shifts %>%
write_csv(paste0(outdir, meta, "_ms_shifts", k, ".csv"))
n_shifts = n_shifts + nNN
if (verbose)
{
message(n_shifts, " shifts after ", k, " iterations")
}
k = k + 1
}
if (verbose)
{
message("iterative time-shifting complete;")
message("differences in last iteration: ", n_shifts, " iterations: ", k - 1)
}
predictor = bind_cols(dfwide_TS, NN_TS %>%
dplyr::select(-any_of(ids)))
cor_mat = cormat(dfwide_TS, predictor, remcols = ids)
predf = expected(dfwide_TS, predictor, cor_mat = cor_mat, remcols = ids,
thres = thresi, thresd = thresd, nmin = nmin, nmax = nmax,
equivs = equivs)
predf_ts = ts_expected(dfwide_TS, predictor, thres = thresi, thresd = thresd,
cor_mat = cor_mat, nmin = nmin_ts, nmax = nmax_ts,
nmin_day = nmin_day, remcols = ids, equivs = equivs,
verbose = verbose)
predNN = nn_exp(dfwide_TS, NN_TS, NNlist, predf, cor_mat, remcols = ids,
thres_cov = thres_cov, verbose = verbose)
dfwide_I_N_TS = infill(dfwide_TS, predNN)
saveRDS(predNN, paste0(outdir, meta_exp, "_N_TS.rds"))
saveRDS(predf, paste0(outdir, meta_exp, "_TS.rds"))
saveRDS(predf_ts, paste0(outdir, meta_exp, "_nmin_day.rds"))
write_csv(dfwide_TS, paste0(outdir, meta, "_TS.csv"))
write_csv(NN_TS, paste0(outdir, "NN_", meta, "_TS.csv"))
saveRDS(tsr, paste0(outdir, meta, "_ms_completed_ts.rds"))
corNN = cormat(NN_TS, predictor)
NNpred = expected(NN_TS, predictor, cor_mat = corNN, remcols = ids,
thres = thresi, thresd = thresd, nmin = nmin, nmax = nmax,
equivs = equivs)
saveRDS(NNpred, paste0(outdir, "NN_", meta_exp, "_TS.rds"))
source_rats = list()
for (dsource in names(sourceslist))
{
write_csv(sources_TS[[dsource]], paste0(outdir, dsource, "_TS.csv"))
# prepare for source selection, while we're in this loop
source_stations = get_stations(sources_TS[[dsource]], remcols = ids)
# only consider stations available for this source
source_pred = predf_ts %>%
dplyr::select(any_of(source_stations))
# assuming that TS has minimal impact on mu
source_rats[[dsource]] = calc_rat(sources_TS[[dsource]], source_pred,
remcols = ids, mus = mus)
}
write_csv(dfwide_I_N_TS, paste0(outdir, meta, "_I_N_TS.csv"))
write_csv(cor_mat %>%
data.frame(), paste0(outdir, meta, "cor_mat_TS.csv"))
### 5.
message("running step 5: performing source selection")
rat = calc_rat(dfwide_TS, predf_ts)
c(dfwide_TS_SS, ss_df) %<-% source_select(dfwide_TS, sources_TS, rat, source_rats,
excl = tsr, remcols = ids,
verbose = verbose)
write_csv(ss_df, paste0(outdir, meta, "_sourceselections.csv"))
rm(source_rats)
predictor = bind_cols(dfwide_TS_SS, NN_TS %>%
dplyr::select(-any_of(ids)))
cor_mat = cormat(dfwide_TS_SS, predictor, remcols = ids)
predf = expected(dfwide_TS_SS, predictor, cor_mat = cor_mat, remcols = ids,
thres = thresi, thresd = thresd, nmin = nmin, nmax = nmax,
equivs = equivs)
predNN = nn_exp(dfwide_TS_SS, NN_TS, NNlist, predf, cor_mat, remcols = ids,
thres_cov = thres_cov, verbose = verbose)
print("performing infilling...")
dfwide_I_N_TS_SS = infill(dfwide_TS_SS, predNN)
saveRDS(predNN, paste0(outdir, meta_exp, "_N_TS_SS.rds"))
saveRDS(predf, paste0(outdir, meta_exp, "_TS_SS.rds"))
write_csv(dfwide_TS_SS, paste0(outdir, meta, "_TS_SS.csv"))
write_csv(dfwide_I_N_TS_SS, paste0(outdir, meta, "_I_N_TS_SS.csv"))
### 6.
message("running step 6")
# change `sourcecol` to `NULL` if monthly data are all from a single source
dc = remcols[1] # name of the date column
mon_accum %<>% prep_mon_accum(dfwide %>%
mutate(year = as.integer(format(.[[dc]], "%Y")),
month = as.integer(format(.[[dc]], "%Y"))),
sourcecol = "source", n_select = nmon)
accumdf %<>% bind_rows(mon_accum) %>%
shorter2top() %>%
dplyr::filter(station %in% controls)
# combined predictor
predc = bind_cols(predNN, NNpred %>%
dplyr::select(-any_of(ids)))
# needs to be split into NN and dfwide components
predictor %<>% distr_accum_known(predc, accumdf, remcols = ids, minrain = r_thres)
nn_stations = setdiff(controls, stations)
NN_TS_FA = predictor %>%
dplyr::select(union(ids, nn_stations))
dfwide_TS_SS_FA = predictor %>%
dplyr::select(union(ids, stations))
# Data in accumdf should be arranged from shortest to longest period, as data are
# not overwritten once they are included, so data over longer periods are ignored
# where shorter period data exist.
cor_mat = cormat(dfwide_TS_SS_FA, predictor, remcols = ids)
predf = expected(dfwide_TS_SS_FA, predictor, cor_mat = cor_mat, remcols = ids,
thres = thresi, thresd = thresd, nmin = nmin, nmax = nmax,
equivs = equivs, verbose = verbose)
# use more stations again for accumulations
predf_ts = ts_expected(dfwide_TS_SS_FA, predictor, cor_mat = cor_mat,
remcols = ids, thres = thresi, thresd = thresd,
nmin = nmin_ts, nmax = nmax_ts, nmin_day = nmin_day,
equivs = equivs, verbose = verbose)
predNN = nn_exp(dfwide_TS_SS_FA, NN_TS_FA, NNlist, predf, cor_mat, remcols = ids,
thres_cov = thres_cov, verbose = verbose)
corNN = cormat(NN_TS_FA, predictor)
NNpred = ts_expected(NN_TS_FA, predictor, cor_mat = corNN, remcols = ids,
thres = thresi, thresd = thresd, nmin = nmin_ts,
nmax = nmax_ts, nmin_day = nmin_day, equivs = equivs,
verbose = verbose)
dfwide_I_N_TS_SS_FA = infill(dfwide_TS_SS_FA, predNN)
saveRDS(NNpred, paste0(outdir, "NN_", meta_exp, "_TS_FA.rds"))
saveRDS(predNN, paste0(outdir, meta_exp, "_N_TS_SS_FA.rds"))
saveRDS(predf, paste0(outdir, meta_exp, "_TS_SS_FA.rds"))
write_csv(dfwide_TS_SS_FA, paste0(outdir, meta, "_TS_SS_FA.csv"))
write_csv(dfwide_I_N_TS_SS_FA, paste0(outdir, meta, "_I_N_TS_SS_FA.csv"))
write_csv(NN_TS_FA, paste0(outdir, "NN_", meta, "_TS_FA.csv"))
### 7.
message("running step 7")
c(dfwide_TS_SS_FA_A,
accum_performed) %<-% distr_accum(dfwide_TS_SS_FA, predf_ts,
min_rain = r_thres, min_accum = min_accum,
min_rat = min_rat,
min_back_rat = min_back_rat,
min_cum_rat = min_cum_rat,
max_cum_rat = max_cum_rat,
min_repl = min_repl, remcols = ids,
verbose = verbose)
write_csv(accum_performed, paste0(outdir, meta, "_accum_record1.csv"))
c(NN_TS_FA_A,
accum_performed) %<-% distr_accum(NN_TS_FA, NNpred, min_rain = r_thres,
min_accum = min_accum, min_rat = min_rat,
min_back_rat = min_back_rat,
min_cum_rat = min_cum_rat,
max_cum_rat = max_cum_rat,
min_repl = min_repl, remcols = ids,
verbose = verbose)
write_csv(accum_performed, paste0(outdir, meta, "_accum_record_NN.csv"))
predictor = bind_cols(dfwide_TS_SS_FA_A,
NN_TS_FA_A %>%
dplyr::select(-any_of(ids)))
cor_mat = cormat(dfwide_TS_SS_FA_A, predictor)
predf_ts = ts_expected(dfwide_TS_SS_FA_A, predictor, cor_mat = cor_mat,
remcols = ids, thres = thresi, thresd = thresd,
nmin = nmin_ts, nmax = nmax_ts, nmin_day = nmin_day,
equivs = equivs, verbose = verbose)
corNN = cormat(NN_TS_FA_A, predictor)
NNpred = ts_expected(NN_TS_FA_A, predictor, cor_mat = corNN, remcols = ids,
thres = thresi, thresd = thresd, nmin = nmin, nmax = nmax,
equivs = equivs)
# perform distr_accum a second time, in case multiple stations require it
c(dfwide_TS_SS_FA_A,
accum_performed) %<-% distr_accum(dfwide_TS_SS_FA_A, predf_ts,
min_rain = r_thres, min_accum = min_accum,
min_rat = min_rat,
min_back_rat = min_back_rat,
min_cum_rat = min_cum_rat,
max_cum_rat = max_cum_rat,
min_repl = min_repl, remcols = ids,
verbose = verbose)
write_csv(accum_performed, paste0(outdir, meta, "_accum_record2.csv"))
c(NN_TS_FA_A, accum_performed) %<-% distr_accum(NN_TS_FA_A, NNpred,
min_rain = r_thres,
min_accum = min_accum,
min_rat = min_rat,
min_back_rat = min_back_rat,
min_cum_rat = min_cum_rat,
max_cum_rat = max_cum_rat,
min_repl = min_repl,
remcols = ids, verbose = verbose)
write_csv(accum_performed, paste0(outdir, meta, "_accum_record_NN2.csv"))
predictor = bind_cols(dfwide_TS_SS_FA_A, NN_TS_FA_A %>%
dplyr::select(-any_of(ids)))
cor_mat = cormat(dfwide_TS_SS_FA_A, predictor)
predf = expected(dfwide_TS_SS_FA_A, predictor, cor_mat = cor_mat, remcols = ids,
thres = thresi, thresd = thresd, nmin = nmin, nmax = nmax,
equivs = equivs)
predNN = nn_exp(dfwide_TS_SS_FA_A, NN_TS_FA_A, NNlist, predf, cor_mat,
remcols = ids, thres_cov = thres_cov, verbose = verbose)
corNN = cormat(NN_TS_FA_A, predictor)
# for time-shifting below
NNpred = ts_expected(NN_TS_FA_A, predictor, cor_mat = corNN, remcols = ids,
thres = thresi, thresd = thresd, nmin = nmin_ts,
nmax = nmax_ts, nmin_day = nmin_day, equivs = equivs,
verbose = verbose)
dfwide_I_N_TS_SS_FA_A = infill(dfwide_TS_SS_FA_A, predNN)
# for next set of time-shifts
predf_ts = ts_expected(dfwide_TS_SS_FA_A, predictor, cor_mat = cor_mat,
remcols = ids, thres = thresi, thresd = thresd,
nmin = nmin_ts, nmax = nmax_ts, nmin_day = nmin_day,
equivs = equivs, verbose = verbose)
saveRDS(NNpred, paste0(outdir, "NN_", meta_exp, "_TS_FA_A.rds"))
saveRDS(predNN, paste0(outdir, meta_exp, "_N_TS_SS_FA_A.rds"))
saveRDS(predf, paste0(outdir, meta_exp, "_TS_SS_FA_A.rds"))
write_csv(dfwide_TS_SS_FA_A, paste0(outdir, meta, "_TS_SS_FA_A.csv"))
write_csv(dfwide_I_N_TS_SS_FA_A, paste0(outdir, meta, "_I_N_TS_SS_FA_A.csv"))
write_csv(NN_TS_FA_A, paste0(outdir, "NN_", meta, "_TS_FA_A.csv"))
message("performing time-shifting with accumulation-distributed reference series")
# no multi-source shifting to be done, as source selection is complete
# update mean estimates
mus = predictor %>%
corrected_smeans(predictor, thres_cov = thres_cov, remcols = ids)
c(NN_FA_TS, tsrNNi, NNshifts, nNN, NNms) %<-% shift_time(NN_TS_FA, NNpred, mus,
r_thres = r_thres,
thres_ms = thres_ms,
thres_p = thres_p,
thres_max = thres_max,
remcols = ids,
verbose = verbose)
NNshifts %>%
write_csv(paste0(outdir, meta, "_NN_FA_shifts", ".csv"))
c(dfwide_SS_TS, tsi,
shift1_df,
n_shifts, ms_shifts) %<-% shift_time(dfwide_TS_SS, predf, mus,
r_thres = r_thres,
thres_ms = thres_ms,
thres_p = thres_p,
thres_max = thres_max,
remcols = ids,
verbose = verbose)
shift1_df %>%
write_csv(paste0(outdir, meta, "_FA_shifts", ".csv"))
write_csv(NN_FA_TS, paste0(outdir, "NN_", meta, "_SS_FA_TS.csv"))
write_csv(dfwide_SS_TS, paste0(outdir, meta, "_SS_TS.csv"))
### 8.
message("running step 8")
c(dfwide_TS_SS_FA_A_L, dec_changes) %<-% dec_error(dfwide_TS_SS_FA_A, predf_ts,
predNN, dec_thres = dec_thres,
mult_thres = mult_thres,
r_thres = r_thres, mus = mus,
remcols = ids)
write_csv(dec_changes, paste0(outdir, meta, "_decimals1.csv"))
predictor = bind_cols(dfwide_TS_SS_FA_A_L, NN_TS_FA_A %>%
dplyr::select(-any_of(ids)))
cor_mat = cormat(dfwide_TS_SS_FA_A_L, predictor)
predf = expected(dfwide_TS_SS_FA_A_L, predictor, cor_mat = cor_mat, remcols = ids,
thres = thresi, thresd = thresd, nmin = nmin, nmax = nmax,
equivs = equivs)
predNN = nn_exp(dfwide_TS_SS_FA_A_L, NN_TS_FA_A, NNlist, predf, cor_mat,
remcols = ids, thres_cov = thres_cov, verbose = verbose)
dfwide_I_N_TS_SS_FA_A_L = infill(dfwide_TS_SS_FA_A_L, predNN)
saveRDS(predf, paste0(outdir, meta_exp, "_N_TS_SS_FA_A_L.rds"))
saveRDS(predNN, paste0(outdir, meta_exp, "_TS_SS_FA_A_L.rds"))
write_csv(dfwide_TS_SS_FA_A_L, paste0(outdir, meta, "_TS_SS_FA_A_L.csv"))
write_csv(dfwide_I_N_TS_SS_FA_A_L, paste0(outdir, meta, "_I_N_TS_SS_FA_A_L.csv"))
# remove large record-keeping datasets
rm(ms_shifts, shift1_df, NNms, NNshifts, shift_source, ms1, dec_changes)
# remove large infilling datasets that have already been saved to file
rm(dfwide_I, dfwide_I_N, dfwide_I_N_TS, dfwide_I_N_TS_SS, dfwide_I_N_TS_SS_FA_A,
dfwide_I_N_TS_SS_FA)
### 9.
message("running step 9")
na_df = is.na(dfwide_TS_SS_FA_A_L %>%
dplyr::select(-any_of(ids)))
c(predZG, df_p0,
df_mu, df_sigma) %<-% zg_exp(dfwide_I_N_TS_SS_FA_A_L, na_df, cor_mat,
sbc_thres = sbc_thres, cores = cores,
max_stations = max_stations, max_excl = max_excl,
savemodel = savemodel, verbose = verbose)
predNS = nn_exp(dfwide_TS_SS_FA_A_L, NN_TS_FA_A, NNstrict, predZG, cor_mat,
remcols = ids, thres_cov = thres_cov, verbose = verbose)
# strict nearest neighbour
dfwide_I_N_TS_SS_A_L_ZG = infill(dfwide_TS_SS_FA_A_L, predNS)
saveRDS(predNS, paste0(outdir, meta_exp, "_NS.rds"))
saveRDS(predZG, paste0(outdir, meta_exp, "_ZG.rds"))
saveRDS(df_p0, paste0(outdir, meta_exp, "_p0ZG.rds"))
saveRDS(df_mu, paste0(outdir, meta_exp, "_muZG.rds"))
saveRDS(df_sigma, paste0(outdir, meta_exp, "_sigmaZG.rds"))
write_csv(dfwide_I_N_TS_SS_A_L_ZG, paste0(outdir, meta, "_I_N_TS_SS_A_L_ZG.csv"))
# if the process is to be broken into 2, first run this step and save output to file
# for later
if (stepRDS)
{
save(list = ls(all.names = TRUE), file = paste0(RDSname, ".rds"))
return(list(simple_cleaned = dfwide_TS_SS_FA_A_L,
simple_filled = dfwide_I_N_TS_SS_FA_A_L))
}
rm(dfwide_I_N_TS_SS_A_L_ZG)
### 10.
message("running step 10")
# exponent = 1, since no dep. correct needed, with `yest` in `nu` model
c(dfwide_TS_SS_FA_A_L_SZ, rem0df) %<-% rem0seq(dfwide_TS_SS_FA_A_L, df_p0,
pthres = pthres,
pignore = pignore, exponent = 1,
remcols = ids, verbose = verbose,
r_thres = r_thres)
write_csv(rem0df, paste0(outdir, meta, "rem0_1.csv"))
# to be used for second phase accum distribution
c(dfwide_SS_TS_SZ, rem0df) %<-% rem0seq(dfwide_SS_TS, df_p0, pthres = pthres,
pignore = pignore, exponent = 1,
r_thres = r_thres, remcols = ids,
verbose = verbose)
write_csv(rem0df, paste0(outdir, meta, "rem0_SS_TS_SZ.csv"))
write_csv(dfwide_TS_SS_FA_A_L_SZ, paste0(outdir, meta, "_TS_SS_FA_A_L_SZ.csv"))
write_csv(dfwide_SS_TS_SZ, paste0(outdir, meta, "_SS_TS_SZ.csv"))
predictor = bind_cols(dfwide_TS_SS_FA_A_L_SZ, NN_TS_FA_A %>%
dplyr::select(-any_of(ids)))
cor_mat = cormat(dfwide_TS_SS_FA_A_L_SZ, predictor)
predNS = nn_exp(dfwide_TS_SS_FA_A_L_SZ, NN_TS_FA_A, NNstrict, predZG, cor_mat,
remcols = ids, thres_cov = thres_cov, verbose = verbose)
saveRDS(predNS, paste0(outdir, meta_exp, "_NS_SZ.rds"))
rm(rem0df)
### 11.
message("running step 11")
# First rerun the forced accumulation script, then move to suspected accumulations
dfwide_SS_TS_SZ_FA = distr_accum_known(dfwide_SS_TS_SZ, predNS, accumdf %>%
dplyr::filter(station %in% stations),
remcols = ids, minrain = r_thres)
write_csv(dfwide_SS_TS_SZ_FA, paste0(outdir, meta, "_SS_TS_SZ_FA.csv"))
c(dfwide_SS_TS_SZ_FA_AP,
accums_df) %<-% distr_accum_prob(dfwide_SS_TS_SZ_FA, predNS, df_p0,
df_mu, df_sigma, min_rain = r_thres,
min_accum = min_accum_zg,
min_rat = min_rat_zg,
max_prob = max_prob,
min_improve = min_improve,
max_cum_rat = max_cum_rat,
remcols = ids, verbose = verbose)
write_csv(accums_df, paste0(outdir, meta, "_accums_df_ZG"))
write_csv(dfwide_SS_TS_SZ_FA_AP, paste0(outdir, meta, "_SS_TS_SZ_FA_AP.csv"))
### 12.
message("step 12: rerunning step 8 on step 11 output")
c(dfwide_SS_TS_SZ_FA_AP_DE,
df_fixed) %<-% dec_error_prob(dfwide_SS_TS_SZ_FA_AP, df_p0,
df_mu, df_sigma, remcols = ids,
p_max = p_max, p_thres = min_prob,
r_thres = r_thres)
write_csv(df_fixed, paste0(outdir, meta, "_dec_error_prob_record.csv"))
write_csv(dfwide_SS_TS_SZ_FA_AP_DE, paste0(outdir, meta, "_SS_TS_SZ_FA_AP_DE.csv"))
predictor = bind_cols(dfwide_SS_TS_SZ_FA_AP_DE, NN_TS_FA_A %>%
dplyr::select(-any_of(ids)))
cor_mat = cormat(dfwide_SS_TS_SZ_FA_AP_DE, predictor)
predNS = nn_exp(dfwide_SS_TS_SZ_FA_AP_DE, NN_TS_FA_A, NNstrict, predZG, cor_mat,
remcols = ids, thres_cov = thres_cov, verbose = verbose)
dfwide_I_N_TS_SS_ZG_SZ_FA_AP_DE = infill(dfwide_SS_TS_SZ_FA_AP_DE, predNS)
saveRDS(predNS, paste0(outdir, meta_exp, "_NS_SZ_AP.rds"))
write_csv(dfwide_I_N_TS_SS_ZG_SZ_FA_AP_DE, paste0(outdir, meta,
"_I_N_TS_SS_ZG_SZ_AP_DE.csv"))
rm(mon_accum, accums_df, df_fixed, accumdf, predNN, predf_ts, dfwide_SS_TS_SZ_FA,
dfwide_TS_SS_FA, dfwide_TS, dfwide_TS_SS, dfwide_SS_TS_SZ_FA_AP, dfwide_TS_SS_FA_A,
dfwide_TS_SS_FA_A_L_SZ)
gc()
### 13.
message("step 13: rerunning step 8")
# For the GAMLSS prediction, only consider the stations in the 'full' dataset, not NN
# stations
cor_mat = cormat(dfwide_SS_TS_SZ_FA_AP_DE, dfwide_SS_TS_SZ_FA_AP_DE)
na_df = is.na(dfwide_SS_TS_SZ_FA_AP_DE %>%
dplyr::select(-any_of(ids)))
c(predZG, df_p0,
df_mu, df_sigma) %<-% zg_exp(dfwide_I_N_TS_SS_ZG_SZ_FA_AP_DE, na_df, cor_mat,
max_excl = max_excl, sbc_thres = sbc_thres,
cores = cores, max_stations = max_stations,
savemodel = savemodel, verbose = verbose)
cor_mat = cormat(dfwide_SS_TS_SZ_FA_AP_DE, predictor)
predNS = nn_exp(dfwide_SS_TS_SZ_FA_AP_DE, NN_TS_FA_A, NNstrict, predZG, cor_mat,
remcols = ids,thres_cov = thres_cov, verbose = verbose)
dfwide_I_N_SS_TS_SZ_FA_AP_DE_ZG = infill(dfwide_SS_TS_SZ_FA_AP_DE, predNS)
write_csv(dfwide_I_N_SS_TS_SZ_FA_AP_DE_ZG, paste0(outdir, meta,
"_I_N_SS_TS_SZ_FA_AP_DE_ZG.csv"))
saveRDS(df_p0, paste0(outdir, meta_exp, "_p0ZG_SZ_AP.rds"))
saveRDS(df_mu, paste0(outdir, meta_exp, "_muZG_SZ_AP.rds"))
saveRDS(df_sigma, paste0(outdir, meta_exp, "_sigmaZG_SZ_AP.rds"))
saveRDS(predNS, paste0(outdir, meta_exp, "_NS_SZ_AP_ZG.rds"))
return(list(simple_cleaned = dfwide_TS_SS_FA_A_L,
simple_filled = dfwide_I_N_TS_SS_FA_A_L,
cleaned = dfwide_SS_TS_SZ_FA_AP_DE,
clean_filled = dfwide_I_N_SS_TS_SZ_FA_AP_DE_ZG))
}
cleanfill_cont = function(RDSname, ncores = 3)
{
### This function is used as to continue the cleanfill procedure if in the initial
### application of it, `stepRDS = 1` was set and the namespace saved successfully to
### `RDSname`. This function reads in this file and completes the procedure.
### This is done to help with memory constraints. The only parameter that can be
### set separately from the initial input is `ncores`, the number of cores to be
### used for the second iteration of the GAMLSS fitting procedure. This parameter
### overwrites the value of `cores` supplied to `cleanfill()`.
# read in the data and parameters saved
load(paste0(RDSname, ".rds"))
rm(dfwide_I_N_TS_SS_A_L_ZG)
### 10.
message("running step 10")
# exponent = 1, since no dep. correct needed, with yest in `nu` model
c(dfwide_TS_SS_FA_A_L_SZ, rem0df) %<-% rem0seq(dfwide_TS_SS_FA_A_L, df_p0,
pthres = pthres,
pignore = pignore, exponent = 1,
remcols = ids, verbose = verbose,
r_thres = r_thres)
write_csv(rem0df, paste0(outdir, meta, "rem0_1.csv"))
# to be used for second phase accum distribution
c(dfwide_SS_TS_SZ, rem0df) %<-% rem0seq(dfwide_SS_TS, df_p0, pthres = pthres,
pignore = pignore, exponent = 1,
r_thres = r_thres, remcols = ids,
verbose = verbose)
write_csv(rem0df, paste0(outdir, meta, "rem0_SS_TS_SZ.csv"))
write_csv(dfwide_TS_SS_FA_A_L_SZ, paste0(outdir, meta, "_TS_SS_FA_A_L_SZ.csv"))
write_csv(dfwide_SS_TS_SZ, paste0(outdir, meta, "_SS_TS_SZ.csv"))
predictor = bind_cols(dfwide_TS_SS_FA_A_L_SZ, NN_TS_FA_A %>%
dplyr::select(-any_of(ids)))
cor_mat = cormat(dfwide_TS_SS_FA_A_L_SZ, predictor)
predNS = nn_exp(dfwide_TS_SS_FA_A_L_SZ, NN_TS_FA_A, NNstrict, predZG, cor_mat,
remcols = ids, thres_cov = thres_cov, verbose = verbose)
saveRDS(predNS, paste0(outdir, meta_exp, "_NS_SZ.rds"))
rm(rem0df)
### 11.
message("running step 11")
# First rerun the forced accumulation script, then move to suspected accumulations
dfwide_SS_TS_SZ_FA = distr_accum_known(dfwide_SS_TS_SZ, predNS, accumdf %>%
dplyr::filter(station %in% stations),
remcols = ids, minrain = r_thres)
write_csv(dfwide_SS_TS_SZ_FA, paste0(outdir, meta, "_SS_TS_SZ_FA.csv"))
c(dfwide_SS_TS_SZ_FA_AP,
accums_df) %<-% distr_accum_prob(dfwide_SS_TS_SZ_FA, predNS, df_p0,
df_mu, df_sigma, min_rain = r_thres,
min_accum = min_accum_zg,
min_rat = min_rat_zg,
max_prob = max_prob,
min_improve = min_improve,
max_cum_rat = max_cum_rat,
remcols = ids, verbose = verbose)
write_csv(accums_df, paste0(outdir, meta, "_accums_df_ZG"))
write_csv(dfwide_SS_TS_SZ_FA_AP, paste0(outdir, meta, "_SS_TS_SZ_FA_AP.csv"))
### 12.
message("step 12: rerunning step 8 on step 11 output")
c(dfwide_SS_TS_SZ_FA_AP_DE,
df_fixed) %<-% dec_error_prob(dfwide_SS_TS_SZ_FA_AP, df_p0,
df_mu, df_sigma, remcols = ids,
p_max = p_max, p_thres = min_prob,
r_thres = r_thres)
write_csv(df_fixed, paste0(outdir, meta, "_dec_error_prob_record.csv"))
write_csv(dfwide_SS_TS_SZ_FA_AP_DE, paste0(outdir, meta, "_SS_TS_SZ_FA_AP_DE.csv"))
predictor = bind_cols(dfwide_SS_TS_SZ_FA_AP_DE, NN_TS_FA_A %>%
dplyr::select(-any_of(ids)))
cor_mat = cormat(dfwide_SS_TS_SZ_FA_AP_DE, predictor)
predNS = nn_exp(dfwide_SS_TS_SZ_FA_AP_DE, NN_TS_FA_A, NNstrict, predZG, cor_mat,
remcols = ids, thres_cov = thres_cov, verbose = verbose)
dfwide_I_N_TS_SS_ZG_SZ_FA_AP_DE = infill(dfwide_SS_TS_SZ_FA_AP_DE, predNS)
saveRDS(predNS, paste0(outdir, meta_exp, "_NS_SZ_AP.rds"))
write_csv(dfwide_I_N_TS_SS_ZG_SZ_FA_AP_DE, paste0(outdir, meta,
"_I_N_TS_SS_ZG_SZ_AP_DE.csv"))
rm(mon_accum, accums_df, df_fixed, accumdf, predNN, predf_ts, dfwide_SS_TS_SZ_FA,
dfwide_TS_SS_FA, dfwide_TS, dfwide_TS_SS, dfwide_SS_TS_SZ_FA_AP, dfwide_TS_SS_FA_A,
dfwide_TS_SS_FA_A_L_SZ)
gc()
### 13.
message("step 13: rerunning step 8")
# For the GAMLSS prediction, only consider the stations in the 'full' dataset, not NN
# stations
cor_mat = cormat(dfwide_SS_TS_SZ_FA_AP_DE, dfwide_SS_TS_SZ_FA_AP_DE)
na_df = is.na(dfwide_SS_TS_SZ_FA_AP_DE %>%
dplyr::select(-any_of(ids)))
c(predZG, df_p0,
df_mu, df_sigma) %<-% zg_exp(dfwide_I_N_TS_SS_ZG_SZ_FA_AP_DE, na_df, cor_mat,
max_excl = max_excl, sbc_thres = sbc_thres,
cores = ncores, max_stations = max_stations,
savemodel = savemodel, verbose = verbose)
cor_mat = cormat(dfwide_SS_TS_SZ_FA_AP_DE, predictor)
predNS = nn_exp(dfwide_SS_TS_SZ_FA_AP_DE, NN_TS_FA_A, NNstrict, predZG, cor_mat,
remcols = ids,thres_cov = thres_cov, verbose = verbose)
dfwide_I_N_SS_TS_SZ_FA_AP_DE_ZG = infill(dfwide_SS_TS_SZ_FA_AP_DE, predNS)
write_csv(dfwide_I_N_SS_TS_SZ_FA_AP_DE_ZG, paste0(outdir, meta,
"_I_N_SS_TS_SZ_FA_AP_DE_ZG.csv"))
saveRDS(df_p0, paste0(outdir, meta_exp, "_p0ZG_SZ_AP.rds"))
saveRDS(df_mu, paste0(outdir, meta_exp, "_muZG_SZ_AP.rds"))
saveRDS(df_sigma, paste0(outdir, meta_exp, "_sigmaZG_SZ_AP.rds"))
saveRDS(predNS, paste0(outdir, meta_exp, "_NS_SZ_AP_ZG.rds"))
return(list(simple_cleaned = dfwide_TS_SS_FA_A_L,
simple_filled = dfwide_I_N_TS_SS_FA_A_L,
cleaned = dfwide_SS_TS_SZ_FA_AP_DE,
clean_filled = dfwide_I_N_SS_TS_SZ_FA_AP_DE_ZG))
}
PK{vmLLPK
WpDpD
gamlss_fit.RUT keke### This script contains the function to fit GAMLSS models for each station in the
### dataset
scriptsroot = "~/Documents/Academic/PhD/Scripts/R/pptClean/share/gamlss_cleaner/"
source(paste0(scriptsroot, "helpers.R"))
library(zeallot)
library(gamlss)
library(tidyverse)
library(magrittr)
zg_exp = function(dfwide, na_df, cor_mat, mus = NULL, sbc_thres = 10, cores = 3,
full_gaic = F, savemodel = TRUE, r_thres = 0.2, max_stations = 9,
max_excl = 9, ss = NULL, remcols = c("date", "year", "month"),
datecol = remcols[1], verbose = FALSE)
{
### This function fits GAMLSS models to station rainfall series contained in a wide-
### format data.frame `dfwide`, using the Zero-Inflated Gamma/Zero Adjusted Gamma
### (ZAGA) distribution. For a give target (predictand) station, the model is fit
### only for days where valid data are available for the target station (but not
### necessarily the predictor stations). Model selection for each parameter is
### performed and the final models output to file iff `savemodel` is set to TRUE.
### The estimated model parameter time series for each station are output in
### data.frames with the same structure as `dfwide`. A single rainfall realisation
### series obtained using a binomial random variable for rainfall occurrence and the
### fitted mean value for rainfall amount, is also included in a data.frame of the
### same structure.
### `dfwide` must be a gap-free station-wide format data frame, whose column names are
### either station names or date/time identifiers specified in the remcols vector.
### Which days do not have valid data should be specified in `na_df`, a data.frame
### containing exactly the same station columns and time steps as `dfwide`, but where
### the values in all station columns are boolean variables indicating whether the
### given data point is missing.
### `cor_mat` is a matrix with column and row names for all stations to be considered
### and elements indicating the Spearman rank correlation between the corresponding
### rainfall series.
### `mus` is a labelled (with station names) numeric vector containing the
### climatological daily mean rainfall values at each station in mm. If NULL (default)
### will be computed from the means of gap-filled data.frame `dfwide`.
### If `ss` is NULL (default) models are fit for all stations in `dfwide`, otherwise
### only those stations listed in the character vector `ss` are considered.
### `sbc_thres` is the minimum Schwarz Bayesian Criterion (SBC) improvement
### obtained from including an additional station in the GAMLSS model for a given
### target station for the search for additional predictor stations to be continued
### thereafter.
### `cores` is the number of CPUs to use for the model selection component of the
### function.
### `r_thres` is the minimum rainfall amount to be considered as a rain day.
### `max_stations` is the maximum number of predictor stations to be included
### in the model for a given target station.
### `max_excl` is the maximum number of stations to exclude from considering
### as potential predictors for a GAMLSS fit for a given target station due to
### non-convergence, before the search for additional predictor stations is abandoned.
### If `full_gaic` is set to TRUE both forward and backward selection of stations
### is performed, as outlined by
### Ramires et al. (2021) [https://doi.org/10.6339/21-JDS1003]. Otherwise, only
### the forward selection component is run.
## Internal helper function to transform station data from a (assumed)
## ZAGA/ZIG distribution to N(0, 1) [the std. normal]
zaga_transform = function(dfwide, stations, mus, r_thres = 0.2)
{
for (S in stations)
{
obs = dfwide %>%
pull(S)
mu0 = mus[S]
p0 = sum(obs < r_thres)/nt
# station cdf
qs = pZAGA(obs, mu = mu0/(1 - p0), sigma = 1, nu = p0)
qs[qs == min(qs)] = min(qs)/2 # halve quantile for rain==0
# to ensure mean ~ 0
dfwide[, S] = qnorm(qs) # transform to normal
}
# some rainfall values are too large and set to Inf; reset these to 8
numcols = setdiff(colnames(dfwide), datecol) # numeric columns
dfwide[numcols][!is.finite(as.matrix(dfwide[numcols]))] = 8
return(dfwide)
}
### ERROR HELPER FUNCTIONS:
### MUST BE INSIDE THIS FUNCTION TO SEE VARIABLES IN ZG_exp SCOPE
zaga_log_error = function(cond)
{
### function used for error handling when model fails to converge
message("got error: ", cond, " hence, initialising with log link for mu, as:")
muf = as.formula(paste(S, paste0("log(", s, " + 1)"), sep = " ~ "))
message(muf)
return(gamlss(muf, sigma.fo = sigmaf, nu.fo = nuf,
family = ZAGA(mu.link = log, nu.link = cloglog),
data = ref_df, control = param))
}
zaga_mu_error = function(cond)
{
message("got error:", cond, "; hence, initialising with station mean")
return(gamlss(muf, sigma.fo = sigmaf, nu.fo = nuf,
family = ZAGA(mu.link = identity, nu.link = cloglog),
data = ref_df, control = param, mu.start = mus[S]))
}
zaga_def_error = function(cond)
{
message("got error:", cond, "; hence, initialising from default")
return(gamlss(muf, sigma.fo = sigmaf, nu.fo = nuf,
family = ZAGA(mu.link = identity, nu.link = cloglog),
data = ref_df, control = param))
}
zaga_final_error = function(cond)
{
message("got error:", cond, "; no initialisation options remain.",
" Reverting to previous model")
s = s[s != snext]
nm = nm - 1
excl = c(excl, snext)
if (length(excl) > (max_excl - 1))
{
message("alternative stations not yielding convergence; ",
"end search for superior station model")
sbc1 = sbc1 # for return
} else
{
# try again!
sbc1 = sbc1 + sbc_thres + 1
}
return(list(m1, s, nm, excl, sbc1))
}
zaga_update_error = function(cond)
{
message("got error:", cond, "; hence, initialising with station mean")
return(update(update(update(m2, what = "nu", formula = ~1),
what = "sigma", formula = ~1),
formula = m2$mu.formula, mu.start = mus[S]))
}
m2_error = function(cond)
{
message("got error:", cond, "; hence, returning existing model m2:")
return(m2)
}
## perform work
stations = dfwide %>%
get_stations(remcols = remcols)
if (length(mus) == 0)
{
mus = dfwide %>%
smeans(remcols = remcols)
}
mus %<>% .[stations]
# day of year; + 1 since function return value is in [0, 365]
doy = (dfwide %>%
pull(datecol) %>%
as.POSIXlt() %>%
.$yday) + 1
nt = nrow(dfwide)
# initialisation
df_fit = tibble(dfwide %>%
dplyr::select(-any_of(remcols)), doy = doy)
df_p0 = tibble(date = dfwide %>%
pull(datecol)) # nu values
df_mu = df_p0
df_sigma = df_p0
sbcs = NULL
preds = NULL
models = NULL
## for assessment of residuals: transform data to normal by normal quantiles
## (SPI-type)
norm_df = dfwide %>% zaga_transform(stations, mus, r_thres = r_thres)
# default to running for all stations
if (length(ss) == 0)
{
ss = stations
}
# build model for each station...
for (S in ss)
{
corvec = cor_mat[rownames(cor_mat) != S, S] # exclude current station from
# potential predictors
# the previous day's rain: assume the day before first day had same rain
# as day 1
yest = df_fit %>%
pull(S) %>%
.[c(1, 1:(nt - 1))]
df_fit$yest = yest # for later 'prediction'; replaced for every new station
## ref_df needs to be in the global scope as this is assumed by stepGAIC.VR later
# consider only days with valid data at station S
ref_df <<- df_fit[!na_df[, S], ]
obs = ref_df %>%
pull(all_of(S)) # also only valid data included
nvalid = length(obs)
norm_s = norm_df[!na_df[, S], stations]
# as first predictor station, select the station with highest correlation with S
s = names(which.max(corvec)) # returns the name of 1st occurrence of max val
if (verbose)
{
message("predictand is ", S)
message("1st predictor is ", s)
}
# formulae for mean (mu), sd (sigma) & dry day prob (nu/p0)
# <<- to make this visible to the gamlss fns
sigmaf <<- as.formula(paste("~ log(", s, "+ 1)"))
muf <<- as.formula(paste(S, s, sep = " ~ "))
# ensure that the probability of a dry day decreases monotonically with
# increasing rainfall at predictor stations
nuf <<- as.formula(paste0("~pbm(log(", s, " + 1), mono = 'down')"))
# less strict convergence criterium & fewer cycles allowed; offsets tendency to
# get stuck in a loop
param <<- gamlss.control(c.crit = 0.1, n.cyc = 6)
sbc2 = 0 # initialisation
# build initial model! for errors that are known to occur, directly attempt
# alternative approach:
m1 <<- tryCatch(gamlss(muf, sigma.fo = sigmaf, nu.fo = nuf,
family = ZAGA(mu.link = identity, nu.link = cloglog),
data = ref_df, control = param), error = zaga_log_error)
sbc1 = m1$sbc
dsbc = sbc1 - sbc2 # change in sbc with addition of new station
nm = 1 # number of terms in the model
excl = NULL # stations to exclude from consideration for this station
# as they have previously led to non-convergence
# joint probability of wet (dry) at station and reference;
# not really a probability, but proportional to it, as it's not divided by nt
j_pr_wet = colSums((obs >= r_thres) * (ref_df[, stations] >= r_thres))
j_pr_dry = colSums((obs < r_thres) * (ref_df[, stations] < r_thres))
# reqiure at least 3 stations;
# thereafter continue until model stops improving sufficiently
while (nm < 3 | dsbc > sbc_thres)
{
## select next station to add to model, based on correlation with residuals
res1 = get.rqres(m1, howmany = 100) # 100 randomised, normalised quantile
# residuals
res1[!is.finite(res1)] = 8 # set 'infinite' values to high value
corres = colMeans(cor(res1, norm_s)) # correlation with residuals of
# previous model
meas = corres * j_pr_wet * j_pr_dry # measure of similarity
# incorporates rain day co-frequency
ignore = c(S, s, excl) # can't predict itself & no repeats
meas %<>% .[setdiff(stations, ignore)]
snext = names(meas)[which.max(meas)] # next selected station
if (verbose)
{
message("new predictor station is: ", snext)
}
s = c(s, snext)
sigmaf <<- as.formula(paste("~", paste("log(", s, "+ 1)", collapse = " + ")))
muf <<- as.formula(paste(S, paste(s, collapse = " + "), sep = " ~ "))
nuf <<- as.formula(paste("~", paste0("pbm(log(", s, " + 1), mono = 'down')",
collapse = " + "),
"+ pbm(log(yest + 1), mono = 'down')",
"+ pbc(doy, max.df = 2)"))
# next model iteration!
m2 = tryCatch(tryCatch(tryCatch(gamlss(muf, sigma.fo = sigmaf, nu.fo = nuf,
family = ZAGA(mu.link = identity,
nu.link = cloglog),
data = ref_df, control = param,
mu.start = fitted(m1, "mu")),
error = zaga_mu_error),
error = zaga_def_error),
error = zaga_final_error)
m2 <<- m2 # directly assigning to the global m2 only does not work inside
# the function if a failure previously occurred, since then a local copy of m2
# already exists with higher precedence than the global one within the
# function scope
# if the output comes from zaga_final_error
if (!is.gamlss(m2))
{
c(m2, s, nm, excl, sbc1) %<-% m2
m2 <<- m2
}
sbc2 = m2$sbc
dsbc = sbc1 - sbc2
if (verbose)
{
message("change in sbc = ", dsbc)
}
nm = nm + 1
# model not improved relative to previous set-up
if (dsbc <= 0)
{
m2 = m1 # take previous model as current
m2 <<- m2 # need to make sure global and local scope versions are equal
nm = nm - 1
s = s[1:nm]
if (verbose)
{
message("model not improved; revert to previous set-up")
}
break
}
if (nm >= max_stations)
{
if (verbose)
{
message("Reached max number of stations, exit")
}
break
}
m1 <<- m2 # update previous model
sbc1 = m1$sbc
}
if (verbose)
{
message("number of predictand stations = ", nm)
message("min rainfall prob = ", 1 - max(fitted(m2, "nu")))
message("max rainfall prob = ", 1 - min(fitted(m2, "nu")))
}
## Now select the best of model using m2 to define the scope. We start with a
## model for nu, then sigma, then mu.
m4 = gamlss(as.formula(paste(S, "~1")), sigma.fo = ~1, nu.fo = ~1,
family = ZAGA(mu.link = identity, nu.link = cloglog),
data = ref_df, control = param)
# incorporate yesterday and seasonality
muf <<- as.formula(paste(S, paste(c(s, "yest", "pbc(doy, max.df = 2)"),
collapse = " + "), sep = " ~ "))
m3 = stepGAIC(m4, what = "mu", direction = "forward",
scope = list(lower = ~1, upper = muf),
k = log(nvalid), ncpus = cores, parallel = "multicore")
m3 = stepGAIC(m3, what = "sigma", direction = "forward",
scope = list(lower = ~1, upper = m2$sigma.formula),
k = log(nvalid), ncpus = cores, parallel = "multicore")
# re-include nu. Need to add it back first, as backward stepping is used so the
# full formula must be initially included. If this fails totally, return to the
# model we know was at least stable and ok
m3 = tryCatch(update(m3, what = "nu", formula = m2$nu.formula), error = m2_error)
m3 = stepGAIC(m3, what = "nu", direction = "backward",
scope = list(lower = ~1, upper = m3$nu.formula),
k = log(nvalid), ncpus = cores, parallel = "multicore")
# This should be done in line with the `stepGAICAll.A` procedure, but has minimal
# impact while taking substantial computational time
if (full_gaic)
{
m3 = stepGAIC(m3, what = "sigma", k = log(nvalid), ncpus = cores,
parallel = "multicore")
m3 = stepGAIC(m3, what = "mu", k = log(nvalid), ncpus = cores,
parallel = "multicore")
}
# update input dataframe with a single random realisation; ignore sigma
pred0 = predict(m3, what = "nu", newdata = df_fit, type = "response")
predmu = predict(m3, what = "mu", newdata = df_fit, type = "response")
dfwide[[S]] = rbinom(n = nt, p = 1 - pred0, size = 1) * predmu
if (savemodel)
{
saveRDS(m3, paste0(S, "_gamlss.rds"))
}
df_p0[[S]] = predict(m3, what = "nu", newdata = df_fit, type = "response")
df_mu[[S]] = predict(m3, what = "mu", newdata = df_fit, type = "response")
df_sigma[[S]] = predict(m3, what = "sigma", newdata = df_fit, type = "response")
sbcs[[S]] = m3$sbc
preds[[S]] = s
}
if (savemodel)
{
saveRDS(sbcs, "sbclist.rds")
saveRDS(preds, "predvars.rds")
}
rm(m2, m3, m4, ref_df) # remove variables with global scope
return(list(dfpred = dfwide, p0 = df_p0, mu = df_mu, sigma = df_sigma))
}
PKMpDpDPK
W--
estimators.RUT keke### This script contains functions used to produce rainfall estimates by day for each
### station to be used for gap-filling and data-cleaning
library(tidyverse)
library(magrittr)
scriptsroot = "~/Documents/Academic/PhD/Scripts/R/pptClean/share/gamlss_cleaner/"
source(paste0(scriptsroot, "helpers.R"))
expected = function(dfwide, dfcomp, cor_mat = cormat(dfwide, dfcomp),
remcols = c("date", "year", "month"), thres = 0.8, thresd = 0.01,
nmin = 4, nmax = 8, equivs = NULL, thres_cov = 0.95, verbose = T)
{
### This function supersedes ExpectedPPT Given a daily rainfall df (dfwide) with
### station data in columns labelled by station name and date-time related info in
### `remcols`, and a comparison dataframe from which to compute returns the expected
### rainfall by station and day, through mean-adjusted inverse-distance weighted
### infilling, using the reciprocal square Spearman's Rank correlation between
### stations as distance measure and neighbour station selector. cor_mat is a matrix
### of the correlations between station stations in dfwide and dfcomp thres is the
### default minimum correlation for selecting neighbouring stations thresd is the
### amount by which this threshold is recursively adjusted downwards until at least
### nmin stations are selected; or increased until the number of neighbouring
### stations is no more than nmax
# dummy function to use name-repair to get correct column names for vector
sfun = function(x)
{
return(S)
}
# returns a vector with mean daily rainfall by station
mus = dfcomp %>%
corrected_smeans(dfcomp = dfcomp, thres_cov = thres_cov, remcols = remcols)
# stations = dfwide %>% get_stations(remcols)
nt = nrow(dfwide)
predf = dfwide %>%
dplyr::select(any_of(remcols)) # initialise the prediction df with date cols
neighbours = neighbour_select(dfwide, dfcomp, remcols = remcols, nmin = nmin,
nmax = nmax, thresi = thres, thresd = thresd,
equivs = equivs, verbose = F)
stations = dfwide %>%
get_stations(remcols = remcols)
for (S in stations)
{
vec = dfwide %>%
pull(S)
mu0 = mean(vec, na.rm = T)
corvec = as.matrix(neighbours[[S]])[1, ] # get a named vector out
selected = names(corvec)
# if (verbose) { print(paste('station is', S)); print('selected stations')
# print(selected) }
# station df to use for prediction; transpose for row wise * and /
pred0 = (t(dfcomp %>%
dplyr::select(all_of(selected))) * corvec/(mus)[selected])
# number of valid data points, weighted by the station correlations
nv = colSums((!is.na(pred0)) * corvec)
# transpose, as order of R calculations is by row not column
pred = rowSums(t(pred0) * mu0/nv, na.rm = T)
## where no valid data are available from the chosen stations
# first find the station with highest correlation among those that have valid
# data for this entire set of days to ensure only one station is req'd
# all stations to consider
scols = setdiff(names(dfcomp %>%
dplyr::select(-any_of(remcols))), S)
# select the ones with complete valid data for all relevant time steps
scols = scols[which(colSums(is.na(dfcomp[nv == 0, scols])) == 0)]
# select the best one (returns the first if there are multiple)
srep = if(length(scols) > 1) {names(which.max(cor_mat[S, scols]))} else {scols}
pred[nv == 0] = dfcomp[[srep]][nv == 0] * mu0/mus[srep]
if (verbose)
{
message(S, " correlation between prediction and observation = ",
cor(pred, vec, method = "spearman", use = "pairwise.complete.obs"))
}
predf = bind_cols(predf, tibble(pred, .name_repair = sfun))
}
return(predf)
}
ts_expected = function(dfwide, dfcomp, cor_mat = cormat(dfwide, dfcomp), thres = 0.8,
thresd = 0.01, remcols = c("date", "year", "month"), equivs = NULL,
thres_cov = 0.95, nmin_day = 3, nmin = 6, nmax = 10, verbose = T)
{
### This function is a generalisation of `expected`, to be used primarily when
### time-shifting or performing similar operations (e.g. identifying suspiciously
### large rainfall values), where it is essential that a single station---at which a
### faulty reading may have been recorded on any particular day, does not dominate
### the expected rainfall series for any other station.
### Hence, this function has an additional parameter `nmin_day`, by default set to 3,
### which is used to ensure that the expected value of rainfall for all days is based
### on a minimum station set size. In addition, to reduce the likelihood of infinite
### loops or of multiple stations with questionable data influencing the rainfall
### expectation unduly, `nmin=6` and `nmax=10` by default.
### All other parameters and functioning are as with ts_expected. This function is
### included separately as the additional generalisation involves substantial
### additional computational cost to be avoided with the frequent calling of
### `expected` in `clean_fill()`.
# dummy function to use name-repair to get correct column names for vector must be in
# the scope of the function so that the value of S is known
sfun = function(x)
{
return(S)
}
# returns a vector with mean daily rainfall by station
mus = dfcomp %>%
corrected_smeans(dfcomp = dfcomp, thres_cov = thres_cov, remcols = remcols)
# stations = dfwide %>% get_stations(remcols)
nt = nrow(dfwide)
predf = dfwide %>%
dplyr::select(any_of(remcols)) # initialise the prediction df without date cols
neighbours = neighbour_select(dfwide, dfcomp, remcols = remcols, nmin = nmin,
nmax = nmax, thresi = thres, thresd = thresd,
equivs = equivs, verbose = F)
stations = dfwide %>%
get_stations(remcols = remcols)
controls = dfcomp %>%
get_stations(remcols = remcols)
for (S in stations)
{
vec = dfwide %>%
pull(S)
mu0 = mean(vec, na.rm = T)
corvec = as.matrix(neighbours[[S]])[1, ] # get a named vector out
selected = names(corvec)
# if (verbose) { print(paste('station is', S)) print('selected stations')
# print(selected) }
# station df to use for prediction; transpose for row wise * and /
pred0 = (t(dfcomp %>%
dplyr::select(all_of(selected))) * corvec/(mus)[selected])
# number of valid data points; nvw: weighted by the station correlations
nv = colSums(!is.na(pred0))
nvw = colSums((!is.na(pred0)) * corvec)
# transpose, as order of R calculations is by row not column
pred = rowSums(t(pred0) * mu0/nvw, na.rm = T)
## where no valid data are available from the chosen stations
# first find the station with highest correlation among those that have valid
# data for this entire set of days to ensure only one station is req'd
# all stations to consider
scols = setdiff(controls, S)
# select the longest one of each 'equiv' set; assume that it doesn't matter too
# much which is selected further; certainly don't want to be removing equivalents
# individually for all days. This does mean that sometimes a good station will
# not be included because an equivalent station will not be included while a
# near- equivalent that was considered is empty; this should not be too
# consequential
if (length(equivs) != 0)
{
for (eq in equivs)
{
len_eqs = colSums(!is.na(dfcomp[, scols[scols %in% eq]]))
retain = names(len_eqs)[which.max(len_eqs)]
# drop the stations in eq and not in retain
scols = setdiff(scols, setdiff(eq, retain))
}
}
# now cycle individually through the days when insufficient data are available
# select the best nmin stations in each case
for (d in which(nv < nmin_day))
{
scols_d = scols[which(!(is.na(dfcomp[d, scols])))]
corvec_d = (cor_mat[S, scols_d][order(cor_mat[S, scols_d],
decreasing = T)][1:nmin_day])^4
scols_d = names(corvec_d)
pred[d] = sum(mu0 * (dfcomp[d, ] %>% dplyr::select(all_of(scols_d)) *
corvec_d) / (mus[scols_d] * sum(corvec_d)))
}
if (verbose)
{
message(S, " correlation between prediction and observation = ",
cor(pred, vec, method = "spearman", use = "pairwise.complete.obs"))
}
predf = bind_cols(predf, tibble(pred, .name_repair = sfun))
}
return(predf)
}
nn_exp = function(dfwide, NN, NNlist, predf, cor_mat = NULL,
remcols = c("date", "year", "month"),
thres_cov = 0.925, verbose = T)
{
## This function supersedes NNexp
## dfwide (df) = data frame in wide format, with columns for station rainfall
## labelled by station names and columns for date-related info (labelled by remcols)
## NN (df) = data frame of the same structure as dfwide, to be used for nearest
## neighbour stations not included in dfwide
## predf (df) = data frame with the same structure as dfwide and NN, to be used where
## NNs are not specified or days when no NN has data
## NNlist = a list labelled by station, containing character vectors of names of
## stations selected as NNs
## cor_mat (Matrix) = Spearman-rank correlations between dfwide (rows) and dfwide+NN
## (cols); computed if not provided (i.e. when, as in default, = NULL)
NN = bind_cols(dfwide, NN %>%
dplyr::select(-any_of(remcols)))
if (is.null(cor_mat))
{
cor_mat = cormat(dfwide, NN, remcols = remcols)
}
stations = dfwide %>%
get_stations(remcols)
mus = NN %>%
corrected_smeans(dfcomp = NN, remcols = remcols, thres_cov = thres_cov)
nn_stations = names(NNlist) # stations for which nearest neighbours are provided
for (S in nn_stations)
{
neighbours = NNlist[[S]]
if (verbose)
{
message("Station is: ", S, ". Neighbhour(s) are: ")
print(neighbours)
}
corvec = cor_mat[S, neighbours]^4
# for some neighbour stations there is no overlap with original, so cor is NA In
# these cases assume that the other station is being used as a 'replacement' i.e.
# that their climatologies should be equal: hence covec = 1
corvec[is.na(corvec)] = 1
muNN = mus[neighbours]
mu0 = mus[S] # mean for this station
# nearest neighbour df for this station; transpose for row wise * and /
NN_s = t(NN %>%
dplyr::select(all_of(neighbours))) * corvec / muNN
# number of valid data points, weighted by the station correlations
nv = colSums((!is.na(NN_s)) * corvec)
NN_s = rowSums(t(NN_s * mu0) / nv, na.rm = T)
NN_s[nv == 0] = predf[[S]][nv == 0] # use old prediction where nn fails
predf[[S]] = NN_s
}
return(predf)
}
PK/?--PK-
WI @time_shifters.RUTkePK-
Wl~?`?` @Ghelpers.RUTkePK-
WBB
@cleaners.RUTkePK-
W{vmLL @Qcleanfill_fn.RUTkePK-
WMpDpD @gamlss_fit.RUTkePK-
W/?-- @Pestimators.RUTkePK~