Project

General

Profile

1 53 aaronmk
#!/usr/bin/env python
2
# Maps one datasource to another, using a map spreadsheet if needed
3
# For outputting an XML file to a PostgreSQL database, use the general format of
4
# http://vegbank.org/vegdocs/xml/vegbank_example_ver1.0.2.xml
5
6
import os.path
7
import sys
8 299 aaronmk
import xml.dom.minidom as minidom
9 53 aaronmk
10 266 aaronmk
sys.path.append(os.path.dirname(__file__)+"/../lib")
11 53 aaronmk
12 344 aaronmk
import exc
13 64 aaronmk
import opts
14 281 aaronmk
import Parser
15 131 aaronmk
import sql
16 310 aaronmk
import util
17 133 aaronmk
import xml_dom
18 86 aaronmk
import xml_func
19 53 aaronmk
20 84 aaronmk
def metadata_value(name):
21 164 aaronmk
    if type(name) == str and name.startswith(':'): return name[1:]
22 84 aaronmk
    else: return None
23
24 53 aaronmk
def main():
25 434 aaronmk
    ex_tracker = exc.ExTracker()
26
27 131 aaronmk
    env_names = []
28
    def usage_err():
29 400 aaronmk
        raise SystemExit('Usage: '+opts.env_usage(env_names, True)+' [commit=1]'
30 510 aaronmk
            ' [test=1] '+sys.argv[0]+' [map_path...] [<input] [>output]')
31 146 aaronmk
    limit = opts.get_env_var('n', None, env_names)
32
    if limit != None: limit = int(limit)
33 400 aaronmk
    test = opts.env_flag('test')
34
    commit = not test and opts.env_flag('commit') # never commit in test mode
35 131 aaronmk
36 53 aaronmk
    # Get db config from env vars
37 131 aaronmk
    db_config_names = ['engine', 'host', 'user', 'password', 'database']
38 53 aaronmk
    def get_db_config(prefix):
39 64 aaronmk
        return opts.get_env_vars(db_config_names, prefix, env_names)
40 67 aaronmk
    in_db_config = get_db_config('in')
41
    out_db_config = get_db_config('out')
42 131 aaronmk
    in_is_db = 'engine' in in_db_config
43
    out_is_db = 'engine' in out_db_config
44 53 aaronmk
45
    # Parse args
46 510 aaronmk
    map_paths = sys.argv[1:]
47 512 aaronmk
    if map_paths == []:
48
        if in_is_db or not out_is_db: usage_err()
49
        else: map_paths = [None]
50 53 aaronmk
51 512 aaronmk
    def process_input(root, process_row, map_path):
52
        '''Inputs datasource to XML tree, mapping if needed'''
53
        # Load map header
54
        in_is_xpaths = True
55
        out_label = None
56
        if map_path != None:
57
            import copy
58
            import csv
59
60
            import xpath
61
62
            metadata = []
63
            mappings = []
64
            stream = open(map_path, 'rb')
65
            reader = csv.reader(stream)
66
            in_label, out_label = reader.next()[:2]
67
            def split_col_name(name):
68
                name, sep, root = name.partition(':')
69
                return name, sep != '', root
70
            in_label, in_is_xpaths, in_root = split_col_name(in_label)
71
            out_label, out_is_xpaths, out_root = split_col_name(out_label)
72
            assert out_is_xpaths # CSV output not supported yet
73
            has_types = out_root.startswith('/*s/') # outer elements are types
74
            for row in reader:
75
                in_, out = row[:2]
76
                if out != '':
77
                    if out_is_xpaths: out = xpath.parse(out_root+out)
78
                    mappings.append((in_, out))
79
            stream.close()
80
81
            root.ownerDocument.documentElement.tagName = out_label
82
        in_is_xml = in_is_xpaths and not in_is_db
83 56 aaronmk
84 512 aaronmk
        if in_is_xml:
85
            doc0 = minidom.parse(sys.stdin)
86
            if out_label == None: out_label = doc0.documentElement.tagName
87 53 aaronmk
88 309 aaronmk
        def process_rows(get_value, rows):
89 297 aaronmk
            '''Processes input values
90
            @param get_value f(in_, row):str
91
            '''
92 314 aaronmk
            for i, row in enumerate(rows):
93 316 aaronmk
                if not (limit == None or i < limit): break
94
                row_id = str(i)
95
                for in_, out in mappings:
96
                    value = metadata_value(in_)
97
                    if value == None: value = get_value(in_, row)
98
                    if value != None:
99
                        xpath.put_obj(root, out, row_id, has_types, value)
100 452 aaronmk
                process_row(row)
101 460 aaronmk
            sys.stderr.write('Processed '+str(i+1)+' input rows\n')
102 297 aaronmk
103 310 aaronmk
        if map_path == None:
104
            iter_ = xml_dom.NodeElemIter(doc0.documentElement)
105
            util.skip(iter_, xml_dom.is_text) # skip metadata
106 317 aaronmk
            for child in iter_:
107
                root.appendChild(child)
108 452 aaronmk
                process_row(child)
109 309 aaronmk
        elif in_is_db:
110 130 aaronmk
            assert in_is_xpaths
111 126 aaronmk
112 117 aaronmk
            import db_xml
113
114 161 aaronmk
            in_root_xml = xpath.path2xml(in_root)
115 164 aaronmk
            for i, mapping in enumerate(mappings):
116
                in_, out = mapping
117
                if metadata_value(in_) == None:
118 168 aaronmk
                    mappings[i] = (xpath.path2xml(in_root+'/'+in_), out)
119 126 aaronmk
120 131 aaronmk
            in_db = sql.connect(in_db_config)
121 133 aaronmk
            in_pkeys = {}
122 297 aaronmk
            def get_value(in_, row):
123 167 aaronmk
                pkey, = row
124 297 aaronmk
                in_ = in_.cloneNode(True) # don't modify orig value!
125
                xml_dom.set_id(xpath.get(in_, in_root), pkey)
126
                value = sql.value_or_none(db_xml.get(in_db, in_, in_pkeys))
127
                if value != None: return str(value)
128
                else: return None
129 309 aaronmk
            process_rows(get_value, sql.rows(db_xml.get(in_db, in_root_xml,
130
                in_pkeys, limit)))
131 117 aaronmk
            in_db.close()
132 161 aaronmk
        elif in_is_xml:
133 297 aaronmk
            def get_value(in_, row):
134
                node = xpath.get(row, in_)
135
                if node != None: return xml_dom.value(node)
136
                else: return None
137
            row0 = xpath.get(doc0.documentElement, in_root)
138 309 aaronmk
            process_rows(get_value, xml_dom.NodeElemIter(row0.parentNode))
139 56 aaronmk
        else: # input is CSV
140 133 aaronmk
            map_ = dict(mappings)
141 59 aaronmk
            reader = csv.reader(sys.stdin)
142 84 aaronmk
            cols = reader.next()
143 162 aaronmk
            col_idxs = dict([(value, idx) for idx, value in enumerate(cols)])
144 164 aaronmk
            for i, mapping in enumerate(mappings):
145
                in_, out = mapping
146
                if metadata_value(in_) == None:
147
                    try: mappings[i] = (col_idxs[in_], out)
148
                    except KeyError: pass
149 162 aaronmk
150 297 aaronmk
            def get_value(in_, row):
151
                value = row[in_]
152
                if value != '': return value
153
                else: return None
154 309 aaronmk
            process_rows(get_value, reader)
155 53 aaronmk
156 512 aaronmk
    def process_inputs(root, process_row):
157
        for map_path in map_paths: process_input(root, process_row, map_path)
158
159 53 aaronmk
    # Output XML tree
160 512 aaronmk
    doc = xml_dom.create_doc()
161 316 aaronmk
    root = doc.documentElement
162 130 aaronmk
    if out_is_db:
163 53 aaronmk
        from psycopg2.extensions import ISOLATION_LEVEL_SERIALIZABLE
164
        import db_xml
165
166 133 aaronmk
        out_db = sql.connect(out_db_config)
167
        out_db.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)
168 310 aaronmk
        out_pkeys = {}
169 53 aaronmk
        try:
170 400 aaronmk
            if test: sql.empty_db(out_db)
171 53 aaronmk
            row_ct_ref = [0]
172 449 aaronmk
173 452 aaronmk
            def process_row(input_row):
174
                def on_error(e):
175
                    exc.add_msg(e, 'output row:\n'+str(root))
176
                    exc.add_msg(e, 'input row:\n'+str(input_row))
177
                    ex_tracker.track(e)
178
179 449 aaronmk
                xml_func.process(root, on_error)
180 442 aaronmk
                if not xml_dom.is_empty(root):
181
                    assert xml_dom.has_one_child(root)
182
                    try:
183
                        sql.with_savepoint(out_db, lambda: db_xml.put(out_db,
184 462 aaronmk
                            root.firstChild, out_pkeys, row_ct_ref, on_error))
185 442 aaronmk
                        if commit: out_db.commit()
186 449 aaronmk
                    except sql.DatabaseErrors, e: on_error(e)
187 316 aaronmk
                root.clear()
188 449 aaronmk
189 512 aaronmk
            process_inputs(root, process_row)
190 460 aaronmk
            sys.stderr.write('Inserted '+str(row_ct_ref[0])+
191
                ' new rows into database\n')
192 53 aaronmk
        finally:
193 133 aaronmk
            out_db.rollback()
194
            out_db.close()
195 299 aaronmk
    else: # output is XML
196 452 aaronmk
        def process_row(input_row): pass
197 512 aaronmk
        process_inputs(root, process_row)
198 316 aaronmk
        xml_func.process(root)
199
        doc.writexml(sys.stdout, **xml_dom.prettyxml_config)
200 53 aaronmk
201 133 aaronmk
try: main()
202 294 aaronmk
except Parser.SyntaxException, e: raise SystemExit(str(e))