Project

General

Profile

1 53 aaronmk
#!/usr/bin/env python
2
# Maps one datasource to another, using a map spreadsheet if needed
3
# For outputting an XML file to a PostgreSQL database, use the general format of
4
# http://vegbank.org/vegdocs/xml/vegbank_example_ver1.0.2.xml
5
6
import os.path
7
import sys
8 299 aaronmk
import xml.dom.minidom as minidom
9 53 aaronmk
10 266 aaronmk
sys.path.append(os.path.dirname(__file__)+"/../lib")
11 53 aaronmk
12 344 aaronmk
import exc
13 64 aaronmk
import opts
14 281 aaronmk
import Parser
15 131 aaronmk
import sql
16 715 aaronmk
import strings
17 310 aaronmk
import util
18 133 aaronmk
import xml_dom
19 86 aaronmk
import xml_func
20 53 aaronmk
21 84 aaronmk
def metadata_value(name):
22 164 aaronmk
    if type(name) == str and name.startswith(':'): return name[1:]
23 84 aaronmk
    else: return None
24
25 53 aaronmk
def main():
26 434 aaronmk
    ex_tracker = exc.ExTracker()
27
28 131 aaronmk
    env_names = []
29
    def usage_err():
30 662 aaronmk
        raise SystemExit('Usage: ' + opts.env_usage(env_names, True)
31
            +' [commit=1] [test=1] [verbose=1] [debug=1] '+sys.argv[0]
32
            +' [map_path...] [<input] [>output]')
33 146 aaronmk
    limit = opts.get_env_var('n', None, env_names)
34
    if limit != None: limit = int(limit)
35 400 aaronmk
    test = opts.env_flag('test')
36
    commit = not test and opts.env_flag('commit') # never commit in test mode
37 662 aaronmk
    debug = opts.env_flag('debug')
38
    verbose = debug or opts.env_flag('verbose')
39 131 aaronmk
40 662 aaronmk
    def log(msg, on=verbose):
41
        if on: sys.stderr.write(msg)
42 663 aaronmk
    def log_start(action, on=verbose): log(action+'...', on)
43
    def log_done(on=verbose): log('Done\n', on)
44 662 aaronmk
45 53 aaronmk
    # Get db config from env vars
46 131 aaronmk
    db_config_names = ['engine', 'host', 'user', 'password', 'database']
47 53 aaronmk
    def get_db_config(prefix):
48 64 aaronmk
        return opts.get_env_vars(db_config_names, prefix, env_names)
49 67 aaronmk
    in_db_config = get_db_config('in')
50
    out_db_config = get_db_config('out')
51 131 aaronmk
    in_is_db = 'engine' in in_db_config
52
    out_is_db = 'engine' in out_db_config
53 53 aaronmk
54
    # Parse args
55 510 aaronmk
    map_paths = sys.argv[1:]
56 512 aaronmk
    if map_paths == []:
57
        if in_is_db or not out_is_db: usage_err()
58
        else: map_paths = [None]
59 53 aaronmk
60 646 aaronmk
    def connect_db(db_config):
61 662 aaronmk
        log_start('Connecting to '+sql.db_config_str(db_config))
62 646 aaronmk
        db = sql.connect(db_config)
63 662 aaronmk
        log_done()
64 646 aaronmk
        return db
65
66 512 aaronmk
    def process_input(root, process_row, map_path):
67
        '''Inputs datasource to XML tree, mapping if needed'''
68
        # Load map header
69
        in_is_xpaths = True
70
        out_label = None
71
        if map_path != None:
72
            import copy
73
            import csv
74
75
            import xpath
76
77
            metadata = []
78
            mappings = []
79
            stream = open(map_path, 'rb')
80
            reader = csv.reader(stream)
81
            in_label, out_label = reader.next()[:2]
82
            def split_col_name(name):
83
                name, sep, root = name.partition(':')
84
                return name, sep != '', root
85
            in_label, in_is_xpaths, in_root = split_col_name(in_label)
86
            out_label, out_is_xpaths, out_root = split_col_name(out_label)
87
            assert out_is_xpaths # CSV output not supported yet
88
            has_types = out_root.startswith('/*s/') # outer elements are types
89
            for row in reader:
90
                in_, out = row[:2]
91
                if out != '':
92
                    if out_is_xpaths: out = xpath.parse(out_root+out)
93
                    mappings.append((in_, out))
94
            stream.close()
95
96
            root.ownerDocument.documentElement.tagName = out_label
97
        in_is_xml = in_is_xpaths and not in_is_db
98 56 aaronmk
99 512 aaronmk
        if in_is_xml:
100
            doc0 = minidom.parse(sys.stdin)
101
            if out_label == None: out_label = doc0.documentElement.tagName
102 53 aaronmk
103 309 aaronmk
        def process_rows(get_value, rows):
104 297 aaronmk
            '''Processes input values
105
            @param get_value f(in_, row):str
106
            '''
107 314 aaronmk
            for i, row in enumerate(rows):
108 316 aaronmk
                if not (limit == None or i < limit): break
109
                row_id = str(i)
110
                for in_, out in mappings:
111
                    value = metadata_value(in_)
112 662 aaronmk
                    if value == None:
113
                        log_start('Getting '+str(in_), debug)
114
                        value = get_value(in_, row)
115
                        log_done(debug)
116 715 aaronmk
                    if value != None: xpath.put_obj(root, out, row_id,
117
                        has_types, strings.cleanup(value))
118 452 aaronmk
                process_row(row)
119 460 aaronmk
            sys.stderr.write('Processed '+str(i+1)+' input rows\n')
120 297 aaronmk
121 310 aaronmk
        if map_path == None:
122
            iter_ = xml_dom.NodeElemIter(doc0.documentElement)
123
            util.skip(iter_, xml_dom.is_text) # skip metadata
124 317 aaronmk
            for child in iter_:
125
                root.appendChild(child)
126 452 aaronmk
                process_row(child)
127 309 aaronmk
        elif in_is_db:
128 130 aaronmk
            assert in_is_xpaths
129 126 aaronmk
130 117 aaronmk
            import db_xml
131
132 161 aaronmk
            in_root_xml = xpath.path2xml(in_root)
133 164 aaronmk
            for i, mapping in enumerate(mappings):
134
                in_, out = mapping
135
                if metadata_value(in_) == None:
136 168 aaronmk
                    mappings[i] = (xpath.path2xml(in_root+'/'+in_), out)
137 126 aaronmk
138 646 aaronmk
            in_db = connect_db(in_db_config)
139 133 aaronmk
            in_pkeys = {}
140 297 aaronmk
            def get_value(in_, row):
141 167 aaronmk
                pkey, = row
142 297 aaronmk
                in_ = in_.cloneNode(True) # don't modify orig value!
143
                xml_dom.set_id(xpath.get(in_, in_root), pkey)
144
                value = sql.value_or_none(db_xml.get(in_db, in_, in_pkeys))
145
                if value != None: return str(value)
146
                else: return None
147 309 aaronmk
            process_rows(get_value, sql.rows(db_xml.get(in_db, in_root_xml,
148
                in_pkeys, limit)))
149 117 aaronmk
            in_db.close()
150 161 aaronmk
        elif in_is_xml:
151 297 aaronmk
            def get_value(in_, row):
152
                node = xpath.get(row, in_)
153
                if node != None: return xml_dom.value(node)
154
                else: return None
155
            row0 = xpath.get(doc0.documentElement, in_root)
156 309 aaronmk
            process_rows(get_value, xml_dom.NodeElemIter(row0.parentNode))
157 56 aaronmk
        else: # input is CSV
158 133 aaronmk
            map_ = dict(mappings)
159 59 aaronmk
            reader = csv.reader(sys.stdin)
160 84 aaronmk
            cols = reader.next()
161 162 aaronmk
            col_idxs = dict([(value, idx) for idx, value in enumerate(cols)])
162 164 aaronmk
            for i, mapping in enumerate(mappings):
163
                in_, out = mapping
164
                if metadata_value(in_) == None:
165
                    try: mappings[i] = (col_idxs[in_], out)
166
                    except KeyError: pass
167 162 aaronmk
168 297 aaronmk
            def get_value(in_, row):
169
                value = row[in_]
170
                if value != '': return value
171
                else: return None
172 309 aaronmk
            process_rows(get_value, reader)
173 53 aaronmk
174 512 aaronmk
    def process_inputs(root, process_row):
175
        for map_path in map_paths: process_input(root, process_row, map_path)
176
177 53 aaronmk
    # Output XML tree
178 512 aaronmk
    doc = xml_dom.create_doc()
179 316 aaronmk
    root = doc.documentElement
180 130 aaronmk
    if out_is_db:
181 53 aaronmk
        import db_xml
182
183 646 aaronmk
        out_db = connect_db(out_db_config)
184 310 aaronmk
        out_pkeys = {}
185 53 aaronmk
        try:
186 400 aaronmk
            if test: sql.empty_db(out_db)
187 53 aaronmk
            row_ct_ref = [0]
188 449 aaronmk
189 452 aaronmk
            def process_row(input_row):
190
                def on_error(e):
191
                    exc.add_msg(e, 'output row:\n'+str(root))
192
                    exc.add_msg(e, 'input row:\n'+str(input_row))
193
                    ex_tracker.track(e)
194
195 449 aaronmk
                xml_func.process(root, on_error)
196 442 aaronmk
                if not xml_dom.is_empty(root):
197
                    assert xml_dom.has_one_child(root)
198
                    try:
199
                        sql.with_savepoint(out_db, lambda: db_xml.put(out_db,
200 462 aaronmk
                            root.firstChild, out_pkeys, row_ct_ref, on_error))
201 442 aaronmk
                        if commit: out_db.commit()
202 449 aaronmk
                    except sql.DatabaseErrors, e: on_error(e)
203 316 aaronmk
                root.clear()
204 449 aaronmk
205 512 aaronmk
            process_inputs(root, process_row)
206 717 aaronmk
            sys.stdout.write('Inserted '+str(row_ct_ref[0])+
207 460 aaronmk
                ' new rows into database\n')
208 53 aaronmk
        finally:
209 133 aaronmk
            out_db.rollback()
210
            out_db.close()
211 299 aaronmk
    else: # output is XML
212 452 aaronmk
        def process_row(input_row): pass
213 512 aaronmk
        process_inputs(root, process_row)
214 316 aaronmk
        xml_func.process(root)
215
        doc.writexml(sys.stdout, **xml_dom.prettyxml_config)
216 53 aaronmk
217 133 aaronmk
try: main()
218 294 aaronmk
except Parser.SyntaxException, e: raise SystemExit(str(e))