Project

General

Profile

1
#!/usr/bin/env python
2
# Maps one datasource to another, using a map spreadsheet if needed
3
# Exit status is the # of errors in the import, up to the maximum exit status
4
# For outputting an XML file to a PostgreSQL database, use the general format of
5
# http://vegbank.org/vegdocs/xml/vegbank_example_ver1.0.2.xml
6

    
7
import csv
8
import os.path
9
import sys
10
import xml.dom.minidom as minidom
11

    
12
sys.path.append(os.path.dirname(__file__)+"/../lib")
13

    
14
import csvs
15
import exc
16
import opts
17
import Parser
18
import profiling
19
import sql
20
import strings
21
import term
22
import util
23
import xpath
24
import xml_dom
25
import xml_func
26

    
27
def get_with_prefix(map_, prefixes, key):
28
    '''Gets all entries for the given key with any of the given prefixes'''
29
    values = []
30
    for prefix in ['']+prefixes: # also lookup with no prefix
31
        try: value = map_[prefix+key]
32
        except KeyError, e: continue # keep going
33
        values.append(value)
34
    
35
    if values != []: return values
36
    else: raise e # re-raise last KeyError
37

    
38
def metadata_value(name): return None # this feature has been removed
39

    
40
def cleanup(val):
41
    if val == None: return val
42
    return util.none_if(strings.cleanup(strings.ustr(val)), u'', u'\\N')
43

    
44
def main_():
45
    env_names = []
46
    def usage_err():
47
        raise SystemExit('Usage: '+opts.env_usage(env_names, True)+' '
48
            +sys.argv[0]+' [map_path...] [<input] [>output]')
49
    
50
    ## Get config from env vars
51
    
52
    # Modes
53
    test = opts.env_flag('test', False, env_names)
54
    commit = opts.env_flag('commit', False, env_names) and not test
55
        # never commit in test mode
56
    redo = opts.env_flag('redo', test, env_names) and not commit
57
        # never redo in commit mode (manually run `make empty_db` instead)
58
    end = util.cast(int, opts.get_env_var('n', None, env_names))
59
    
60
    # Ranges
61
    start = util.cast(int, opts.get_env_var('start', '0', env_names))
62
    if end != None: end += start
63
    
64
    # Debugging
65
    debug = opts.env_flag('debug', False, env_names)
66
    sql.run_raw_query.debug = debug
67
    verbose = debug or opts.env_flag('verbose', False, env_names)
68
    opts.get_env_var('profile_to', None, env_names) # add to env_names
69
    
70
    # DB
71
    db_config_names = ['engine', 'host', 'user', 'password', 'database']
72
    def get_db_config(prefix):
73
        return opts.get_env_vars(db_config_names, prefix, env_names)
74
    in_db_config = get_db_config('in')
75
    out_db_config = get_db_config('out')
76
    in_is_db = 'engine' in in_db_config
77
    out_is_db = 'engine' in out_db_config
78
    
79
    ##
80
    
81
    # Logging
82
    def log(msg, on=verbose):
83
        if on: sys.stderr.write(msg)
84
    def log_start(action, on=verbose): log(action+'...\n', on)
85
    
86
    # Parse args
87
    map_paths = sys.argv[1:]
88
    if map_paths == []:
89
        if in_is_db or not out_is_db: usage_err()
90
        else: map_paths = [None]
91
    
92
    def connect_db(db_config):
93
        log_start('Connecting to '+sql.db_config_str(db_config))
94
        return sql.connect(db_config)
95
    
96
    ex_tracker = exc.ExPercentTracker(iter_text='row')
97
    profiler = profiling.ItersProfiler(start_now=True, iter_text='row')
98
    
99
    doc = xml_dom.create_doc()
100
    root = doc.documentElement
101
    out_is_xml_ref = [False]
102
    in_label_ref = [None]
103
    def update_in_label():
104
        if in_label_ref[0] != None:
105
            xpath.get(root, '/_ignore/inLabel="'+in_label_ref[0]+'"', True)
106
    def prep_root():
107
        root.clear()
108
        update_in_label()
109
    prep_root()
110
    
111
    def process_input(root, row_ready, map_path):
112
        '''Inputs datasource to XML tree, mapping if needed'''
113
        # Load map header
114
        in_is_xpaths = True
115
        out_is_xpaths = True
116
        out_label = None
117
        if map_path != None:
118
            metadata = []
119
            mappings = []
120
            stream = open(map_path, 'rb')
121
            reader = csv.reader(stream)
122
            in_label, out_label = reader.next()[:2]
123
            
124
            def split_col_name(name):
125
                label, sep, root = name.partition(':')
126
                label, sep2, prefixes_str = label.partition('[')
127
                prefixes_str = strings.remove_suffix(']', prefixes_str)
128
                prefixes = strings.split(',', prefixes_str)
129
                return label, sep != '', root, prefixes
130
                    # extract datasrc from "datasrc[data_format]"
131
            
132
            in_label, in_is_xpaths, in_root, prefixes = split_col_name(in_label)
133
            in_label_ref[0] = in_label
134
            update_in_label()
135
            out_label, out_is_xpaths, out_root = split_col_name(out_label)[:3]
136
            has_types = out_root.startswith('/*s/') # outer elements are types
137
            
138
            for row in reader:
139
                in_, out = row[:2]
140
                if out != '':
141
                    if out_is_xpaths: out = xpath.parse(out_root+out)
142
                    mappings.append((in_, out))
143
            
144
            stream.close()
145
            
146
            root.ownerDocument.documentElement.tagName = out_label
147
        in_is_xml = in_is_xpaths and not in_is_db
148
        out_is_xml_ref[0] = out_is_xpaths and not out_is_db
149
        
150
        if in_is_xml:
151
            doc0 = minidom.parse(sys.stdin)
152
            doc0_root = doc0.documentElement
153
            if out_label == None: out_label = doc0_root.tagName
154
        
155
        def process_rows(process_row, rows):
156
            '''Processes input rows
157
            @param process_row(in_row, i)
158
            '''
159
            i = -1 # in case for loop does not execute
160
            for i, row in enumerate(rows):
161
                if i < start: continue
162
                if end != None and i >= end: break
163
                process_row(row, i)
164
                row_ready(i, row)
165
            row_ct = i-start+1
166
            return row_ct
167
        
168
        def map_rows(get_value, rows):
169
            '''Maps input rows
170
            @param get_value(in_, row):str
171
            '''
172
            def process_row(row, i):
173
                row_id = str(i)
174
                for in_, out in mappings:
175
                    value = metadata_value(in_)
176
                    if value == None:
177
                        log_start('Getting '+str(in_), debug)
178
                        value = cleanup(get_value(in_, row))
179
                    if value != None:
180
                        log_start('Putting '+str(out), debug)
181
                        xpath.put_obj(root, out, row_id, has_types, value)
182
            return process_rows(process_row, rows)
183
        
184
        def map_table(col_names, rows):
185
            col_names_ct = len(col_names)
186
            col_idxs = util.list_flip(col_names)
187
            
188
            i = 0
189
            while i < len(mappings): # mappings len changes in loop
190
                in_, out = mappings[i]
191
                if metadata_value(in_) == None:
192
                    try: mappings[i] = (
193
                        get_with_prefix(col_idxs, prefixes, in_), out)
194
                    except KeyError:
195
                        del mappings[i]
196
                        continue # keep i the same
197
                i += 1
198
            
199
            def get_value(in_, row):
200
                return util.coalesce(*util.list_subset(row.list, in_))
201
            def wrap_row(row):
202
                return util.ListDict(util.list_as_length(row, col_names_ct),
203
                    col_names, col_idxs) # handle CSV rows of different lengths
204
            
205
            return map_rows(get_value, util.WrapIter(wrap_row, rows))
206
        
207
        if map_path == None:
208
            iter_ = xml_dom.NodeElemIter(doc0_root)
209
            util.skip(iter_, xml_dom.is_text) # skip metadata
210
            row_ct = process_rows(lambda row, i: root.appendChild(row), iter_)
211
        elif in_is_db:
212
            assert in_is_xpaths
213
            
214
            in_db = connect_db(in_db_config)
215
            in_pkeys = {}
216
            cur = sql.select(in_db, table=in_root, fields=None, conds=None,
217
                limit=end, start=0)
218
            row_ct = map_table(list(sql.col_names(cur)), sql.rows(cur))
219
            
220
            in_db.close()
221
        elif in_is_xml:
222
            def get_value(in_, row):
223
                nodes = xpath.get(row, in_, allow_rooted=False)
224
                if nodes != []: return xml_dom.value(nodes[0])
225
                else: return None
226
            rows = xpath.get(doc0_root, in_root, limit=end)
227
            if rows == []: raise SystemExit('Map error: Root "'+in_root
228
                +'" not found in input')
229
            row_ct = map_rows(get_value, rows)
230
        else: # input is CSV
231
            map_ = dict(mappings)
232
            reader, col_names = csvs.reader_and_header(sys.stdin)
233
            row_ct = map_table(col_names, reader)
234
        
235
        return row_ct
236
    
237
    def process_inputs(root, row_ready):
238
        row_ct = 0
239
        for map_path in map_paths:
240
            row_ct += process_input(root, row_ready, map_path)
241
        return row_ct
242
    
243
    if out_is_db:
244
        import db_xml
245
        
246
        out_db = connect_db(out_db_config)
247
        out_pkeys = {}
248
        try:
249
            if redo: sql.empty_db(out_db)
250
            row_ins_ct_ref = [0]
251
            
252
            def row_ready(row_num, input_row):
253
                def on_error(e):
254
                    exc.add_msg(e, term.emph('row #:')+' '+str(row_num))
255
                    exc.add_msg(e, term.emph('input row:')+'\n'+str(input_row))
256
                    exc.add_msg(e, term.emph('output row:')+'\n'+str(root))
257
                    ex_tracker.track(e)
258
                
259
                xml_func.process(root, on_error)
260
                if not xml_dom.is_empty(root):
261
                    assert xml_dom.has_one_child(root)
262
                    try:
263
                        sql.with_savepoint(out_db,
264
                            lambda: db_xml.put(out_db, root.firstChild,
265
                                out_pkeys, row_ins_ct_ref, on_error))
266
                        if commit: out_db.commit()
267
                    except sql.DatabaseErrors, e: on_error(e)
268
                prep_root()
269
            
270
            row_ct = process_inputs(root, row_ready)
271
            sys.stdout.write('Inserted '+str(row_ins_ct_ref[0])+
272
                ' new rows into database\n')
273
        finally:
274
            out_db.rollback()
275
            out_db.close()
276
    else:
277
        def on_error(e): ex_tracker.track(e)
278
        def row_ready(row_num, input_row): pass
279
        row_ct = process_inputs(root, row_ready)
280
        xml_func.process(root, on_error)
281
        if out_is_xml_ref[0]:
282
            doc.writexml(sys.stdout, **xml_dom.prettyxml_config)
283
        else: # output is CSV
284
            raise NotImplementedError('CSV output not supported yet')
285
    
286
    profiler.stop(row_ct)
287
    ex_tracker.add_iters(row_ct)
288
    if verbose:
289
        sys.stderr.write('Processed '+str(row_ct)+' input rows\n')
290
        sys.stderr.write(profiler.msg()+'\n')
291
        sys.stderr.write(ex_tracker.msg()+'\n')
292
    ex_tracker.exit()
293

    
294
def main():
295
    try: main_()
296
    except Parser.SyntaxException, e: raise SystemExit(str(e))
297

    
298
if __name__ == '__main__':
299
    profile_to = opts.get_env_var('profile_to', None)
300
    if profile_to != None:
301
        import cProfile
302
        cProfile.run(main.func_code, profile_to)
303
    else: main()
(18-18/36)