Project

General

Profile

1
#!/usr/bin/env python
2
# Maps one datasource to another, using a map spreadsheet if needed
3
# Exit status is the # of errors in the import, up to the maximum exit status
4
# For outputting an XML file to a PostgreSQL database, use the general format of
5
# http://vegbank.org/vegdocs/xml/vegbank_example_ver1.0.2.xml
6

    
7
import csv
8
import itertools
9
import os.path
10
import sys
11
import xml.dom.minidom as minidom
12

    
13
sys.path.append(os.path.dirname(__file__)+"/../lib")
14

    
15
import csvs
16
import exc
17
import iters
18
import maps
19
import opts
20
import Parser
21
import profiling
22
import sql
23
import streams
24
import strings
25
import term
26
import util
27
import xpath
28
import xml_dom
29
import xml_func
30
import xml_parse
31

    
32
def get_with_prefix(map_, prefixes, key):
33
    '''Gets all entries for the given key with any of the given prefixes'''
34
    values = []
35
    for key_ in strings.with_prefixes(['']+prefixes, key): # also with no prefix
36
        try: value = map_[key_]
37
        except KeyError, e: continue # keep going
38
        values.append(value)
39
    
40
    if values != []: return values
41
    else: raise e # re-raise last KeyError
42

    
43
def metadata_value(name): return None # this feature has been removed
44

    
45
def cleanup(val):
46
    if val == None: return val
47
    return util.none_if(strings.cleanup(strings.ustr(val)), u'', u'\\N')
48

    
49
def main_():
50
    env_names = []
51
    def usage_err():
52
        raise SystemExit('Usage: '+opts.env_usage(env_names, True)+' '
53
            +sys.argv[0]+' [map_path...] [<input] [>output]')
54
    
55
    ## Get config from env vars
56
    
57
    # Modes
58
    test = opts.env_flag('test', False, env_names)
59
    commit = opts.env_flag('commit', False, env_names) and not test
60
        # never commit in test mode
61
    redo = opts.env_flag('redo', test, env_names) and not commit
62
        # never redo in commit mode (manually run `make empty_db` instead)
63
    
64
    # Ranges
65
    start = util.cast(int, opts.get_env_var('start', '0', env_names))
66
    if test: end_default = 1
67
    else: end_default = None
68
    end = util.cast(int, util.none_if(
69
        opts.get_env_var('n', end_default, env_names), u''))
70
    if end != None: end += start
71
    
72
    # Debugging
73
    debug = opts.env_flag('debug', False, env_names)
74
    sql.run_raw_query.debug = debug
75
    verbose = debug or opts.env_flag('verbose', not test, env_names)
76
    opts.get_env_var('profile_to', None, env_names) # add to env_names
77
    
78
    # DB
79
    db_config_names = ['engine', 'host', 'user', 'password', 'database']
80
    def get_db_config(prefix):
81
        return opts.get_env_vars(db_config_names, prefix, env_names)
82
    in_db_config = get_db_config('in')
83
    out_db_config = get_db_config('out')
84
    in_is_db = 'engine' in in_db_config
85
    out_is_db = 'engine' in out_db_config
86
    
87
    ##
88
    
89
    # Logging
90
    def log(msg, on=verbose):
91
        if on: sys.stderr.write(msg)
92
    def log_start(action, on=verbose): log(action+'...\n', on)
93
    
94
    # Parse args
95
    map_paths = sys.argv[1:]
96
    if map_paths == []:
97
        if in_is_db or not out_is_db: usage_err()
98
        else: map_paths = [None]
99
    
100
    def connect_db(db_config):
101
        log_start('Connecting to '+sql.db_config_str(db_config))
102
        return sql.connect(db_config)
103
    
104
    if end != None: end_str = str(end-1)
105
    else: end_str = 'end'
106
    log_start('Processing input rows '+str(start)+'-'+end_str)
107
    
108
    ex_tracker = exc.ExPercentTracker(iter_text='row')
109
    profiler = profiling.ItersProfiler(start_now=True, iter_text='row')
110
    
111
    doc = xml_dom.create_doc()
112
    root = doc.documentElement
113
    out_is_xml_ref = [False]
114
    in_label_ref = [None]
115
    def update_in_label():
116
        if in_label_ref[0] != None:
117
            xpath.get(root, '/_ignore/inLabel="'+in_label_ref[0]+'"', True)
118
    def prep_root():
119
        root.clear()
120
        update_in_label()
121
    prep_root()
122
    
123
    def process_input(root, row_ready, map_path):
124
        '''Inputs datasource to XML tree, mapping if needed'''
125
        # Load map header
126
        in_is_xpaths = True
127
        out_is_xpaths = True
128
        out_label = None
129
        if map_path != None:
130
            metadata = []
131
            mappings = []
132
            stream = open(map_path, 'rb')
133
            reader = csv.reader(stream)
134
            in_label, out_label = reader.next()[:2]
135
            
136
            def split_col_name(name):
137
                label, sep, root = name.partition(':')
138
                label, sep2, prefixes_str = label.partition('[')
139
                prefixes_str = strings.remove_suffix(']', prefixes_str)
140
                prefixes = strings.split(',', prefixes_str)
141
                return label, sep != '', root, prefixes
142
                    # extract datasrc from "datasrc[data_format]"
143
            
144
            in_label, in_root, prefixes = maps.col_info(in_label)
145
            in_is_xpaths = in_root != None
146
            in_label_ref[0] = in_label
147
            update_in_label()
148
            out_label, out_root = maps.col_info(out_label)[:2]
149
            out_is_xpaths = out_root != None
150
            if out_is_xpaths: has_types = out_root.startswith('/*s/')
151
                # outer elements are types
152
            
153
            for row in reader:
154
                in_, out = row[:2]
155
                if out != '':
156
                    if out_is_xpaths: out = xpath.parse(out_root+out)
157
                    mappings.append((in_, out))
158
            
159
            stream.close()
160
            
161
            root.ownerDocument.documentElement.tagName = out_label
162
        in_is_xml = in_is_xpaths and not in_is_db
163
        out_is_xml_ref[0] = out_is_xpaths and not out_is_db
164
        
165
        def process_rows(process_row, rows):
166
            '''Processes input rows
167
            @param process_row(in_row, i)
168
            '''
169
            i = 0
170
            while end == None or i < end:
171
                try: row = rows.next()
172
                except StopIteration: break # no more rows
173
                if i < start: continue # not at start row yet
174
                
175
                process_row(row, i)
176
                row_ready(i, row)
177
                i += 1
178
            row_ct = i-start
179
            return row_ct
180
        
181
        def map_rows(get_value, rows):
182
            '''Maps input rows
183
            @param get_value(in_, row):str
184
            '''
185
            def process_row(row, i):
186
                row_id = str(i)
187
                for in_, out in mappings:
188
                    value = metadata_value(in_)
189
                    if value == None:
190
                        log_start('Getting '+str(in_), debug)
191
                        value = cleanup(get_value(in_, row))
192
                    if value != None:
193
                        log_start('Putting '+str(out), debug)
194
                        xpath.put_obj(root, out, row_id, has_types, value)
195
            return process_rows(process_row, rows)
196
        
197
        def map_table(col_names, rows):
198
            col_names_ct = len(col_names)
199
            col_idxs = util.list_flip(col_names)
200
            
201
            i = 0
202
            while i < len(mappings): # mappings len changes in loop
203
                in_, out = mappings[i]
204
                if metadata_value(in_) == None:
205
                    try: mappings[i] = (
206
                        get_with_prefix(col_idxs, prefixes, in_), out)
207
                    except KeyError:
208
                        del mappings[i]
209
                        continue # keep i the same
210
                i += 1
211
            
212
            def get_value(in_, row):
213
                return util.coalesce(*util.list_subset(row.list, in_))
214
            def wrap_row(row):
215
                return util.ListDict(util.list_as_length(row, col_names_ct),
216
                    col_names, col_idxs) # handle CSV rows of different lengths
217
            
218
            return map_rows(get_value, util.WrapIter(wrap_row, rows))
219
        
220
        stdin = streams.LineCountStream(sys.stdin)
221
        def on_error(e):
222
            exc.add_msg(e, term.emph('input line #:')+' '+str(stdin.line_num))
223
            ex_tracker.track(e)
224
        
225
        if in_is_db:
226
            assert in_is_xpaths
227
            
228
            in_db = connect_db(in_db_config)
229
            in_pkeys = {}
230
            cur = sql.select(in_db, table=in_root, fields=None, conds=None,
231
                limit=end, start=0)
232
            row_ct = map_table(list(sql.col_names(cur)), sql.rows(cur))
233
            
234
            in_db.close()
235
        elif in_is_xml:
236
            def get_rows(doc2rows):
237
                return iters.flatten(itertools.imap(doc2rows,
238
                    xml_parse.docs_iter(stdin, on_error)))
239
            
240
            if map_path == None:
241
                def doc2rows(in_xml_root):
242
                    iter_ = xml_dom.NodeElemIter(in_xml_root)
243
                    util.skip(iter_, xml_dom.is_text) # skip metadata
244
                    return iter_
245
                
246
                row_ct = process_rows(lambda row, i: root.appendChild(row),
247
                    get_rows(doc2rows))
248
            else:
249
                def doc2rows(in_xml_root):
250
                    rows = xpath.get(in_xml_root, in_root, limit=end)
251
                    if rows == []: raise SystemExit('Map error: Root "'
252
                        +in_root+'" not found in input')
253
                    return rows
254
                
255
                def get_value(in_, row):
256
                    in_ = './{'+(','.join(strings.with_prefixes(
257
                        ['']+prefixes, in_)))+'}' # also with no prefix
258
                    nodes = xpath.get(row, in_, allow_rooted=False)
259
                    if nodes != []: return xml_dom.value(nodes[0])
260
                    else: return None
261
                
262
                row_ct = map_rows(get_value, get_rows(doc2rows))
263
        else: # input is CSV
264
            map_ = dict(mappings)
265
            reader, col_names = csvs.reader_and_header(sys.stdin)
266
            row_ct = map_table(col_names, reader)
267
        
268
        return row_ct
269
    
270
    def process_inputs(root, row_ready):
271
        row_ct = 0
272
        for map_path in map_paths:
273
            row_ct += process_input(root, row_ready, map_path)
274
        return row_ct
275
    
276
    if out_is_db:
277
        import db_xml
278
        
279
        out_db = connect_db(out_db_config)
280
        out_pkeys = {}
281
        try:
282
            if redo: sql.empty_db(out_db)
283
            row_ins_ct_ref = [0]
284
            
285
            def row_ready(row_num, input_row):
286
                def on_error(e):
287
                    exc.add_msg(e, term.emph('row #:')+' '+str(row_num))
288
                    exc.add_msg(e, term.emph('input row:')+'\n'+str(input_row))
289
                    exc.add_msg(e, term.emph('output row:')+'\n'+str(root))
290
                    ex_tracker.track(e, row_num)
291
                
292
                xml_func.process(root, on_error)
293
                if not xml_dom.is_empty(root):
294
                    assert xml_dom.has_one_child(root)
295
                    try:
296
                        sql.with_savepoint(out_db,
297
                            lambda: db_xml.put(out_db, root.firstChild,
298
                                out_pkeys, row_ins_ct_ref, on_error))
299
                        if commit: out_db.commit()
300
                    except sql.DatabaseErrors, e: on_error(e)
301
                prep_root()
302
            
303
            row_ct = process_inputs(root, row_ready)
304
            sys.stdout.write('Inserted '+str(row_ins_ct_ref[0])+
305
                ' new rows into database\n')
306
        finally:
307
            out_db.rollback()
308
            out_db.close()
309
    else:
310
        def on_error(e): ex_tracker.track(e)
311
        def row_ready(row_num, input_row): pass
312
        row_ct = process_inputs(root, row_ready)
313
        xml_func.process(root, on_error)
314
        if out_is_xml_ref[0]:
315
            doc.writexml(sys.stdout, **xml_dom.prettyxml_config)
316
        else: # output is CSV
317
            raise NotImplementedError('CSV output not supported yet')
318
    
319
    profiler.stop(row_ct)
320
    ex_tracker.add_iters(row_ct)
321
    if verbose:
322
        sys.stderr.write('Processed '+str(row_ct)+' input rows\n')
323
        sys.stderr.write(profiler.msg()+'\n')
324
        sys.stderr.write(ex_tracker.msg()+'\n')
325
    ex_tracker.exit()
326

    
327
def main():
328
    try: main_()
329
    except Parser.SyntaxError, e: raise SystemExit(str(e))
330

    
331
if __name__ == '__main__':
332
    profile_to = opts.get_env_var('profile_to', None)
333
    if profile_to != None:
334
        import cProfile
335
        sys.stderr.write('Profiling to '+profile_to+'\n')
336
        cProfile.run(main.func_code, profile_to)
337
    else: main()
(23-23/43)