Project

General

Profile

1
#!/usr/bin/env python
2
# Maps one datasource to another, using a map spreadsheet if needed
3
# Exit status is the # of errors in the import, up to the maximum exit status
4
# For outputting an XML file to a PostgreSQL database, use the general format of
5
# http://vegbank.org/vegdocs/xml/vegbank_example_ver1.0.2.xml
6

    
7
import csv
8
import itertools
9
import os.path
10
import sys
11
import xml.dom.minidom as minidom
12

    
13
sys.path.append(os.path.dirname(__file__)+"/../lib")
14

    
15
import csvs
16
import exc
17
import iters
18
import maps
19
import opts
20
import Parser
21
import profiling
22
import sql
23
import strings
24
import term
25
import util
26
import xpath
27
import xml_dom
28
import xml_func
29
import xml_parse
30

    
31
def get_with_prefix(map_, prefixes, key):
32
    '''Gets all entries for the given key with any of the given prefixes'''
33
    values = []
34
    for key_ in strings.with_prefixes(['']+prefixes, key): # also with no prefix
35
        try: value = map_[key_]
36
        except KeyError, e: continue # keep going
37
        values.append(value)
38
    
39
    if values != []: return values
40
    else: raise e # re-raise last KeyError
41

    
42
def metadata_value(name): return None # this feature has been removed
43

    
44
def cleanup(val):
45
    if val == None: return val
46
    return util.none_if(strings.cleanup(strings.ustr(val)), u'', u'\\N')
47

    
48
def main_():
49
    env_names = []
50
    def usage_err():
51
        raise SystemExit('Usage: '+opts.env_usage(env_names, True)+' '
52
            +sys.argv[0]+' [map_path...] [<input] [>output]')
53
    
54
    ## Get config from env vars
55
    
56
    # Modes
57
    test = opts.env_flag('test', False, env_names)
58
    commit = opts.env_flag('commit', False, env_names) and not test
59
        # never commit in test mode
60
    redo = opts.env_flag('redo', test, env_names) and not commit
61
        # never redo in commit mode (manually run `make empty_db` instead)
62
    
63
    # Ranges
64
    start = util.cast(int, opts.get_env_var('start', '0', env_names))
65
    if test: end_default = 1
66
    else: end_default = None
67
    end = util.cast(int, util.none_if(
68
        opts.get_env_var('n', end_default, env_names), u''))
69
    if end != None: end += start
70
    
71
    # Debugging
72
    debug = opts.env_flag('debug', False, env_names)
73
    sql.run_raw_query.debug = debug
74
    verbose = debug or opts.env_flag('verbose', not test, env_names)
75
    opts.get_env_var('profile_to', None, env_names) # add to env_names
76
    
77
    # DB
78
    db_config_names = ['engine', 'host', 'user', 'password', 'database']
79
    def get_db_config(prefix):
80
        return opts.get_env_vars(db_config_names, prefix, env_names)
81
    in_db_config = get_db_config('in')
82
    out_db_config = get_db_config('out')
83
    in_is_db = 'engine' in in_db_config
84
    out_is_db = 'engine' in out_db_config
85
    
86
    ##
87
    
88
    # Logging
89
    def log(msg, on=verbose):
90
        if on: sys.stderr.write(msg)
91
    def log_start(action, on=verbose): log(action+'...\n', on)
92
    
93
    # Parse args
94
    map_paths = sys.argv[1:]
95
    if map_paths == []:
96
        if in_is_db or not out_is_db: usage_err()
97
        else: map_paths = [None]
98
    
99
    def connect_db(db_config):
100
        log_start('Connecting to '+sql.db_config_str(db_config))
101
        return sql.connect(db_config)
102
    
103
    if end != None: end_str = str(end-1)
104
    else: end_str = 'end'
105
    log_start('Processing input rows '+str(start)+'-'+end_str)
106
    
107
    ex_tracker = exc.ExPercentTracker(iter_text='row')
108
    profiler = profiling.ItersProfiler(start_now=True, iter_text='row')
109
    
110
    doc = xml_dom.create_doc()
111
    root = doc.documentElement
112
    out_is_xml_ref = [False]
113
    in_label_ref = [None]
114
    def update_in_label():
115
        if in_label_ref[0] != None:
116
            xpath.get(root, '/_ignore/inLabel="'+in_label_ref[0]+'"', True)
117
    def prep_root():
118
        root.clear()
119
        update_in_label()
120
    prep_root()
121
    
122
    def process_input(root, row_ready, map_path):
123
        '''Inputs datasource to XML tree, mapping if needed'''
124
        # Load map header
125
        in_is_xpaths = True
126
        out_is_xpaths = True
127
        out_label = None
128
        if map_path != None:
129
            metadata = []
130
            mappings = []
131
            stream = open(map_path, 'rb')
132
            reader = csv.reader(stream)
133
            in_label, out_label = reader.next()[:2]
134
            
135
            def split_col_name(name):
136
                label, sep, root = name.partition(':')
137
                label, sep2, prefixes_str = label.partition('[')
138
                prefixes_str = strings.remove_suffix(']', prefixes_str)
139
                prefixes = strings.split(',', prefixes_str)
140
                return label, sep != '', root, prefixes
141
                    # extract datasrc from "datasrc[data_format]"
142
            
143
            in_label, in_root, prefixes = maps.col_info(in_label)
144
            in_is_xpaths = in_root != None
145
            in_label_ref[0] = in_label
146
            update_in_label()
147
            out_label, out_root = maps.col_info(out_label)[:2]
148
            out_is_xpaths = out_root != None
149
            if out_is_xpaths: has_types = out_root.startswith('/*s/')
150
                # outer elements are types
151
            
152
            for row in reader:
153
                in_, out = row[:2]
154
                if out != '':
155
                    if out_is_xpaths: out = xpath.parse(out_root+out)
156
                    mappings.append((in_, out))
157
            
158
            stream.close()
159
            
160
            root.ownerDocument.documentElement.tagName = out_label
161
        in_is_xml = in_is_xpaths and not in_is_db
162
        out_is_xml_ref[0] = out_is_xpaths and not out_is_db
163
        
164
        def process_rows(process_row, rows):
165
            '''Processes input rows
166
            @param process_row(in_row, i)
167
            '''
168
            i = -1 # in case for loop does not execute
169
            for i, row in enumerate(rows):
170
                if i < start: continue
171
                if end != None and i >= end: break
172
                process_row(row, i)
173
                row_ready(i, row)
174
            row_ct = i-start+1
175
            return row_ct
176
        
177
        def map_rows(get_value, rows):
178
            '''Maps input rows
179
            @param get_value(in_, row):str
180
            '''
181
            def process_row(row, i):
182
                row_id = str(i)
183
                for in_, out in mappings:
184
                    value = metadata_value(in_)
185
                    if value == None:
186
                        log_start('Getting '+str(in_), debug)
187
                        value = cleanup(get_value(in_, row))
188
                    if value != None:
189
                        log_start('Putting '+str(out), debug)
190
                        xpath.put_obj(root, out, row_id, has_types, value)
191
            return process_rows(process_row, rows)
192
        
193
        def map_table(col_names, rows):
194
            col_names_ct = len(col_names)
195
            col_idxs = util.list_flip(col_names)
196
            
197
            i = 0
198
            while i < len(mappings): # mappings len changes in loop
199
                in_, out = mappings[i]
200
                if metadata_value(in_) == None:
201
                    try: mappings[i] = (
202
                        get_with_prefix(col_idxs, prefixes, in_), out)
203
                    except KeyError:
204
                        del mappings[i]
205
                        continue # keep i the same
206
                i += 1
207
            
208
            def get_value(in_, row):
209
                return util.coalesce(*util.list_subset(row.list, in_))
210
            def wrap_row(row):
211
                return util.ListDict(util.list_as_length(row, col_names_ct),
212
                    col_names, col_idxs) # handle CSV rows of different lengths
213
            
214
            return map_rows(get_value, util.WrapIter(wrap_row, rows))
215
        
216
        if in_is_db:
217
            assert in_is_xpaths
218
            
219
            in_db = connect_db(in_db_config)
220
            in_pkeys = {}
221
            cur = sql.select(in_db, table=in_root, fields=None, conds=None,
222
                limit=end, start=0)
223
            row_ct = map_table(list(sql.col_names(cur)), sql.rows(cur))
224
            
225
            in_db.close()
226
        elif in_is_xml:
227
            if map_path == None:
228
                for in_xml_root in xml_parse.docs_iter(sys.stdin):
229
                    iter_ = xml_dom.NodeElemIter(in_xml_root)
230
                    util.skip(iter_, xml_dom.is_text) # skip metadata
231
                    row_ct = process_rows(lambda row, i: root.appendChild(row),
232
                        iter_)
233
            else:
234
                def doc_rows(in_xml_root):
235
                    rows = xpath.get(in_xml_root, in_root, limit=end)
236
                    if rows == []: raise SystemExit('Map error: Root "'
237
                        +in_root+'" not found in input')
238
                    return rows
239
                
240
                def get_value(in_, row):
241
                    in_ = './{'+(','.join(strings.with_prefixes(
242
                        ['']+prefixes, in_)))+'}' # also with no prefix
243
                    nodes = xpath.get(row, in_, allow_rooted=False)
244
                    if nodes != []: return xml_dom.value(nodes[0])
245
                    else: return None
246
                
247
                row_ct = map_rows(get_value, iters.flatten(itertools.imap(
248
                    doc_rows, xml_parse.docs_iter(sys.stdin))))
249
        else: # input is CSV
250
            map_ = dict(mappings)
251
            reader, col_names = csvs.reader_and_header(sys.stdin)
252
            row_ct = map_table(col_names, reader)
253
        
254
        return row_ct
255
    
256
    def process_inputs(root, row_ready):
257
        row_ct = 0
258
        for map_path in map_paths:
259
            row_ct += process_input(root, row_ready, map_path)
260
        return row_ct
261
    
262
    if out_is_db:
263
        import db_xml
264
        
265
        out_db = connect_db(out_db_config)
266
        out_pkeys = {}
267
        try:
268
            if redo: sql.empty_db(out_db)
269
            row_ins_ct_ref = [0]
270
            
271
            def row_ready(row_num, input_row):
272
                def on_error(e):
273
                    exc.add_msg(e, term.emph('row #:')+' '+str(row_num))
274
                    exc.add_msg(e, term.emph('input row:')+'\n'+str(input_row))
275
                    exc.add_msg(e, term.emph('output row:')+'\n'+str(root))
276
                    ex_tracker.track(e, row_num)
277
                
278
                xml_func.process(root, on_error)
279
                if not xml_dom.is_empty(root):
280
                    assert xml_dom.has_one_child(root)
281
                    try:
282
                        sql.with_savepoint(out_db,
283
                            lambda: db_xml.put(out_db, root.firstChild,
284
                                out_pkeys, row_ins_ct_ref, on_error))
285
                        if commit: out_db.commit()
286
                    except sql.DatabaseErrors, e: on_error(e)
287
                prep_root()
288
            
289
            row_ct = process_inputs(root, row_ready)
290
            sys.stdout.write('Inserted '+str(row_ins_ct_ref[0])+
291
                ' new rows into database\n')
292
        finally:
293
            out_db.rollback()
294
            out_db.close()
295
    else:
296
        def on_error(e): ex_tracker.track(e)
297
        def row_ready(row_num, input_row): pass
298
        row_ct = process_inputs(root, row_ready)
299
        xml_func.process(root, on_error)
300
        if out_is_xml_ref[0]:
301
            doc.writexml(sys.stdout, **xml_dom.prettyxml_config)
302
        else: # output is CSV
303
            raise NotImplementedError('CSV output not supported yet')
304
    
305
    profiler.stop(row_ct)
306
    ex_tracker.add_iters(row_ct)
307
    if verbose:
308
        sys.stderr.write('Processed '+str(row_ct)+' input rows\n')
309
        sys.stderr.write(profiler.msg()+'\n')
310
        sys.stderr.write(ex_tracker.msg()+'\n')
311
    ex_tracker.exit()
312

    
313
def main():
314
    try: main_()
315
    except Parser.SyntaxException, e: raise SystemExit(str(e))
316

    
317
if __name__ == '__main__':
318
    profile_to = opts.get_env_var('profile_to', None)
319
    if profile_to != None:
320
        import cProfile
321
        sys.stderr.write('Profiling to '+profile_to+'\n')
322
        cProfile.run(main.func_code, profile_to)
323
    else: main()
(21-21/40)