Project

General

Profile

1 53 aaronmk
#!/usr/bin/env python
2
# Maps one datasource to another, using a map spreadsheet if needed
3
# For outputting an XML file to a PostgreSQL database, use the general format of
4
# http://vegbank.org/vegdocs/xml/vegbank_example_ver1.0.2.xml
5
6
import os.path
7
import sys
8 299 aaronmk
import xml.dom.minidom as minidom
9 53 aaronmk
10 266 aaronmk
sys.path.append(os.path.dirname(__file__)+"/../lib")
11 53 aaronmk
12 344 aaronmk
import exc
13 64 aaronmk
import opts
14 281 aaronmk
import Parser
15 131 aaronmk
import sql
16 715 aaronmk
import strings
17 828 aaronmk
import term
18 310 aaronmk
import util
19 133 aaronmk
import xml_dom
20 86 aaronmk
import xml_func
21 53 aaronmk
22 84 aaronmk
def metadata_value(name):
23 164 aaronmk
    if type(name) == str and name.startswith(':'): return name[1:]
24 84 aaronmk
    else: return None
25
26 53 aaronmk
def main():
27 434 aaronmk
    ex_tracker = exc.ExTracker()
28
29 131 aaronmk
    env_names = []
30
    def usage_err():
31 662 aaronmk
        raise SystemExit('Usage: ' + opts.env_usage(env_names, True)
32
            +' [commit=1] [test=1] [verbose=1] [debug=1] '+sys.argv[0]
33
            +' [map_path...] [<input] [>output]')
34 146 aaronmk
    limit = opts.get_env_var('n', None, env_names)
35
    if limit != None: limit = int(limit)
36 400 aaronmk
    test = opts.env_flag('test')
37
    commit = not test and opts.env_flag('commit') # never commit in test mode
38 662 aaronmk
    debug = opts.env_flag('debug')
39
    verbose = debug or opts.env_flag('verbose')
40 131 aaronmk
41 662 aaronmk
    def log(msg, on=verbose):
42
        if on: sys.stderr.write(msg)
43 663 aaronmk
    def log_start(action, on=verbose): log(action+'...', on)
44
    def log_done(on=verbose): log('Done\n', on)
45 662 aaronmk
46 53 aaronmk
    # Get db config from env vars
47 131 aaronmk
    db_config_names = ['engine', 'host', 'user', 'password', 'database']
48 53 aaronmk
    def get_db_config(prefix):
49 64 aaronmk
        return opts.get_env_vars(db_config_names, prefix, env_names)
50 67 aaronmk
    in_db_config = get_db_config('in')
51
    out_db_config = get_db_config('out')
52 131 aaronmk
    in_is_db = 'engine' in in_db_config
53
    out_is_db = 'engine' in out_db_config
54 53 aaronmk
55
    # Parse args
56 510 aaronmk
    map_paths = sys.argv[1:]
57 512 aaronmk
    if map_paths == []:
58
        if in_is_db or not out_is_db: usage_err()
59
        else: map_paths = [None]
60 53 aaronmk
61 646 aaronmk
    def connect_db(db_config):
62 662 aaronmk
        log_start('Connecting to '+sql.db_config_str(db_config))
63 646 aaronmk
        db = sql.connect(db_config)
64 662 aaronmk
        log_done()
65 646 aaronmk
        return db
66
67 751 aaronmk
    out_is_xml_ref = [False]
68
69 512 aaronmk
    def process_input(root, process_row, map_path):
70
        '''Inputs datasource to XML tree, mapping if needed'''
71
        # Load map header
72
        in_is_xpaths = True
73 751 aaronmk
        out_is_xpaths = True
74 512 aaronmk
        out_label = None
75
        if map_path != None:
76
            import copy
77
            import csv
78
79
            import xpath
80
81
            metadata = []
82
            mappings = []
83
            stream = open(map_path, 'rb')
84
            reader = csv.reader(stream)
85
            in_label, out_label = reader.next()[:2]
86
            def split_col_name(name):
87
                name, sep, root = name.partition(':')
88
                return name, sep != '', root
89
            in_label, in_is_xpaths, in_root = split_col_name(in_label)
90
            out_label, out_is_xpaths, out_root = split_col_name(out_label)
91
            has_types = out_root.startswith('/*s/') # outer elements are types
92
            for row in reader:
93
                in_, out = row[:2]
94
                if out != '':
95
                    if out_is_xpaths: out = xpath.parse(out_root+out)
96
                    mappings.append((in_, out))
97
            stream.close()
98
99
            root.ownerDocument.documentElement.tagName = out_label
100
        in_is_xml = in_is_xpaths and not in_is_db
101 751 aaronmk
        out_is_xml_ref[0] = out_is_xpaths and not out_is_db
102 56 aaronmk
103 512 aaronmk
        if in_is_xml:
104
            doc0 = minidom.parse(sys.stdin)
105
            if out_label == None: out_label = doc0.documentElement.tagName
106 53 aaronmk
107 309 aaronmk
        def process_rows(get_value, rows):
108 297 aaronmk
            '''Processes input values
109
            @param get_value f(in_, row):str
110
            '''
111 314 aaronmk
            for i, row in enumerate(rows):
112 316 aaronmk
                if not (limit == None or i < limit): break
113
                row_id = str(i)
114
                for in_, out in mappings:
115
                    value = metadata_value(in_)
116 662 aaronmk
                    if value == None:
117
                        log_start('Getting '+str(in_), debug)
118
                        value = get_value(in_, row)
119
                        log_done(debug)
120 715 aaronmk
                    if value != None: xpath.put_obj(root, out, row_id,
121
                        has_types, strings.cleanup(value))
122 452 aaronmk
                process_row(row)
123 460 aaronmk
            sys.stderr.write('Processed '+str(i+1)+' input rows\n')
124 297 aaronmk
125 310 aaronmk
        if map_path == None:
126
            iter_ = xml_dom.NodeElemIter(doc0.documentElement)
127
            util.skip(iter_, xml_dom.is_text) # skip metadata
128 317 aaronmk
            for child in iter_:
129
                root.appendChild(child)
130 452 aaronmk
                process_row(child)
131 309 aaronmk
        elif in_is_db:
132 130 aaronmk
            assert in_is_xpaths
133 126 aaronmk
134 117 aaronmk
            import db_xml
135
136 161 aaronmk
            in_root_xml = xpath.path2xml(in_root)
137 164 aaronmk
            for i, mapping in enumerate(mappings):
138
                in_, out = mapping
139
                if metadata_value(in_) == None:
140 168 aaronmk
                    mappings[i] = (xpath.path2xml(in_root+'/'+in_), out)
141 126 aaronmk
142 646 aaronmk
            in_db = connect_db(in_db_config)
143 133 aaronmk
            in_pkeys = {}
144 297 aaronmk
            def get_value(in_, row):
145 167 aaronmk
                pkey, = row
146 297 aaronmk
                in_ = in_.cloneNode(True) # don't modify orig value!
147
                xml_dom.set_id(xpath.get(in_, in_root), pkey)
148
                value = sql.value_or_none(db_xml.get(in_db, in_, in_pkeys))
149
                if value != None: return str(value)
150
                else: return None
151 309 aaronmk
            process_rows(get_value, sql.rows(db_xml.get(in_db, in_root_xml,
152
                in_pkeys, limit)))
153 117 aaronmk
            in_db.close()
154 161 aaronmk
        elif in_is_xml:
155 297 aaronmk
            def get_value(in_, row):
156
                node = xpath.get(row, in_)
157
                if node != None: return xml_dom.value(node)
158
                else: return None
159
            row0 = xpath.get(doc0.documentElement, in_root)
160 309 aaronmk
            process_rows(get_value, xml_dom.NodeElemIter(row0.parentNode))
161 56 aaronmk
        else: # input is CSV
162 133 aaronmk
            map_ = dict(mappings)
163 59 aaronmk
            reader = csv.reader(sys.stdin)
164 84 aaronmk
            cols = reader.next()
165 162 aaronmk
            col_idxs = dict([(value, idx) for idx, value in enumerate(cols)])
166 164 aaronmk
            for i, mapping in enumerate(mappings):
167
                in_, out = mapping
168
                if metadata_value(in_) == None:
169
                    try: mappings[i] = (col_idxs[in_], out)
170
                    except KeyError: pass
171 162 aaronmk
172 297 aaronmk
            def get_value(in_, row):
173
                value = row[in_]
174
                if value != '': return value
175
                else: return None
176 309 aaronmk
            process_rows(get_value, reader)
177 53 aaronmk
178 512 aaronmk
    def process_inputs(root, process_row):
179
        for map_path in map_paths: process_input(root, process_row, map_path)
180
181 53 aaronmk
    # Output XML tree
182 512 aaronmk
    doc = xml_dom.create_doc()
183 316 aaronmk
    root = doc.documentElement
184 130 aaronmk
    if out_is_db:
185 53 aaronmk
        import db_xml
186
187 646 aaronmk
        out_db = connect_db(out_db_config)
188 310 aaronmk
        out_pkeys = {}
189 53 aaronmk
        try:
190 400 aaronmk
            if test: sql.empty_db(out_db)
191 53 aaronmk
            row_ct_ref = [0]
192 449 aaronmk
193 452 aaronmk
            def process_row(input_row):
194
                def on_error(e):
195 828 aaronmk
                    exc.add_msg(e, term.emph('input row:')+'\n'+str(input_row))
196
                    exc.add_msg(e, term.emph('output row:')+'\n'+str(root))
197 452 aaronmk
                    ex_tracker.track(e)
198
199 449 aaronmk
                xml_func.process(root, on_error)
200 442 aaronmk
                if not xml_dom.is_empty(root):
201
                    assert xml_dom.has_one_child(root)
202
                    try:
203
                        sql.with_savepoint(out_db, lambda: db_xml.put(out_db,
204 462 aaronmk
                            root.firstChild, out_pkeys, row_ct_ref, on_error))
205 442 aaronmk
                        if commit: out_db.commit()
206 449 aaronmk
                    except sql.DatabaseErrors, e: on_error(e)
207 316 aaronmk
                root.clear()
208 449 aaronmk
209 512 aaronmk
            process_inputs(root, process_row)
210 717 aaronmk
            sys.stdout.write('Inserted '+str(row_ct_ref[0])+
211 460 aaronmk
                ' new rows into database\n')
212 53 aaronmk
        finally:
213 133 aaronmk
            out_db.rollback()
214
            out_db.close()
215 751 aaronmk
    else:
216 759 aaronmk
        def on_error(e): ex_tracker.track(e)
217 452 aaronmk
        def process_row(input_row): pass
218 512 aaronmk
        process_inputs(root, process_row)
219 759 aaronmk
        xml_func.process(root, on_error)
220 751 aaronmk
        if out_is_xml_ref[0]:
221
            doc.writexml(sys.stdout, **xml_dom.prettyxml_config)
222
        else: # output is CSV
223
            raise NotImplementedError('CSV output not supported yet')
224 53 aaronmk
225 133 aaronmk
try: main()
226 294 aaronmk
except Parser.SyntaxException, e: raise SystemExit(str(e))