Project

General

Profile

1 53 aaronmk
#!/usr/bin/env python
2
# Maps one datasource to another, using a map spreadsheet if needed
3 986 aaronmk
# Exit status is the # of errors in the import, up to the maximum exit status
4 53 aaronmk
# For outputting an XML file to a PostgreSQL database, use the general format of
5
# http://vegbank.org/vegdocs/xml/vegbank_example_ver1.0.2.xml
6
7 1014 aaronmk
import csv
8 1714 aaronmk
import itertools
9 53 aaronmk
import os.path
10
import sys
11 299 aaronmk
import xml.dom.minidom as minidom
12 53 aaronmk
13 266 aaronmk
sys.path.append(os.path.dirname(__file__)+"/../lib")
14 53 aaronmk
15 1389 aaronmk
import csvs
16 344 aaronmk
import exc
17 1714 aaronmk
import iters
18 1705 aaronmk
import maps
19 64 aaronmk
import opts
20 1859 aaronmk
import parallel
21 281 aaronmk
import Parser
22 982 aaronmk
import profiling
23 131 aaronmk
import sql
24 1758 aaronmk
import streams
25 715 aaronmk
import strings
26 828 aaronmk
import term
27 310 aaronmk
import util
28 1014 aaronmk
import xpath
29 133 aaronmk
import xml_dom
30 86 aaronmk
import xml_func
31 1713 aaronmk
import xml_parse
32 53 aaronmk
33 1404 aaronmk
def get_with_prefix(map_, prefixes, key):
34 1484 aaronmk
    '''Gets all entries for the given key with any of the given prefixes'''
35
    values = []
36 1681 aaronmk
    for key_ in strings.with_prefixes(['']+prefixes, key): # also with no prefix
37
        try: value = map_[key_]
38 1484 aaronmk
        except KeyError, e: continue # keep going
39
        values.append(value)
40
41
    if values != []: return values
42
    else: raise e # re-raise last KeyError
43 1404 aaronmk
44 1018 aaronmk
def metadata_value(name): return None # this feature has been removed
45 84 aaronmk
46 1360 aaronmk
def cleanup(val):
47
    if val == None: return val
48 1374 aaronmk
    return util.none_if(strings.cleanup(strings.ustr(val)), u'', u'\\N')
49 1360 aaronmk
50 847 aaronmk
def main_():
51 131 aaronmk
    env_names = []
52
    def usage_err():
53 944 aaronmk
        raise SystemExit('Usage: '+opts.env_usage(env_names, True)+' '
54 1945 aaronmk
            +sys.argv[0]+' [map_path...] [<input] [>output]\n'
55 1946 aaronmk
            'Note: Row #s start with 1')
56 838 aaronmk
57 1570 aaronmk
    ## Get config from env vars
58 838 aaronmk
59 1570 aaronmk
    # Modes
60 946 aaronmk
    test = opts.env_flag('test', False, env_names)
61 947 aaronmk
    commit = opts.env_flag('commit', False, env_names) and not test
62 944 aaronmk
        # never commit in test mode
63 947 aaronmk
    redo = opts.env_flag('redo', test, env_names) and not commit
64
        # never redo in commit mode (manually run `make empty_db` instead)
65 1570 aaronmk
66
    # Ranges
67 1946 aaronmk
    start = util.cast(int, opts.get_env_var('start', 1, env_names)) # 1-based
68
    # Make start interally 0-based.
69
    # It's 1-based to the user to match up with the staging table row #s.
70
    start -= 1
71 1945 aaronmk
    if test: n_default = 1
72
    else: n_default = None
73
    n = util.cast(int, util.none_if(opts.get_env_var('n', n_default, env_names),
74
        u''))
75
    end = n
76 1570 aaronmk
    if end != None: end += start
77
78 1846 aaronmk
    # Optimization
79 1985 aaronmk
    by_col = opts.env_flag('by_col', False, env_names)
80 1851 aaronmk
    if test: cpus_default = 0
81
    else: cpus_default = None
82 1856 aaronmk
    cpus = util.cast(int, util.none_if(opts.get_env_var('cpus', cpus_default,
83
        env_names), u''))
84 1846 aaronmk
85 1570 aaronmk
    # Debugging
86 946 aaronmk
    debug = opts.env_flag('debug', False, env_names)
87 859 aaronmk
    sql.run_raw_query.debug = debug
88 1574 aaronmk
    verbose = debug or opts.env_flag('verbose', not test, env_names)
89 944 aaronmk
    opts.get_env_var('profile_to', None, env_names) # add to env_names
90 131 aaronmk
91 1570 aaronmk
    # DB
92
    def get_db_config(prefix):
93 1926 aaronmk
        return opts.get_env_vars(sql.db_config_names, prefix, env_names)
94 1570 aaronmk
    in_db_config = get_db_config('in')
95
    out_db_config = get_db_config('out')
96
    in_is_db = 'engine' in in_db_config
97
    out_is_db = 'engine' in out_db_config
98 1982 aaronmk
    in_schema = opts.get_env_var('in_schema', None, env_names)
99
    in_table = opts.get_env_var('in_table', None, env_names)
100 1570 aaronmk
101
    ##
102
103 838 aaronmk
    # Logging
104 662 aaronmk
    def log(msg, on=verbose):
105 1901 aaronmk
        if on: sys.stderr.write(msg+'\n')
106
    if debug: log_debug = lambda msg: log(msg, debug)
107
    else: log_debug = sql.log_debug_none
108 662 aaronmk
109 53 aaronmk
    # Parse args
110 510 aaronmk
    map_paths = sys.argv[1:]
111 512 aaronmk
    if map_paths == []:
112
        if in_is_db or not out_is_db: usage_err()
113
        else: map_paths = [None]
114 53 aaronmk
115 646 aaronmk
    def connect_db(db_config):
116 1901 aaronmk
        log('Connecting to '+sql.db_config_str(db_config))
117
        return sql.connect(db_config, log_debug=log_debug)
118 646 aaronmk
119 1945 aaronmk
    if end != None: end_str = str(end-1) # end is one past the last #
120 1573 aaronmk
    else: end_str = 'end'
121 1901 aaronmk
    log('Processing input rows '+str(start)+'-'+end_str)
122 1573 aaronmk
123 1014 aaronmk
    ex_tracker = exc.ExPercentTracker(iter_text='row')
124
    profiler = profiling.ItersProfiler(start_now=True, iter_text='row')
125
126 1856 aaronmk
    # Parallel processing
127 1862 aaronmk
    pool = parallel.MultiProducerPool(cpus)
128 1901 aaronmk
    log('Using '+str(pool.process_ct)+' parallel CPUs')
129 1846 aaronmk
130 1014 aaronmk
    doc = xml_dom.create_doc()
131
    root = doc.documentElement
132 751 aaronmk
    out_is_xml_ref = [False]
133 1014 aaronmk
    in_label_ref = [None]
134
    def update_in_label():
135
        if in_label_ref[0] != None:
136
            xpath.get(root, '/_ignore/inLabel="'+in_label_ref[0]+'"', True)
137
    def prep_root():
138
        root.clear()
139
        update_in_label()
140
    prep_root()
141 751 aaronmk
142 838 aaronmk
    def process_input(root, row_ready, map_path):
143 512 aaronmk
        '''Inputs datasource to XML tree, mapping if needed'''
144
        # Load map header
145
        in_is_xpaths = True
146 751 aaronmk
        out_is_xpaths = True
147 512 aaronmk
        out_label = None
148
        if map_path != None:
149
            metadata = []
150
            mappings = []
151
            stream = open(map_path, 'rb')
152
            reader = csv.reader(stream)
153
            in_label, out_label = reader.next()[:2]
154 1402 aaronmk
155 512 aaronmk
            def split_col_name(name):
156 1402 aaronmk
                label, sep, root = name.partition(':')
157
                label, sep2, prefixes_str = label.partition('[')
158
                prefixes_str = strings.remove_suffix(']', prefixes_str)
159
                prefixes = strings.split(',', prefixes_str)
160
                return label, sep != '', root, prefixes
161 1198 aaronmk
                    # extract datasrc from "datasrc[data_format]"
162 1402 aaronmk
163 1705 aaronmk
            in_label, in_root, prefixes = maps.col_info(in_label)
164
            in_is_xpaths = in_root != None
165 1014 aaronmk
            in_label_ref[0] = in_label
166
            update_in_label()
167 1705 aaronmk
            out_label, out_root = maps.col_info(out_label)[:2]
168
            out_is_xpaths = out_root != None
169 1841 aaronmk
            if out_is_xpaths: has_types = out_root.find('/*s/') >= 0
170 1705 aaronmk
                # outer elements are types
171 1402 aaronmk
172 512 aaronmk
            for row in reader:
173
                in_, out = row[:2]
174
                if out != '':
175
                    if out_is_xpaths: out = xpath.parse(out_root+out)
176
                    mappings.append((in_, out))
177 1402 aaronmk
178 512 aaronmk
            stream.close()
179
180
            root.ownerDocument.documentElement.tagName = out_label
181
        in_is_xml = in_is_xpaths and not in_is_db
182 751 aaronmk
        out_is_xml_ref[0] = out_is_xpaths and not out_is_db
183 56 aaronmk
184 1897 aaronmk
        def process_rows(process_row, rows, rows_start=0):
185
            '''Processes input rows
186 838 aaronmk
            @param process_row(in_row, i)
187 1945 aaronmk
            @rows_start The (0-based) row # of the first row in rows. Set this
188
                only if the pre-start rows have already been skipped.
189 297 aaronmk
            '''
190 1897 aaronmk
            rows = iter(rows)
191
192
            if end != None: row_nums = xrange(rows_start, end)
193
            else: row_nums = itertools.count(rows_start)
194
            for i in row_nums:
195 1718 aaronmk
                try: row = rows.next()
196
                except StopIteration: break # no more rows
197
                if i < start: continue # not at start row yet
198
199 838 aaronmk
                process_row(row, i)
200
                row_ready(i, row)
201 1718 aaronmk
            row_ct = i-start
202 982 aaronmk
            return row_ct
203 838 aaronmk
204 1898 aaronmk
        def map_rows(get_value, rows, **kw_args):
205 838 aaronmk
            '''Maps input rows
206
            @param get_value(in_, row):str
207
            '''
208
            def process_row(row, i):
209 316 aaronmk
                row_id = str(i)
210
                for in_, out in mappings:
211
                    value = metadata_value(in_)
212 662 aaronmk
                    if value == None:
213 1901 aaronmk
                        log_debug('Getting '+str(in_))
214 1360 aaronmk
                        value = cleanup(get_value(in_, row))
215 1348 aaronmk
                    if value != None:
216 1901 aaronmk
                        log_debug('Putting '+str(out))
217 1360 aaronmk
                        xpath.put_obj(root, out, row_id, has_types, value)
218 1898 aaronmk
            return process_rows(process_row, rows, **kw_args)
219 297 aaronmk
220 1898 aaronmk
        def map_table(col_names, rows, **kw_args):
221 1416 aaronmk
            col_names_ct = len(col_names)
222 1403 aaronmk
            col_idxs = util.list_flip(col_names)
223
224
            i = 0
225
            while i < len(mappings): # mappings len changes in loop
226
                in_, out = mappings[i]
227
                if metadata_value(in_) == None:
228 1404 aaronmk
                    try: mappings[i] = (
229
                        get_with_prefix(col_idxs, prefixes, in_), out)
230
                    except KeyError:
231
                        del mappings[i]
232
                        continue # keep i the same
233 1403 aaronmk
                i += 1
234
235
            def get_value(in_, row):
236 1484 aaronmk
                return util.coalesce(*util.list_subset(row.list, in_))
237 1416 aaronmk
            def wrap_row(row):
238
                return util.ListDict(util.list_as_length(row, col_names_ct),
239
                    col_names, col_idxs) # handle CSV rows of different lengths
240 1403 aaronmk
241 1898 aaronmk
            return map_rows(get_value, util.WrapIter(wrap_row, rows), **kw_args)
242 1403 aaronmk
243 1758 aaronmk
        stdin = streams.LineCountStream(sys.stdin)
244
        def on_error(e):
245
            exc.add_msg(e, term.emph('input line #:')+' '+str(stdin.line_num))
246
            ex_tracker.track(e)
247
248 1700 aaronmk
        if in_is_db:
249 1982 aaronmk
            in_db = connect_db(in_db_config)
250 126 aaronmk
251 1982 aaronmk
            # Get table and schema name
252
            schema = in_schema # modified, so can't have same name as outer var
253
            table = in_table # modified, so can't have same name as outer var
254
            if table == None:
255
                assert in_is_xpaths
256
                schema, sep, table = in_root.partition('.')
257
                if sep == '': # only the table name was specified
258
                    table = schema
259
                    schema = None
260
            table_is_esc = False
261
            if schema != None:
262
                table = sql.qual_name(in_db, schema, table)
263
                table_is_esc = True
264
265 1985 aaronmk
            if by_col and in_db_config == out_db_config:
266
                # Mapping to same DB and by-column optimization enabled
267 1984 aaronmk
                raise NotImplementedError(
268
                    'By-column optimization not available yet')
269
            else:
270
                # Use normal by-row method
271
                cur = sql.select(in_db, table, limit=n, start=start,
272
                    table_is_esc=table_is_esc)
273
                row_ct = map_table(list(sql.col_names(cur)), sql.rows(cur),
274
                    rows_start=start) # rows_start: pre-start rows were skipped
275 1136 aaronmk
276 1849 aaronmk
            in_db.db.close()
277 161 aaronmk
        elif in_is_xml:
278 1715 aaronmk
            def get_rows(doc2rows):
279 1758 aaronmk
                return iters.flatten(itertools.imap(doc2rows,
280
                    xml_parse.docs_iter(stdin, on_error)))
281 1715 aaronmk
282 1714 aaronmk
            if map_path == None:
283 1715 aaronmk
                def doc2rows(in_xml_root):
284 1701 aaronmk
                    iter_ = xml_dom.NodeElemIter(in_xml_root)
285
                    util.skip(iter_, xml_dom.is_text) # skip metadata
286 1715 aaronmk
                    return iter_
287
288
                row_ct = process_rows(lambda row, i: root.appendChild(row),
289
                    get_rows(doc2rows))
290 1714 aaronmk
            else:
291 1715 aaronmk
                def doc2rows(in_xml_root):
292 1701 aaronmk
                    rows = xpath.get(in_xml_root, in_root, limit=end)
293 1714 aaronmk
                    if rows == []: raise SystemExit('Map error: Root "'
294
                        +in_root+'" not found in input')
295
                    return rows
296
297
                def get_value(in_, row):
298
                    in_ = './{'+(','.join(strings.with_prefixes(
299
                        ['']+prefixes, in_)))+'}' # also with no prefix
300
                    nodes = xpath.get(row, in_, allow_rooted=False)
301
                    if nodes != []: return xml_dom.value(nodes[0])
302
                    else: return None
303
304 1715 aaronmk
                row_ct = map_rows(get_value, get_rows(doc2rows))
305 56 aaronmk
        else: # input is CSV
306 133 aaronmk
            map_ = dict(mappings)
307 1389 aaronmk
            reader, col_names = csvs.reader_and_header(sys.stdin)
308 1403 aaronmk
            row_ct = map_table(col_names, reader)
309 982 aaronmk
310
        return row_ct
311 53 aaronmk
312 838 aaronmk
    def process_inputs(root, row_ready):
313 982 aaronmk
        row_ct = 0
314
        for map_path in map_paths:
315
            row_ct += process_input(root, row_ready, map_path)
316
        return row_ct
317 512 aaronmk
318 1886 aaronmk
    pool.share_vars(locals())
319 130 aaronmk
    if out_is_db:
320 53 aaronmk
        import db_xml
321
322 646 aaronmk
        out_db = connect_db(out_db_config)
323 53 aaronmk
        try:
324 947 aaronmk
            if redo: sql.empty_db(out_db)
325 982 aaronmk
            row_ins_ct_ref = [0]
326 1886 aaronmk
            pool.share_vars(locals())
327 449 aaronmk
328 838 aaronmk
            def row_ready(row_num, input_row):
329 452 aaronmk
                def on_error(e):
330 1946 aaronmk
                    exc.add_msg(e, term.emph('row #:')+' '+str(row_num+1))
331
                        # row # is interally 0-based, but 1-based to the user
332 828 aaronmk
                    exc.add_msg(e, term.emph('input row:')+'\n'+str(input_row))
333
                    exc.add_msg(e, term.emph('output row:')+'\n'+str(root))
334 1617 aaronmk
                    ex_tracker.track(e, row_num)
335 1886 aaronmk
                pool.share_vars(locals())
336 452 aaronmk
337 449 aaronmk
                xml_func.process(root, on_error)
338 442 aaronmk
                if not xml_dom.is_empty(root):
339
                    assert xml_dom.has_one_child(root)
340
                    try:
341 982 aaronmk
                        sql.with_savepoint(out_db,
342
                            lambda: db_xml.put(out_db, root.firstChild,
343 1850 aaronmk
                                row_ins_ct_ref, on_error))
344 1849 aaronmk
                        if commit: out_db.db.commit()
345 449 aaronmk
                    except sql.DatabaseErrors, e: on_error(e)
346 1010 aaronmk
                prep_root()
347 449 aaronmk
348 982 aaronmk
            row_ct = process_inputs(root, row_ready)
349
            sys.stdout.write('Inserted '+str(row_ins_ct_ref[0])+
350 460 aaronmk
                ' new rows into database\n')
351 1868 aaronmk
352
            # Consume asynchronous tasks
353
            pool.main_loop()
354 53 aaronmk
        finally:
355 1849 aaronmk
            out_db.db.rollback()
356
            out_db.db.close()
357 751 aaronmk
    else:
358 759 aaronmk
        def on_error(e): ex_tracker.track(e)
359 838 aaronmk
        def row_ready(row_num, input_row): pass
360 982 aaronmk
        row_ct = process_inputs(root, row_ready)
361 759 aaronmk
        xml_func.process(root, on_error)
362 751 aaronmk
        if out_is_xml_ref[0]:
363
            doc.writexml(sys.stdout, **xml_dom.prettyxml_config)
364
        else: # output is CSV
365
            raise NotImplementedError('CSV output not supported yet')
366 985 aaronmk
367 1868 aaronmk
    # Consume any asynchronous tasks not already consumed above
368 1862 aaronmk
    pool.main_loop()
369
370 982 aaronmk
    profiler.stop(row_ct)
371
    ex_tracker.add_iters(row_ct)
372 990 aaronmk
    if verbose:
373
        sys.stderr.write('Processed '+str(row_ct)+' input rows\n')
374
        sys.stderr.write(profiler.msg()+'\n')
375
        sys.stderr.write(ex_tracker.msg()+'\n')
376 985 aaronmk
    ex_tracker.exit()
377 53 aaronmk
378 847 aaronmk
def main():
379
    try: main_()
380 1719 aaronmk
    except Parser.SyntaxError, e: raise SystemExit(str(e))
381 847 aaronmk
382 846 aaronmk
if __name__ == '__main__':
383 847 aaronmk
    profile_to = opts.get_env_var('profile_to', None)
384
    if profile_to != None:
385
        import cProfile
386 1584 aaronmk
        sys.stderr.write('Profiling to '+profile_to+'\n')
387 847 aaronmk
        cProfile.run(main.func_code, profile_to)
388
    else: main()