From start to finish
Let’s pull all these steps together into a few functions.
The script below shows an example workflow that starts with 24-hour air toxics data from MPCA’s Air Toxics database, walks through each of the analysis steps outlined in the previous sections, and produces a suite of data validation tools, POC analysis tools, completeness checks, annual summaries, site comparisons, and pollution roses.
10.1 Using functions
The first step is to put our analysis methods into a few reusable functions, which can then be re-used for all of our monitoring conditions.
Click below to view the example functions.
# Functions
install.MPCAair.packages = function() {
install.packages(c(
"easypackages",
"tidyverse",
"data.table",
"nortest",
"car",
"DT",
"RcppRoll",
"shiny",
"rsconnect",
"EnvStats",
"openair",
"ggbeeswarm",
"reshape",
"corrplot",
"RODBC"
))
}
MPCA_air_libraries = function() {
library(easypackages)
libraries(c(
"tidyverse",
"lubridate",
"stats",
"nortest",
"car",
"DT",
"RcppRoll",
"shiny",
"rsconnect",
"EnvStats",
"openair",
"ggbeeswarm",
"reshape",
"corrplot",
"RODBC"
))
}
flag <- function(data) {
library(tidyverse)
library(lubridate)
data <- mutate(data,
AQSID_flag = is.na(AQSID),
POC_flag = is.na(as.numeric(POC) ) | as.numeric(POC) < 0 | as.numeric(POC) > 4,
Parameter_flag = is.na(as.numeric(Parameter)) | nchar(as.character(Parameter)) != 5,
Date_flag = is.na(ymd(Date)),
Result_flag = !is.na(Result) & (abs(as.numeric(Result) ) >= 900 | is.na(as.numeric(Result) ) ),
Null_flag = (is.na(Result) & is.na(Null_Data_Code) ) | (!is.na(Result) & !is.na(Null_Data_Code) ),
MDL_flag = is.character(MDL) | MDL < 0,
Pollutant_flag = is.na(Pollutant),
any_flag = (AQSID_flag + POC_flag + Parameter_flag + Date_flag + Result_flag + Null_flag +
MDL_flag + Pollutant_flag) > 0
)
}
remove_flagged <- function(data) {
library(tidyverse)
return(filter(data, !any_flag) %>%
select(-contains("flag") ) %>%
mutate(Result = as.numeric(Result) ) )
}
flag_duplicates = function(data) {
library(tidyverse)
return(data %>% group_by(AQSID, POC, Parameter, Date) %>% mutate(Count = n(), duplicate_flag = Count > 1))
}
average_duplicates = function(data) {
library(tidyverse)
dupe_averaging = function(Result, Censored) {
Result = Result[!is.na(Result)]
Censored = Censored[!is.na(Censored)]
if(all(Censored, na.rm = T)) {
return (mean(Result, na.rm = T))
}
else {
return (mean(Result[!Censored], na.rm = T ) )
}
}
data = data %>% group_by(AQSID, POC, Parameter, Pollutant, Date, MDL) %>% mutate(Result = dupe_averaging(Result, Censored), Censored = all(Censored, na.rm = T) ) %>% slice(1) %>% ungroup() %>%
mutate(Result = ifelse(is.na(Result), NA, Result), Censored = ifelse(is.na(Result), NA, Censored))
return (data)
}
time_series_plots = function(data) {
library(shiny)
library(tidyverse)
library(RcppRoll)
library(DT)
library(rsconnect)
data <- mutate(data, sitePOC = paste0(AQSID,"-", POC) )
pollutant <- unique(data$Pollutant)
site <- unique(data$sitePOC)
shinyApp(
ui = fluidPage(
fluidRow(
column(3,
style = "padding-bottom: 20px;",
inputPanel(
selectInput("pollutant", label="Choose a pollutant", choices = pollutant,
selected="Benzene"),
selectInput("site", label="Choose a site", choices = site, selected=270535501),
dateRangeInput("date", label = "Select date range", start = "2009-01-01", end =
"2013-12-31", min = "2009-01-01", max = "2013-12-31") ) ),
column(9,
plotOutput('detlim', height = "400px")))),
server = function(input, output) {
output$detlim <- renderPlot({
data_sub = filter(data, Pollutant==input$pollutant, sitePOC == input$site, Date >= input$date[1], Date <= input$date[2])
ggplot(data=data_sub, aes(x= Date, y=Result)) +
geom_point(aes(color=Censored), size =3, alpha=0.55) +
geom_line() +
scale_x_date(date_labels = "%D") +
xlab(NULL) +
ylab("Result (ug/m3)") +
expand_limits(y=c(0, max(data_sub$Result))) +
scale_colour_manual(values= c("#197519"[FALSE %in% unique(data_sub$Censored)], "#0000FF"[TRUE %in% unique(data_sub$Censored)]), breaks=c(FALSE, TRUE)) +
theme(text = element_text(size=15), axis.text.x = element_text(angle = -90, vjust = 0.3, size=14)) +
ggtitle(paste0("Time series for ", input$pollutant, " at site ", input$site))
})
})
}
POC_compare = function(data) {
library(shiny)
library(tidyverse)
library(rsconnect)
data <- distinct(data, AQSID, POC, Parameter, Date, Pollutant, Year, .keep_all = T) #replace with better cleaning function
data <- spread(data, POC, Result) %>%
mutate(Status = ifelse(`1` < MDL & `2` < MDL, "POCs 1 and 2 below MDL", ifelse(`1` < MDL, "POC 1 below MDL", ifelse(`2` < MDL, "POC 2 below MDL", "POCs 1 and 2 above MDL") ) ) ) %>%
drop_na(Status)
Pollutant <- unique(data$Pollutant)
Site <- unique(data$AQSID)
shinyApp(
ui = fluidPage(
fluidRow(
column(3,
style = "padding-bottom: 20px;",
inputPanel(
selectInput("Pollutant", label="Choose a pollutant", choices = Pollutant),
selectInput("Site", label="Choose a site", choices = Site),
dateRangeInput("date", label = "Select date range", start = "2009-01-01", end =
"2013-12-31", min = "2009-01-01", max = "2013-12-31"))),
column(9,
plotOutput('normviz', height = "500px")))),
server = function(input, output) {
output$normviz <- renderPlot({
data_sub = filter(data, Pollutant==input$Pollutant, AQSID == input$Site, Date >= input$date[1], Date <= input$date[2])
ggplot(data_sub, aes(x = `1`, y = `2`, color = Status)) +
geom_point(size = 3) +
geom_segment(x=-1000, xend=1000, y=-1000, yend=1000, color="red", size=1) +
labs(title = "POC comparison chart", x = "POC 1", y = "POC 2", subtitle = paste("Correlation =", round(cor(data_sub$`1`,data_sub$`2`, use = "complete"), 2) ) )
})
})
}
POC_average = function(data) {
library(tidyverse)
POC_averaging = function(Result, Censored) {
Result = Result[!is.na(Result)]
Censored = Censored[!is.na(Censored)]
if(all(Censored, na.rm = T)) {
return (mean(Result, na.rm = T))
}
else {
return (mean(Result[!Censored], na.rm = T ) )
}
}
data = data %>% group_by(AQSID, Parameter, Pollutant, Date, MDL) %>% mutate(Result = POC_averaging(Result, Censored), Censored = all(Censored, na.rm = T) ) %>% slice(1) %>% select(-POC) %>% ungroup() %>%
mutate(Result = ifelse(is.na(Result), NA, Result), Censored = ifelse(is.na(Result), NA, Censored))
return (data)
}
completeness_check = function(data) {
# Create a sampling calendar based on EPA's air toxics monitoring schedule
sample_calendar <- function(start = "2012-01-01",
end = "2016-12-31",
day_interval = 6,
type = "air_toxics") {
library(lubridate)
# Convert 'start' and 'end' to class date
start <- ymd(start)
end <- ymd(end)
# Set official start date to selected EPA calendar
if(type == "air_toxics") {
epa_start <- ymd("1989-12-24")
} else {
epa_start <- start
}
# Create full table of sampling dates
calendar <- seq(from = epa_start,
to = end,
by = paste(day_interval, "days"))
# Subset to user's date range
calendar <- calendar[calendar >= start & calendar <= end]
return(calendar)
}
# Find the year range of your data
date_range <- range(data$Date)
# Create expected sample calendar
epa_schedule <- tibble(Date = sample_calendar(start = format(date_range[1], "%Y-01-01"), #Extend range to first day of the year
end = format(date_range[2], "%Y-12-31"), #Extend range to last day of the year
day_interval = 6))
# Add year and calendar quarter columns
epa_schedule <- epa_schedule %>% mutate(Year = year(Date),
cal_quarter = quarter(Date))
# Count the expected number of samples per quarter and year.
epa_schedule <- epa_schedule %>%
group_by(Year, cal_quarter) %>%
summarize(expected_quarter_samples = length(unique(Date))) %>%
group_by(Year) %>%
mutate(expected_annual_samples = sum(expected_quarter_samples))
# Assign each date to a calendar quarter
data <- data %>% mutate(cal_quarter = quarter(Date))
# Count the number of sampling dates for each quarter and year.
data <- data %>%
group_by(AQSID, Parameter, Pollutant, Year, cal_quarter) %>%
mutate(valid_quarter_samples = length(unique(Date[!is.na(Result)]))) %>%
group_by(AQSID, Parameter, Pollutant, Year) %>%
mutate(valid_annual_samples = length(unique(Date[!is.na(Result)])))
# Join expected sample table to data by quarter and year columns
data <- left_join(data, epa_schedule, by = c("Year", "cal_quarter"))
# Divide valid samples by expected samples
data <- data %>%
group_by(AQSID, Parameter, Pollutant, Year, cal_quarter) %>%
summarise(pct_quarter_samples = round(valid_quarter_samples[1] / expected_quarter_samples[1], 2)) %>%
mutate(Complete = pct_quarter_samples >= 0.75) %>%
group_by(AQSID, Parameter, Pollutant, Year) %>%
summarise(Complete = sum(Complete, na.rm = T) == 4,
lowest_quarter = ifelse(n() == 4, min(pct_quarter_samples, na.rm = T), 0) )
return(data %>% select(AQSID, Parameter, Pollutant, Year, Complete, lowest_quarter) %>% ungroup() )
}
UCL_95 = function(data, Boot_Repeats = 1000) {
library(EnvStats)
set.seed(2017)
annual_AT_means = function(air_toxics) {
air_toxics = mutate(air_toxics, Year = year(ymd(Date)), Quarter = quarter(ymd(Date)) )
sample_complete = air_toxics %>% completeness_check()
enough_detects = air_toxics %>% group_by(AQSID, Parameter, Pollutant, Year) %>% summarise(Detected = mean(Censored, na.rm = T) <= 0.8 )
site_means = air_toxics %>% group_by(AQSID, Parameter, Pollutant, Year) %>% summarise(Mean = ifelse(length(unique(Result[!is.na(Result) & !Censored] ) ) < 2, NA,
ifelse (any(Censored, na.rm = T), elnormAltCensored(Result, Censored, method = "impute.w.mle", ci = F)$parameters[[1]], mean(Result, na.rm = T) ) ) )
site_means = left_join(site_means, sample_complete, by = c("AQSID", "Parameter", "Pollutant", "Year") ) %>%
left_join(enough_detects, by = c("AQSID", "Parameter", "Pollutant", "Year") ) %>% mutate(Mean = ifelse(Complete & Detected, Mean, NA), ID = paste(AQSID, Parameter, Pollutant, Year) )
return(site_means)
}
MLE_est <- function(data){
results = data$Result
censored = data$Censored
n = sum(!is.na(results))
if (length(unique(results[!is.na(results) & !censored] ) ) < 2 ) {
MLE_means = NA
}
else {
random.rows = NULL
random.rows = sample(which(!is.na(censored) & (!censored) & !duplicated(results) ), 2, replace = FALSE)
random.rows = c(random.rows, sample(which(!is.na(censored)), n-2, replace = TRUE))
MLE_means = ifelse(sum(censored[random.rows], na.rm = T) == 0, mean(results[random.rows]), elnormAltCensored(results[random.rows], censored[random.rows], method = "impute.w.mle", ci = F)$parameters[[1]] )
}
return(MLE_means)
}
data = mutate(data, ID = paste(AQSID, Parameter, Pollutant, Year), Result = ifelse(Censored, MDL, Result) )
Bootstrap_means = replicate(Boot_Repeats, (by(data, data$ID, MLE_est) ) )
CL = apply(Bootstrap_means, 1, function(x) sort(x)[ceiling(0.95 * Boot_Repeats)] )
CL = data.frame(ID = names(CL), UCL95 = unname(CL))
annual_summary = left_join(annual_AT_means(data), CL, by = "ID") %>% mutate(UCL95 = ifelse(Complete & Detected, UCL95, NA) ) %>% select(-ID)
return(annual_summary)
}
site_compare = function(data, site_number, Boot_Repeats = 1000) {
library(EnvStats)
set.seed(2017)
annual_AT_means = function(air_toxics) {
air_toxics = mutate(air_toxics, Year = year(ymd(Date)), Quarter = quarter(ymd(Date)) )
sample_complete = air_toxics %>% completeness_check()
enough_detects = air_toxics %>% group_by(AQSID, Parameter, Pollutant, Year) %>% summarise(Detected = mean(Censored, na.rm = T) <= 0.8 )
site_means = air_toxics %>% group_by(AQSID, Parameter, Pollutant, Year) %>% summarise(Mean = ifelse(length(unique(Result[!is.na(Result) & !Censored] ) ) < 2, NA,
ifelse (any(Censored, na.rm = T), elnormAltCensored(Result, Censored, method = "impute.w.mle", ci = F)$parameters[[1]], mean(Result, na.rm = T) ) ) )
site_means = left_join(site_means, sample_complete, by = c("AQSID", "Parameter", "Pollutant", "Year") ) %>%
left_join(enough_detects, by = c("AQSID", "Parameter", "Pollutant", "Year") ) %>% mutate(Mean = ifelse(Complete & Detected, Mean, NA), ID = paste(AQSID, Parameter, Pollutant, Year) )
return(site_means)
}
MLE_est <- function(data){
results = data$Result
censored = data$Censored
n = sum(!is.na(results))
if (length(unique(results[!is.na(results) & !censored] ) ) < 2 ) {
MLE_means = NA
}
else {
random.rows = NULL
random.rows = sample(which(!is.na(censored) & (!censored) & !duplicated(results) ), 2, replace = FALSE)
random.rows = c(random.rows, sample(which(!is.na(censored)), n-2, replace = TRUE))
MLE_means = ifelse(sum(censored[random.rows], na.rm = T) == 0, mean(results[random.rows]), elnormAltCensored(results[random.rows], censored[random.rows], method = "impute.w.mle", ci = F)$parameters[[1]] )
}
return(MLE_means)
}
data = mutate(data, Result = ifelse(Censored, MDL, Result), ID = paste(AQSID, Parameter, Pollutant, Year))
Bootstrap_means = replicate(Boot_Repeats, (by(data, data$ID, MLE_est) ) )
Bootstrap_means = rownames_to_column(as.data.frame(Bootstrap_means), "ID" )
Bootstrap_means = right_join(annual_AT_means(data), Bootstrap_means, by = "ID")
Bootstrap_means = Bootstrap_means %>% group_by(Parameter, Pollutant, Year) %>% arrange(desc(AQSID == site_number), .by_group = T ) %>%
group_by(Parameter, Pollutant, Year) %>% mutate_at(vars(num_range("V", 1:Boot_Repeats)), funs(c(first(.), (. - first(.))[-1])) ) %>% ungroup()
LB = select(Bootstrap_means, num_range("V", 1:Boot_Repeats) ) %>% apply(1, function(x) sort(-x)[floor(0.025 * Boot_Repeats)] )
UB = select(Bootstrap_means, num_range("V", 1:Boot_Repeats) ) %>% apply(1, function(x) sort(-x)[ceiling(0.975 * Boot_Repeats)] )
CI = data.frame(Lower = LB, Upper = UB)
CI = bind_cols(CI, Bootstrap_means) %>% select(Lower:ID) %>% group_by(Pollutant, Year) %>%
mutate(Lower = ifelse(any(AQSID == site_number & Complete & Detected) & AQSID != site_number & Complete &
Detected, Lower, NA), Upper = ifelse(any(AQSID == site_number & Complete & Detected) & AQSID !=
site_number & Complete & Detected, Upper, NA), Comparison = ifelse(Lower > 0, "Higher", ifelse(Upper < 0,
"Lower", "Same") ) )
return(CI %>% ungroup() )
}
correlation_plots = function(data, site) {
library(tidyverse)
library(corrplot)
data_site <- filter(data, AQSID %in% site) %>% select(Date, Pollutant, Result)
analytes <- spread(data_site, Pollutant, Result, drop=T)
analytes$Date <- NULL
coranalytes <- cor(analytes, method="kendall", use="pairwise.complete.obs") %>% as.data.frame()
coranalytes <- select_if(coranalytes, function(x) !all(is.na(x))) %>% filter_all(any_vars(!is.na(.))) %>%
as.matrix()
rownames(coranalytes) <- colnames(coranalytes)
return(corrplot(coranalytes, method = "circle", type="lower", tl.cex=0.6) )#plot matrix
}
correlation_plots = function(data) {
library(tidyverse)
library(corrplot)
site <- unique(data$AQSID)
shinyApp(
ui = fluidPage(
fluidRow(
column(3,
style = "padding-bottom: 20px;",
inputPanel(
selectInput("site", label="Choose a site", choices = site),
dateRangeInput("date", label = "Select date range", start = "2009-01-01", end =
"2013-12-31", min = "2009-01-01", max = "2013-12-31"))),
column(9,
plotOutput('normviz', height = "500px")))),
server = function(input, output) {
output$normviz <- renderPlot({
data_sub = filter(data, AQSID==input$site, Date >= input$date[1], Date <=
input$date[2]) %>% select(Date, Pollutant, Result)
analytes <- spread(data_sub, Pollutant, Result, drop=T)
analytes$Date <- NULL
coranalytes <- cor(analytes, method="kendall", use="pairwise.complete.obs") %>% as.data.frame()
coranalytes <- select_if(coranalytes, function(x) !all(is.na(x))) %>% filter_all(any_vars(!is.na(.))) %>%
as.matrix()
rownames(coranalytes) <- colnames(coranalytes)
dummy_obj = corrplot(coranalytes, method = "circle", type="lower", tl.cex=0.6)
})
})
}
beeswarm_plot = function(data) {
library(dplyr)
library(ggbeeswarm)
library(ggplot2)
data = mutate(data, AQSID = as.character(AQSID) )
Pollutant <- unique(data$Pollutant)
Year <- unique(data$Year)
shinyApp(
ui = fluidPage(
fluidRow(
column(3,
style = "padding-bottom: 20px;",
inputPanel(
selectInput("Pollutant", label="Choose a pollutant", choices = Pollutant),
dateRangeInput("date", label = "Select date range", start = "2009-01-01", end =
"2013-12-31", min = "2009-01-01", max = "2013-12-31") ) ),
column(9,
plotOutput('normviz', height = "500px")))),
server = function(input, output) {
output$normviz <- renderPlot({
data_sub = filter(data, Pollutant==input$Pollutant, Date >= input$date[1], Date <=
input$date[2], !is.na(Result) )
ggplot(data_sub, aes(y = AQSID, x = Result, color = Censored) ) +
geom_quasirandom(groupOnX=F) +
labs(title = paste(data_sub$Pollutant[1]), x = "Result (ug/m^3)" )
})
})
}
read_met_data_tableau <- function(years = 2009:2017, stations = "MSP") {
library(tidyverse)
years <- paste0(years, collapse = ",")
stations <- paste0(stations, collapse = ",")
url <- paste0("http://tableau.pca.state.mn.us/views/WeatherObservations2009-2017/HourTable.csv?Station=",
stations, "&Year=", years, "&Month=1,2,3,4,5,6,7,8,9,10,11,12")
met_data <- read_csv(url, col_types = "??????????-") %>%
select(Station, Year, Month, Day, Hour, everything()) %>%
arrange(Station, Year, Month, Day, Hour)
return(met_data)
}
pollution_roses = function(data, met_data, num_breaks = 5) {
# Met data must be in Tableau format
library(tidyverse)
library(openair)
library(reshape)
library(shiny)
library(rsconnect)
data$Date <- ymd(data$Date)
met_data <- dplyr::rename(met_data, wd = `Wind Dir`, ws = `Wind Spd MPH`) %>%
mutate(date = paste0(Year,"/",Month,"/",Day," ",Hour,":00"), date = ymd_hm(date)) %>%
select(-Day, -Month, -Hour, -Year) %>%
timeAverage(avg.time = "day") %>%
mutate(date = ymd(date))
data <- left_join(data, met_data, by = c("Date" = "date"))
Pollutant <- unique(data$Pollutant)
Site <- unique(data$AQSID)
shinyApp(
ui = fluidPage(
fluidRow(
column(3,
style = "padding-bottom: 20px;",
inputPanel(
selectInput("Pollutant", label="Choose a pollutant", choices = Pollutant),
selectInput("Site", label="Choose a site", choices = Site),
dateRangeInput("date", label = "Select date range", start = "2009-01-01", end =
"2013-12-31", min = "2009-01-01", max = "2013-12-31"))),
column(9,
plotOutput('normviz', height = "500px")))),
server = function(input, output) {
output$normviz <- renderPlot({
data_sub = filter(data, Pollutant==input$Pollutant, AQSID == input$Site, Date >= input$date[1], Date <=
input$date[2], !is.na(Result))
data_sub = data_sub %>% mutate(MDL = max(MDL), minimum = min(Result), maximum = max(Result), Result =
ifelse(Censored, 1e-16, Result))
breaks_site = NULL
if(!all(data_sub$Censored)){
breaks_site = c(breaks_site, 0,
round_any( c(data_sub$MDL[1], data_sub$MDL[1] + (data_sub$maximum[1] - data_sub$MDL[1]) *
(1:(num_breaks-1) / (num_breaks-1) ) ), 0.0001, ceiling ) )
pollutionRose(data_sub, statistic = "abs.count", pollutant = "Result", breaks = breaks_site,
key.footer="ug/m3", main=paste("Daily Average Pollution Rose for",
data_sub$Pollutant[1],"\n", data_sub$AQSID[1]) )
}
else {
breaks_site = c(breaks_site, c(0, round_any( c(data_sub$MDL[1], 2*data_sub$MDL[1] ), 0.0001, ceiling ) ) )
pollutionRose(data_sub, statistic = "abs.count", pollutant = "Result", breaks = breaks_site,
key.footer="ug/m3", main=paste("Daily Average Pollution Rose for",
data_sub$Pollutant[1],"\n", data_sub$AQSID[1]) )
}
})
})
}
10.2 A simpler analysis
Now we use our functions above to write a simpler and easier to read analysis script.
Click below to view the example.
library(tidyverse)
MPCA_air_libraries()
#Import data
data <- read_csv('https://raw.githubusercontent.com/MPCA-air/air-methods/master/airtoxics_data_2009_2013.csv')
names(data)[1:10] <- c("AQSID", "POC", "Parameter", "Date","Result",
"Null_Data_Code", "MDL", "Pollutant", "Year", "CAS")
# Data cleaning
flagged <- data %>% flag() #Check with QA about flagged values if necessary.
duplicates_flagged <- data %>% flag_duplicates()
data <- data %>% mutate(Censored = Result < MDL) %>% average_duplicates() #We decide to average the duplicates here
# Data Validation
time_series <- data %>% time_series_plots()
# Collocated monitors
poc_comparisons <- data %>% POC_compare()
data <- data %>% POC_average()
# Completeness checks
complete <- data %>% completeness_check()
# Summary statistics
annual_summary <- data %>% filter(AQSID == 270370020) %>% UCL_95(100)
# Site Comparisons
site_number <- 270370020 #Our favorite site
comparisons_to_FH <- data %>%
filter(AQSID %in% c(270370020, 270370470, 271230871) ) %>%
site_compare(site_number, 50)
pollutant_correlations <- data %>% correlation_plots()
beeswarms <- data %>% beeswarm_plot()
# Pollution Roses
poll_rose_data <- data
met_data <- read_met_data_tableau(2009:2013, "MSP")
num_breaks <- 5
pol_roses <- poll_rose_data %>% pollution_roses(met_data, num_breaks)