Project

General

Profile

1 53 aaronmk
#!/usr/bin/env python
2
# Maps one datasource to another, using a map spreadsheet if needed
3
# For outputting an XML file to a PostgreSQL database, use the general format of
4
# http://vegbank.org/vegdocs/xml/vegbank_example_ver1.0.2.xml
5
6
import os.path
7
import sys
8 299 aaronmk
import xml.dom.minidom as minidom
9 53 aaronmk
10 266 aaronmk
sys.path.append(os.path.dirname(__file__)+"/../lib")
11 53 aaronmk
12 344 aaronmk
import exc
13 64 aaronmk
import opts
14 281 aaronmk
import Parser
15 131 aaronmk
import sql
16 310 aaronmk
import util
17 133 aaronmk
import xml_dom
18 86 aaronmk
import xml_func
19 53 aaronmk
20 84 aaronmk
def metadata_value(name):
21 164 aaronmk
    if type(name) == str and name.startswith(':'): return name[1:]
22 84 aaronmk
    else: return None
23
24 53 aaronmk
def main():
25 434 aaronmk
    ex_tracker = exc.ExTracker()
26
27 131 aaronmk
    env_names = []
28
    def usage_err():
29 400 aaronmk
        raise SystemExit('Usage: '+opts.env_usage(env_names, True)+' [commit=1]'
30 510 aaronmk
            ' [test=1] '+sys.argv[0]+' [map_path...] [<input] [>output]')
31 146 aaronmk
    limit = opts.get_env_var('n', None, env_names)
32
    if limit != None: limit = int(limit)
33 400 aaronmk
    test = opts.env_flag('test')
34
    commit = not test and opts.env_flag('commit') # never commit in test mode
35 131 aaronmk
36 53 aaronmk
    # Get db config from env vars
37 131 aaronmk
    db_config_names = ['engine', 'host', 'user', 'password', 'database']
38 53 aaronmk
    def get_db_config(prefix):
39 64 aaronmk
        return opts.get_env_vars(db_config_names, prefix, env_names)
40 67 aaronmk
    in_db_config = get_db_config('in')
41
    out_db_config = get_db_config('out')
42 131 aaronmk
    in_is_db = 'engine' in in_db_config
43
    out_is_db = 'engine' in out_db_config
44 53 aaronmk
45
    # Parse args
46 510 aaronmk
    # Parse args
47
    map_paths = sys.argv[1:]
48
    if map_paths == [] and (in_is_db or not out_is_db): usage_err()
49 73 aaronmk
    map_path = None
50 510 aaronmk
    if map_paths != []: map_path = map_paths[0]
51 53 aaronmk
52 57 aaronmk
    # Load map header
53 130 aaronmk
    in_is_xpaths = True
54 318 aaronmk
    out_label = None
55 73 aaronmk
    if map_path != None:
56 56 aaronmk
        import copy
57
        import csv
58
59 53 aaronmk
        import xpath
60
61 133 aaronmk
        metadata = []
62 73 aaronmk
        mappings = []
63
        stream = open(map_path, 'rb')
64
        reader = csv.reader(stream)
65 161 aaronmk
        in_label, out_label = reader.next()[:2]
66 61 aaronmk
        def split_col_name(name):
67 72 aaronmk
            name, sep, root = name.partition(':')
68
            return name, sep != '', root
69 161 aaronmk
        in_label, in_is_xpaths, in_root = split_col_name(in_label)
70
        out_label, out_is_xpaths, out_root = split_col_name(out_label)
71 133 aaronmk
        assert out_is_xpaths # CSV output not supported yet
72 161 aaronmk
        has_types = out_root.startswith('/*s/') # outer elements are types
73 73 aaronmk
        for row in reader:
74
            in_, out = row[:2]
75
            if out != '':
76 432 aaronmk
                if out_is_xpaths: out = xpath.parse(out_root+out)
77 164 aaronmk
                mappings.append((in_, out))
78 73 aaronmk
        stream.close()
79 130 aaronmk
    in_is_xml = in_is_xpaths and not in_is_db
80 56 aaronmk
81 318 aaronmk
    if in_is_xml:
82
        doc0 = minidom.parse(sys.stdin)
83
        if out_label == None: out_label = doc0.documentElement.tagName
84 294 aaronmk
85 316 aaronmk
    def process_input(root, process_row):
86 309 aaronmk
        '''Inputs datasource to XML tree, mapping if needed'''
87
        def process_rows(get_value, rows):
88 297 aaronmk
            '''Processes input values
89
            @param get_value f(in_, row):str
90
            '''
91 314 aaronmk
            for i, row in enumerate(rows):
92 316 aaronmk
                if not (limit == None or i < limit): break
93
                row_id = str(i)
94
                for in_, out in mappings:
95
                    value = metadata_value(in_)
96
                    if value == None: value = get_value(in_, row)
97
                    if value != None:
98
                        xpath.put_obj(root, out, row_id, has_types, value)
99 452 aaronmk
                process_row(row)
100 460 aaronmk
            sys.stderr.write('Processed '+str(i+1)+' input rows\n')
101 297 aaronmk
102 310 aaronmk
        if map_path == None:
103
            iter_ = xml_dom.NodeElemIter(doc0.documentElement)
104
            util.skip(iter_, xml_dom.is_text) # skip metadata
105 317 aaronmk
            for child in iter_:
106
                root.appendChild(child)
107 452 aaronmk
                process_row(child)
108 309 aaronmk
        elif in_is_db:
109 130 aaronmk
            assert in_is_xpaths
110 126 aaronmk
111 117 aaronmk
            import db_xml
112
113 161 aaronmk
            in_root_xml = xpath.path2xml(in_root)
114 164 aaronmk
            for i, mapping in enumerate(mappings):
115
                in_, out = mapping
116
                if metadata_value(in_) == None:
117 168 aaronmk
                    mappings[i] = (xpath.path2xml(in_root+'/'+in_), out)
118 126 aaronmk
119 131 aaronmk
            in_db = sql.connect(in_db_config)
120 133 aaronmk
            in_pkeys = {}
121 297 aaronmk
            def get_value(in_, row):
122 167 aaronmk
                pkey, = row
123 297 aaronmk
                in_ = in_.cloneNode(True) # don't modify orig value!
124
                xml_dom.set_id(xpath.get(in_, in_root), pkey)
125
                value = sql.value_or_none(db_xml.get(in_db, in_, in_pkeys))
126
                if value != None: return str(value)
127
                else: return None
128 309 aaronmk
            process_rows(get_value, sql.rows(db_xml.get(in_db, in_root_xml,
129
                in_pkeys, limit)))
130 117 aaronmk
            in_db.close()
131 161 aaronmk
        elif in_is_xml:
132 297 aaronmk
            def get_value(in_, row):
133
                node = xpath.get(row, in_)
134
                if node != None: return xml_dom.value(node)
135
                else: return None
136
            row0 = xpath.get(doc0.documentElement, in_root)
137 309 aaronmk
            process_rows(get_value, xml_dom.NodeElemIter(row0.parentNode))
138 56 aaronmk
        else: # input is CSV
139 133 aaronmk
            map_ = dict(mappings)
140 59 aaronmk
            reader = csv.reader(sys.stdin)
141 84 aaronmk
            cols = reader.next()
142 162 aaronmk
            col_idxs = dict([(value, idx) for idx, value in enumerate(cols)])
143 164 aaronmk
            for i, mapping in enumerate(mappings):
144
                in_, out = mapping
145
                if metadata_value(in_) == None:
146
                    try: mappings[i] = (col_idxs[in_], out)
147
                    except KeyError: pass
148 162 aaronmk
149 297 aaronmk
            def get_value(in_, row):
150
                value = row[in_]
151
                if value != '': return value
152
                else: return None
153 309 aaronmk
            process_rows(get_value, reader)
154 53 aaronmk
155
    # Output XML tree
156 316 aaronmk
    doc = xml_dom.create_doc(out_label)
157
    root = doc.documentElement
158 130 aaronmk
    if out_is_db:
159 53 aaronmk
        from psycopg2.extensions import ISOLATION_LEVEL_SERIALIZABLE
160
        import db_xml
161
162 133 aaronmk
        out_db = sql.connect(out_db_config)
163
        out_db.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)
164 310 aaronmk
        out_pkeys = {}
165 53 aaronmk
        try:
166 400 aaronmk
            if test: sql.empty_db(out_db)
167 53 aaronmk
            row_ct_ref = [0]
168 449 aaronmk
169 452 aaronmk
            def process_row(input_row):
170
                def on_error(e):
171
                    exc.add_msg(e, 'output row:\n'+str(root))
172
                    exc.add_msg(e, 'input row:\n'+str(input_row))
173
                    ex_tracker.track(e)
174
175 449 aaronmk
                xml_func.process(root, on_error)
176 442 aaronmk
                if not xml_dom.is_empty(root):
177
                    assert xml_dom.has_one_child(root)
178
                    try:
179
                        sql.with_savepoint(out_db, lambda: db_xml.put(out_db,
180 462 aaronmk
                            root.firstChild, out_pkeys, row_ct_ref, on_error))
181 442 aaronmk
                        if commit: out_db.commit()
182 449 aaronmk
                    except sql.DatabaseErrors, e: on_error(e)
183 316 aaronmk
                root.clear()
184 449 aaronmk
185 316 aaronmk
            process_input(root, process_row)
186 460 aaronmk
            sys.stderr.write('Inserted '+str(row_ct_ref[0])+
187
                ' new rows into database\n')
188 53 aaronmk
        finally:
189 133 aaronmk
            out_db.rollback()
190
            out_db.close()
191 299 aaronmk
    else: # output is XML
192 452 aaronmk
        def process_row(input_row): pass
193 316 aaronmk
        process_input(root, process_row)
194
        xml_func.process(root)
195
        doc.writexml(sys.stdout, **xml_dom.prettyxml_config)
196 53 aaronmk
197 133 aaronmk
try: main()
198 294 aaronmk
except Parser.SyntaxException, e: raise SystemExit(str(e))