Project

General

Profile

1 53 aaronmk
#!/usr/bin/env python
2
# Maps one datasource to another, using a map spreadsheet if needed
3 986 aaronmk
# Exit status is the # of errors in the import, up to the maximum exit status
4 53 aaronmk
# For outputting an XML file to a PostgreSQL database, use the general format of
5
# http://vegbank.org/vegdocs/xml/vegbank_example_ver1.0.2.xml
6
7 1014 aaronmk
import csv
8 1714 aaronmk
import itertools
9 53 aaronmk
import os.path
10
import sys
11 299 aaronmk
import xml.dom.minidom as minidom
12 53 aaronmk
13 266 aaronmk
sys.path.append(os.path.dirname(__file__)+"/../lib")
14 53 aaronmk
15 1389 aaronmk
import csvs
16 344 aaronmk
import exc
17 1714 aaronmk
import iters
18 1705 aaronmk
import maps
19 64 aaronmk
import opts
20 1859 aaronmk
import parallel
21 281 aaronmk
import Parser
22 982 aaronmk
import profiling
23 131 aaronmk
import sql
24 1758 aaronmk
import streams
25 715 aaronmk
import strings
26 828 aaronmk
import term
27 310 aaronmk
import util
28 1014 aaronmk
import xpath
29 133 aaronmk
import xml_dom
30 86 aaronmk
import xml_func
31 1713 aaronmk
import xml_parse
32 53 aaronmk
33 1404 aaronmk
def get_with_prefix(map_, prefixes, key):
34 1484 aaronmk
    '''Gets all entries for the given key with any of the given prefixes'''
35
    values = []
36 1681 aaronmk
    for key_ in strings.with_prefixes(['']+prefixes, key): # also with no prefix
37
        try: value = map_[key_]
38 1484 aaronmk
        except KeyError, e: continue # keep going
39
        values.append(value)
40
41
    if values != []: return values
42
    else: raise e # re-raise last KeyError
43 1404 aaronmk
44 1018 aaronmk
def metadata_value(name): return None # this feature has been removed
45 84 aaronmk
46 1360 aaronmk
def cleanup(val):
47
    if val == None: return val
48 1374 aaronmk
    return util.none_if(strings.cleanup(strings.ustr(val)), u'', u'\\N')
49 1360 aaronmk
50 847 aaronmk
def main_():
51 131 aaronmk
    env_names = []
52
    def usage_err():
53 944 aaronmk
        raise SystemExit('Usage: '+opts.env_usage(env_names, True)+' '
54 1945 aaronmk
            +sys.argv[0]+' [map_path...] [<input] [>output]\n'
55 1946 aaronmk
            'Note: Row #s start with 1')
56 838 aaronmk
57 1570 aaronmk
    ## Get config from env vars
58 838 aaronmk
59 1570 aaronmk
    # Modes
60 946 aaronmk
    test = opts.env_flag('test', False, env_names)
61 947 aaronmk
    commit = opts.env_flag('commit', False, env_names) and not test
62 944 aaronmk
        # never commit in test mode
63 947 aaronmk
    redo = opts.env_flag('redo', test, env_names) and not commit
64
        # never redo in commit mode (manually run `make empty_db` instead)
65 1570 aaronmk
66
    # Ranges
67 1946 aaronmk
    start = util.cast(int, opts.get_env_var('start', 1, env_names)) # 1-based
68
    # Make start interally 0-based.
69
    # It's 1-based to the user to match up with the staging table row #s.
70
    start -= 1
71 1945 aaronmk
    if test: n_default = 1
72
    else: n_default = None
73
    n = util.cast(int, util.none_if(opts.get_env_var('n', n_default, env_names),
74
        u''))
75
    end = n
76 1570 aaronmk
    if end != None: end += start
77
78 1846 aaronmk
    # Optimization
79 1851 aaronmk
    if test: cpus_default = 0
80
    else: cpus_default = None
81 1856 aaronmk
    cpus = util.cast(int, util.none_if(opts.get_env_var('cpus', cpus_default,
82
        env_names), u''))
83 1846 aaronmk
84 1570 aaronmk
    # Debugging
85 946 aaronmk
    debug = opts.env_flag('debug', False, env_names)
86 859 aaronmk
    sql.run_raw_query.debug = debug
87 1574 aaronmk
    verbose = debug or opts.env_flag('verbose', not test, env_names)
88 944 aaronmk
    opts.get_env_var('profile_to', None, env_names) # add to env_names
89 131 aaronmk
90 1570 aaronmk
    # DB
91
    def get_db_config(prefix):
92 1926 aaronmk
        return opts.get_env_vars(sql.db_config_names, prefix, env_names)
93 1570 aaronmk
    in_db_config = get_db_config('in')
94
    out_db_config = get_db_config('out')
95
    in_is_db = 'engine' in in_db_config
96
    out_is_db = 'engine' in out_db_config
97
98
    ##
99
100 838 aaronmk
    # Logging
101 662 aaronmk
    def log(msg, on=verbose):
102 1901 aaronmk
        if on: sys.stderr.write(msg+'\n')
103
    if debug: log_debug = lambda msg: log(msg, debug)
104
    else: log_debug = sql.log_debug_none
105 662 aaronmk
106 53 aaronmk
    # Parse args
107 510 aaronmk
    map_paths = sys.argv[1:]
108 512 aaronmk
    if map_paths == []:
109
        if in_is_db or not out_is_db: usage_err()
110
        else: map_paths = [None]
111 53 aaronmk
112 646 aaronmk
    def connect_db(db_config):
113 1901 aaronmk
        log('Connecting to '+sql.db_config_str(db_config))
114
        return sql.connect(db_config, log_debug=log_debug)
115 646 aaronmk
116 1945 aaronmk
    if end != None: end_str = str(end-1) # end is one past the last #
117 1573 aaronmk
    else: end_str = 'end'
118 1901 aaronmk
    log('Processing input rows '+str(start)+'-'+end_str)
119 1573 aaronmk
120 1014 aaronmk
    ex_tracker = exc.ExPercentTracker(iter_text='row')
121
    profiler = profiling.ItersProfiler(start_now=True, iter_text='row')
122
123 1856 aaronmk
    # Parallel processing
124 1862 aaronmk
    pool = parallel.MultiProducerPool(cpus)
125 1901 aaronmk
    log('Using '+str(pool.process_ct)+' parallel CPUs')
126 1846 aaronmk
127 1014 aaronmk
    doc = xml_dom.create_doc()
128
    root = doc.documentElement
129 751 aaronmk
    out_is_xml_ref = [False]
130 1014 aaronmk
    in_label_ref = [None]
131
    def update_in_label():
132
        if in_label_ref[0] != None:
133
            xpath.get(root, '/_ignore/inLabel="'+in_label_ref[0]+'"', True)
134
    def prep_root():
135
        root.clear()
136
        update_in_label()
137
    prep_root()
138 751 aaronmk
139 838 aaronmk
    def process_input(root, row_ready, map_path):
140 512 aaronmk
        '''Inputs datasource to XML tree, mapping if needed'''
141
        # Load map header
142
        in_is_xpaths = True
143 751 aaronmk
        out_is_xpaths = True
144 512 aaronmk
        out_label = None
145
        if map_path != None:
146
            metadata = []
147
            mappings = []
148
            stream = open(map_path, 'rb')
149
            reader = csv.reader(stream)
150
            in_label, out_label = reader.next()[:2]
151 1402 aaronmk
152 512 aaronmk
            def split_col_name(name):
153 1402 aaronmk
                label, sep, root = name.partition(':')
154
                label, sep2, prefixes_str = label.partition('[')
155
                prefixes_str = strings.remove_suffix(']', prefixes_str)
156
                prefixes = strings.split(',', prefixes_str)
157
                return label, sep != '', root, prefixes
158 1198 aaronmk
                    # extract datasrc from "datasrc[data_format]"
159 1402 aaronmk
160 1705 aaronmk
            in_label, in_root, prefixes = maps.col_info(in_label)
161
            in_is_xpaths = in_root != None
162 1014 aaronmk
            in_label_ref[0] = in_label
163
            update_in_label()
164 1705 aaronmk
            out_label, out_root = maps.col_info(out_label)[:2]
165
            out_is_xpaths = out_root != None
166 1841 aaronmk
            if out_is_xpaths: has_types = out_root.find('/*s/') >= 0
167 1705 aaronmk
                # outer elements are types
168 1402 aaronmk
169 512 aaronmk
            for row in reader:
170
                in_, out = row[:2]
171
                if out != '':
172
                    if out_is_xpaths: out = xpath.parse(out_root+out)
173
                    mappings.append((in_, out))
174 1402 aaronmk
175 512 aaronmk
            stream.close()
176
177
            root.ownerDocument.documentElement.tagName = out_label
178
        in_is_xml = in_is_xpaths and not in_is_db
179 751 aaronmk
        out_is_xml_ref[0] = out_is_xpaths and not out_is_db
180 56 aaronmk
181 1897 aaronmk
        def process_rows(process_row, rows, rows_start=0):
182
            '''Processes input rows
183 838 aaronmk
            @param process_row(in_row, i)
184 1945 aaronmk
            @rows_start The (0-based) row # of the first row in rows. Set this
185
                only if the pre-start rows have already been skipped.
186 297 aaronmk
            '''
187 1897 aaronmk
            rows = iter(rows)
188
189
            if end != None: row_nums = xrange(rows_start, end)
190
            else: row_nums = itertools.count(rows_start)
191
            for i in row_nums:
192 1718 aaronmk
                try: row = rows.next()
193
                except StopIteration: break # no more rows
194
                if i < start: continue # not at start row yet
195
196 838 aaronmk
                process_row(row, i)
197
                row_ready(i, row)
198 1718 aaronmk
            row_ct = i-start
199 982 aaronmk
            return row_ct
200 838 aaronmk
201 1898 aaronmk
        def map_rows(get_value, rows, **kw_args):
202 838 aaronmk
            '''Maps input rows
203
            @param get_value(in_, row):str
204
            '''
205
            def process_row(row, i):
206 316 aaronmk
                row_id = str(i)
207
                for in_, out in mappings:
208
                    value = metadata_value(in_)
209 662 aaronmk
                    if value == None:
210 1901 aaronmk
                        log_debug('Getting '+str(in_))
211 1360 aaronmk
                        value = cleanup(get_value(in_, row))
212 1348 aaronmk
                    if value != None:
213 1901 aaronmk
                        log_debug('Putting '+str(out))
214 1360 aaronmk
                        xpath.put_obj(root, out, row_id, has_types, value)
215 1898 aaronmk
            return process_rows(process_row, rows, **kw_args)
216 297 aaronmk
217 1898 aaronmk
        def map_table(col_names, rows, **kw_args):
218 1416 aaronmk
            col_names_ct = len(col_names)
219 1403 aaronmk
            col_idxs = util.list_flip(col_names)
220
221
            i = 0
222
            while i < len(mappings): # mappings len changes in loop
223
                in_, out = mappings[i]
224
                if metadata_value(in_) == None:
225 1404 aaronmk
                    try: mappings[i] = (
226
                        get_with_prefix(col_idxs, prefixes, in_), out)
227
                    except KeyError:
228
                        del mappings[i]
229
                        continue # keep i the same
230 1403 aaronmk
                i += 1
231
232
            def get_value(in_, row):
233 1484 aaronmk
                return util.coalesce(*util.list_subset(row.list, in_))
234 1416 aaronmk
            def wrap_row(row):
235
                return util.ListDict(util.list_as_length(row, col_names_ct),
236
                    col_names, col_idxs) # handle CSV rows of different lengths
237 1403 aaronmk
238 1898 aaronmk
            return map_rows(get_value, util.WrapIter(wrap_row, rows), **kw_args)
239 1403 aaronmk
240 1758 aaronmk
        stdin = streams.LineCountStream(sys.stdin)
241
        def on_error(e):
242
            exc.add_msg(e, term.emph('input line #:')+' '+str(stdin.line_num))
243
            ex_tracker.track(e)
244
245 1700 aaronmk
        if in_is_db:
246 130 aaronmk
            assert in_is_xpaths
247 126 aaronmk
248 1136 aaronmk
            in_db = connect_db(in_db_config)
249 1945 aaronmk
            cur = sql.select(in_db, table=in_root, limit=n, start=start)
250 1898 aaronmk
            row_ct = map_table(list(sql.col_names(cur)), sql.rows(cur),
251
                rows_start=start) # rows_start: pre-start rows have been skipped
252 1136 aaronmk
253 1849 aaronmk
            in_db.db.close()
254 161 aaronmk
        elif in_is_xml:
255 1715 aaronmk
            def get_rows(doc2rows):
256 1758 aaronmk
                return iters.flatten(itertools.imap(doc2rows,
257
                    xml_parse.docs_iter(stdin, on_error)))
258 1715 aaronmk
259 1714 aaronmk
            if map_path == None:
260 1715 aaronmk
                def doc2rows(in_xml_root):
261 1701 aaronmk
                    iter_ = xml_dom.NodeElemIter(in_xml_root)
262
                    util.skip(iter_, xml_dom.is_text) # skip metadata
263 1715 aaronmk
                    return iter_
264
265
                row_ct = process_rows(lambda row, i: root.appendChild(row),
266
                    get_rows(doc2rows))
267 1714 aaronmk
            else:
268 1715 aaronmk
                def doc2rows(in_xml_root):
269 1701 aaronmk
                    rows = xpath.get(in_xml_root, in_root, limit=end)
270 1714 aaronmk
                    if rows == []: raise SystemExit('Map error: Root "'
271
                        +in_root+'" not found in input')
272
                    return rows
273
274
                def get_value(in_, row):
275
                    in_ = './{'+(','.join(strings.with_prefixes(
276
                        ['']+prefixes, in_)))+'}' # also with no prefix
277
                    nodes = xpath.get(row, in_, allow_rooted=False)
278
                    if nodes != []: return xml_dom.value(nodes[0])
279
                    else: return None
280
281 1715 aaronmk
                row_ct = map_rows(get_value, get_rows(doc2rows))
282 56 aaronmk
        else: # input is CSV
283 133 aaronmk
            map_ = dict(mappings)
284 1389 aaronmk
            reader, col_names = csvs.reader_and_header(sys.stdin)
285 1403 aaronmk
            row_ct = map_table(col_names, reader)
286 982 aaronmk
287
        return row_ct
288 53 aaronmk
289 838 aaronmk
    def process_inputs(root, row_ready):
290 982 aaronmk
        row_ct = 0
291
        for map_path in map_paths:
292
            row_ct += process_input(root, row_ready, map_path)
293
        return row_ct
294 512 aaronmk
295 1886 aaronmk
    pool.share_vars(locals())
296 130 aaronmk
    if out_is_db:
297 53 aaronmk
        import db_xml
298
299 646 aaronmk
        out_db = connect_db(out_db_config)
300 53 aaronmk
        try:
301 947 aaronmk
            if redo: sql.empty_db(out_db)
302 982 aaronmk
            row_ins_ct_ref = [0]
303 1886 aaronmk
            pool.share_vars(locals())
304 449 aaronmk
305 838 aaronmk
            def row_ready(row_num, input_row):
306 452 aaronmk
                def on_error(e):
307 1946 aaronmk
                    exc.add_msg(e, term.emph('row #:')+' '+str(row_num+1))
308
                        # row # is interally 0-based, but 1-based to the user
309 828 aaronmk
                    exc.add_msg(e, term.emph('input row:')+'\n'+str(input_row))
310
                    exc.add_msg(e, term.emph('output row:')+'\n'+str(root))
311 1617 aaronmk
                    ex_tracker.track(e, row_num)
312 1886 aaronmk
                pool.share_vars(locals())
313 452 aaronmk
314 449 aaronmk
                xml_func.process(root, on_error)
315 442 aaronmk
                if not xml_dom.is_empty(root):
316
                    assert xml_dom.has_one_child(root)
317
                    try:
318 982 aaronmk
                        sql.with_savepoint(out_db,
319
                            lambda: db_xml.put(out_db, root.firstChild,
320 1850 aaronmk
                                row_ins_ct_ref, on_error))
321 1849 aaronmk
                        if commit: out_db.db.commit()
322 449 aaronmk
                    except sql.DatabaseErrors, e: on_error(e)
323 1010 aaronmk
                prep_root()
324 449 aaronmk
325 982 aaronmk
            row_ct = process_inputs(root, row_ready)
326
            sys.stdout.write('Inserted '+str(row_ins_ct_ref[0])+
327 460 aaronmk
                ' new rows into database\n')
328 1868 aaronmk
329
            # Consume asynchronous tasks
330
            pool.main_loop()
331 53 aaronmk
        finally:
332 1849 aaronmk
            out_db.db.rollback()
333
            out_db.db.close()
334 751 aaronmk
    else:
335 759 aaronmk
        def on_error(e): ex_tracker.track(e)
336 838 aaronmk
        def row_ready(row_num, input_row): pass
337 982 aaronmk
        row_ct = process_inputs(root, row_ready)
338 759 aaronmk
        xml_func.process(root, on_error)
339 751 aaronmk
        if out_is_xml_ref[0]:
340
            doc.writexml(sys.stdout, **xml_dom.prettyxml_config)
341
        else: # output is CSV
342
            raise NotImplementedError('CSV output not supported yet')
343 985 aaronmk
344 1868 aaronmk
    # Consume any asynchronous tasks not already consumed above
345 1862 aaronmk
    pool.main_loop()
346
347 982 aaronmk
    profiler.stop(row_ct)
348
    ex_tracker.add_iters(row_ct)
349 990 aaronmk
    if verbose:
350
        sys.stderr.write('Processed '+str(row_ct)+' input rows\n')
351
        sys.stderr.write(profiler.msg()+'\n')
352
        sys.stderr.write(ex_tracker.msg()+'\n')
353 985 aaronmk
    ex_tracker.exit()
354 53 aaronmk
355 847 aaronmk
def main():
356
    try: main_()
357 1719 aaronmk
    except Parser.SyntaxError, e: raise SystemExit(str(e))
358 847 aaronmk
359 846 aaronmk
if __name__ == '__main__':
360 847 aaronmk
    profile_to = opts.get_env_var('profile_to', None)
361
    if profile_to != None:
362
        import cProfile
363 1584 aaronmk
        sys.stderr.write('Profiling to '+profile_to+'\n')
364 847 aaronmk
        cProfile.run(main.func_code, profile_to)
365
    else: main()