Project

General

Profile

1 53 aaronmk
#!/usr/bin/env python
2
# Maps one datasource to another, using a map spreadsheet if needed
3
# For outputting an XML file to a PostgreSQL database, use the general format of
4
# http://vegbank.org/vegdocs/xml/vegbank_example_ver1.0.2.xml
5
6
import os.path
7
import sys
8 299 aaronmk
import xml.dom.minidom as minidom
9 53 aaronmk
10 266 aaronmk
sys.path.append(os.path.dirname(__file__)+"/../lib")
11 53 aaronmk
12 344 aaronmk
import exc
13 64 aaronmk
import opts
14 281 aaronmk
import Parser
15 131 aaronmk
import sql
16 310 aaronmk
import util
17 133 aaronmk
import xml_dom
18 86 aaronmk
import xml_func
19 53 aaronmk
20 84 aaronmk
def metadata_value(name):
21 164 aaronmk
    if type(name) == str and name.startswith(':'): return name[1:]
22 84 aaronmk
    else: return None
23
24 53 aaronmk
def main():
25 131 aaronmk
    env_names = []
26
    def usage_err():
27 400 aaronmk
        raise SystemExit('Usage: '+opts.env_usage(env_names, True)+' [commit=1]'
28
            ' [test=1] '+sys.argv[0]+' [map_path] [<input] [>output]')
29 146 aaronmk
    limit = opts.get_env_var('n', None, env_names)
30
    if limit != None: limit = int(limit)
31 400 aaronmk
    test = opts.env_flag('test')
32
    commit = not test and opts.env_flag('commit') # never commit in test mode
33 131 aaronmk
34 53 aaronmk
    # Get db config from env vars
35 131 aaronmk
    db_config_names = ['engine', 'host', 'user', 'password', 'database']
36 53 aaronmk
    def get_db_config(prefix):
37 64 aaronmk
        return opts.get_env_vars(db_config_names, prefix, env_names)
38 67 aaronmk
    in_db_config = get_db_config('in')
39
    out_db_config = get_db_config('out')
40 131 aaronmk
    in_is_db = 'engine' in in_db_config
41
    out_is_db = 'engine' in out_db_config
42 53 aaronmk
43
    # Parse args
44 73 aaronmk
    map_path = None
45 67 aaronmk
    try: _prog_name, map_path = sys.argv
46 53 aaronmk
    except ValueError:
47 338 aaronmk
        if in_is_db or not out_is_db: usage_err()
48 53 aaronmk
49 57 aaronmk
    # Load map header
50 130 aaronmk
    in_is_xpaths = True
51 318 aaronmk
    out_label = None
52 73 aaronmk
    if map_path != None:
53 56 aaronmk
        import copy
54
        import csv
55
56 53 aaronmk
        import xpath
57
58 133 aaronmk
        metadata = []
59 73 aaronmk
        mappings = []
60
        stream = open(map_path, 'rb')
61
        reader = csv.reader(stream)
62 161 aaronmk
        in_label, out_label = reader.next()[:2]
63 61 aaronmk
        def split_col_name(name):
64 72 aaronmk
            name, sep, root = name.partition(':')
65
            return name, sep != '', root
66 161 aaronmk
        in_label, in_is_xpaths, in_root = split_col_name(in_label)
67
        out_label, out_is_xpaths, out_root = split_col_name(out_label)
68 133 aaronmk
        assert out_is_xpaths # CSV output not supported yet
69 161 aaronmk
        has_types = out_root.startswith('/*s/') # outer elements are types
70 73 aaronmk
        for row in reader:
71
            in_, out = row[:2]
72
            if out != '':
73 164 aaronmk
                if out_is_xpaths: out = out_root+out
74
                mappings.append((in_, out))
75 73 aaronmk
        stream.close()
76 130 aaronmk
    in_is_xml = in_is_xpaths and not in_is_db
77 56 aaronmk
78 318 aaronmk
    if in_is_xml:
79
        doc0 = minidom.parse(sys.stdin)
80
        if out_label == None: out_label = doc0.documentElement.tagName
81 294 aaronmk
82 316 aaronmk
    def process_input(root, process_row):
83 309 aaronmk
        '''Inputs datasource to XML tree, mapping if needed'''
84
        def process_rows(get_value, rows):
85 297 aaronmk
            '''Processes input values
86
            @param get_value f(in_, row):str
87
            '''
88 314 aaronmk
            for i, row in enumerate(rows):
89 316 aaronmk
                if not (limit == None or i < limit): break
90
                row_id = str(i)
91
                for in_, out in mappings:
92
                    value = metadata_value(in_)
93
                    if value == None: value = get_value(in_, row)
94
                    if value != None:
95
                        xpath.put_obj(root, out, row_id, has_types, value)
96
                process_row()
97 297 aaronmk
98 310 aaronmk
        if map_path == None:
99
            iter_ = xml_dom.NodeElemIter(doc0.documentElement)
100
            util.skip(iter_, xml_dom.is_text) # skip metadata
101 317 aaronmk
            for child in iter_:
102
                root.appendChild(child)
103
                process_row()
104 309 aaronmk
        elif in_is_db:
105 130 aaronmk
            assert in_is_xpaths
106 126 aaronmk
107 117 aaronmk
            import db_xml
108
109 161 aaronmk
            in_root_xml = xpath.path2xml(in_root)
110 164 aaronmk
            for i, mapping in enumerate(mappings):
111
                in_, out = mapping
112
                if metadata_value(in_) == None:
113 168 aaronmk
                    mappings[i] = (xpath.path2xml(in_root+'/'+in_), out)
114 126 aaronmk
115 131 aaronmk
            in_db = sql.connect(in_db_config)
116 133 aaronmk
            in_pkeys = {}
117 297 aaronmk
            def get_value(in_, row):
118 167 aaronmk
                pkey, = row
119 297 aaronmk
                in_ = in_.cloneNode(True) # don't modify orig value!
120
                xml_dom.set_id(xpath.get(in_, in_root), pkey)
121
                value = sql.value_or_none(db_xml.get(in_db, in_, in_pkeys))
122
                if value != None: return str(value)
123
                else: return None
124 309 aaronmk
            process_rows(get_value, sql.rows(db_xml.get(in_db, in_root_xml,
125
                in_pkeys, limit)))
126 117 aaronmk
            in_db.close()
127 161 aaronmk
        elif in_is_xml:
128 297 aaronmk
            def get_value(in_, row):
129
                node = xpath.get(row, in_)
130
                if node != None: return xml_dom.value(node)
131
                else: return None
132
            row0 = xpath.get(doc0.documentElement, in_root)
133 309 aaronmk
            process_rows(get_value, xml_dom.NodeElemIter(row0.parentNode))
134 56 aaronmk
        else: # input is CSV
135 133 aaronmk
            map_ = dict(mappings)
136 59 aaronmk
            reader = csv.reader(sys.stdin)
137 84 aaronmk
            cols = reader.next()
138 162 aaronmk
            col_idxs = dict([(value, idx) for idx, value in enumerate(cols)])
139 164 aaronmk
            for i, mapping in enumerate(mappings):
140
                in_, out = mapping
141
                if metadata_value(in_) == None:
142
                    try: mappings[i] = (col_idxs[in_], out)
143
                    except KeyError: pass
144 162 aaronmk
145 297 aaronmk
            def get_value(in_, row):
146
                value = row[in_]
147
                if value != '': return value
148
                else: return None
149 309 aaronmk
            process_rows(get_value, reader)
150 53 aaronmk
151
    # Output XML tree
152 316 aaronmk
    doc = xml_dom.create_doc(out_label)
153
    root = doc.documentElement
154 130 aaronmk
    if out_is_db:
155 53 aaronmk
        from psycopg2.extensions import ISOLATION_LEVEL_SERIALIZABLE
156
        import db_xml
157
158 133 aaronmk
        out_db = sql.connect(out_db_config)
159
        out_db.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)
160 310 aaronmk
        out_pkeys = {}
161 53 aaronmk
        try:
162 400 aaronmk
            if test: sql.empty_db(out_db)
163 53 aaronmk
            row_ct_ref = [0]
164 316 aaronmk
            def process_row():
165
                try: xml_func.process(root)
166 344 aaronmk
                except xml_func.SyntaxException, e: exc.print_ex(e, False)
167 316 aaronmk
                else:
168
                    assert xml_dom.has_one_child(root)
169
                    child = root.firstChild
170
                    try:
171 400 aaronmk
                        sql.with_savepoint(out_db, lambda: db_xml.put(out_db,
172
                            child, False, row_ct_ref, out_pkeys))
173 327 aaronmk
                        if commit: out_db.commit()
174 344 aaronmk
                    except sql.DatabaseErrors, e: exc.print_ex(e)
175 316 aaronmk
                root.clear()
176
            process_input(root, process_row)
177 53 aaronmk
            print 'Inserted '+str(row_ct_ref[0])+' rows'
178
        finally:
179 133 aaronmk
            out_db.rollback()
180
            out_db.close()
181 299 aaronmk
    else: # output is XML
182 316 aaronmk
        def process_row(): pass
183
        process_input(root, process_row)
184
        xml_func.process(root)
185
        doc.writexml(sys.stdout, **xml_dom.prettyxml_config)
186 53 aaronmk
187 133 aaronmk
try: main()
188 294 aaronmk
except Parser.SyntaxException, e: raise SystemExit(str(e))