Project

General

Profile

1 53 aaronmk
#!/usr/bin/env python
2
# Maps one datasource to another, using a map spreadsheet if needed
3 986 aaronmk
# Exit status is the # of errors in the import, up to the maximum exit status
4 53 aaronmk
# For outputting an XML file to a PostgreSQL database, use the general format of
5
# http://vegbank.org/vegdocs/xml/vegbank_example_ver1.0.2.xml
6
7 1014 aaronmk
import csv
8 1714 aaronmk
import itertools
9 53 aaronmk
import os.path
10
import sys
11 299 aaronmk
import xml.dom.minidom as minidom
12 53 aaronmk
13 266 aaronmk
sys.path.append(os.path.dirname(__file__)+"/../lib")
14 53 aaronmk
15 1389 aaronmk
import csvs
16 344 aaronmk
import exc
17 1714 aaronmk
import iters
18 1705 aaronmk
import maps
19 64 aaronmk
import opts
20 281 aaronmk
import Parser
21 982 aaronmk
import profiling
22 131 aaronmk
import sql
23 1758 aaronmk
import streams
24 715 aaronmk
import strings
25 828 aaronmk
import term
26 310 aaronmk
import util
27 1014 aaronmk
import xpath
28 133 aaronmk
import xml_dom
29 86 aaronmk
import xml_func
30 1713 aaronmk
import xml_parse
31 53 aaronmk
32 1404 aaronmk
def get_with_prefix(map_, prefixes, key):
33 1484 aaronmk
    '''Gets all entries for the given key with any of the given prefixes'''
34
    values = []
35 1681 aaronmk
    for key_ in strings.with_prefixes(['']+prefixes, key): # also with no prefix
36
        try: value = map_[key_]
37 1484 aaronmk
        except KeyError, e: continue # keep going
38
        values.append(value)
39
40
    if values != []: return values
41
    else: raise e # re-raise last KeyError
42 1404 aaronmk
43 1018 aaronmk
def metadata_value(name): return None # this feature has been removed
44 84 aaronmk
45 1360 aaronmk
def cleanup(val):
46
    if val == None: return val
47 1374 aaronmk
    return util.none_if(strings.cleanup(strings.ustr(val)), u'', u'\\N')
48 1360 aaronmk
49 847 aaronmk
def main_():
50 131 aaronmk
    env_names = []
51
    def usage_err():
52 944 aaronmk
        raise SystemExit('Usage: '+opts.env_usage(env_names, True)+' '
53 847 aaronmk
            +sys.argv[0]+' [map_path...] [<input] [>output]')
54 838 aaronmk
55 1570 aaronmk
    ## Get config from env vars
56 838 aaronmk
57 1570 aaronmk
    # Modes
58 946 aaronmk
    test = opts.env_flag('test', False, env_names)
59 947 aaronmk
    commit = opts.env_flag('commit', False, env_names) and not test
60 944 aaronmk
        # never commit in test mode
61 947 aaronmk
    redo = opts.env_flag('redo', test, env_names) and not commit
62
        # never redo in commit mode (manually run `make empty_db` instead)
63 1570 aaronmk
64
    # Ranges
65
    start = util.cast(int, opts.get_env_var('start', '0', env_names))
66 1572 aaronmk
    if test: end_default = 1
67
    else: end_default = None
68
    end = util.cast(int, util.none_if(
69
        opts.get_env_var('n', end_default, env_names), u''))
70 1570 aaronmk
    if end != None: end += start
71
72 1846 aaronmk
    # Optimization
73
    cpus = util.cast(int, opts.get_env_var('cpus', None, env_names))
74
75 1570 aaronmk
    # Debugging
76 946 aaronmk
    debug = opts.env_flag('debug', False, env_names)
77 859 aaronmk
    sql.run_raw_query.debug = debug
78 1574 aaronmk
    verbose = debug or opts.env_flag('verbose', not test, env_names)
79 944 aaronmk
    opts.get_env_var('profile_to', None, env_names) # add to env_names
80 131 aaronmk
81 1570 aaronmk
    # DB
82
    db_config_names = ['engine', 'host', 'user', 'password', 'database']
83
    def get_db_config(prefix):
84
        return opts.get_env_vars(db_config_names, prefix, env_names)
85
    in_db_config = get_db_config('in')
86
    out_db_config = get_db_config('out')
87
    in_is_db = 'engine' in in_db_config
88
    out_is_db = 'engine' in out_db_config
89
90
    ##
91
92 838 aaronmk
    # Logging
93 662 aaronmk
    def log(msg, on=verbose):
94
        if on: sys.stderr.write(msg)
95 859 aaronmk
    def log_start(action, on=verbose): log(action+'...\n', on)
96 662 aaronmk
97 53 aaronmk
    # Parse args
98 510 aaronmk
    map_paths = sys.argv[1:]
99 512 aaronmk
    if map_paths == []:
100
        if in_is_db or not out_is_db: usage_err()
101
        else: map_paths = [None]
102 53 aaronmk
103 646 aaronmk
    def connect_db(db_config):
104 662 aaronmk
        log_start('Connecting to '+sql.db_config_str(db_config))
105 1014 aaronmk
        return sql.connect(db_config)
106 646 aaronmk
107 1573 aaronmk
    if end != None: end_str = str(end-1)
108
    else: end_str = 'end'
109
    log_start('Processing input rows '+str(start)+'-'+end_str)
110
111 1014 aaronmk
    ex_tracker = exc.ExPercentTracker(iter_text='row')
112
    profiler = profiling.ItersProfiler(start_now=True, iter_text='row')
113
114 1846 aaronmk
    try: import pp
115
    except ImportError: return str_
116
    else:
117
        if cpus != None: args = [cpus]
118
        else: args = []
119
        job_server = pp.Server(*args)
120
        log_start('Using '+str(job_server.get_ncpus())+' CPUs')
121
122 1014 aaronmk
    doc = xml_dom.create_doc()
123
    root = doc.documentElement
124 751 aaronmk
    out_is_xml_ref = [False]
125 1014 aaronmk
    in_label_ref = [None]
126
    def update_in_label():
127
        if in_label_ref[0] != None:
128
            xpath.get(root, '/_ignore/inLabel="'+in_label_ref[0]+'"', True)
129
    def prep_root():
130
        root.clear()
131
        update_in_label()
132
    prep_root()
133 751 aaronmk
134 838 aaronmk
    def process_input(root, row_ready, map_path):
135 512 aaronmk
        '''Inputs datasource to XML tree, mapping if needed'''
136
        # Load map header
137
        in_is_xpaths = True
138 751 aaronmk
        out_is_xpaths = True
139 512 aaronmk
        out_label = None
140
        if map_path != None:
141
            metadata = []
142
            mappings = []
143
            stream = open(map_path, 'rb')
144
            reader = csv.reader(stream)
145
            in_label, out_label = reader.next()[:2]
146 1402 aaronmk
147 512 aaronmk
            def split_col_name(name):
148 1402 aaronmk
                label, sep, root = name.partition(':')
149
                label, sep2, prefixes_str = label.partition('[')
150
                prefixes_str = strings.remove_suffix(']', prefixes_str)
151
                prefixes = strings.split(',', prefixes_str)
152
                return label, sep != '', root, prefixes
153 1198 aaronmk
                    # extract datasrc from "datasrc[data_format]"
154 1402 aaronmk
155 1705 aaronmk
            in_label, in_root, prefixes = maps.col_info(in_label)
156
            in_is_xpaths = in_root != None
157 1014 aaronmk
            in_label_ref[0] = in_label
158
            update_in_label()
159 1705 aaronmk
            out_label, out_root = maps.col_info(out_label)[:2]
160
            out_is_xpaths = out_root != None
161 1841 aaronmk
            if out_is_xpaths: has_types = out_root.find('/*s/') >= 0
162 1705 aaronmk
                # outer elements are types
163 1402 aaronmk
164 512 aaronmk
            for row in reader:
165
                in_, out = row[:2]
166
                if out != '':
167
                    if out_is_xpaths: out = xpath.parse(out_root+out)
168
                    mappings.append((in_, out))
169 1402 aaronmk
170 512 aaronmk
            stream.close()
171
172
            root.ownerDocument.documentElement.tagName = out_label
173
        in_is_xml = in_is_xpaths and not in_is_db
174 751 aaronmk
        out_is_xml_ref[0] = out_is_xpaths and not out_is_db
175 56 aaronmk
176 1148 aaronmk
        def process_rows(process_row, rows):
177 838 aaronmk
            '''Processes input rows
178
            @param process_row(in_row, i)
179 297 aaronmk
            '''
180 1718 aaronmk
            i = 0
181
            while end == None or i < end:
182
                try: row = rows.next()
183
                except StopIteration: break # no more rows
184
                if i < start: continue # not at start row yet
185
186 838 aaronmk
                process_row(row, i)
187
                row_ready(i, row)
188 1718 aaronmk
                i += 1
189
            row_ct = i-start
190 982 aaronmk
            return row_ct
191 838 aaronmk
192
        def map_rows(get_value, rows):
193
            '''Maps input rows
194
            @param get_value(in_, row):str
195
            '''
196
            def process_row(row, i):
197 316 aaronmk
                row_id = str(i)
198
                for in_, out in mappings:
199
                    value = metadata_value(in_)
200 662 aaronmk
                    if value == None:
201 857 aaronmk
                        log_start('Getting '+str(in_), debug)
202 1360 aaronmk
                        value = cleanup(get_value(in_, row))
203 1348 aaronmk
                    if value != None:
204
                        log_start('Putting '+str(out), debug)
205 1360 aaronmk
                        xpath.put_obj(root, out, row_id, has_types, value)
206 982 aaronmk
            return process_rows(process_row, rows)
207 297 aaronmk
208 1403 aaronmk
        def map_table(col_names, rows):
209 1416 aaronmk
            col_names_ct = len(col_names)
210 1403 aaronmk
            col_idxs = util.list_flip(col_names)
211
212
            i = 0
213
            while i < len(mappings): # mappings len changes in loop
214
                in_, out = mappings[i]
215
                if metadata_value(in_) == None:
216 1404 aaronmk
                    try: mappings[i] = (
217
                        get_with_prefix(col_idxs, prefixes, in_), out)
218
                    except KeyError:
219
                        del mappings[i]
220
                        continue # keep i the same
221 1403 aaronmk
                i += 1
222
223
            def get_value(in_, row):
224 1484 aaronmk
                return util.coalesce(*util.list_subset(row.list, in_))
225 1416 aaronmk
            def wrap_row(row):
226
                return util.ListDict(util.list_as_length(row, col_names_ct),
227
                    col_names, col_idxs) # handle CSV rows of different lengths
228 1403 aaronmk
229
            return map_rows(get_value, util.WrapIter(wrap_row, rows))
230
231 1758 aaronmk
        stdin = streams.LineCountStream(sys.stdin)
232
        def on_error(e):
233
            exc.add_msg(e, term.emph('input line #:')+' '+str(stdin.line_num))
234
            ex_tracker.track(e)
235
236 1700 aaronmk
        if in_is_db:
237 130 aaronmk
            assert in_is_xpaths
238 126 aaronmk
239 1136 aaronmk
            in_db = connect_db(in_db_config)
240
            in_pkeys = {}
241
            cur = sql.select(in_db, table=in_root, fields=None, conds=None,
242
                limit=end, start=0)
243 1403 aaronmk
            row_ct = map_table(list(sql.col_names(cur)), sql.rows(cur))
244 1136 aaronmk
245 117 aaronmk
            in_db.close()
246 161 aaronmk
        elif in_is_xml:
247 1715 aaronmk
            def get_rows(doc2rows):
248 1758 aaronmk
                return iters.flatten(itertools.imap(doc2rows,
249
                    xml_parse.docs_iter(stdin, on_error)))
250 1715 aaronmk
251 1714 aaronmk
            if map_path == None:
252 1715 aaronmk
                def doc2rows(in_xml_root):
253 1701 aaronmk
                    iter_ = xml_dom.NodeElemIter(in_xml_root)
254
                    util.skip(iter_, xml_dom.is_text) # skip metadata
255 1715 aaronmk
                    return iter_
256
257
                row_ct = process_rows(lambda row, i: root.appendChild(row),
258
                    get_rows(doc2rows))
259 1714 aaronmk
            else:
260 1715 aaronmk
                def doc2rows(in_xml_root):
261 1701 aaronmk
                    rows = xpath.get(in_xml_root, in_root, limit=end)
262 1714 aaronmk
                    if rows == []: raise SystemExit('Map error: Root "'
263
                        +in_root+'" not found in input')
264
                    return rows
265
266
                def get_value(in_, row):
267
                    in_ = './{'+(','.join(strings.with_prefixes(
268
                        ['']+prefixes, in_)))+'}' # also with no prefix
269
                    nodes = xpath.get(row, in_, allow_rooted=False)
270
                    if nodes != []: return xml_dom.value(nodes[0])
271
                    else: return None
272
273 1715 aaronmk
                row_ct = map_rows(get_value, get_rows(doc2rows))
274 56 aaronmk
        else: # input is CSV
275 133 aaronmk
            map_ = dict(mappings)
276 1389 aaronmk
            reader, col_names = csvs.reader_and_header(sys.stdin)
277 1403 aaronmk
            row_ct = map_table(col_names, reader)
278 982 aaronmk
279
        return row_ct
280 53 aaronmk
281 838 aaronmk
    def process_inputs(root, row_ready):
282 982 aaronmk
        row_ct = 0
283
        for map_path in map_paths:
284
            row_ct += process_input(root, row_ready, map_path)
285
        return row_ct
286 512 aaronmk
287 130 aaronmk
    if out_is_db:
288 53 aaronmk
        import db_xml
289
290 646 aaronmk
        out_db = connect_db(out_db_config)
291 310 aaronmk
        out_pkeys = {}
292 53 aaronmk
        try:
293 947 aaronmk
            if redo: sql.empty_db(out_db)
294 982 aaronmk
            row_ins_ct_ref = [0]
295 449 aaronmk
296 838 aaronmk
            def row_ready(row_num, input_row):
297 452 aaronmk
                def on_error(e):
298 835 aaronmk
                    exc.add_msg(e, term.emph('row #:')+' '+str(row_num))
299 828 aaronmk
                    exc.add_msg(e, term.emph('input row:')+'\n'+str(input_row))
300
                    exc.add_msg(e, term.emph('output row:')+'\n'+str(root))
301 1617 aaronmk
                    ex_tracker.track(e, row_num)
302 452 aaronmk
303 449 aaronmk
                xml_func.process(root, on_error)
304 442 aaronmk
                if not xml_dom.is_empty(root):
305
                    assert xml_dom.has_one_child(root)
306
                    try:
307 982 aaronmk
                        sql.with_savepoint(out_db,
308
                            lambda: db_xml.put(out_db, root.firstChild,
309
                                out_pkeys, row_ins_ct_ref, on_error))
310 442 aaronmk
                        if commit: out_db.commit()
311 449 aaronmk
                    except sql.DatabaseErrors, e: on_error(e)
312 1010 aaronmk
                prep_root()
313 449 aaronmk
314 982 aaronmk
            row_ct = process_inputs(root, row_ready)
315
            sys.stdout.write('Inserted '+str(row_ins_ct_ref[0])+
316 460 aaronmk
                ' new rows into database\n')
317 53 aaronmk
        finally:
318 133 aaronmk
            out_db.rollback()
319
            out_db.close()
320 751 aaronmk
    else:
321 759 aaronmk
        def on_error(e): ex_tracker.track(e)
322 838 aaronmk
        def row_ready(row_num, input_row): pass
323 982 aaronmk
        row_ct = process_inputs(root, row_ready)
324 759 aaronmk
        xml_func.process(root, on_error)
325 751 aaronmk
        if out_is_xml_ref[0]:
326
            doc.writexml(sys.stdout, **xml_dom.prettyxml_config)
327
        else: # output is CSV
328
            raise NotImplementedError('CSV output not supported yet')
329 985 aaronmk
330 982 aaronmk
    profiler.stop(row_ct)
331
    ex_tracker.add_iters(row_ct)
332 990 aaronmk
    if verbose:
333
        sys.stderr.write('Processed '+str(row_ct)+' input rows\n')
334
        sys.stderr.write(profiler.msg()+'\n')
335
        sys.stderr.write(ex_tracker.msg()+'\n')
336 985 aaronmk
    ex_tracker.exit()
337 53 aaronmk
338 847 aaronmk
def main():
339
    try: main_()
340 1719 aaronmk
    except Parser.SyntaxError, e: raise SystemExit(str(e))
341 847 aaronmk
342 846 aaronmk
if __name__ == '__main__':
343 847 aaronmk
    profile_to = opts.get_env_var('profile_to', None)
344
    if profile_to != None:
345
        import cProfile
346 1584 aaronmk
        sys.stderr.write('Profiling to '+profile_to+'\n')
347 847 aaronmk
        cProfile.run(main.func_code, profile_to)
348
    else: main()