1
|
# R script for batch parsing and loading GHCN daily station files
|
2
|
# (*.dly) into a PostgreSQL database. Script will process all such files
|
3
|
# in the current working directory.
|
4
|
#
|
5
|
# As currently written, the script assumes that the 'ghcn' database
|
6
|
# already exists locally but has no tables, and that it can be accessed
|
7
|
# via 'ident' authentication (i.e., as the user executing the script,
|
8
|
# without password prompt).
|
9
|
#
|
10
|
# At the moment, PRCP, TMIN, and TMAX records are loaded.
|
11
|
#
|
12
|
# Jim Regetz
|
13
|
# NCEAS
|
14
|
# Created on 10-May-2012
|
15
|
|
16
|
require(RPostgreSQL)
|
17
|
|
18
|
#-------------#
|
19
|
# "constants" #
|
20
|
#-------------#
|
21
|
|
22
|
# location of ghcn daily data (on atlas)
|
23
|
datadir <- "/home/layers/data/climate/ghcn/ghcnd_all"
|
24
|
# output file
|
25
|
logfile <- "~/ghcn-psql-load.log"
|
26
|
|
27
|
# name of target db
|
28
|
db.name <- "ghcn"
|
29
|
|
30
|
# variables to keep
|
31
|
VARS <- c("PRCP", "TMIN", "TMAX")
|
32
|
|
33
|
# pre-generate several command statements to avoid doing the work
|
34
|
# repeatedly when processing each of the 75k daily files
|
35
|
AWK.CMD <- paste(
|
36
|
"awk -v FIELDWIDTHS='",
|
37
|
paste(c(11, 4, 2, 4, rep(c(5,1,1,1), times=31)), collapse=" "),
|
38
|
"' -v OFS=',' '{ $1=$1 \"\"; print }'", sep="")
|
39
|
SQL.UNION <- paste(
|
40
|
sprintf(
|
41
|
"(select id, year, month, %d as day, element,
|
42
|
value%d as value, sflag%d as sflag,
|
43
|
mflag%d as mflag, qflag%d as qflag from ghcn_wide)",
|
44
|
1:31, 1:31, 1:31, 1:31, 1:31),
|
45
|
collapse=" union all ")
|
46
|
|
47
|
#------------------#
|
48
|
# helper functions #
|
49
|
#------------------#
|
50
|
|
51
|
# Function invoking OS to preprocess data into CSV format with
|
52
|
# grep/awk/tr, then pipe directly into psql to invoke COPY statement for
|
53
|
# bulk load. Returns FALSE if filtering yields zero rows (or fails for
|
54
|
# some other reason), hence no data staged into the database.
|
55
|
loadAsCSV <- function(dly, patt=NULL) {
|
56
|
tmpfile <- tempfile(tmpdir="/tmp")
|
57
|
tr <- "tr -d ' '"
|
58
|
if (is.null(patt)) {
|
59
|
cmd <- paste(AWK.CMD, dly, "|", tr)
|
60
|
} else {
|
61
|
patt <- shQuote(paste(patt, collapse="\\|"))
|
62
|
cmd <- paste("grep -e", patt, dly, "|", AWK.CMD, "|", tr)
|
63
|
}
|
64
|
cmd <- paste(cmd, tmpfile, sep=" > ")
|
65
|
if (system(cmd)==0 & 0<file.info(tmpfile)$size) {
|
66
|
dbGetQuery(db,
|
67
|
sprintf("COPY ghcn_wide FROM '%s' WITH CSV", tmpfile))
|
68
|
file.remove(tmpfile)
|
69
|
TRUE
|
70
|
} else {
|
71
|
FALSE
|
72
|
}
|
73
|
}
|
74
|
|
75
|
# Function to make postgresl move data from the wide-form table (days
|
76
|
# across columns) into the long form table (unique row for each
|
77
|
# day*element).
|
78
|
wideToLong <- function(db) {
|
79
|
dbGetQuery(db, paste("insert into ghcn", SQL.UNION))
|
80
|
}
|
81
|
|
82
|
|
83
|
#-----------------#
|
84
|
# procedural code #
|
85
|
#-----------------#
|
86
|
|
87
|
# establish database connection
|
88
|
drv <- dbDriver("PostgreSQL")
|
89
|
db <- dbConnect(drv, dbname=db.name)
|
90
|
|
91
|
# create ghcn staging table
|
92
|
dbGetQuery(db, paste(
|
93
|
"CREATE TABLE ghcn_wide (
|
94
|
id char(11),
|
95
|
year integer,
|
96
|
month integer,
|
97
|
element char(4), ",
|
98
|
paste(sprintf("value%d integer, mflag%d char(1),
|
99
|
qflag%d char(1), sflag%d char(1)", 1:31, 1:31, 1:31, 1:31),
|
100
|
collapse=", "),
|
101
|
")", sep=""))
|
102
|
|
103
|
# create main ghcn table
|
104
|
dbGetQuery(db, paste(
|
105
|
"CREATE TABLE ghcn (
|
106
|
id char(11),
|
107
|
year integer,
|
108
|
month integer,
|
109
|
day integer,
|
110
|
element char(4),
|
111
|
value integer,
|
112
|
mflag char(1),
|
113
|
qflag char(1),
|
114
|
sflag char(1)
|
115
|
)"))
|
116
|
|
117
|
# process and insert daily data
|
118
|
dailies <- list.files(datadir, pattern="*.dly")
|
119
|
for (file in dailies) {
|
120
|
cat(date(), "\t", file=logfile, append=TRUE)
|
121
|
if (loadAsCSV(file.path(datadir, file), VARS)) {
|
122
|
wideToLong(db)
|
123
|
dbGetQuery(db, 'delete from ghcn_wide')
|
124
|
}
|
125
|
cat(file, "\n", file=logfile, append=TRUE)
|
126
|
}
|
127
|
|
128
|
dbDisconnect(db)
|
129
|
|
130
|
# alternative approach: load some number (>1) of station data files into
|
131
|
# the wide staging table before applying the wideToLong step; initial
|
132
|
# testing suggests that this approach is a little faster than
|
133
|
# one-at-a-time, but I don't know what the optimal number would be,
|
134
|
# particularly given that the amount of data per file is variable
|
135
|
#BATCH.SIZE <- 10
|
136
|
#counter <- 1
|
137
|
#for (file in dailies) {
|
138
|
# loadAsCSV(file.path(datadir, file), VARS)
|
139
|
# if (counter %% BATCH.SIZE == 0) {
|
140
|
# wideToLong(db)
|
141
|
# dbGetQuery(db, 'delete from ghcn_wide')
|
142
|
# }
|
143
|
# counter <- counter + 1
|
144
|
#}
|