Project

General

Profile

Download (5.52 KB) Statistics
| Branch: | Revision:
1
# R script for batch parsing and loading GHCN daily station files
2
# (*.dly) into a PostgreSQL database. The appropriate GHCN files are
3
# assumed to have been downloaded to the location specified by
4
# 'ghcndir', with the daily files themselves in a "ghcnd_all"
5
# subdirectory exactly as generated by unpacking "ghcnd_all.tar.gz";
6
# note that for the purposes of storage efficiency, we're not currently
7
# keeping these uncompressed files on disk, so this tarball would need
8
# to be unpacked again if for some reason this script needs to be
9
# re-run.
10
#
11
# The station data (names, locations) are now also imported as a
12
# separate 'stations' table in the database.
13
#
14
# As currently written, the script assumes that the 'ghcn' database
15
# already exists locally but has no tables, and that it can be accessed
16
# via 'ident' authentication (i.e., as the user executing the script,
17
# without password prompt).
18
#
19
# At the moment, PRCP, TMIN, and TMAX records are loaded.
20
#
21
# Jim Regetz
22
# NCEAS
23
# Created on 10-May-2012
24

    
25
require(RPostgreSQL)
26

    
27
#-------------#
28
# "constants" #
29
#-------------#
30

    
31
# location of ghcn daily data (on atlas)
32
ghcndir <- "/home/layers/data/climate/ghcn/v2.92-upd-2012052822"
33
# output file
34
logfile <- "~/ghcn-psql-load.log"
35

    
36
# name of target db
37
db.name <- "ghcn"
38

    
39
# variables to keep
40
VARS <- c("PRCP", "TMIN", "TMAX")
41

    
42
# pre-generate several command statements to avoid doing the work
43
# repeatedly when processing each of the 75k daily files
44
AWK.CMD <- paste(
45
    "awk -v FIELDWIDTHS='",
46
    paste(c(11, 4, 2, 4, rep(c(5,1,1,1), times=31)), collapse=" "),
47
    "' -v OFS=',' '{ $1=$1 \"\"; print }'", sep="")
48
SQL.UNION <- paste(
49
    sprintf(
50
        "(select id, year, month, %d as day, element,
51
            value%d as value, sflag%d as sflag,
52
            mflag%d as mflag, qflag%d as qflag from ghcn_wide)",
53
        1:31, 1:31, 1:31, 1:31, 1:31),
54
    collapse=" union all ")
55

    
56
#------------------#
57
# helper functions #
58
#------------------#
59

    
60
# Function invoking OS to preprocess data into CSV format with
61
# grep/awk/tr, then pipe directly into psql to invoke COPY statement for
62
# bulk load. Returns FALSE if filtering yields zero rows (or fails for
63
# some other reason), hence no data staged into the database.
64
loadAsCSV <- function(dly, patt=NULL) {
65
    tmpfile <- tempfile(tmpdir="/tmp")
66
    tr <- "tr -d ' '"
67
    if (is.null(patt)) {
68
        cmd <- paste(AWK.CMD, dly, "|", tr)
69
    } else {
70
        patt <- shQuote(paste(patt, collapse="\\|"))
71
        cmd <- paste("grep -e", patt, dly, "|", AWK.CMD, "|", tr)
72
    }
73
    cmd <- paste(cmd, tmpfile, sep=" > ")
74
    if (system(cmd)==0 & 0<file.info(tmpfile)$size) {
75
        dbGetQuery(db,
76
            sprintf("COPY ghcn_wide FROM '%s' WITH CSV", tmpfile))
77
        file.remove(tmpfile)
78
        TRUE
79
    } else {
80
        FALSE
81
    }
82
}
83

    
84
# Function to make postgresl move data from the wide-form table (days
85
# across columns) into the long form table (unique row for each
86
# day*element).
87
wideToLong <- function(db) {
88
   dbGetQuery(db, paste("insert into ghcn", SQL.UNION))
89
}
90

    
91

    
92
#-----------------#
93
# procedural code #
94
#-----------------#
95

    
96
# establish database connection
97
drv <- dbDriver("PostgreSQL")
98
db <- dbConnect(drv, dbname=db.name)
99

    
100
# create ghcn staging table
101
dbGetQuery(db, paste(
102
    "CREATE TABLE ghcn_wide (
103
        id char(11),
104
        year integer,
105
        month integer,
106
        element char(4), ",
107
        paste(sprintf("value%d integer, mflag%d char(1),
108
            qflag%d char(1), sflag%d char(1)", 1:31, 1:31, 1:31, 1:31),
109
            collapse=", "),
110
        ")", sep=""))
111

    
112
# create main ghcn table
113
dbGetQuery(db, paste(
114
    "CREATE TABLE ghcn (
115
        id char(11),
116
        year integer,
117
        month integer,
118
        day integer,
119
        element char(4),
120
        value integer,
121
        mflag char(1),
122
        qflag char(1),
123
        sflag char(1)
124
        )"))
125

    
126
# process and insert daily data
127
dailies <- list.files(file.path(ghcndir, "ghcnd_all"), pattern="*.dly")
128
for (file in dailies) {
129
    cat(date(), "\t", file=logfile, append=TRUE)
130
    if (loadAsCSV(file.path(ghcndir, "ghcnd_all", file), VARS)) {
131
        wideToLong(db)
132
        dbGetQuery(db, 'delete from ghcn_wide')
133
    }
134
    cat(file, "\n", file=logfile, append=TRUE)
135
}
136

    
137
# create ghcn stations table
138
dbGetQuery(db, paste(
139
    "CREATE TABLE stations (
140
        id char(11) primary key,
141
        latitude numeric,
142
        longitude numeric,
143
        elevation numeric,
144
        state char(2),
145
        name char(30),
146
        gsnflag char(3),
147
        hcnflag char(3),
148
        wmoid char(5)
149
        )"))
150

    
151
# process and insert station data
152
FMT.COLS <- strsplit(paste(c("A11", "F8", "F9", "F6", "A2", "A30", "A3",
153
    "A3", "A5"), collapse=",X1,"), ",")[[1]]
154
stations <- read.fortran(file.path(ghcndir, "ghcnd-stations.txt"),
155
    FMT.COLS, comment.char="", strip.white=TRUE, na="")
156
dbWriteTable(db, "stations", stations, row.names=FALSE, append=TRUE)
157

    
158
# add foreign key constraint to ghcn table
159
dbGetQuery(db,
160
    "ALTER TABLE ghcn
161
        ADD FOREIGN KEY (station)
162
        REFERENCES stations (id)")
163

    
164
dbDisconnect(db)
165

    
166
# alternative approach: load some number (>1) of station data files into
167
# the wide staging table before applying the wideToLong step; initial
168
# testing suggests that this approach is a little faster than
169
# one-at-a-time, but I don't know what the optimal number would be,
170
# particularly given that the amount of data per file is variable
171
#BATCH.SIZE <- 10
172
#counter <- 1
173
#for (file in dailies) {
174
#    loadAsCSV(file.path(ghcndir, "ghcnd_all", file), VARS)
175
#    if (counter %% BATCH.SIZE == 0) {
176
#        wideToLong(db)
177
#        dbGetQuery(db, 'delete from ghcn_wide')
178
#    }
179
#    counter <- counter + 1
180
#}
(2-2/3)