Project

General

Profile

Download (4.18 KB) Statistics
| Branch: | Revision:
1
# R script for batch parsing and loading GHCN daily station files
2
# (*.dly) into a PostgreSQL database. Script will process all such files
3
# in the current working directory.
4
#
5
# As currently written, the script assumes that the 'ghcn' database
6
# already exists locally but has no tables, and that it can be accessed
7
# via 'ident' authentication (i.e., as the user executing the script,
8
# without password prompt).
9
#
10
# At the moment, PRCP, TMIN, and TMAX records are loaded.
11
#
12
# Jim Regetz
13
# NCEAS
14
# Created on 10-May-2012
15

    
16
require(RPostgreSQL)
17

    
18
#-------------#
19
# "constants" #
20
#-------------#
21

    
22
# location of ghcn daily data (on atlas)
23
datadir <- "/home/layers/data/climate/ghcn/ghcnd_all"
24
# output file
25
logfile <- "~/ghcn-psql-load.log"
26

    
27
# name of target db
28
db.name <- "ghcn"
29

    
30
# variables to keep
31
VARS <- c("PRCP", "TMIN", "TMAX")
32

    
33
# pre-generate several command statements to avoid doing the work
34
# repeatedly when processing each of the 75k daily files
35
AWK.CMD <- paste(
36
    "awk -v FIELDWIDTHS='",
37
    paste(c(11, 4, 2, 4, rep(c(5,1,1,1), times=31)), collapse=" "),
38
    "' -v OFS=',' '{ $1=$1 \"\"; print }'", sep="")
39
SQL.UNION <- paste(
40
    sprintf(
41
        "(select id, year, month, %d as day, element,
42
            value%d as value, sflag%d as sflag,
43
            mflag%d as mflag, qflag%d as qflag from ghcn_wide)",
44
        1:31, 1:31, 1:31, 1:31, 1:31),
45
    collapse=" union all ")
46

    
47
#------------------#
48
# helper functions #
49
#------------------#
50

    
51
# Function invoking OS to preprocess data into CSV format with
52
# grep/awk/tr, then pipe directly into psql to invoke COPY statement for
53
# bulk load. Returns FALSE if filtering yields zero rows (or fails for
54
# some other reason), hence no data staged into the database.
55
loadAsCSV <- function(dly, patt=NULL) {
56
    tmpfile <- tempfile(tmpdir="/tmp")
57
    tr <- "tr -d ' '"
58
    if (is.null(patt)) {
59
        cmd <- paste(AWK.CMD, dly, "|", tr)
60
    } else {
61
        patt <- shQuote(paste(patt, collapse="\\|"))
62
        cmd <- paste("grep -e", patt, dly, "|", AWK.CMD, "|", tr)
63
    }
64
    cmd <- paste(cmd, tmpfile, sep=" > ")
65
    if (system(cmd)==0 & 0<file.info(tmpfile)$size) {
66
        dbGetQuery(db,
67
            sprintf("COPY ghcn_wide FROM '%s' WITH CSV", tmpfile))
68
        file.remove(tmpfile)
69
        TRUE
70
    } else {
71
        FALSE
72
    }
73
}
74

    
75
# Function to make postgresl move data from the wide-form table (days
76
# across columns) into the long form table (unique row for each
77
# day*element).
78
wideToLong <- function(db) {
79
   dbGetQuery(db, paste("insert into ghcn", SQL.UNION))
80
}
81

    
82

    
83
#-----------------#
84
# procedural code #
85
#-----------------#
86

    
87
# establish database connection
88
drv <- dbDriver("PostgreSQL")
89
db <- dbConnect(drv, dbname=db.name)
90

    
91
# create ghcn staging table
92
dbGetQuery(db, paste(
93
    "CREATE TABLE ghcn_wide (
94
        id char(11),
95
        year integer,
96
        month integer,
97
        element char(4), ",
98
        paste(sprintf("value%d integer, mflag%d char(1),
99
            qflag%d char(1), sflag%d char(1)", 1:31, 1:31, 1:31, 1:31),
100
            collapse=", "),
101
        ")", sep=""))
102

    
103
# create main ghcn table
104
dbGetQuery(db, paste(
105
    "CREATE TABLE ghcn (
106
        id char(11),
107
        year integer,
108
        month integer,
109
        day integer,
110
        element char(4),
111
        value integer,
112
        mflag char(1),
113
        qflag char(1),
114
        sflag char(1)
115
        )"))
116

    
117
# process and insert daily data
118
dailies <- list.files(datadir, pattern="*.dly")
119
for (file in dailies) {
120
    cat(date(), "\t", file=logfile, append=TRUE)
121
    if (loadAsCSV(file.path(datadir, file), VARS)) {
122
        wideToLong(db)
123
        dbGetQuery(db, 'delete from ghcn_wide')
124
    }
125
    cat(file, "\n", file=logfile, append=TRUE)
126
}
127

    
128
dbDisconnect(db)
129

    
130
# alternative approach: load some number (>1) of station data files into
131
# the wide staging table before applying the wideToLong step; initial
132
# testing suggests that this approach is a little faster than
133
# one-at-a-time, but I don't know what the optimal number would be,
134
# particularly given that the amount of data per file is variable
135
#BATCH.SIZE <- 10
136
#counter <- 1
137
#for (file in dailies) {
138
#    loadAsCSV(file.path(datadir, file), VARS)
139
#    if (counter %% BATCH.SIZE == 0) {
140
#        wideToLong(db)
141
#        dbGetQuery(db, 'delete from ghcn_wide')
142
#    }
143
#    counter <- counter + 1
144
#}
(2-2/3)