1
|
# R script for batch parsing and loading GHCN daily station files
|
2
|
# (*.dly) into a SQLite database. Script will process all such files in
|
3
|
# the current working directory.
|
4
|
#
|
5
|
# As written, the script is intended to create and populate the database
|
6
|
# from scratch, reporting an error if it already exists. In principle
|
7
|
# though, the code that processes and loads a given *.dly file could be
|
8
|
# run on its own to load additional data into an already existing
|
9
|
# database file.
|
10
|
#
|
11
|
# At the moment, only TMIN and TMAX are loaded, but that can easily be
|
12
|
# changed.
|
13
|
#
|
14
|
# Jim Regetz
|
15
|
# NCEAS
|
16
|
# Created on 09-May-2012
|
17
|
|
18
|
require(RSQLite)
|
19
|
|
20
|
#-------------#
|
21
|
# "constants" #
|
22
|
#-------------#
|
23
|
|
24
|
# name of target db
|
25
|
db.path <- "ghcn_all.db"
|
26
|
|
27
|
# variables to keep
|
28
|
VARS <- c("TMIN", "TMAX")
|
29
|
|
30
|
# column characteristics of the *.dly data files
|
31
|
DLY.COLS <- c("character", "integer", "integer", "character",
|
32
|
rep(c("numeric", "character", "character", "character"), times=31))
|
33
|
NUM.WIDE.COLS <- 4 + 4*31
|
34
|
DAYS <- lapply(seq(from=5, to=NUM.WIDE.COLS, by=4), function(i) i:(i+3))
|
35
|
|
36
|
|
37
|
#------------------#
|
38
|
# helper functions #
|
39
|
#------------------#
|
40
|
|
41
|
# bulk insert helper function (adapted from RSQLite documentation)
|
42
|
ghcn_bulk_insert <- function(db, sql, dat) {
|
43
|
dbBeginTransaction(db)
|
44
|
dbGetPreparedQuery(db, sql, bind.data = dat)
|
45
|
dbCommit(db)
|
46
|
dbGetQuery(db, "select count(*) from ghcn")[[1]]
|
47
|
}
|
48
|
|
49
|
# shell out to OS to leverage grep/awk/tr for faster initial parsing and
|
50
|
# filtering of data; if no data records are read in, this function
|
51
|
# returns NULL
|
52
|
loadAsCSV <- function(dly, patt=NULL) {
|
53
|
awk <- paste(
|
54
|
"awk -v FIELDWIDTHS='",
|
55
|
paste(c(11, 4, 2, 4, rep(c(5,1,1,1), times=31)), collapse=" "),
|
56
|
"' -v OFS=',' '{ $1=$1 \"\"; print }'", sep="")
|
57
|
tr <- "tr -d ' '"
|
58
|
if (is.null(patt)) {
|
59
|
cmd <- paste(awk, dly, "|", tr)
|
60
|
} else {
|
61
|
patt <- shQuote(paste(patt, collapse="\\|"))
|
62
|
cmd <- paste("grep -e", patt, dly, "|", awk, "|", tr)
|
63
|
}
|
64
|
csv <- system(cmd, intern=TRUE)
|
65
|
if (length(csv)>0) {
|
66
|
read.csv(textConnection(csv), header=FALSE, colClasses=DLY.COLS)
|
67
|
} else {
|
68
|
NULL
|
69
|
}
|
70
|
}
|
71
|
|
72
|
# split data columnwise by day, then recombine into long format; note
|
73
|
# that the indexing here is hard-coded to work for the *.dly files, and
|
74
|
# simply assumes that they are all consistent
|
75
|
wideToLong <- function(dat, days) {
|
76
|
daily.data <- lapply(seq_along(days), function(i) {
|
77
|
dat <- data.frame(dat[1:4], day=i, dat[days[[i]]])
|
78
|
dat$srcrowid <- seq(nrow(dat))
|
79
|
names(dat) <- 1:ncol(dat)
|
80
|
dat
|
81
|
})
|
82
|
do.call("rbind", daily.data)
|
83
|
}
|
84
|
|
85
|
|
86
|
#-----------------#
|
87
|
# procedural code #
|
88
|
#-----------------#
|
89
|
|
90
|
# establish database connection
|
91
|
con <- dbDriver("SQLite")
|
92
|
if (file.exists(db.path)) {
|
93
|
stop("database already exists at ", db.path)
|
94
|
}
|
95
|
db <- dbConnect(con, dbname=db.path)
|
96
|
|
97
|
# create main ghcn table
|
98
|
sql <- "
|
99
|
CREATE TABLE ghcn (
|
100
|
id text,
|
101
|
year int,
|
102
|
month int,
|
103
|
element text,
|
104
|
day int,
|
105
|
value int,
|
106
|
mflag text,
|
107
|
qflag text,
|
108
|
sflag text,
|
109
|
srcrowid int)
|
110
|
"
|
111
|
dbGetQuery(db, sql)
|
112
|
|
113
|
# prepare sql insert statement
|
114
|
params.clist <- paste(rep("?", length(dbListFields(db, "ghcn"))),
|
115
|
collapse=", ")
|
116
|
sql <- paste("insert into ghcn values (", params.clist, ")", sep="")
|
117
|
|
118
|
# process and insert daily data
|
119
|
dailies <- list.files(pattern="*.dly")
|
120
|
for (file in dailies) {
|
121
|
dly <- loadAsCSV(file, VARS)
|
122
|
if (!is.null(dly)) {
|
123
|
long <- wideToLong(dly, DAYS)
|
124
|
ghcn_bulk_insert(db, sql, long)
|
125
|
} else {
|
126
|
message("no rows imported for ", file)
|
127
|
}
|
128
|
}
|