Project

General

Profile

1 53 aaronmk
#!/usr/bin/env python
2
# Maps one datasource to another, using a map spreadsheet if needed
3 986 aaronmk
# Exit status is the # of errors in the import, up to the maximum exit status
4 53 aaronmk
# For outputting an XML file to a PostgreSQL database, use the general format of
5
# http://vegbank.org/vegdocs/xml/vegbank_example_ver1.0.2.xml
6
7 2550 aaronmk
import copy
8 1014 aaronmk
import csv
9 1714 aaronmk
import itertools
10 53 aaronmk
import os.path
11
import sys
12 299 aaronmk
import xml.dom.minidom as minidom
13 53 aaronmk
14 266 aaronmk
sys.path.append(os.path.dirname(__file__)+"/../lib")
15 53 aaronmk
16 1389 aaronmk
import csvs
17 2040 aaronmk
import db_xml
18 344 aaronmk
import exc
19 1714 aaronmk
import iters
20 1705 aaronmk
import maps
21 64 aaronmk
import opts
22 2035 aaronmk
import parallelproc
23 281 aaronmk
import Parser
24 982 aaronmk
import profiling
25 131 aaronmk
import sql
26 2288 aaronmk
import sql_gen
27 3103 aaronmk
import sql_io
28 1758 aaronmk
import streams
29 715 aaronmk
import strings
30 828 aaronmk
import term
31 310 aaronmk
import util
32 1014 aaronmk
import xpath
33 133 aaronmk
import xml_dom
34 86 aaronmk
import xml_func
35 1713 aaronmk
import xml_parse
36 53 aaronmk
37 1404 aaronmk
def get_with_prefix(map_, prefixes, key):
38 2040 aaronmk
    '''Gets all entries for the given key with any of the given prefixes
39
    @return tuple(found_key, found_value)
40
    '''
41 1484 aaronmk
    values = []
42 1681 aaronmk
    for key_ in strings.with_prefixes(['']+prefixes, key): # also with no prefix
43
        try: value = map_[key_]
44 1484 aaronmk
        except KeyError, e: continue # keep going
45 2040 aaronmk
        values.append((key_, value))
46 1484 aaronmk
47
    if values != []: return values
48
    else: raise e # re-raise last KeyError
49 1404 aaronmk
50 1018 aaronmk
def metadata_value(name): return None # this feature has been removed
51 84 aaronmk
52 1360 aaronmk
def cleanup(val):
53
    if val == None: return val
54 1374 aaronmk
    return util.none_if(strings.cleanup(strings.ustr(val)), u'', u'\\N')
55 1360 aaronmk
56 847 aaronmk
def main_():
57 131 aaronmk
    env_names = []
58
    def usage_err():
59 944 aaronmk
        raise SystemExit('Usage: '+opts.env_usage(env_names, True)+' '
60 1945 aaronmk
            +sys.argv[0]+' [map_path...] [<input] [>output]\n'
61 1946 aaronmk
            'Note: Row #s start with 1')
62 838 aaronmk
63 1570 aaronmk
    ## Get config from env vars
64 838 aaronmk
65 1570 aaronmk
    # Modes
66 946 aaronmk
    test = opts.env_flag('test', False, env_names)
67 947 aaronmk
    commit = opts.env_flag('commit', False, env_names) and not test
68 944 aaronmk
        # never commit in test mode
69 947 aaronmk
    redo = opts.env_flag('redo', test, env_names) and not commit
70 2977 aaronmk
        # never redo in commit mode (run `make schemas/reinstall` instead)
71 1570 aaronmk
72
    # Ranges
73 1946 aaronmk
    start = util.cast(int, opts.get_env_var('start', 1, env_names)) # 1-based
74
    # Make start interally 0-based.
75
    # It's 1-based to the user to match up with the staging table row #s.
76
    start -= 1
77 1945 aaronmk
    if test: n_default = 1
78
    else: n_default = None
79
    n = util.cast(int, util.none_if(opts.get_env_var('n', n_default, env_names),
80
        u''))
81
    end = n
82 1570 aaronmk
    if end != None: end += start
83
84
    # Debugging
85 3132 aaronmk
    verbosity = util.cast(float, opts.get_env_var('verbosity', None, env_names))
86 944 aaronmk
    opts.get_env_var('profile_to', None, env_names) # add to env_names
87 131 aaronmk
88 1570 aaronmk
    # DB
89
    def get_db_config(prefix):
90 1926 aaronmk
        return opts.get_env_vars(sql.db_config_names, prefix, env_names)
91 1570 aaronmk
    in_db_config = get_db_config('in')
92
    out_db_config = get_db_config('out')
93
    in_is_db = 'engine' in in_db_config
94
    out_is_db = 'engine' in out_db_config
95 1982 aaronmk
    in_schema = opts.get_env_var('in_schema', None, env_names)
96
    in_table = opts.get_env_var('in_table', None, env_names)
97 3339 aaronmk
    if in_schema != None:
98
        for config in [in_db_config, out_db_config]:
99
            config['schemas'] += ','+in_schema
100 1570 aaronmk
101 1989 aaronmk
    # Optimization
102 2050 aaronmk
    cache_sql = opts.env_flag('cache_sql', True, env_names)
103 1989 aaronmk
    by_col = in_db_config == out_db_config and opts.env_flag('by_col', False,
104
        env_names) # by-column optimization only applies if mapping to same DB
105
    if test: cpus_default = 0
106 2044 aaronmk
    else: cpus_default = 0 # or None to use parallel processing by default
107 1989 aaronmk
    cpus = util.cast(int, util.none_if(opts.get_env_var('cpus', cpus_default,
108
        env_names), u''))
109
110 3132 aaronmk
    # Set default verbosity. Must happen after by_col is set.
111
    if verbosity == None:
112
        if test: verbosity = 0.5 # automated tests should not be verbose
113
        elif by_col: verbosity = 3 # show all queries to assist profiling
114
        else: verbosity = 1.1 # just show row progress
115
116 1570 aaronmk
    ##
117
118 838 aaronmk
    # Logging
119 2457 aaronmk
    verbose_errors = test and verbosity > 0
120 3243 aaronmk
    debug = verbosity >= 1.5
121 2439 aaronmk
    def log(msg, level=1):
122
        '''Higher level -> more verbose'''
123 2452 aaronmk
        if level <= verbosity:
124 2490 aaronmk
            if verbosity <= 2:
125
                if level == 1.5: msg = '# '+msg # msg is Redmine list item
126
                elif msg.startswith('DB query:'): # remove extra debug info
127
                    first_line, nl, msg = msg.partition('\n')
128
            elif level > 1: msg = '['+str(level)+'] '+msg # include level in msg
129
130 2883 aaronmk
            sys.stderr.write(strings.to_raw_str(msg.rstrip('\n')+'\n'))
131 2439 aaronmk
    if debug: log_debug = lambda msg, level=2: log(msg, level)
132 1901 aaronmk
    else: log_debug = sql.log_debug_none
133 662 aaronmk
134 53 aaronmk
    # Parse args
135 510 aaronmk
    map_paths = sys.argv[1:]
136 512 aaronmk
    if map_paths == []:
137
        if in_is_db or not out_is_db: usage_err()
138
        else: map_paths = [None]
139 53 aaronmk
140 646 aaronmk
    def connect_db(db_config):
141 1901 aaronmk
        log('Connecting to '+sql.db_config_str(db_config))
142 2920 aaronmk
        return sql.connect(db_config, caching=cache_sql, autocommit=commit,
143 2916 aaronmk
            debug_temp=verbosity > 3 and commit, log_debug=log_debug)
144 646 aaronmk
145 1945 aaronmk
    if end != None: end_str = str(end-1) # end is one past the last #
146 1573 aaronmk
    else: end_str = 'end'
147 1901 aaronmk
    log('Processing input rows '+str(start)+'-'+end_str)
148 1573 aaronmk
149 1014 aaronmk
    ex_tracker = exc.ExPercentTracker(iter_text='row')
150
    profiler = profiling.ItersProfiler(start_now=True, iter_text='row')
151
152 1856 aaronmk
    # Parallel processing
153 2035 aaronmk
    pool = parallelproc.MultiProducerPool(cpus)
154 1901 aaronmk
    log('Using '+str(pool.process_ct)+' parallel CPUs')
155 1846 aaronmk
156 1014 aaronmk
    doc = xml_dom.create_doc()
157
    root = doc.documentElement
158 751 aaronmk
    out_is_xml_ref = [False]
159 1014 aaronmk
    in_label_ref = [None]
160
    def update_in_label():
161
        if in_label_ref[0] != None:
162
            xpath.get(root, '/_ignore/inLabel="'+in_label_ref[0]+'"', True)
163
    def prep_root():
164
        root.clear()
165
        update_in_label()
166
    prep_root()
167 751 aaronmk
168 1991 aaronmk
    # Define before the out_is_db section because it's used by by_col
169
    row_ins_ct_ref = [0]
170
171 2601 aaronmk
    if out_is_db:
172
        out_db = connect_db(out_db_config)
173 2602 aaronmk
        rel_funcs = set(sql.tables(out_db, schema_like='%',
174 2601 aaronmk
            table_like=r'\__%'))
175
176 838 aaronmk
    def process_input(root, row_ready, map_path):
177 512 aaronmk
        '''Inputs datasource to XML tree, mapping if needed'''
178
        # Load map header
179
        in_is_xpaths = True
180 751 aaronmk
        out_is_xpaths = True
181 512 aaronmk
        out_label = None
182
        if map_path != None:
183
            metadata = []
184
            mappings = []
185
            stream = open(map_path, 'rb')
186
            reader = csv.reader(stream)
187
            in_label, out_label = reader.next()[:2]
188 1402 aaronmk
189 512 aaronmk
            def split_col_name(name):
190 1402 aaronmk
                label, sep, root = name.partition(':')
191
                label, sep2, prefixes_str = label.partition('[')
192
                prefixes_str = strings.remove_suffix(']', prefixes_str)
193
                prefixes = strings.split(',', prefixes_str)
194
                return label, sep != '', root, prefixes
195 1198 aaronmk
                    # extract datasrc from "datasrc[data_format]"
196 1402 aaronmk
197 1705 aaronmk
            in_label, in_root, prefixes = maps.col_info(in_label)
198
            in_is_xpaths = in_root != None
199 1014 aaronmk
            in_label_ref[0] = in_label
200
            update_in_label()
201 1705 aaronmk
            out_label, out_root = maps.col_info(out_label)[:2]
202
            out_is_xpaths = out_root != None
203 1841 aaronmk
            if out_is_xpaths: has_types = out_root.find('/*s/') >= 0
204 1705 aaronmk
                # outer elements are types
205 1402 aaronmk
206 512 aaronmk
            for row in reader:
207
                in_, out = row[:2]
208 2029 aaronmk
                if out != '': mappings.append([in_, out_root+out])
209 1402 aaronmk
210 512 aaronmk
            stream.close()
211
212
            root.ownerDocument.documentElement.tagName = out_label
213
        in_is_xml = in_is_xpaths and not in_is_db
214 751 aaronmk
        out_is_xml_ref[0] = out_is_xpaths and not out_is_db
215 56 aaronmk
216 1897 aaronmk
        def process_rows(process_row, rows, rows_start=0):
217
            '''Processes input rows
218 838 aaronmk
            @param process_row(in_row, i)
219 1945 aaronmk
            @rows_start The (0-based) row # of the first row in rows. Set this
220
                only if the pre-start rows have already been skipped.
221 297 aaronmk
            '''
222 1897 aaronmk
            rows = iter(rows)
223
224
            if end != None: row_nums = xrange(rows_start, end)
225
            else: row_nums = itertools.count(rows_start)
226 2038 aaronmk
            i = -1
227 1897 aaronmk
            for i in row_nums:
228 1718 aaronmk
                try: row = rows.next()
229 2038 aaronmk
                except StopIteration:
230
                    i -= 1 # last row # didn't count
231
                    break # no more rows
232 1718 aaronmk
                if i < start: continue # not at start row yet
233
234 2881 aaronmk
                # Row # is interally 0-based, but 1-based to the user
235
                log('Processing input row #'+str(i+1), level=1.1)
236 838 aaronmk
                process_row(row, i)
237
                row_ready(i, row)
238 2033 aaronmk
            row_ct = i-start+1
239 982 aaronmk
            return row_ct
240 838 aaronmk
241 1898 aaronmk
        def map_rows(get_value, rows, **kw_args):
242 838 aaronmk
            '''Maps input rows
243
            @param get_value(in_, row):str
244
            '''
245 2030 aaronmk
            # Prevent collisions if multiple inputs mapping to same output
246
            outputs_idxs = dict()
247
            for i, mapping in enumerate(mappings):
248
                in_, out = mapping
249
                default = util.NamedTuple(count=1, first=i)
250
                idxs = outputs_idxs.setdefault(out, default)
251
                if idxs is not default: # key existed, so there was a collision
252
                    if idxs.count == 1: # first key does not yet have /_alt/#
253
                        mappings[idxs.first][1] += '/_alt/0'
254
                    mappings[i][1] += '/_alt/'+str(idxs.count)
255
                    idxs.count += 1
256
257 2026 aaronmk
            id_node = None
258
            if out_is_db:
259
                for i, mapping in enumerate(mappings):
260
                    in_, out = mapping
261
                    # All put_obj()s should return the same id_node
262
                    nodes, id_node = xpath.put_obj(root, out, '-1', has_types,
263
                        '$'+str(in_)) # value is placeholder that documents name
264 2032 aaronmk
                    mappings[i] = [in_, nodes]
265
                assert id_node != None
266 2026 aaronmk
267 2119 aaronmk
                if debug: # only calc if debug
268 2026 aaronmk
                    log_debug('Put template:\n'+str(root))
269
270 838 aaronmk
            def process_row(row, i):
271 316 aaronmk
                row_id = str(i)
272 2032 aaronmk
                if id_node != None: xml_dom.set_value(id_node, row_id)
273 316 aaronmk
                for in_, out in mappings:
274 2027 aaronmk
                    log_debug('Getting '+str(in_))
275 316 aaronmk
                    value = metadata_value(in_)
276 2027 aaronmk
                    if value == None: value = cleanup(get_value(in_, row))
277
                    log_debug('Putting '+repr(value)+' to '+str(out))
278 2032 aaronmk
                    if out_is_db: # out is list of XML nodes
279
                        for node in out: xml_dom.set_value(node, value)
280
                    elif value != None: # out is XPath
281 1360 aaronmk
                        xpath.put_obj(root, out, row_id, has_types, value)
282 1898 aaronmk
            return process_rows(process_row, rows, **kw_args)
283 297 aaronmk
284 1898 aaronmk
        def map_table(col_names, rows, **kw_args):
285 1416 aaronmk
            col_names_ct = len(col_names)
286 1403 aaronmk
            col_idxs = util.list_flip(col_names)
287
288 2029 aaronmk
            # Resolve prefixes
289 2025 aaronmk
            mappings_orig = mappings[:] # save a copy
290
            mappings[:] = [] # empty existing elements
291
            for in_, out in mappings_orig:
292 1403 aaronmk
                if metadata_value(in_) == None:
293 2040 aaronmk
                    try: cols = get_with_prefix(col_idxs, prefixes, in_)
294 2025 aaronmk
                    except KeyError: pass
295 2040 aaronmk
                    else: mappings[len(mappings):] = [[db_xml.ColRef(*col), out]
296
                        for col in cols] # can't use += because that uses =
297 1403 aaronmk
298 2040 aaronmk
            def get_value(in_, row): return row.list[in_.idx]
299 1416 aaronmk
            def wrap_row(row):
300
                return util.ListDict(util.list_as_length(row, col_names_ct),
301
                    col_names, col_idxs) # handle CSV rows of different lengths
302 1403 aaronmk
303 1898 aaronmk
            return map_rows(get_value, util.WrapIter(wrap_row, rows), **kw_args)
304 1403 aaronmk
305 1700 aaronmk
        if in_is_db:
306 2809 aaronmk
            def on_error(e): ex_tracker.track(e)
307
308 3186 aaronmk
            if by_col: in_db = out_db
309
            else: in_db = connect_db(in_db_config)
310 126 aaronmk
311 1982 aaronmk
            # Get table and schema name
312
            schema = in_schema # modified, so can't have same name as outer var
313
            table = in_table # modified, so can't have same name as outer var
314
            if table == None:
315
                assert in_is_xpaths
316
                schema, sep, table = in_root.partition('.')
317
                if sep == '': # only the table name was specified
318
                    table = schema
319
                    schema = None
320 2313 aaronmk
            table = sql_gen.Table(table, schema)
321 1982 aaronmk
322 1991 aaronmk
            # Fetch rows
323
            if by_col: limit = 0 # only fetch column names
324
            else: limit = n
325 2313 aaronmk
            cur = sql.select(in_db, table, limit=limit, start=start,
326
                cacheable=False)
327 1991 aaronmk
            col_names = list(sql.col_names(cur))
328
329 1989 aaronmk
            if by_col:
330 2042 aaronmk
                map_table(col_names, []) # just create the template
331 2550 aaronmk
332 2734 aaronmk
                if n == None: # doing full import
333
                    log('Clearing errors table')
334 3103 aaronmk
                    sql.truncate(in_db, sql_io.errors_table(in_db, table))
335 2734 aaronmk
336 2550 aaronmk
                # Strip XML functions not in the DB
337 3427 aaronmk
                def is_rel_func(name):
338
                    return (name in db_xml.put_table_special_funcs
339
                        or sql.function_exists(in_db, sql_gen.Function(name)))
340
                xml_func.process(root, is_rel_func=is_rel_func)
341 2103 aaronmk
                if debug: log_debug('Putting stripped:\n'+str(root))
342 2119 aaronmk
                    # only calc if debug
343 2550 aaronmk
344
                # Import rows
345 2422 aaronmk
                in_row_ct_ref = [0]
346 2928 aaronmk
                db_xml.put_table(in_db, root.firstChild, table, in_row_ct_ref,
347
                    row_ins_ct_ref, n, start, on_error)
348 2422 aaronmk
                row_ct = in_row_ct_ref[0]
349 1984 aaronmk
            else:
350
                # Use normal by-row method
351 1991 aaronmk
                row_ct = map_table(col_names, sql.rows(cur), rows_start=start)
352
                    # rows_start: pre-start rows have been skipped
353 3186 aaronmk
354
                in_db.db.close()
355 161 aaronmk
        elif in_is_xml:
356 2809 aaronmk
            stdin = streams.LineCountStream(sys.stdin)
357
            def on_error(e):
358
                exc.add_msg(e, term.emph('input line #:')+' '
359
                    +str(stdin.line_num))
360
                ex_tracker.track(e)
361
362 1715 aaronmk
            def get_rows(doc2rows):
363 1758 aaronmk
                return iters.flatten(itertools.imap(doc2rows,
364
                    xml_parse.docs_iter(stdin, on_error)))
365 1715 aaronmk
366 1714 aaronmk
            if map_path == None:
367 1715 aaronmk
                def doc2rows(in_xml_root):
368 1701 aaronmk
                    iter_ = xml_dom.NodeElemIter(in_xml_root)
369
                    util.skip(iter_, xml_dom.is_text) # skip metadata
370 1715 aaronmk
                    return iter_
371
372
                row_ct = process_rows(lambda row, i: root.appendChild(row),
373
                    get_rows(doc2rows))
374 1714 aaronmk
            else:
375 1715 aaronmk
                def doc2rows(in_xml_root):
376 1701 aaronmk
                    rows = xpath.get(in_xml_root, in_root, limit=end)
377 1714 aaronmk
                    if rows == []: raise SystemExit('Map error: Root "'
378
                        +in_root+'" not found in input')
379
                    return rows
380
381
                def get_value(in_, row):
382
                    in_ = './{'+(','.join(strings.with_prefixes(
383
                        ['']+prefixes, in_)))+'}' # also with no prefix
384
                    nodes = xpath.get(row, in_, allow_rooted=False)
385
                    if nodes != []: return xml_dom.value(nodes[0])
386
                    else: return None
387
388 1715 aaronmk
                row_ct = map_rows(get_value, get_rows(doc2rows))
389 56 aaronmk
        else: # input is CSV
390 133 aaronmk
            map_ = dict(mappings)
391 1389 aaronmk
            reader, col_names = csvs.reader_and_header(sys.stdin)
392 1403 aaronmk
            row_ct = map_table(col_names, reader)
393 982 aaronmk
394
        return row_ct
395 53 aaronmk
396 838 aaronmk
    def process_inputs(root, row_ready):
397 982 aaronmk
        row_ct = 0
398
        for map_path in map_paths:
399
            row_ct += process_input(root, row_ready, map_path)
400
        return row_ct
401 512 aaronmk
402 1886 aaronmk
    pool.share_vars(locals())
403 130 aaronmk
    if out_is_db:
404 53 aaronmk
        try:
405 947 aaronmk
            if redo: sql.empty_db(out_db)
406 1886 aaronmk
            pool.share_vars(locals())
407 449 aaronmk
408 838 aaronmk
            def row_ready(row_num, input_row):
409 2119 aaronmk
                row_str_ = [None]
410
                def row_str():
411
                    if row_str_[0] == None:
412
                        # Row # is interally 0-based, but 1-based to the user
413
                        row_str_[0] = (term.emph('row #:')+' '+str(row_num+1)
414
                            +'\n'+term.emph('input row:')+'\n'+str(input_row))
415
                        if verbose_errors: row_str_[0] += ('\n'
416
                            +term.emph('output row:')+'\n'+str(root))
417
                    return row_str_[0]
418
419
                if debug: log_debug(row_str()) # only calc if debug
420
421 452 aaronmk
                def on_error(e):
422 2119 aaronmk
                    exc.add_msg(e, row_str())
423 2046 aaronmk
                    ex_tracker.track(e, row_num, detail=verbose_errors)
424 1886 aaronmk
                pool.share_vars(locals())
425 452 aaronmk
426 2032 aaronmk
                row_root = root.cloneNode(True) # deep copy so don't modify root
427 3424 aaronmk
                xml_func.process(row_root, on_error, lambda f: f in rel_funcs,
428
                    out_db)
429 2032 aaronmk
                if not xml_dom.is_empty(row_root):
430
                    assert xml_dom.has_one_child(row_root)
431 442 aaronmk
                    try:
432 982 aaronmk
                        sql.with_savepoint(out_db,
433 2032 aaronmk
                            lambda: db_xml.put(out_db, row_root.firstChild,
434 1850 aaronmk
                                row_ins_ct_ref, on_error))
435 449 aaronmk
                    except sql.DatabaseErrors, e: on_error(e)
436
437 982 aaronmk
            row_ct = process_inputs(root, row_ready)
438
            sys.stdout.write('Inserted '+str(row_ins_ct_ref[0])+
439 460 aaronmk
                ' new rows into database\n')
440 1868 aaronmk
441
            # Consume asynchronous tasks
442
            pool.main_loop()
443 3117 aaronmk
        finally: out_db.close()
444 751 aaronmk
    else:
445 759 aaronmk
        def on_error(e): ex_tracker.track(e)
446 838 aaronmk
        def row_ready(row_num, input_row): pass
447 982 aaronmk
        row_ct = process_inputs(root, row_ready)
448 759 aaronmk
        xml_func.process(root, on_error)
449 751 aaronmk
        if out_is_xml_ref[0]:
450
            doc.writexml(sys.stdout, **xml_dom.prettyxml_config)
451
        else: # output is CSV
452
            raise NotImplementedError('CSV output not supported yet')
453 985 aaronmk
454 1868 aaronmk
    # Consume any asynchronous tasks not already consumed above
455 1862 aaronmk
    pool.main_loop()
456
457 982 aaronmk
    profiler.stop(row_ct)
458 3324 aaronmk
    if not by_col: ex_tracker.add_iters(row_ct) # only if errors are done by row
459 2439 aaronmk
    log('Processed '+str(row_ct)+' input rows')
460
    log(profiler.msg())
461
    log(ex_tracker.msg())
462 985 aaronmk
    ex_tracker.exit()
463 53 aaronmk
464 847 aaronmk
def main():
465
    try: main_()
466 1719 aaronmk
    except Parser.SyntaxError, e: raise SystemExit(str(e))
467 847 aaronmk
468 846 aaronmk
if __name__ == '__main__':
469 847 aaronmk
    profile_to = opts.get_env_var('profile_to', None)
470
    if profile_to != None:
471
        import cProfile
472 1584 aaronmk
        sys.stderr.write('Profiling to '+profile_to+'\n')
473 847 aaronmk
        cProfile.run(main.func_code, profile_to)
474
    else: main()