1
|
# R script for batch parsing and loading GHCN daily station files
|
2
|
# (*.dly) into a PostgreSQL database. The appropriate GHCN files are
|
3
|
# assumed to have been downloaded to the location specified by
|
4
|
# 'ghcndir', with the daily files themselves in a "ghcnd_all"
|
5
|
# subdirectory exactly as generated by unpacking "ghcnd_all.tar.gz";
|
6
|
# note that for the purposes of storage efficiency, we're not currently
|
7
|
# keeping these uncompressed files on disk, so this tarball would need
|
8
|
# to be unpacked again if for some reason this script needs to be
|
9
|
# re-run.
|
10
|
#
|
11
|
# The station data (names, locations) are now also imported as a
|
12
|
# separate 'stations' table in the database.
|
13
|
#
|
14
|
# As currently written, the script assumes that the 'ghcn' database
|
15
|
# already exists locally but has no tables, and that it can be accessed
|
16
|
# via 'ident' authentication (i.e., as the user executing the script,
|
17
|
# without password prompt).
|
18
|
#
|
19
|
# At the moment, PRCP, TMIN, and TMAX records are loaded.
|
20
|
#
|
21
|
# Jim Regetz
|
22
|
# NCEAS
|
23
|
# Created on 10-May-2012
|
24
|
|
25
|
require(RPostgreSQL)
|
26
|
|
27
|
#-------------#
|
28
|
# "constants" #
|
29
|
#-------------#
|
30
|
|
31
|
# location of ghcn daily data (on atlas)
|
32
|
ghcndir <- "/home/layers/data/climate/ghcn/v2.92-upd-2012052822"
|
33
|
# output file
|
34
|
logfile <- "~/ghcn-psql-load.log"
|
35
|
|
36
|
# name of target db
|
37
|
db.name <- "ghcn"
|
38
|
|
39
|
# variables to keep
|
40
|
VARS <- c("PRCP", "TMIN", "TMAX")
|
41
|
|
42
|
# pre-generate several command statements to avoid doing the work
|
43
|
# repeatedly when processing each of the 75k daily files
|
44
|
AWK.CMD <- paste(
|
45
|
"awk -v FIELDWIDTHS='",
|
46
|
paste(c(11, 4, 2, 4, rep(c(5,1,1,1), times=31)), collapse=" "),
|
47
|
"' -v OFS=',' '{ $1=$1 \"\"; print }'", sep="")
|
48
|
SQL.UNION <- paste(
|
49
|
sprintf(
|
50
|
"(select id, year, month, %d as day, element,
|
51
|
value%d as value, sflag%d as sflag,
|
52
|
mflag%d as mflag, qflag%d as qflag from ghcn_wide)",
|
53
|
1:31, 1:31, 1:31, 1:31, 1:31),
|
54
|
collapse=" union all ")
|
55
|
|
56
|
#------------------#
|
57
|
# helper functions #
|
58
|
#------------------#
|
59
|
|
60
|
# Function invoking OS to preprocess data into CSV format with
|
61
|
# grep/awk/tr, then pipe directly into psql to invoke COPY statement for
|
62
|
# bulk load. Returns FALSE if filtering yields zero rows (or fails for
|
63
|
# some other reason), hence no data staged into the database.
|
64
|
loadAsCSV <- function(dly, patt=NULL) {
|
65
|
tmpfile <- tempfile(tmpdir="/tmp")
|
66
|
tr <- "tr -d ' '"
|
67
|
if (is.null(patt)) {
|
68
|
cmd <- paste(AWK.CMD, dly, "|", tr)
|
69
|
} else {
|
70
|
patt <- shQuote(paste(patt, collapse="\\|"))
|
71
|
cmd <- paste("grep -e", patt, dly, "|", AWK.CMD, "|", tr)
|
72
|
}
|
73
|
cmd <- paste(cmd, tmpfile, sep=" > ")
|
74
|
if (system(cmd)==0 & 0<file.info(tmpfile)$size) {
|
75
|
dbGetQuery(db,
|
76
|
sprintf("COPY ghcn_wide FROM '%s' WITH CSV", tmpfile))
|
77
|
file.remove(tmpfile)
|
78
|
TRUE
|
79
|
} else {
|
80
|
FALSE
|
81
|
}
|
82
|
}
|
83
|
|
84
|
# Function to make postgresl move data from the wide-form table (days
|
85
|
# across columns) into the long form table (unique row for each
|
86
|
# day*element).
|
87
|
wideToLong <- function(db) {
|
88
|
dbGetQuery(db, paste("insert into ghcn", SQL.UNION))
|
89
|
}
|
90
|
|
91
|
|
92
|
#-----------------#
|
93
|
# procedural code #
|
94
|
#-----------------#
|
95
|
|
96
|
# establish database connection
|
97
|
drv <- dbDriver("PostgreSQL")
|
98
|
db <- dbConnect(drv, dbname=db.name)
|
99
|
|
100
|
# create ghcn staging table
|
101
|
dbGetQuery(db, paste(
|
102
|
"CREATE TABLE ghcn_wide (
|
103
|
id char(11),
|
104
|
year integer,
|
105
|
month integer,
|
106
|
element char(4), ",
|
107
|
paste(sprintf("value%d integer, mflag%d char(1),
|
108
|
qflag%d char(1), sflag%d char(1)", 1:31, 1:31, 1:31, 1:31),
|
109
|
collapse=", "),
|
110
|
")", sep=""))
|
111
|
|
112
|
# create main ghcn table
|
113
|
dbGetQuery(db, paste(
|
114
|
"CREATE TABLE ghcn (
|
115
|
id char(11),
|
116
|
year integer,
|
117
|
month integer,
|
118
|
day integer,
|
119
|
element char(4),
|
120
|
value integer,
|
121
|
mflag char(1),
|
122
|
qflag char(1),
|
123
|
sflag char(1)
|
124
|
)"))
|
125
|
|
126
|
# process and insert daily data
|
127
|
dailies <- list.files(file.path(ghcndir, "ghcnd_all"), pattern="*.dly")
|
128
|
for (file in dailies) {
|
129
|
cat(date(), "\t", file=logfile, append=TRUE)
|
130
|
if (loadAsCSV(file.path(ghcndir, "ghcnd_all", file), VARS)) {
|
131
|
wideToLong(db)
|
132
|
dbGetQuery(db, 'delete from ghcn_wide')
|
133
|
}
|
134
|
cat(file, "\n", file=logfile, append=TRUE)
|
135
|
}
|
136
|
|
137
|
# create ghcn stations table
|
138
|
dbGetQuery(db, paste(
|
139
|
"CREATE TABLE stations (
|
140
|
id char(11) primary key,
|
141
|
latitude numeric,
|
142
|
longitude numeric,
|
143
|
elevation numeric,
|
144
|
state char(2),
|
145
|
name char(30),
|
146
|
gsnflag char(3),
|
147
|
hcnflag char(3),
|
148
|
wmoid char(5)
|
149
|
)"))
|
150
|
|
151
|
# process and insert station data
|
152
|
FMT.COLS <- strsplit(paste(c("A11", "F8", "F9", "F6", "A2", "A30", "A3",
|
153
|
"A3", "A5"), collapse=",X1,"), ",")[[1]]
|
154
|
stations <- read.fortran(file.path(ghcndir, "ghcnd-stations.txt"),
|
155
|
FMT.COLS, comment.char="", strip.white=TRUE, na="")
|
156
|
dbWriteTable(db, "stations", stations, row.names=FALSE, append=TRUE)
|
157
|
|
158
|
# add foreign key constraint to ghcn table
|
159
|
dbGetQuery(db,
|
160
|
"ALTER TABLE ghcn
|
161
|
ADD FOREIGN KEY (station)
|
162
|
REFERENCES stations (id)")
|
163
|
|
164
|
dbDisconnect(db)
|
165
|
|
166
|
# alternative approach: load some number (>1) of station data files into
|
167
|
# the wide staging table before applying the wideToLong step; initial
|
168
|
# testing suggests that this approach is a little faster than
|
169
|
# one-at-a-time, but I don't know what the optimal number would be,
|
170
|
# particularly given that the amount of data per file is variable
|
171
|
#BATCH.SIZE <- 10
|
172
|
#counter <- 1
|
173
|
#for (file in dailies) {
|
174
|
# loadAsCSV(file.path(ghcndir, "ghcnd_all", file), VARS)
|
175
|
# if (counter %% BATCH.SIZE == 0) {
|
176
|
# wideToLong(db)
|
177
|
# dbGetQuery(db, 'delete from ghcn_wide')
|
178
|
# }
|
179
|
# counter <- counter + 1
|
180
|
#}
|