Project

General

Profile

1 53 aaronmk
#!/usr/bin/env python
2
# Maps one datasource to another, using a map spreadsheet if needed
3 986 aaronmk
# Exit status is the # of errors in the import, up to the maximum exit status
4 53 aaronmk
# For outputting an XML file to a PostgreSQL database, use the general format of
5
# http://vegbank.org/vegdocs/xml/vegbank_example_ver1.0.2.xml
6
7 2550 aaronmk
import copy
8 1014 aaronmk
import csv
9 1714 aaronmk
import itertools
10 53 aaronmk
import os.path
11
import sys
12 299 aaronmk
import xml.dom.minidom as minidom
13 53 aaronmk
14 266 aaronmk
sys.path.append(os.path.dirname(__file__)+"/../lib")
15 53 aaronmk
16 1389 aaronmk
import csvs
17 2040 aaronmk
import db_xml
18 344 aaronmk
import exc
19 1714 aaronmk
import iters
20 1705 aaronmk
import maps
21 64 aaronmk
import opts
22 2035 aaronmk
import parallelproc
23 281 aaronmk
import Parser
24 982 aaronmk
import profiling
25 131 aaronmk
import sql
26 2288 aaronmk
import sql_gen
27 3103 aaronmk
import sql_io
28 1758 aaronmk
import streams
29 715 aaronmk
import strings
30 828 aaronmk
import term
31 310 aaronmk
import util
32 1014 aaronmk
import xpath
33 133 aaronmk
import xml_dom
34 86 aaronmk
import xml_func
35 1713 aaronmk
import xml_parse
36 53 aaronmk
37 1404 aaronmk
def get_with_prefix(map_, prefixes, key):
38 2040 aaronmk
    '''Gets all entries for the given key with any of the given prefixes
39
    @return tuple(found_key, found_value)
40
    '''
41 1484 aaronmk
    values = []
42 1681 aaronmk
    for key_ in strings.with_prefixes(['']+prefixes, key): # also with no prefix
43
        try: value = map_[key_]
44 1484 aaronmk
        except KeyError, e: continue # keep going
45 2040 aaronmk
        values.append((key_, value))
46 1484 aaronmk
47
    if values != []: return values
48
    else: raise e # re-raise last KeyError
49 1404 aaronmk
50 1018 aaronmk
def metadata_value(name): return None # this feature has been removed
51 84 aaronmk
52 1360 aaronmk
def cleanup(val):
53
    if val == None: return val
54 1374 aaronmk
    return util.none_if(strings.cleanup(strings.ustr(val)), u'', u'\\N')
55 1360 aaronmk
56 847 aaronmk
def main_():
57 131 aaronmk
    env_names = []
58
    def usage_err():
59 944 aaronmk
        raise SystemExit('Usage: '+opts.env_usage(env_names, True)+' '
60 1945 aaronmk
            +sys.argv[0]+' [map_path...] [<input] [>output]\n'
61 1946 aaronmk
            'Note: Row #s start with 1')
62 838 aaronmk
63 1570 aaronmk
    ## Get config from env vars
64 838 aaronmk
65 1570 aaronmk
    # Modes
66 946 aaronmk
    test = opts.env_flag('test', False, env_names)
67 947 aaronmk
    commit = opts.env_flag('commit', False, env_names) and not test
68 944 aaronmk
        # never commit in test mode
69 947 aaronmk
    redo = opts.env_flag('redo', test, env_names) and not commit
70 2977 aaronmk
        # never redo in commit mode (run `make schemas/reinstall` instead)
71 1570 aaronmk
72
    # Ranges
73 1946 aaronmk
    start = util.cast(int, opts.get_env_var('start', 1, env_names)) # 1-based
74
    # Make start interally 0-based.
75
    # It's 1-based to the user to match up with the staging table row #s.
76
    start -= 1
77 1945 aaronmk
    if test: n_default = 1
78
    else: n_default = None
79
    n = util.cast(int, util.none_if(opts.get_env_var('n', n_default, env_names),
80
        u''))
81
    end = n
82 1570 aaronmk
    if end != None: end += start
83
84
    # Debugging
85 3132 aaronmk
    verbosity = util.cast(float, opts.get_env_var('verbosity', None, env_names))
86 944 aaronmk
    opts.get_env_var('profile_to', None, env_names) # add to env_names
87 131 aaronmk
88 1570 aaronmk
    # DB
89
    def get_db_config(prefix):
90 1926 aaronmk
        return opts.get_env_vars(sql.db_config_names, prefix, env_names)
91 1570 aaronmk
    in_db_config = get_db_config('in')
92
    out_db_config = get_db_config('out')
93
    in_is_db = 'engine' in in_db_config
94
    out_is_db = 'engine' in out_db_config
95 1982 aaronmk
    in_schema = opts.get_env_var('in_schema', None, env_names)
96
    in_table = opts.get_env_var('in_table', None, env_names)
97 3339 aaronmk
    if in_schema != None:
98
        for config in [in_db_config, out_db_config]:
99
            config['schemas'] += ','+in_schema
100 1570 aaronmk
101 1989 aaronmk
    # Optimization
102 2050 aaronmk
    cache_sql = opts.env_flag('cache_sql', True, env_names)
103 1989 aaronmk
    by_col = in_db_config == out_db_config and opts.env_flag('by_col', False,
104
        env_names) # by-column optimization only applies if mapping to same DB
105
    if test: cpus_default = 0
106 2044 aaronmk
    else: cpus_default = 0 # or None to use parallel processing by default
107 1989 aaronmk
    cpus = util.cast(int, util.none_if(opts.get_env_var('cpus', cpus_default,
108
        env_names), u''))
109
110 3132 aaronmk
    # Set default verbosity. Must happen after by_col is set.
111
    if verbosity == None:
112
        if test: verbosity = 0.5 # automated tests should not be verbose
113
        elif by_col: verbosity = 3 # show all queries to assist profiling
114
        else: verbosity = 1.1 # just show row progress
115
116 1570 aaronmk
    ##
117
118 838 aaronmk
    # Logging
119 2457 aaronmk
    verbose_errors = test and verbosity > 0
120 3243 aaronmk
    debug = verbosity >= 1.5
121 2439 aaronmk
    def log(msg, level=1):
122
        '''Higher level -> more verbose'''
123 2452 aaronmk
        if level <= verbosity:
124 2490 aaronmk
            if verbosity <= 2:
125
                if level == 1.5: msg = '# '+msg # msg is Redmine list item
126
                elif msg.startswith('DB query:'): # remove extra debug info
127
                    first_line, nl, msg = msg.partition('\n')
128
            elif level > 1: msg = '['+str(level)+'] '+msg # include level in msg
129
130 2883 aaronmk
            sys.stderr.write(strings.to_raw_str(msg.rstrip('\n')+'\n'))
131 2439 aaronmk
    if debug: log_debug = lambda msg, level=2: log(msg, level)
132 1901 aaronmk
    else: log_debug = sql.log_debug_none
133 662 aaronmk
134 53 aaronmk
    # Parse args
135 510 aaronmk
    map_paths = sys.argv[1:]
136 512 aaronmk
    if map_paths == []:
137
        if in_is_db or not out_is_db: usage_err()
138
        else: map_paths = [None]
139 53 aaronmk
140 646 aaronmk
    def connect_db(db_config):
141 1901 aaronmk
        log('Connecting to '+sql.db_config_str(db_config))
142 2920 aaronmk
        return sql.connect(db_config, caching=cache_sql, autocommit=commit,
143 2916 aaronmk
            debug_temp=verbosity > 3 and commit, log_debug=log_debug)
144 646 aaronmk
145 1945 aaronmk
    if end != None: end_str = str(end-1) # end is one past the last #
146 1573 aaronmk
    else: end_str = 'end'
147 1901 aaronmk
    log('Processing input rows '+str(start)+'-'+end_str)
148 1573 aaronmk
149 1014 aaronmk
    ex_tracker = exc.ExPercentTracker(iter_text='row')
150
    profiler = profiling.ItersProfiler(start_now=True, iter_text='row')
151
152 1856 aaronmk
    # Parallel processing
153 2035 aaronmk
    pool = parallelproc.MultiProducerPool(cpus)
154 1901 aaronmk
    log('Using '+str(pool.process_ct)+' parallel CPUs')
155 1846 aaronmk
156 1014 aaronmk
    doc = xml_dom.create_doc()
157
    root = doc.documentElement
158 751 aaronmk
    out_is_xml_ref = [False]
159 1014 aaronmk
    in_label_ref = [None]
160
    def update_in_label():
161
        if in_label_ref[0] != None:
162
            xpath.get(root, '/_ignore/inLabel="'+in_label_ref[0]+'"', True)
163
    def prep_root():
164
        root.clear()
165
        update_in_label()
166
    prep_root()
167 751 aaronmk
168 1991 aaronmk
    # Define before the out_is_db section because it's used by by_col
169
    row_ins_ct_ref = [0]
170
171 2601 aaronmk
    if out_is_db:
172
        out_db = connect_db(out_db_config)
173 2602 aaronmk
        rel_funcs = set(sql.tables(out_db, schema_like='%',
174 2601 aaronmk
            table_like=r'\__%'))
175
176 838 aaronmk
    def process_input(root, row_ready, map_path):
177 512 aaronmk
        '''Inputs datasource to XML tree, mapping if needed'''
178
        # Load map header
179
        in_is_xpaths = True
180 751 aaronmk
        out_is_xpaths = True
181 512 aaronmk
        out_label = None
182
        if map_path != None:
183
            metadata = []
184
            mappings = []
185
            stream = open(map_path, 'rb')
186
            reader = csv.reader(stream)
187
            in_label, out_label = reader.next()[:2]
188 1402 aaronmk
189 512 aaronmk
            def split_col_name(name):
190 1402 aaronmk
                label, sep, root = name.partition(':')
191
                label, sep2, prefixes_str = label.partition('[')
192
                prefixes_str = strings.remove_suffix(']', prefixes_str)
193
                prefixes = strings.split(',', prefixes_str)
194
                return label, sep != '', root, prefixes
195 1198 aaronmk
                    # extract datasrc from "datasrc[data_format]"
196 1402 aaronmk
197 1705 aaronmk
            in_label, in_root, prefixes = maps.col_info(in_label)
198
            in_is_xpaths = in_root != None
199 1014 aaronmk
            in_label_ref[0] = in_label
200
            update_in_label()
201 1705 aaronmk
            out_label, out_root = maps.col_info(out_label)[:2]
202
            out_is_xpaths = out_root != None
203 1841 aaronmk
            if out_is_xpaths: has_types = out_root.find('/*s/') >= 0
204 1705 aaronmk
                # outer elements are types
205 1402 aaronmk
206 512 aaronmk
            for row in reader:
207
                in_, out = row[:2]
208 2029 aaronmk
                if out != '': mappings.append([in_, out_root+out])
209 1402 aaronmk
210 512 aaronmk
            stream.close()
211
212
            root.ownerDocument.documentElement.tagName = out_label
213
        in_is_xml = in_is_xpaths and not in_is_db
214 751 aaronmk
        out_is_xml_ref[0] = out_is_xpaths and not out_is_db
215 56 aaronmk
216 1897 aaronmk
        def process_rows(process_row, rows, rows_start=0):
217
            '''Processes input rows
218 838 aaronmk
            @param process_row(in_row, i)
219 1945 aaronmk
            @rows_start The (0-based) row # of the first row in rows. Set this
220
                only if the pre-start rows have already been skipped.
221 297 aaronmk
            '''
222 1897 aaronmk
            rows = iter(rows)
223
224
            if end != None: row_nums = xrange(rows_start, end)
225
            else: row_nums = itertools.count(rows_start)
226 2038 aaronmk
            i = -1
227 1897 aaronmk
            for i in row_nums:
228 1718 aaronmk
                try: row = rows.next()
229 2038 aaronmk
                except StopIteration:
230
                    i -= 1 # last row # didn't count
231
                    break # no more rows
232 1718 aaronmk
                if i < start: continue # not at start row yet
233
234 2881 aaronmk
                # Row # is interally 0-based, but 1-based to the user
235
                log('Processing input row #'+str(i+1), level=1.1)
236 838 aaronmk
                process_row(row, i)
237
                row_ready(i, row)
238 2033 aaronmk
            row_ct = i-start+1
239 982 aaronmk
            return row_ct
240 838 aaronmk
241 1898 aaronmk
        def map_rows(get_value, rows, **kw_args):
242 838 aaronmk
            '''Maps input rows
243
            @param get_value(in_, row):str
244
            '''
245 2030 aaronmk
            # Prevent collisions if multiple inputs mapping to same output
246
            outputs_idxs = dict()
247
            for i, mapping in enumerate(mappings):
248
                in_, out = mapping
249
                default = util.NamedTuple(count=1, first=i)
250
                idxs = outputs_idxs.setdefault(out, default)
251
                if idxs is not default: # key existed, so there was a collision
252
                    if idxs.count == 1: # first key does not yet have /_alt/#
253
                        mappings[idxs.first][1] += '/_alt/0'
254
                    mappings[i][1] += '/_alt/'+str(idxs.count)
255
                    idxs.count += 1
256
257 2026 aaronmk
            id_node = None
258
            if out_is_db:
259
                for i, mapping in enumerate(mappings):
260
                    in_, out = mapping
261
                    # All put_obj()s should return the same id_node
262
                    nodes, id_node = xpath.put_obj(root, out, '-1', has_types,
263
                        '$'+str(in_)) # value is placeholder that documents name
264 2032 aaronmk
                    mappings[i] = [in_, nodes]
265
                assert id_node != None
266 2026 aaronmk
267 2119 aaronmk
                if debug: # only calc if debug
268 2026 aaronmk
                    log_debug('Put template:\n'+str(root))
269
270 838 aaronmk
            def process_row(row, i):
271 316 aaronmk
                row_id = str(i)
272 2032 aaronmk
                if id_node != None: xml_dom.set_value(id_node, row_id)
273 316 aaronmk
                for in_, out in mappings:
274 2027 aaronmk
                    log_debug('Getting '+str(in_))
275 316 aaronmk
                    value = metadata_value(in_)
276 2027 aaronmk
                    if value == None: value = cleanup(get_value(in_, row))
277
                    log_debug('Putting '+repr(value)+' to '+str(out))
278 2032 aaronmk
                    if out_is_db: # out is list of XML nodes
279
                        for node in out: xml_dom.set_value(node, value)
280
                    elif value != None: # out is XPath
281 1360 aaronmk
                        xpath.put_obj(root, out, row_id, has_types, value)
282 1898 aaronmk
            return process_rows(process_row, rows, **kw_args)
283 297 aaronmk
284 1898 aaronmk
        def map_table(col_names, rows, **kw_args):
285 1416 aaronmk
            col_names_ct = len(col_names)
286 1403 aaronmk
            col_idxs = util.list_flip(col_names)
287
288 2029 aaronmk
            # Resolve prefixes
289 2025 aaronmk
            mappings_orig = mappings[:] # save a copy
290
            mappings[:] = [] # empty existing elements
291
            for in_, out in mappings_orig:
292 1403 aaronmk
                if metadata_value(in_) == None:
293 2040 aaronmk
                    try: cols = get_with_prefix(col_idxs, prefixes, in_)
294 2025 aaronmk
                    except KeyError: pass
295 2040 aaronmk
                    else: mappings[len(mappings):] = [[db_xml.ColRef(*col), out]
296
                        for col in cols] # can't use += because that uses =
297 1403 aaronmk
298 2040 aaronmk
            def get_value(in_, row): return row.list[in_.idx]
299 1416 aaronmk
            def wrap_row(row):
300
                return util.ListDict(util.list_as_length(row, col_names_ct),
301
                    col_names, col_idxs) # handle CSV rows of different lengths
302 1403 aaronmk
303 1898 aaronmk
            return map_rows(get_value, util.WrapIter(wrap_row, rows), **kw_args)
304 1403 aaronmk
305 1700 aaronmk
        if in_is_db:
306 2809 aaronmk
            def on_error(e): ex_tracker.track(e)
307
308 3186 aaronmk
            if by_col: in_db = out_db
309
            else: in_db = connect_db(in_db_config)
310 126 aaronmk
311 1982 aaronmk
            # Get table and schema name
312
            schema = in_schema # modified, so can't have same name as outer var
313
            table = in_table # modified, so can't have same name as outer var
314
            if table == None:
315
                assert in_is_xpaths
316
                schema, sep, table = in_root.partition('.')
317
                if sep == '': # only the table name was specified
318
                    table = schema
319
                    schema = None
320 2313 aaronmk
            table = sql_gen.Table(table, schema)
321 1982 aaronmk
322 1991 aaronmk
            # Fetch rows
323
            if by_col: limit = 0 # only fetch column names
324
            else: limit = n
325 2313 aaronmk
            cur = sql.select(in_db, table, limit=limit, start=start,
326
                cacheable=False)
327 1991 aaronmk
            col_names = list(sql.col_names(cur))
328
329 1989 aaronmk
            if by_col:
330 2042 aaronmk
                map_table(col_names, []) # just create the template
331 2550 aaronmk
332 2734 aaronmk
                if n == None: # doing full import
333
                    log('Clearing errors table')
334 3103 aaronmk
                    sql.truncate(in_db, sql_io.errors_table(in_db, table))
335 2734 aaronmk
336 2550 aaronmk
                # Strip XML functions not in the DB
337 2602 aaronmk
                special_funcs = db_xml.put_table_special_funcs | rel_funcs
338
                xml_func.process(root, rel_funcs=special_funcs)
339 2103 aaronmk
                if debug: log_debug('Putting stripped:\n'+str(root))
340 2119 aaronmk
                    # only calc if debug
341 2550 aaronmk
342
                # Import rows
343 2422 aaronmk
                in_row_ct_ref = [0]
344 2928 aaronmk
                db_xml.put_table(in_db, root.firstChild, table, in_row_ct_ref,
345
                    row_ins_ct_ref, n, start, on_error)
346 2422 aaronmk
                row_ct = in_row_ct_ref[0]
347 1984 aaronmk
            else:
348
                # Use normal by-row method
349 1991 aaronmk
                row_ct = map_table(col_names, sql.rows(cur), rows_start=start)
350
                    # rows_start: pre-start rows have been skipped
351 3186 aaronmk
352
                in_db.db.close()
353 161 aaronmk
        elif in_is_xml:
354 2809 aaronmk
            stdin = streams.LineCountStream(sys.stdin)
355
            def on_error(e):
356
                exc.add_msg(e, term.emph('input line #:')+' '
357
                    +str(stdin.line_num))
358
                ex_tracker.track(e)
359
360 1715 aaronmk
            def get_rows(doc2rows):
361 1758 aaronmk
                return iters.flatten(itertools.imap(doc2rows,
362
                    xml_parse.docs_iter(stdin, on_error)))
363 1715 aaronmk
364 1714 aaronmk
            if map_path == None:
365 1715 aaronmk
                def doc2rows(in_xml_root):
366 1701 aaronmk
                    iter_ = xml_dom.NodeElemIter(in_xml_root)
367
                    util.skip(iter_, xml_dom.is_text) # skip metadata
368 1715 aaronmk
                    return iter_
369
370
                row_ct = process_rows(lambda row, i: root.appendChild(row),
371
                    get_rows(doc2rows))
372 1714 aaronmk
            else:
373 1715 aaronmk
                def doc2rows(in_xml_root):
374 1701 aaronmk
                    rows = xpath.get(in_xml_root, in_root, limit=end)
375 1714 aaronmk
                    if rows == []: raise SystemExit('Map error: Root "'
376
                        +in_root+'" not found in input')
377
                    return rows
378
379
                def get_value(in_, row):
380
                    in_ = './{'+(','.join(strings.with_prefixes(
381
                        ['']+prefixes, in_)))+'}' # also with no prefix
382
                    nodes = xpath.get(row, in_, allow_rooted=False)
383
                    if nodes != []: return xml_dom.value(nodes[0])
384
                    else: return None
385
386 1715 aaronmk
                row_ct = map_rows(get_value, get_rows(doc2rows))
387 56 aaronmk
        else: # input is CSV
388 133 aaronmk
            map_ = dict(mappings)
389 1389 aaronmk
            reader, col_names = csvs.reader_and_header(sys.stdin)
390 1403 aaronmk
            row_ct = map_table(col_names, reader)
391 982 aaronmk
392
        return row_ct
393 53 aaronmk
394 838 aaronmk
    def process_inputs(root, row_ready):
395 982 aaronmk
        row_ct = 0
396
        for map_path in map_paths:
397
            row_ct += process_input(root, row_ready, map_path)
398
        return row_ct
399 512 aaronmk
400 1886 aaronmk
    pool.share_vars(locals())
401 130 aaronmk
    if out_is_db:
402 53 aaronmk
        try:
403 947 aaronmk
            if redo: sql.empty_db(out_db)
404 1886 aaronmk
            pool.share_vars(locals())
405 449 aaronmk
406 838 aaronmk
            def row_ready(row_num, input_row):
407 2119 aaronmk
                row_str_ = [None]
408
                def row_str():
409
                    if row_str_[0] == None:
410
                        # Row # is interally 0-based, but 1-based to the user
411
                        row_str_[0] = (term.emph('row #:')+' '+str(row_num+1)
412
                            +'\n'+term.emph('input row:')+'\n'+str(input_row))
413
                        if verbose_errors: row_str_[0] += ('\n'
414
                            +term.emph('output row:')+'\n'+str(root))
415
                    return row_str_[0]
416
417
                if debug: log_debug(row_str()) # only calc if debug
418
419 452 aaronmk
                def on_error(e):
420 2119 aaronmk
                    exc.add_msg(e, row_str())
421 2046 aaronmk
                    ex_tracker.track(e, row_num, detail=verbose_errors)
422 1886 aaronmk
                pool.share_vars(locals())
423 452 aaronmk
424 2032 aaronmk
                row_root = root.cloneNode(True) # deep copy so don't modify root
425 2639 aaronmk
                xml_func.process(row_root, on_error, rel_funcs, out_db)
426 2032 aaronmk
                if not xml_dom.is_empty(row_root):
427
                    assert xml_dom.has_one_child(row_root)
428 442 aaronmk
                    try:
429 982 aaronmk
                        sql.with_savepoint(out_db,
430 2032 aaronmk
                            lambda: db_xml.put(out_db, row_root.firstChild,
431 1850 aaronmk
                                row_ins_ct_ref, on_error))
432 449 aaronmk
                    except sql.DatabaseErrors, e: on_error(e)
433
434 982 aaronmk
            row_ct = process_inputs(root, row_ready)
435
            sys.stdout.write('Inserted '+str(row_ins_ct_ref[0])+
436 460 aaronmk
                ' new rows into database\n')
437 1868 aaronmk
438
            # Consume asynchronous tasks
439
            pool.main_loop()
440 3117 aaronmk
        finally: out_db.close()
441 751 aaronmk
    else:
442 759 aaronmk
        def on_error(e): ex_tracker.track(e)
443 838 aaronmk
        def row_ready(row_num, input_row): pass
444 982 aaronmk
        row_ct = process_inputs(root, row_ready)
445 759 aaronmk
        xml_func.process(root, on_error)
446 751 aaronmk
        if out_is_xml_ref[0]:
447
            doc.writexml(sys.stdout, **xml_dom.prettyxml_config)
448
        else: # output is CSV
449
            raise NotImplementedError('CSV output not supported yet')
450 985 aaronmk
451 1868 aaronmk
    # Consume any asynchronous tasks not already consumed above
452 1862 aaronmk
    pool.main_loop()
453
454 982 aaronmk
    profiler.stop(row_ct)
455 3324 aaronmk
    if not by_col: ex_tracker.add_iters(row_ct) # only if errors are done by row
456 2439 aaronmk
    log('Processed '+str(row_ct)+' input rows')
457
    log(profiler.msg())
458
    log(ex_tracker.msg())
459 985 aaronmk
    ex_tracker.exit()
460 53 aaronmk
461 847 aaronmk
def main():
462
    try: main_()
463 1719 aaronmk
    except Parser.SyntaxError, e: raise SystemExit(str(e))
464 847 aaronmk
465 846 aaronmk
if __name__ == '__main__':
466 847 aaronmk
    profile_to = opts.get_env_var('profile_to', None)
467
    if profile_to != None:
468
        import cProfile
469 1584 aaronmk
        sys.stderr.write('Profiling to '+profile_to+'\n')
470 847 aaronmk
        cProfile.run(main.func_code, profile_to)
471
    else: main()