Project

General

Profile

1 53 aaronmk
#!/usr/bin/env python
2
# Maps one datasource to another, using a map spreadsheet if needed
3 986 aaronmk
# Exit status is the # of errors in the import, up to the maximum exit status
4 3768 aaronmk
# Multi-safe (supports an input appearing multiple times).
5 53 aaronmk
# For outputting an XML file to a PostgreSQL database, use the general format of
6
# http://vegbank.org/vegdocs/xml/vegbank_example_ver1.0.2.xml
7 4213 aaronmk
# Duplicate-column safe (supports multiple columns of the same name, which will
8
# be combined)
9 4505 aaronmk
# Case- and punctuation-insensitive.
10 53 aaronmk
11 2550 aaronmk
import copy
12 1014 aaronmk
import csv
13 1714 aaronmk
import itertools
14 53 aaronmk
import os.path
15 3577 aaronmk
import warnings
16 53 aaronmk
import sys
17 299 aaronmk
import xml.dom.minidom as minidom
18 53 aaronmk
19 266 aaronmk
sys.path.append(os.path.dirname(__file__)+"/../lib")
20 53 aaronmk
21 1389 aaronmk
import csvs
22 2040 aaronmk
import db_xml
23 344 aaronmk
import exc
24 1714 aaronmk
import iters
25 1705 aaronmk
import maps
26 64 aaronmk
import opts
27 2035 aaronmk
import parallelproc
28 281 aaronmk
import Parser
29 982 aaronmk
import profiling
30 131 aaronmk
import sql
31 2288 aaronmk
import sql_gen
32 3103 aaronmk
import sql_io
33 1758 aaronmk
import streams
34 715 aaronmk
import strings
35 828 aaronmk
import term
36 310 aaronmk
import util
37 1014 aaronmk
import xpath
38 133 aaronmk
import xml_dom
39 86 aaronmk
import xml_func
40 1713 aaronmk
import xml_parse
41 53 aaronmk
42 5911 aaronmk
metadata_prefix = ':'
43 4068 aaronmk
collision_suffix = '/_alt/'
44 4047 aaronmk
45 1404 aaronmk
def get_with_prefix(map_, prefixes, key):
46 2040 aaronmk
    '''Gets all entries for the given key with any of the given prefixes
47
    @return tuple(found_key, found_value)
48
    '''
49 1484 aaronmk
    values = []
50 1681 aaronmk
    for key_ in strings.with_prefixes(['']+prefixes, key): # also with no prefix
51
        try: value = map_[key_]
52 1484 aaronmk
        except KeyError, e: continue # keep going
53 2040 aaronmk
        values.append((key_, value))
54 1484 aaronmk
55
    if values != []: return values
56
    else: raise e # re-raise last KeyError
57 1404 aaronmk
58 5927 aaronmk
def is_metadata(str_): return str_.startswith(metadata_prefix)
59
60 5911 aaronmk
def metadata_value(name):
61
    removed_ref = [False]
62
    name = strings.remove_prefix(metadata_prefix, name, removed_ref)
63
    if removed_ref[0]: return name
64
    else: return None
65 84 aaronmk
66 1360 aaronmk
def cleanup(val):
67
    if val == None: return val
68 1374 aaronmk
    return util.none_if(strings.cleanup(strings.ustr(val)), u'', u'\\N')
69 1360 aaronmk
70 847 aaronmk
def main_():
71 131 aaronmk
    env_names = []
72
    def usage_err():
73 944 aaronmk
        raise SystemExit('Usage: '+opts.env_usage(env_names, True)+' '
74 1945 aaronmk
            +sys.argv[0]+' [map_path...] [<input] [>output]\n'
75 1946 aaronmk
            'Note: Row #s start with 1')
76 838 aaronmk
77 1570 aaronmk
    ## Get config from env vars
78 838 aaronmk
79 1570 aaronmk
    # Modes
80 946 aaronmk
    test = opts.env_flag('test', False, env_names)
81 947 aaronmk
    commit = opts.env_flag('commit', False, env_names) and not test
82 944 aaronmk
        # never commit in test mode
83 947 aaronmk
    redo = opts.env_flag('redo', test, env_names) and not commit
84 2977 aaronmk
        # never redo in commit mode (run `make schemas/reinstall` instead)
85 1570 aaronmk
86
    # Ranges
87 1946 aaronmk
    start = util.cast(int, opts.get_env_var('start', 1, env_names)) # 1-based
88
    # Make start interally 0-based.
89
    # It's 1-based to the user to match up with the staging table row #s.
90
    start -= 1
91 1945 aaronmk
    if test: n_default = 1
92
    else: n_default = None
93
    n = util.cast(int, util.none_if(opts.get_env_var('n', n_default, env_names),
94
        u''))
95
    end = n
96 1570 aaronmk
    if end != None: end += start
97
98
    # Debugging
99 3132 aaronmk
    verbosity = util.cast(float, opts.get_env_var('verbosity', None, env_names))
100 944 aaronmk
    opts.get_env_var('profile_to', None, env_names) # add to env_names
101 131 aaronmk
102 1570 aaronmk
    # DB
103
    def get_db_config(prefix):
104 1926 aaronmk
        return opts.get_env_vars(sql.db_config_names, prefix, env_names)
105 1570 aaronmk
    in_db_config = get_db_config('in')
106
    out_db_config = get_db_config('out')
107
    in_is_db = 'engine' in in_db_config
108
    out_is_db = 'engine' in out_db_config
109 1982 aaronmk
    in_schema = opts.get_env_var('in_schema', None, env_names)
110
    in_table = opts.get_env_var('in_table', None, env_names)
111 3339 aaronmk
    if in_schema != None:
112
        for config in [in_db_config, out_db_config]:
113
            config['schemas'] += ','+in_schema
114 1570 aaronmk
115 1989 aaronmk
    # Optimization
116 2050 aaronmk
    cache_sql = opts.env_flag('cache_sql', True, env_names)
117 1989 aaronmk
    by_col = in_db_config == out_db_config and opts.env_flag('by_col', False,
118
        env_names) # by-column optimization only applies if mapping to same DB
119
    if test: cpus_default = 0
120 2044 aaronmk
    else: cpus_default = 0 # or None to use parallel processing by default
121 1989 aaronmk
    cpus = util.cast(int, util.none_if(opts.get_env_var('cpus', cpus_default,
122
        env_names), u''))
123
124 3132 aaronmk
    # Set default verbosity. Must happen after by_col is set.
125 4026 aaronmk
    if verbosity == None:
126 3132 aaronmk
        if test: verbosity = 0.5 # automated tests should not be verbose
127
        elif by_col: verbosity = 3 # show all queries to assist profiling
128
        else: verbosity = 1.1 # just show row progress
129
130 1570 aaronmk
    ##
131
132 838 aaronmk
    # Logging
133 2457 aaronmk
    verbose_errors = test and verbosity > 0
134 3243 aaronmk
    debug = verbosity >= 1.5
135 2439 aaronmk
    def log(msg, level=1):
136
        '''Higher level -> more verbose'''
137 2452 aaronmk
        if level <= verbosity:
138 2490 aaronmk
            if verbosity <= 2:
139
                if level == 1.5: msg = '# '+msg # msg is Redmine list item
140
                elif msg.startswith('DB query:'): # remove extra debug info
141
                    first_line, nl, msg = msg.partition('\n')
142
            elif level > 1: msg = '['+str(level)+'] '+msg # include level in msg
143
144 2883 aaronmk
            sys.stderr.write(strings.to_raw_str(msg.rstrip('\n')+'\n'))
145 2439 aaronmk
    if debug: log_debug = lambda msg, level=2: log(msg, level)
146 1901 aaronmk
    else: log_debug = sql.log_debug_none
147 662 aaronmk
148 53 aaronmk
    # Parse args
149 510 aaronmk
    map_paths = sys.argv[1:]
150 512 aaronmk
    if map_paths == []:
151
        if in_is_db or not out_is_db: usage_err()
152
        else: map_paths = [None]
153 53 aaronmk
154 646 aaronmk
    def connect_db(db_config):
155 1901 aaronmk
        log('Connecting to '+sql.db_config_str(db_config))
156 2920 aaronmk
        return sql.connect(db_config, caching=cache_sql, autocommit=commit,
157 2916 aaronmk
            debug_temp=verbosity > 3 and commit, log_debug=log_debug)
158 646 aaronmk
159 1945 aaronmk
    if end != None: end_str = str(end-1) # end is one past the last #
160 1573 aaronmk
    else: end_str = 'end'
161 1901 aaronmk
    log('Processing input rows '+str(start)+'-'+end_str)
162 1573 aaronmk
163 1014 aaronmk
    ex_tracker = exc.ExPercentTracker(iter_text='row')
164
    profiler = profiling.ItersProfiler(start_now=True, iter_text='row')
165
166 1856 aaronmk
    # Parallel processing
167 2035 aaronmk
    pool = parallelproc.MultiProducerPool(cpus)
168 1901 aaronmk
    log('Using '+str(pool.process_ct)+' parallel CPUs')
169 1846 aaronmk
170 3661 aaronmk
    # Set up DB access
171
    row_ins_ct_ref = [0]
172
    if out_is_db:
173
        out_db = connect_db(out_db_config)
174 3689 aaronmk
        def is_rel_func(name):
175
            return (name in db_xml.put_special_funcs
176
                or sql.function_exists(out_db, sql_gen.Function(name)))
177 3661 aaronmk
178 1014 aaronmk
    doc = xml_dom.create_doc()
179
    root = doc.documentElement
180 751 aaronmk
    out_is_xml_ref = [False]
181 3653 aaronmk
182 1014 aaronmk
    in_label_ref = [None]
183 3653 aaronmk
    col_defaults = {}
184 1014 aaronmk
    def update_in_label():
185 3696 aaronmk
        if in_label_ref[0] != None and out_is_db:
186
            # TODO: Move this to the mappings as some kind of metadata
187 5242 aaronmk
            col_defaults['creator_id'] = db_xml.put(out_db, xpath.path2xml(
188
                'party:[creator_id=0]/organizationname="'+in_label_ref[0]+'"'
189 5234 aaronmk
                ), row_ins_ct_ref)
190 3653 aaronmk
191 1014 aaronmk
    def prep_root():
192
        root.clear()
193
        update_in_label()
194
    prep_root()
195 751 aaronmk
196 838 aaronmk
    def process_input(root, row_ready, map_path):
197 512 aaronmk
        '''Inputs datasource to XML tree, mapping if needed'''
198
        # Load map header
199
        in_is_xpaths = True
200 751 aaronmk
        out_is_xpaths = True
201 512 aaronmk
        out_label = None
202
        if map_path != None:
203
            metadata = []
204
            mappings = []
205
            stream = open(map_path, 'rb')
206
            reader = csv.reader(stream)
207
            in_label, out_label = reader.next()[:2]
208 1402 aaronmk
209 512 aaronmk
            def split_col_name(name):
210 1402 aaronmk
                label, sep, root = name.partition(':')
211 4652 aaronmk
                return label, sep != '', root, []
212 1402 aaronmk
213 1705 aaronmk
            in_label, in_root, prefixes = maps.col_info(in_label)
214
            in_is_xpaths = in_root != None
215 1014 aaronmk
            in_label_ref[0] = in_label
216
            update_in_label()
217 1705 aaronmk
            out_label, out_root = maps.col_info(out_label)[:2]
218
            out_is_xpaths = out_root != None
219 1841 aaronmk
            if out_is_xpaths: has_types = out_root.find('/*s/') >= 0
220 1705 aaronmk
                # outer elements are types
221 1402 aaronmk
222 512 aaronmk
            for row in reader:
223
                in_, out = row[:2]
224 2029 aaronmk
                if out != '': mappings.append([in_, out_root+out])
225 1402 aaronmk
226 512 aaronmk
            stream.close()
227
228
            root.ownerDocument.documentElement.tagName = out_label
229
        in_is_xml = in_is_xpaths and not in_is_db
230 751 aaronmk
        out_is_xml_ref[0] = out_is_xpaths and not out_is_db
231 56 aaronmk
232 1897 aaronmk
        def process_rows(process_row, rows, rows_start=0):
233 4026 aaronmk
            '''Processes input rows
234 838 aaronmk
            @param process_row(in_row, i)
235 1945 aaronmk
            @rows_start The (0-based) row # of the first row in rows. Set this
236
                only if the pre-start rows have already been skipped.
237 297 aaronmk
            '''
238 1897 aaronmk
            rows = iter(rows)
239
240
            if end != None: row_nums = xrange(rows_start, end)
241
            else: row_nums = itertools.count(rows_start)
242 2038 aaronmk
            i = -1
243 1897 aaronmk
            for i in row_nums:
244 1718 aaronmk
                try: row = rows.next()
245 2038 aaronmk
                except StopIteration:
246
                    i -= 1 # last row # didn't count
247
                    break # no more rows
248 1718 aaronmk
                if i < start: continue # not at start row yet
249
250 2881 aaronmk
                # Row # is interally 0-based, but 1-based to the user
251
                log('Processing input row #'+str(i+1), level=1.1)
252 838 aaronmk
                process_row(row, i)
253
                row_ready(i, row)
254 2033 aaronmk
            row_ct = i-start+1
255 982 aaronmk
            return row_ct
256 838 aaronmk
257 1898 aaronmk
        def map_rows(get_value, rows, **kw_args):
258 838 aaronmk
            '''Maps input rows
259
            @param get_value(in_, row):str
260
            '''
261 2030 aaronmk
            # Prevent collisions if multiple inputs mapping to same output
262
            outputs_idxs = dict()
263
            for i, mapping in enumerate(mappings):
264
                in_, out = mapping
265
                default = util.NamedTuple(count=1, first=i)
266
                idxs = outputs_idxs.setdefault(out, default)
267
                if idxs is not default: # key existed, so there was a collision
268 4048 aaronmk
                    if idxs.count == 1: # first key does not yet have suffix
269 4047 aaronmk
                        mappings[idxs.first][1] += collision_suffix+'0'
270
                    mappings[i][1] += collision_suffix+str(idxs.count)
271 2030 aaronmk
                    idxs.count += 1
272
273 2026 aaronmk
            id_node = None
274
            if out_is_db:
275 5927 aaronmk
                mappings_orig = mappings[:] # save a copy
276
                mappings[:] = [] # empty existing elements
277
                for in_, out in mappings_orig:
278
                    in_str = strings.ustr(in_)
279
                    is_metadata_ = is_metadata(in_str)
280
                    if is_metadata_: value = metadata_value(in_str)
281
                    else: value = '$'+in_str # mark as name
282
283 2026 aaronmk
                    # All put_obj()s should return the same id_node
284
                    nodes, id_node = xpath.put_obj(root, out, '-1', has_types,
285 5927 aaronmk
                        value) # value is placeholder that documents name
286
                    if not is_metadata_: mappings.append([in_, nodes])
287 3580 aaronmk
                if id_node == None:
288 4026 aaronmk
                    warnings.warn(UserWarning('Map warning: No mappings'))
289 4042 aaronmk
                xml_func.simplify(root)
290 4491 aaronmk
                sys.stdout.write(strings.to_raw_str('Put template:\n'
291
                    +strings.ustr(root)))
292 3669 aaronmk
                sys.stdout.flush()
293 2026 aaronmk
294 838 aaronmk
            def process_row(row, i):
295 316 aaronmk
                row_id = str(i)
296 2032 aaronmk
                if id_node != None: xml_dom.set_value(id_node, row_id)
297 316 aaronmk
                for in_, out in mappings:
298 4491 aaronmk
                    log_debug('Getting '+strings.ustr(in_))
299 5927 aaronmk
                    value = cleanup(get_value(in_, row))
300 4492 aaronmk
                    log_debug('Putting '+strings.urepr(value)+' to '
301
                        +strings.ustr(out))
302 2032 aaronmk
                    if out_is_db: # out is list of XML nodes
303
                        for node in out: xml_dom.set_value(node, value)
304
                    elif value != None: # out is XPath
305 1360 aaronmk
                        xpath.put_obj(root, out, row_id, has_types, value)
306 1898 aaronmk
            return process_rows(process_row, rows, **kw_args)
307 297 aaronmk
308 1898 aaronmk
        def map_table(col_names, rows, **kw_args):
309 1416 aaronmk
            col_names_ct = len(col_names)
310 1403 aaronmk
            col_idxs = util.list_flip(col_names)
311 4503 aaronmk
            col_names_simp = map(maps.simplify, col_names)
312 4505 aaronmk
            col_names_map = dict(zip(col_names_simp, col_names))
313 4503 aaronmk
            prefixes_simp = map(maps.simplify, prefixes)
314 1403 aaronmk
315 2029 aaronmk
            # Resolve prefixes
316 2025 aaronmk
            mappings_orig = mappings[:] # save a copy
317
            mappings[:] = [] # empty existing elements
318
            for in_, out in mappings_orig:
319 5928 aaronmk
                if is_metadata(in_): mappings.append((in_, out))
320
                else:
321 4503 aaronmk
                    try:
322 4505 aaronmk
                        cols = get_with_prefix(col_names_map, prefixes_simp,
323 4503 aaronmk
                            maps.simplify(in_))
324 2025 aaronmk
                    except KeyError: pass
325 4503 aaronmk
                    else:
326 4505 aaronmk
                        cols = [(orig, col_idxs[orig]) for simp, orig in cols]
327 4503 aaronmk
                        mappings[len(mappings):] = [[db_xml.ColRef(*col), out]
328
                            for col in cols] # can't use += because that uses =
329 1403 aaronmk
330 2040 aaronmk
            def get_value(in_, row): return row.list[in_.idx]
331 1416 aaronmk
            def wrap_row(row):
332
                return util.ListDict(util.list_as_length(row, col_names_ct),
333
                    col_names, col_idxs) # handle CSV rows of different lengths
334 1403 aaronmk
335 1898 aaronmk
            return map_rows(get_value, util.WrapIter(wrap_row, rows), **kw_args)
336 1403 aaronmk
337 1700 aaronmk
        if in_is_db:
338 2809 aaronmk
            def on_error(e): ex_tracker.track(e)
339
340 3186 aaronmk
            if by_col: in_db = out_db
341
            else: in_db = connect_db(in_db_config)
342 126 aaronmk
343 1982 aaronmk
            # Get table and schema name
344
            schema = in_schema # modified, so can't have same name as outer var
345
            table = in_table # modified, so can't have same name as outer var
346
            if table == None:
347
                assert in_is_xpaths
348
                schema, sep, table = in_root.partition('.')
349
                if sep == '': # only the table name was specified
350
                    table = schema
351
                    schema = None
352 2313 aaronmk
            table = sql_gen.Table(table, schema)
353 1982 aaronmk
354 1991 aaronmk
            # Fetch rows
355
            if by_col: limit = 0 # only fetch column names
356
            else: limit = n
357 2313 aaronmk
            cur = sql.select(in_db, table, limit=limit, start=start,
358 5523 aaronmk
                cacheable=False)
359 1991 aaronmk
            col_names = list(sql.col_names(cur))
360
361 1989 aaronmk
            if by_col:
362 2042 aaronmk
                map_table(col_names, []) # just create the template
363 2550 aaronmk
364 3617 aaronmk
                if start == 0 and n == None: # doing full re-import
365 2734 aaronmk
                    log('Clearing errors table')
366 4474 aaronmk
                    errors_table_ = sql_io.errors_table(in_db, table)
367
                    if errors_table_ != None:
368
                        sql.drop_table(in_db, errors_table_)
369 2734 aaronmk
370 2550 aaronmk
                # Strip XML functions not in the DB
371 3427 aaronmk
                xml_func.process(root, is_rel_func=is_rel_func)
372 4491 aaronmk
                if debug: log_debug('Putting stripped:\n'+strings.ustr(root))
373 2119 aaronmk
                    # only calc if debug
374 2550 aaronmk
375
                # Import rows
376 2422 aaronmk
                in_row_ct_ref = [0]
377 2928 aaronmk
                db_xml.put_table(in_db, root.firstChild, table, in_row_ct_ref,
378 3627 aaronmk
                    row_ins_ct_ref, n, start, on_error, col_defaults)
379 2422 aaronmk
                row_ct = in_row_ct_ref[0]
380 1984 aaronmk
            else:
381
                # Use normal by-row method
382 1991 aaronmk
                row_ct = map_table(col_names, sql.rows(cur), rows_start=start)
383
                    # rows_start: pre-start rows have been skipped
384 3186 aaronmk
385
                in_db.db.close()
386 161 aaronmk
        elif in_is_xml:
387 2809 aaronmk
            stdin = streams.LineCountStream(sys.stdin)
388
            def on_error(e):
389
                exc.add_msg(e, term.emph('input line #:')+' '
390
                    +str(stdin.line_num))
391
                ex_tracker.track(e)
392
393 1715 aaronmk
            def get_rows(doc2rows):
394 1758 aaronmk
                return iters.flatten(itertools.imap(doc2rows,
395
                    xml_parse.docs_iter(stdin, on_error)))
396 1715 aaronmk
397 1714 aaronmk
            if map_path == None:
398 1715 aaronmk
                def doc2rows(in_xml_root):
399 1701 aaronmk
                    iter_ = xml_dom.NodeElemIter(in_xml_root)
400
                    util.skip(iter_, xml_dom.is_text) # skip metadata
401 1715 aaronmk
                    return iter_
402
403
                row_ct = process_rows(lambda row, i: root.appendChild(row),
404
                    get_rows(doc2rows))
405 1714 aaronmk
            else:
406 1715 aaronmk
                def doc2rows(in_xml_root):
407 1701 aaronmk
                    rows = xpath.get(in_xml_root, in_root, limit=end)
408 3581 aaronmk
                    if rows == []: warnings.warn(UserWarning('Map warning: '
409
                        'Root "'+in_root+'" not found in input'))
410 1714 aaronmk
                    return rows
411
412
                def get_value(in_, row):
413
                    in_ = './{'+(','.join(strings.with_prefixes(
414
                        ['']+prefixes, in_)))+'}' # also with no prefix
415
                    nodes = xpath.get(row, in_, allow_rooted=False)
416
                    if nodes != []: return xml_dom.value(nodes[0])
417
                    else: return None
418
419 1715 aaronmk
                row_ct = map_rows(get_value, get_rows(doc2rows))
420 56 aaronmk
        else: # input is CSV
421 1389 aaronmk
            reader, col_names = csvs.reader_and_header(sys.stdin)
422 1403 aaronmk
            row_ct = map_table(col_names, reader)
423 982 aaronmk
424
        return row_ct
425 53 aaronmk
426 838 aaronmk
    def process_inputs(root, row_ready):
427 982 aaronmk
        row_ct = 0
428
        for map_path in map_paths:
429
            row_ct += process_input(root, row_ready, map_path)
430
        return row_ct
431 512 aaronmk
432 1886 aaronmk
    pool.share_vars(locals())
433 130 aaronmk
    if out_is_db:
434 53 aaronmk
        try:
435 947 aaronmk
            if redo: sql.empty_db(out_db)
436 1886 aaronmk
            pool.share_vars(locals())
437 449 aaronmk
438 838 aaronmk
            def row_ready(row_num, input_row):
439 2119 aaronmk
                row_str_ = [None]
440
                def row_str():
441
                    if row_str_[0] == None:
442
                        # Row # is interally 0-based, but 1-based to the user
443
                        row_str_[0] = (term.emph('row #:')+' '+str(row_num+1)
444 4491 aaronmk
                            +'\n'+term.emph('input row:')+'\n'
445
                            +strings.ustr(input_row))
446 2119 aaronmk
                        if verbose_errors: row_str_[0] += ('\n'
447 4491 aaronmk
                            +term.emph('output row:')+'\n'+strings.ustr(root))
448 2119 aaronmk
                    return row_str_[0]
449
450
                if debug: log_debug(row_str()) # only calc if debug
451
452 452 aaronmk
                def on_error(e):
453 2119 aaronmk
                    exc.add_msg(e, row_str())
454 2046 aaronmk
                    ex_tracker.track(e, row_num, detail=verbose_errors)
455 1886 aaronmk
                pool.share_vars(locals())
456 452 aaronmk
457 2032 aaronmk
                row_root = root.cloneNode(True) # deep copy so don't modify root
458 3689 aaronmk
                xml_func.process(row_root, on_error, is_rel_func, out_db)
459 4491 aaronmk
                if debug: log_debug('Putting processed:\n'
460
                    +strings.ustr(row_root)) # only calc if debug
461 2032 aaronmk
                if not xml_dom.is_empty(row_root):
462
                    assert xml_dom.has_one_child(row_root)
463 442 aaronmk
                    try:
464 982 aaronmk
                        sql.with_savepoint(out_db,
465 2032 aaronmk
                            lambda: db_xml.put(out_db, row_root.firstChild,
466 3653 aaronmk
                                row_ins_ct_ref, on_error, col_defaults))
467 449 aaronmk
                    except sql.DatabaseErrors, e: on_error(e)
468
469 982 aaronmk
            row_ct = process_inputs(root, row_ready)
470
            sys.stdout.write('Inserted '+str(row_ins_ct_ref[0])+
471 460 aaronmk
                ' new rows into database\n')
472 3669 aaronmk
            sys.stdout.flush()
473 1868 aaronmk
474
            # Consume asynchronous tasks
475
            pool.main_loop()
476 3117 aaronmk
        finally: out_db.close()
477 751 aaronmk
    else:
478 759 aaronmk
        def on_error(e): ex_tracker.track(e)
479 838 aaronmk
        def row_ready(row_num, input_row): pass
480 982 aaronmk
        row_ct = process_inputs(root, row_ready)
481 759 aaronmk
        xml_func.process(root, on_error)
482 751 aaronmk
        if out_is_xml_ref[0]:
483
            doc.writexml(sys.stdout, **xml_dom.prettyxml_config)
484
        else: # output is CSV
485
            raise NotImplementedError('CSV output not supported yet')
486 985 aaronmk
487 1868 aaronmk
    # Consume any asynchronous tasks not already consumed above
488 1862 aaronmk
    pool.main_loop()
489
490 982 aaronmk
    profiler.stop(row_ct)
491 3324 aaronmk
    if not by_col: ex_tracker.add_iters(row_ct) # only if errors are done by row
492 2439 aaronmk
    log('Processed '+str(row_ct)+' input rows')
493
    log(profiler.msg())
494
    log(ex_tracker.msg())
495 985 aaronmk
    ex_tracker.exit()
496 53 aaronmk
497 847 aaronmk
def main():
498
    try: main_()
499 4491 aaronmk
    except Parser.SyntaxError, e: raise SystemExit(strings.ustr(e))
500 847 aaronmk
501 846 aaronmk
if __name__ == '__main__':
502 847 aaronmk
    profile_to = opts.get_env_var('profile_to', None)
503
    if profile_to != None:
504
        import cProfile
505 1584 aaronmk
        sys.stderr.write('Profiling to '+profile_to+'\n')
506 847 aaronmk
        cProfile.run(main.func_code, profile_to)
507
    else: main()