Project

General

Profile

1
#!/usr/bin/env python
2
# Maps one datasource to another, using a map spreadsheet if needed
3
# Exit status is the # of errors in the import, up to the maximum exit status
4
# For outputting an XML file to a PostgreSQL database, use the general format of
5
# http://vegbank.org/vegdocs/xml/vegbank_example_ver1.0.2.xml
6

    
7
import csv
8
import itertools
9
import os.path
10
import sys
11
import xml.dom.minidom as minidom
12

    
13
sys.path.append(os.path.dirname(__file__)+"/../lib")
14

    
15
import csvs
16
import exc
17
import iters
18
import maps
19
import opts
20
import Parser
21
import profiling
22
import sql
23
import strings
24
import term
25
import util
26
import xpath
27
import xml_dom
28
import xml_func
29
import xml_parse
30

    
31
def get_with_prefix(map_, prefixes, key):
32
    '''Gets all entries for the given key with any of the given prefixes'''
33
    values = []
34
    for key_ in strings.with_prefixes(['']+prefixes, key): # also with no prefix
35
        try: value = map_[key_]
36
        except KeyError, e: continue # keep going
37
        values.append(value)
38
    
39
    if values != []: return values
40
    else: raise e # re-raise last KeyError
41

    
42
def metadata_value(name): return None # this feature has been removed
43

    
44
def cleanup(val):
45
    if val == None: return val
46
    return util.none_if(strings.cleanup(strings.ustr(val)), u'', u'\\N')
47

    
48
def main_():
49
    env_names = []
50
    def usage_err():
51
        raise SystemExit('Usage: '+opts.env_usage(env_names, True)+' '
52
            +sys.argv[0]+' [map_path...] [<input] [>output]')
53
    
54
    ## Get config from env vars
55
    
56
    # Modes
57
    test = opts.env_flag('test', False, env_names)
58
    commit = opts.env_flag('commit', False, env_names) and not test
59
        # never commit in test mode
60
    redo = opts.env_flag('redo', test, env_names) and not commit
61
        # never redo in commit mode (manually run `make empty_db` instead)
62
    
63
    # Ranges
64
    start = util.cast(int, opts.get_env_var('start', '0', env_names))
65
    if test: end_default = 1
66
    else: end_default = None
67
    end = util.cast(int, util.none_if(
68
        opts.get_env_var('n', end_default, env_names), u''))
69
    if end != None: end += start
70
    
71
    # Debugging
72
    debug = opts.env_flag('debug', False, env_names)
73
    sql.run_raw_query.debug = debug
74
    verbose = debug or opts.env_flag('verbose', not test, env_names)
75
    opts.get_env_var('profile_to', None, env_names) # add to env_names
76
    
77
    # DB
78
    db_config_names = ['engine', 'host', 'user', 'password', 'database']
79
    def get_db_config(prefix):
80
        return opts.get_env_vars(db_config_names, prefix, env_names)
81
    in_db_config = get_db_config('in')
82
    out_db_config = get_db_config('out')
83
    in_is_db = 'engine' in in_db_config
84
    out_is_db = 'engine' in out_db_config
85
    
86
    ##
87
    
88
    # Logging
89
    def log(msg, on=verbose):
90
        if on: sys.stderr.write(msg)
91
    def log_start(action, on=verbose): log(action+'...\n', on)
92
    
93
    # Parse args
94
    map_paths = sys.argv[1:]
95
    if map_paths == []:
96
        if in_is_db or not out_is_db: usage_err()
97
        else: map_paths = [None]
98
    
99
    def connect_db(db_config):
100
        log_start('Connecting to '+sql.db_config_str(db_config))
101
        return sql.connect(db_config)
102
    
103
    if end != None: end_str = str(end-1)
104
    else: end_str = 'end'
105
    log_start('Processing input rows '+str(start)+'-'+end_str)
106
    
107
    ex_tracker = exc.ExPercentTracker(iter_text='row')
108
    profiler = profiling.ItersProfiler(start_now=True, iter_text='row')
109
    
110
    doc = xml_dom.create_doc()
111
    root = doc.documentElement
112
    out_is_xml_ref = [False]
113
    in_label_ref = [None]
114
    def update_in_label():
115
        if in_label_ref[0] != None:
116
            xpath.get(root, '/_ignore/inLabel="'+in_label_ref[0]+'"', True)
117
    def prep_root():
118
        root.clear()
119
        update_in_label()
120
    prep_root()
121
    
122
    def process_input(root, row_ready, map_path):
123
        '''Inputs datasource to XML tree, mapping if needed'''
124
        # Load map header
125
        in_is_xpaths = True
126
        out_is_xpaths = True
127
        out_label = None
128
        if map_path != None:
129
            metadata = []
130
            mappings = []
131
            stream = open(map_path, 'rb')
132
            reader = csv.reader(stream)
133
            in_label, out_label = reader.next()[:2]
134
            
135
            def split_col_name(name):
136
                label, sep, root = name.partition(':')
137
                label, sep2, prefixes_str = label.partition('[')
138
                prefixes_str = strings.remove_suffix(']', prefixes_str)
139
                prefixes = strings.split(',', prefixes_str)
140
                return label, sep != '', root, prefixes
141
                    # extract datasrc from "datasrc[data_format]"
142
            
143
            in_label, in_root, prefixes = maps.col_info(in_label)
144
            in_is_xpaths = in_root != None
145
            in_label_ref[0] = in_label
146
            update_in_label()
147
            out_label, out_root = maps.col_info(out_label)[:2]
148
            out_is_xpaths = out_root != None
149
            if out_is_xpaths: has_types = out_root.startswith('/*s/')
150
                # outer elements are types
151
            
152
            for row in reader:
153
                in_, out = row[:2]
154
                if out != '':
155
                    if out_is_xpaths: out = xpath.parse(out_root+out)
156
                    mappings.append((in_, out))
157
            
158
            stream.close()
159
            
160
            root.ownerDocument.documentElement.tagName = out_label
161
        in_is_xml = in_is_xpaths and not in_is_db
162
        out_is_xml_ref[0] = out_is_xpaths and not out_is_db
163
        
164
        def process_rows(process_row, rows):
165
            '''Processes input rows
166
            @param process_row(in_row, i)
167
            '''
168
            i = 0
169
            while end == None or i < end:
170
                try: row = rows.next()
171
                except StopIteration: break # no more rows
172
                if i < start: continue # not at start row yet
173
                
174
                process_row(row, i)
175
                row_ready(i, row)
176
                i += 1
177
            row_ct = i-start
178
            return row_ct
179
        
180
        def map_rows(get_value, rows):
181
            '''Maps input rows
182
            @param get_value(in_, row):str
183
            '''
184
            def process_row(row, i):
185
                row_id = str(i)
186
                for in_, out in mappings:
187
                    value = metadata_value(in_)
188
                    if value == None:
189
                        log_start('Getting '+str(in_), debug)
190
                        value = cleanup(get_value(in_, row))
191
                    if value != None:
192
                        log_start('Putting '+str(out), debug)
193
                        xpath.put_obj(root, out, row_id, has_types, value)
194
            return process_rows(process_row, rows)
195
        
196
        def map_table(col_names, rows):
197
            col_names_ct = len(col_names)
198
            col_idxs = util.list_flip(col_names)
199
            
200
            i = 0
201
            while i < len(mappings): # mappings len changes in loop
202
                in_, out = mappings[i]
203
                if metadata_value(in_) == None:
204
                    try: mappings[i] = (
205
                        get_with_prefix(col_idxs, prefixes, in_), out)
206
                    except KeyError:
207
                        del mappings[i]
208
                        continue # keep i the same
209
                i += 1
210
            
211
            def get_value(in_, row):
212
                return util.coalesce(*util.list_subset(row.list, in_))
213
            def wrap_row(row):
214
                return util.ListDict(util.list_as_length(row, col_names_ct),
215
                    col_names, col_idxs) # handle CSV rows of different lengths
216
            
217
            return map_rows(get_value, util.WrapIter(wrap_row, rows))
218
        
219
        if in_is_db:
220
            assert in_is_xpaths
221
            
222
            in_db = connect_db(in_db_config)
223
            in_pkeys = {}
224
            cur = sql.select(in_db, table=in_root, fields=None, conds=None,
225
                limit=end, start=0)
226
            row_ct = map_table(list(sql.col_names(cur)), sql.rows(cur))
227
            
228
            in_db.close()
229
        elif in_is_xml:
230
            def get_rows(doc2rows):
231
                return iters.flatten(itertools.imap(
232
                    doc2rows, xml_parse.docs_iter(sys.stdin)))
233
            
234
            if map_path == None:
235
                def doc2rows(in_xml_root):
236
                    iter_ = xml_dom.NodeElemIter(in_xml_root)
237
                    util.skip(iter_, xml_dom.is_text) # skip metadata
238
                    return iter_
239
                
240
                row_ct = process_rows(lambda row, i: root.appendChild(row),
241
                    get_rows(doc2rows))
242
            else:
243
                def doc2rows(in_xml_root):
244
                    rows = xpath.get(in_xml_root, in_root, limit=end)
245
                    if rows == []: raise SystemExit('Map error: Root "'
246
                        +in_root+'" not found in input')
247
                    return rows
248
                
249
                def get_value(in_, row):
250
                    in_ = './{'+(','.join(strings.with_prefixes(
251
                        ['']+prefixes, in_)))+'}' # also with no prefix
252
                    nodes = xpath.get(row, in_, allow_rooted=False)
253
                    if nodes != []: return xml_dom.value(nodes[0])
254
                    else: return None
255
                
256
                row_ct = map_rows(get_value, get_rows(doc2rows))
257
        else: # input is CSV
258
            map_ = dict(mappings)
259
            reader, col_names = csvs.reader_and_header(sys.stdin)
260
            row_ct = map_table(col_names, reader)
261
        
262
        return row_ct
263
    
264
    def process_inputs(root, row_ready):
265
        row_ct = 0
266
        for map_path in map_paths:
267
            row_ct += process_input(root, row_ready, map_path)
268
        return row_ct
269
    
270
    if out_is_db:
271
        import db_xml
272
        
273
        out_db = connect_db(out_db_config)
274
        out_pkeys = {}
275
        try:
276
            if redo: sql.empty_db(out_db)
277
            row_ins_ct_ref = [0]
278
            
279
            def row_ready(row_num, input_row):
280
                def on_error(e):
281
                    exc.add_msg(e, term.emph('row #:')+' '+str(row_num))
282
                    exc.add_msg(e, term.emph('input row:')+'\n'+str(input_row))
283
                    exc.add_msg(e, term.emph('output row:')+'\n'+str(root))
284
                    ex_tracker.track(e, row_num)
285
                
286
                xml_func.process(root, on_error)
287
                if not xml_dom.is_empty(root):
288
                    assert xml_dom.has_one_child(root)
289
                    try:
290
                        sql.with_savepoint(out_db,
291
                            lambda: db_xml.put(out_db, root.firstChild,
292
                                out_pkeys, row_ins_ct_ref, on_error))
293
                        if commit: out_db.commit()
294
                    except sql.DatabaseErrors, e: on_error(e)
295
                prep_root()
296
            
297
            row_ct = process_inputs(root, row_ready)
298
            sys.stdout.write('Inserted '+str(row_ins_ct_ref[0])+
299
                ' new rows into database\n')
300
        finally:
301
            out_db.rollback()
302
            out_db.close()
303
    else:
304
        def on_error(e): ex_tracker.track(e)
305
        def row_ready(row_num, input_row): pass
306
        row_ct = process_inputs(root, row_ready)
307
        xml_func.process(root, on_error)
308
        if out_is_xml_ref[0]:
309
            doc.writexml(sys.stdout, **xml_dom.prettyxml_config)
310
        else: # output is CSV
311
            raise NotImplementedError('CSV output not supported yet')
312
    
313
    profiler.stop(row_ct)
314
    ex_tracker.add_iters(row_ct)
315
    if verbose:
316
        sys.stderr.write('Processed '+str(row_ct)+' input rows\n')
317
        sys.stderr.write(profiler.msg()+'\n')
318
        sys.stderr.write(ex_tracker.msg()+'\n')
319
    ex_tracker.exit()
320

    
321
def main():
322
    try: main_()
323
    except Parser.SyntaxError, e: raise SystemExit(str(e))
324

    
325
if __name__ == '__main__':
326
    profile_to = opts.get_env_var('profile_to', None)
327
    if profile_to != None:
328
        import cProfile
329
        sys.stderr.write('Profiling to '+profile_to+'\n')
330
        cProfile.run(main.func_code, profile_to)
331
    else: main()
(21-21/40)