Project

General

Profile

1
#!/usr/bin/env python
2
# Maps one datasource to another, using a map spreadsheet if needed
3
# Exit status is the # of errors in the import, up to the maximum exit status
4
# For outputting an XML file to a PostgreSQL database, use the general format of
5
# http://vegbank.org/vegdocs/xml/vegbank_example_ver1.0.2.xml
6

    
7
import csv
8
import os.path
9
import sys
10
import xml.dom.minidom as minidom
11
import xml.parsers.expat as expat
12

    
13
sys.path.append(os.path.dirname(__file__)+"/../lib")
14

    
15
import csvs
16
import exc
17
import opts
18
import Parser
19
import profiling
20
import sql
21
import strings
22
import term
23
import util
24
import xpath
25
import xml_dom
26
import xml_func
27

    
28
def get_with_prefix(map_, prefixes, key):
29
    '''Gets all entries for the given key with any of the given prefixes'''
30
    values = []
31
    for key_ in strings.with_prefixes(['']+prefixes, key): # also with no prefix
32
        try: value = map_[key_]
33
        except KeyError, e: continue # keep going
34
        values.append(value)
35
    
36
    if values != []: return values
37
    else: raise e # re-raise last KeyError
38

    
39
def metadata_value(name): return None # this feature has been removed
40

    
41
def cleanup(val):
42
    if val == None: return val
43
    return util.none_if(strings.cleanup(strings.ustr(val)), u'', u'\\N')
44

    
45
def main_():
46
    env_names = []
47
    def usage_err():
48
        raise SystemExit('Usage: '+opts.env_usage(env_names, True)+' '
49
            +sys.argv[0]+' [map_path...] [<input] [>output]')
50
    
51
    ## Get config from env vars
52
    
53
    # Modes
54
    test = opts.env_flag('test', False, env_names)
55
    commit = opts.env_flag('commit', False, env_names) and not test
56
        # never commit in test mode
57
    redo = opts.env_flag('redo', test, env_names) and not commit
58
        # never redo in commit mode (manually run `make empty_db` instead)
59
    
60
    # Ranges
61
    start = util.cast(int, opts.get_env_var('start', '0', env_names))
62
    if test: end_default = 1
63
    else: end_default = None
64
    end = util.cast(int, util.none_if(
65
        opts.get_env_var('n', end_default, env_names), u''))
66
    if end != None: end += start
67
    
68
    # Debugging
69
    debug = opts.env_flag('debug', False, env_names)
70
    sql.run_raw_query.debug = debug
71
    verbose = debug or opts.env_flag('verbose', not test, env_names)
72
    opts.get_env_var('profile_to', None, env_names) # add to env_names
73
    
74
    # DB
75
    db_config_names = ['engine', 'host', 'user', 'password', 'database']
76
    def get_db_config(prefix):
77
        return opts.get_env_vars(db_config_names, prefix, env_names)
78
    in_db_config = get_db_config('in')
79
    out_db_config = get_db_config('out')
80
    in_is_db = 'engine' in in_db_config
81
    out_is_db = 'engine' in out_db_config
82
    
83
    ##
84
    
85
    # Logging
86
    def log(msg, on=verbose):
87
        if on: sys.stderr.write(msg)
88
    def log_start(action, on=verbose): log(action+'...\n', on)
89
    
90
    # Parse args
91
    map_paths = sys.argv[1:]
92
    if map_paths == []:
93
        if in_is_db or not out_is_db: usage_err()
94
        else: map_paths = [None]
95
    
96
    def connect_db(db_config):
97
        log_start('Connecting to '+sql.db_config_str(db_config))
98
        return sql.connect(db_config)
99
    
100
    if end != None: end_str = str(end-1)
101
    else: end_str = 'end'
102
    log_start('Processing input rows '+str(start)+'-'+end_str)
103
    
104
    ex_tracker = exc.ExPercentTracker(iter_text='row')
105
    profiler = profiling.ItersProfiler(start_now=True, iter_text='row')
106
    
107
    doc = xml_dom.create_doc()
108
    root = doc.documentElement
109
    out_is_xml_ref = [False]
110
    in_label_ref = [None]
111
    def update_in_label():
112
        if in_label_ref[0] != None:
113
            xpath.get(root, '/_ignore/inLabel="'+in_label_ref[0]+'"', True)
114
    def prep_root():
115
        root.clear()
116
        update_in_label()
117
    prep_root()
118
    
119
    def process_input(root, row_ready, map_path):
120
        '''Inputs datasource to XML tree, mapping if needed'''
121
        # Load map header
122
        in_is_xpaths = True
123
        out_is_xpaths = True
124
        out_label = None
125
        if map_path != None:
126
            metadata = []
127
            mappings = []
128
            stream = open(map_path, 'rb')
129
            reader = csv.reader(stream)
130
            in_label, out_label = reader.next()[:2]
131
            
132
            def split_col_name(name):
133
                label, sep, root = name.partition(':')
134
                label, sep2, prefixes_str = label.partition('[')
135
                prefixes_str = strings.remove_suffix(']', prefixes_str)
136
                prefixes = strings.split(',', prefixes_str)
137
                return label, sep != '', root, prefixes
138
                    # extract datasrc from "datasrc[data_format]"
139
            
140
            in_label, in_is_xpaths, in_root, prefixes = split_col_name(in_label)
141
            in_label_ref[0] = in_label
142
            update_in_label()
143
            out_label, out_is_xpaths, out_root = split_col_name(out_label)[:3]
144
            has_types = out_root.startswith('/*s/') # outer elements are types
145
            
146
            for row in reader:
147
                in_, out = row[:2]
148
                if out != '':
149
                    if out_is_xpaths: out = xpath.parse(out_root+out)
150
                    mappings.append((in_, out))
151
            
152
            stream.close()
153
            
154
            root.ownerDocument.documentElement.tagName = out_label
155
        in_is_xml = in_is_xpaths and not in_is_db
156
        out_is_xml_ref[0] = out_is_xpaths and not out_is_db
157
        
158
        def process_rows(process_row, rows):
159
            '''Processes input rows
160
            @param process_row(in_row, i)
161
            '''
162
            i = -1 # in case for loop does not execute
163
            for i, row in enumerate(rows):
164
                if i < start: continue
165
                if end != None and i >= end: break
166
                process_row(row, i)
167
                row_ready(i, row)
168
            row_ct = i-start+1
169
            return row_ct
170
        
171
        def map_rows(get_value, rows):
172
            '''Maps input rows
173
            @param get_value(in_, row):str
174
            '''
175
            def process_row(row, i):
176
                row_id = str(i)
177
                for in_, out in mappings:
178
                    value = metadata_value(in_)
179
                    if value == None:
180
                        log_start('Getting '+str(in_), debug)
181
                        value = cleanup(get_value(in_, row))
182
                    if value != None:
183
                        log_start('Putting '+str(out), debug)
184
                        xpath.put_obj(root, out, row_id, has_types, value)
185
            return process_rows(process_row, rows)
186
        
187
        def map_table(col_names, rows):
188
            col_names_ct = len(col_names)
189
            col_idxs = util.list_flip(col_names)
190
            
191
            i = 0
192
            while i < len(mappings): # mappings len changes in loop
193
                in_, out = mappings[i]
194
                if metadata_value(in_) == None:
195
                    try: mappings[i] = (
196
                        get_with_prefix(col_idxs, prefixes, in_), out)
197
                    except KeyError:
198
                        del mappings[i]
199
                        continue # keep i the same
200
                i += 1
201
            
202
            def get_value(in_, row):
203
                return util.coalesce(*util.list_subset(row.list, in_))
204
            def wrap_row(row):
205
                return util.ListDict(util.list_as_length(row, col_names_ct),
206
                    col_names, col_idxs) # handle CSV rows of different lengths
207
            
208
            return map_rows(get_value, util.WrapIter(wrap_row, rows))
209
        
210
        if in_is_db:
211
            assert in_is_xpaths
212
            
213
            in_db = connect_db(in_db_config)
214
            in_pkeys = {}
215
            cur = sql.select(in_db, table=in_root, fields=None, conds=None,
216
                limit=end, start=0)
217
            row_ct = map_table(list(sql.col_names(cur)), sql.rows(cur))
218
            
219
            in_db.close()
220
        elif in_is_xml:
221
            while True:
222
                try: in_xml_root = minidom.parse(sys.stdin).documentElement
223
                except expat.ExpatError, e:
224
                    if str(e).startswith('no element found:'): break
225
                        # no more concatenated XML documents
226
                    else: raise e
227
                
228
                if map_path == None:
229
                    iter_ = xml_dom.NodeElemIter(in_xml_root)
230
                    util.skip(iter_, xml_dom.is_text) # skip metadata
231
                    row_ct = process_rows(lambda row, i: root.appendChild(row),
232
                        iter_)
233
                else:
234
                    rows = xpath.get(in_xml_root, in_root, limit=end)
235
                    if rows == []: raise SystemExit('Map error: Root "'+in_root
236
                        +'" not found in input')
237
                    
238
                    def get_value(in_, row):
239
                        in_ = './{'+(','.join(strings.with_prefixes(
240
                            ['']+prefixes, in_)))+'}' # also with no prefix
241
                        nodes = xpath.get(row, in_, allow_rooted=False)
242
                        if nodes != []: return xml_dom.value(nodes[0])
243
                        else: return None
244
                    
245
                    row_ct = map_rows(get_value, rows)
246
        else: # input is CSV
247
            map_ = dict(mappings)
248
            reader, col_names = csvs.reader_and_header(sys.stdin)
249
            row_ct = map_table(col_names, reader)
250
        
251
        return row_ct
252
    
253
    def process_inputs(root, row_ready):
254
        row_ct = 0
255
        for map_path in map_paths:
256
            row_ct += process_input(root, row_ready, map_path)
257
        return row_ct
258
    
259
    if out_is_db:
260
        import db_xml
261
        
262
        out_db = connect_db(out_db_config)
263
        out_pkeys = {}
264
        try:
265
            if redo: sql.empty_db(out_db)
266
            row_ins_ct_ref = [0]
267
            
268
            def row_ready(row_num, input_row):
269
                def on_error(e):
270
                    exc.add_msg(e, term.emph('row #:')+' '+str(row_num))
271
                    exc.add_msg(e, term.emph('input row:')+'\n'+str(input_row))
272
                    exc.add_msg(e, term.emph('output row:')+'\n'+str(root))
273
                    ex_tracker.track(e, row_num)
274
                
275
                xml_func.process(root, on_error)
276
                if not xml_dom.is_empty(root):
277
                    assert xml_dom.has_one_child(root)
278
                    try:
279
                        sql.with_savepoint(out_db,
280
                            lambda: db_xml.put(out_db, root.firstChild,
281
                                out_pkeys, row_ins_ct_ref, on_error))
282
                        if commit: out_db.commit()
283
                    except sql.DatabaseErrors, e: on_error(e)
284
                prep_root()
285
            
286
            row_ct = process_inputs(root, row_ready)
287
            sys.stdout.write('Inserted '+str(row_ins_ct_ref[0])+
288
                ' new rows into database\n')
289
        finally:
290
            out_db.rollback()
291
            out_db.close()
292
    else:
293
        def on_error(e): ex_tracker.track(e)
294
        def row_ready(row_num, input_row): pass
295
        row_ct = process_inputs(root, row_ready)
296
        xml_func.process(root, on_error)
297
        if out_is_xml_ref[0]:
298
            doc.writexml(sys.stdout, **xml_dom.prettyxml_config)
299
        else: # output is CSV
300
            raise NotImplementedError('CSV output not supported yet')
301
    
302
    profiler.stop(row_ct)
303
    ex_tracker.add_iters(row_ct)
304
    if verbose:
305
        sys.stderr.write('Processed '+str(row_ct)+' input rows\n')
306
        sys.stderr.write(profiler.msg()+'\n')
307
        sys.stderr.write(ex_tracker.msg()+'\n')
308
    ex_tracker.exit()
309

    
310
def main():
311
    try: main_()
312
    except Parser.SyntaxException, e: raise SystemExit(str(e))
313

    
314
if __name__ == '__main__':
315
    profile_to = opts.get_env_var('profile_to', None)
316
    if profile_to != None:
317
        import cProfile
318
        sys.stderr.write('Profiling to '+profile_to+'\n')
319
        cProfile.run(main.func_code, profile_to)
320
    else: main()
(21-21/40)