Project

General

Profile

Download (4.61 KB) Statistics
| Branch: | Revision:
1
# R script for batch parsing and loading GHCN daily station files
2
# (*.dly) into a PostgreSQL database. The appropriate GHCN files are
3
# assumed to have been downloaded to the location specified by
4
# 'ghcndir', with the daily files themselves in a "ghcnd_all"
5
# subdirectory exactly as generated by unpacking "ghcnd_all.tar.gz";
6
# note that for the purposes of storage efficiency, we're not currently
7
# keeping these uncompressed files on disk, so this tarball would need
8
# to be unpacked again if for some reason this script needs to be
9
# re-run.
10
#
11
# As currently written, the script assumes that the 'ghcn' database
12
# already exists locally but has no tables, and that it can be accessed
13
# via 'ident' authentication (i.e., as the user executing the script,
14
# without password prompt).
15
#
16
# At the moment, PRCP, TMIN, and TMAX records are loaded.
17
#
18
# Jim Regetz
19
# NCEAS
20
# Created on 10-May-2012
21

    
22
require(RPostgreSQL)
23

    
24
#-------------#
25
# "constants" #
26
#-------------#
27

    
28
# location of ghcn daily data (on atlas)
29
ghcndir <- "/home/layers/data/climate/ghcn/v2.92-upd-2012052822"
30
# output file
31
logfile <- "~/ghcn-psql-load.log"
32

    
33
# name of target db
34
db.name <- "ghcn"
35

    
36
# variables to keep
37
VARS <- c("PRCP", "TMIN", "TMAX")
38

    
39
# pre-generate several command statements to avoid doing the work
40
# repeatedly when processing each of the 75k daily files
41
AWK.CMD <- paste(
42
    "awk -v FIELDWIDTHS='",
43
    paste(c(11, 4, 2, 4, rep(c(5,1,1,1), times=31)), collapse=" "),
44
    "' -v OFS=',' '{ $1=$1 \"\"; print }'", sep="")
45
SQL.UNION <- paste(
46
    sprintf(
47
        "(select id, year, month, %d as day, element,
48
            value%d as value, sflag%d as sflag,
49
            mflag%d as mflag, qflag%d as qflag from ghcn_wide)",
50
        1:31, 1:31, 1:31, 1:31, 1:31),
51
    collapse=" union all ")
52

    
53
#------------------#
54
# helper functions #
55
#------------------#
56

    
57
# Function invoking OS to preprocess data into CSV format with
58
# grep/awk/tr, then pipe directly into psql to invoke COPY statement for
59
# bulk load. Returns FALSE if filtering yields zero rows (or fails for
60
# some other reason), hence no data staged into the database.
61
loadAsCSV <- function(dly, patt=NULL) {
62
    tmpfile <- tempfile(tmpdir="/tmp")
63
    tr <- "tr -d ' '"
64
    if (is.null(patt)) {
65
        cmd <- paste(AWK.CMD, dly, "|", tr)
66
    } else {
67
        patt <- shQuote(paste(patt, collapse="\\|"))
68
        cmd <- paste("grep -e", patt, dly, "|", AWK.CMD, "|", tr)
69
    }
70
    cmd <- paste(cmd, tmpfile, sep=" > ")
71
    if (system(cmd)==0 & 0<file.info(tmpfile)$size) {
72
        dbGetQuery(db,
73
            sprintf("COPY ghcn_wide FROM '%s' WITH CSV", tmpfile))
74
        file.remove(tmpfile)
75
        TRUE
76
    } else {
77
        FALSE
78
    }
79
}
80

    
81
# Function to make postgresl move data from the wide-form table (days
82
# across columns) into the long form table (unique row for each
83
# day*element).
84
wideToLong <- function(db) {
85
   dbGetQuery(db, paste("insert into ghcn", SQL.UNION))
86
}
87

    
88

    
89
#-----------------#
90
# procedural code #
91
#-----------------#
92

    
93
# establish database connection
94
drv <- dbDriver("PostgreSQL")
95
db <- dbConnect(drv, dbname=db.name)
96

    
97
# create ghcn staging table
98
dbGetQuery(db, paste(
99
    "CREATE TABLE ghcn_wide (
100
        id char(11),
101
        year integer,
102
        month integer,
103
        element char(4), ",
104
        paste(sprintf("value%d integer, mflag%d char(1),
105
            qflag%d char(1), sflag%d char(1)", 1:31, 1:31, 1:31, 1:31),
106
            collapse=", "),
107
        ")", sep=""))
108

    
109
# create main ghcn table
110
dbGetQuery(db, paste(
111
    "CREATE TABLE ghcn (
112
        id char(11),
113
        year integer,
114
        month integer,
115
        day integer,
116
        element char(4),
117
        value integer,
118
        mflag char(1),
119
        qflag char(1),
120
        sflag char(1)
121
        )"))
122

    
123
# process and insert daily data
124
dailies <- list.files(file.path(ghcndir, "ghcnd_all"), pattern="*.dly")
125
for (file in dailies) {
126
    cat(date(), "\t", file=logfile, append=TRUE)
127
    if (loadAsCSV(file.path(ghcndir, "ghcnd_all", file), VARS)) {
128
        wideToLong(db)
129
        dbGetQuery(db, 'delete from ghcn_wide')
130
    }
131
    cat(file, "\n", file=logfile, append=TRUE)
132
}
133

    
134
dbDisconnect(db)
135

    
136
# alternative approach: load some number (>1) of station data files into
137
# the wide staging table before applying the wideToLong step; initial
138
# testing suggests that this approach is a little faster than
139
# one-at-a-time, but I don't know what the optimal number would be,
140
# particularly given that the amount of data per file is variable
141
#BATCH.SIZE <- 10
142
#counter <- 1
143
#for (file in dailies) {
144
#    loadAsCSV(file.path(ghcndir, "ghcnd_all", file), VARS)
145
#    if (counter %% BATCH.SIZE == 0) {
146
#        wideToLong(db)
147
#        dbGetQuery(db, 'delete from ghcn_wide')
148
#    }
149
#    counter <- counter + 1
150
#}
(2-2/3)