Project

General

Profile

1
#!/usr/bin/env python
2
# Maps one datasource to another, using a map spreadsheet if needed
3
# Exit status is the # of errors in the import, up to the maximum exit status
4
# For outputting an XML file to a PostgreSQL database, use the general format of
5
# http://vegbank.org/vegdocs/xml/vegbank_example_ver1.0.2.xml
6

    
7
import csv
8
import os.path
9
import sys
10
import xml.dom.minidom as minidom
11

    
12
sys.path.append(os.path.dirname(__file__)+"/../lib")
13

    
14
import csvs
15
import exc
16
import opts
17
import Parser
18
import profiling
19
import sql
20
import strings
21
import term
22
import util
23
import xpath
24
import xml_dom
25
import xml_func
26

    
27
def metadata_value(name): return None # this feature has been removed
28

    
29
def cleanup(val):
30
    if val == None: return val
31
    return util.none_if(strings.cleanup(strings.ustr(val)), u'', u'\\N')
32

    
33
def main_():
34
    env_names = []
35
    def usage_err():
36
        raise SystemExit('Usage: '+opts.env_usage(env_names, True)+' '
37
            +sys.argv[0]+' [map_path...] [<input] [>output]')
38
    
39
    # Get db config from env vars
40
    db_config_names = ['engine', 'host', 'user', 'password', 'database']
41
    def get_db_config(prefix):
42
        return opts.get_env_vars(db_config_names, prefix, env_names)
43
    in_db_config = get_db_config('in')
44
    out_db_config = get_db_config('out')
45
    in_is_db = 'engine' in in_db_config
46
    out_is_db = 'engine' in out_db_config
47
    
48
    # Get other config from env vars
49
    end = util.cast(int, opts.get_env_var('n', None, env_names))
50
    start = util.cast(int, opts.get_env_var('start', '0', env_names))
51
    if end != None: end += start
52
    test = opts.env_flag('test', False, env_names)
53
    commit = opts.env_flag('commit', False, env_names) and not test
54
        # never commit in test mode
55
    redo = opts.env_flag('redo', test, env_names) and not commit
56
        # never redo in commit mode (manually run `make empty_db` instead)
57
    debug = opts.env_flag('debug', False, env_names)
58
    sql.run_raw_query.debug = debug
59
    verbose = debug or opts.env_flag('verbose', False, env_names)
60
    opts.get_env_var('profile_to', None, env_names) # add to env_names
61
    
62
    # Logging
63
    def log(msg, on=verbose):
64
        if on: sys.stderr.write(msg)
65
    def log_start(action, on=verbose): log(action+'...\n', on)
66
    
67
    # Parse args
68
    map_paths = sys.argv[1:]
69
    if map_paths == []:
70
        if in_is_db or not out_is_db: usage_err()
71
        else: map_paths = [None]
72
    
73
    def connect_db(db_config):
74
        log_start('Connecting to '+sql.db_config_str(db_config))
75
        return sql.connect(db_config)
76
    
77
    ex_tracker = exc.ExPercentTracker(iter_text='row')
78
    profiler = profiling.ItersProfiler(start_now=True, iter_text='row')
79
    
80
    doc = xml_dom.create_doc()
81
    root = doc.documentElement
82
    out_is_xml_ref = [False]
83
    in_label_ref = [None]
84
    def update_in_label():
85
        if in_label_ref[0] != None:
86
            xpath.get(root, '/_ignore/inLabel="'+in_label_ref[0]+'"', True)
87
    def prep_root():
88
        root.clear()
89
        update_in_label()
90
    prep_root()
91
    
92
    def process_input(root, row_ready, map_path):
93
        '''Inputs datasource to XML tree, mapping if needed'''
94
        # Load map header
95
        in_is_xpaths = True
96
        out_is_xpaths = True
97
        out_label = None
98
        if map_path != None:
99
            metadata = []
100
            mappings = []
101
            stream = open(map_path, 'rb')
102
            reader = csv.reader(stream)
103
            in_label, out_label = reader.next()[:2]
104
            
105
            def split_col_name(name):
106
                label, sep, root = name.partition(':')
107
                label, sep2, prefixes_str = label.partition('[')
108
                prefixes_str = strings.remove_suffix(']', prefixes_str)
109
                prefixes = strings.split(',', prefixes_str)
110
                return label, sep != '', root, prefixes
111
                    # extract datasrc from "datasrc[data_format]"
112
            
113
            in_label, in_is_xpaths, in_root, prefixes = split_col_name(in_label)
114
            in_label_ref[0] = in_label
115
            update_in_label()
116
            out_label, out_is_xpaths, out_root = split_col_name(out_label)[:3]
117
            has_types = out_root.startswith('/*s/') # outer elements are types
118
            
119
            for row in reader:
120
                in_, out = row[:2]
121
                if out != '':
122
                    if out_is_xpaths: out = xpath.parse(out_root+out)
123
                    mappings.append((in_, out))
124
            
125
            stream.close()
126
            
127
            root.ownerDocument.documentElement.tagName = out_label
128
        in_is_xml = in_is_xpaths and not in_is_db
129
        out_is_xml_ref[0] = out_is_xpaths and not out_is_db
130
        
131
        if in_is_xml:
132
            doc0 = minidom.parse(sys.stdin)
133
            doc0_root = doc0.documentElement
134
            if out_label == None: out_label = doc0_root.tagName
135
        
136
        def process_rows(process_row, rows):
137
            '''Processes input rows
138
            @param process_row(in_row, i)
139
            '''
140
            i = -1 # in case for loop does not execute
141
            for i, row in enumerate(rows):
142
                if i < start: continue
143
                if end != None and i >= end: break
144
                process_row(row, i)
145
                row_ready(i, row)
146
            row_ct = i-start+1
147
            return row_ct
148
        
149
        def map_rows(get_value, rows):
150
            '''Maps input rows
151
            @param get_value(in_, row):str
152
            '''
153
            def process_row(row, i):
154
                row_id = str(i)
155
                for in_, out in mappings:
156
                    value = metadata_value(in_)
157
                    if value == None:
158
                        log_start('Getting '+str(in_), debug)
159
                        value = cleanup(get_value(in_, row))
160
                    if value != None:
161
                        log_start('Putting '+str(out), debug)
162
                        xpath.put_obj(root, out, row_id, has_types, value)
163
            return process_rows(process_row, rows)
164
        
165
        def map_table(col_names, rows):
166
            col_idxs = util.list_flip(col_names)
167
            
168
            i = 0
169
            while i < len(mappings): # mappings len changes in loop
170
                in_, out = mappings[i]
171
                if metadata_value(in_) == None:
172
                    try: mappings[i] = (col_idxs[in_], out)
173
                    except KeyError: del mappings[i]; continue # keep i the same
174
                i += 1
175
            
176
            def get_value(in_, row):
177
                try: return row.list[in_]
178
                except IndexError: return None
179
            def wrap_row(row): return util.ListDict(row, col_names, col_idxs)
180
            
181
            return map_rows(get_value, util.WrapIter(wrap_row, rows))
182
        
183
        if map_path == None:
184
            iter_ = xml_dom.NodeElemIter(doc0_root)
185
            util.skip(iter_, xml_dom.is_text) # skip metadata
186
            row_ct = process_rows(lambda row, i: root.appendChild(row), iter_)
187
        elif in_is_db:
188
            assert in_is_xpaths
189
            
190
            in_db = connect_db(in_db_config)
191
            in_pkeys = {}
192
            cur = sql.select(in_db, table=in_root, fields=None, conds=None,
193
                limit=end, start=0)
194
            row_ct = map_table(list(sql.col_names(cur)), sql.rows(cur))
195
            
196
            in_db.close()
197
        elif in_is_xml:
198
            def get_value(in_, row):
199
                nodes = xpath.get(row, in_, allow_rooted=False)
200
                if nodes != []: return xml_dom.value(nodes[0])
201
                else: return None
202
            rows = xpath.get(doc0_root, in_root, limit=end)
203
            if rows == []: raise SystemExit('Map error: Root "'+in_root
204
                +'" not found in input')
205
            row_ct = map_rows(get_value, rows)
206
        else: # input is CSV
207
            map_ = dict(mappings)
208
            reader, col_names = csvs.reader_and_header(sys.stdin)
209
            row_ct = map_table(col_names, reader)
210
        
211
        return row_ct
212
    
213
    def process_inputs(root, row_ready):
214
        row_ct = 0
215
        for map_path in map_paths:
216
            row_ct += process_input(root, row_ready, map_path)
217
        return row_ct
218
    
219
    if out_is_db:
220
        import db_xml
221
        
222
        out_db = connect_db(out_db_config)
223
        out_pkeys = {}
224
        try:
225
            if redo: sql.empty_db(out_db)
226
            row_ins_ct_ref = [0]
227
            
228
            def row_ready(row_num, input_row):
229
                def on_error(e):
230
                    exc.add_msg(e, term.emph('row #:')+' '+str(row_num))
231
                    exc.add_msg(e, term.emph('input row:')+'\n'+str(input_row))
232
                    exc.add_msg(e, term.emph('output row:')+'\n'+str(root))
233
                    ex_tracker.track(e)
234
                
235
                xml_func.process(root, on_error)
236
                if not xml_dom.is_empty(root):
237
                    assert xml_dom.has_one_child(root)
238
                    try:
239
                        sql.with_savepoint(out_db,
240
                            lambda: db_xml.put(out_db, root.firstChild,
241
                                out_pkeys, row_ins_ct_ref, on_error))
242
                        if commit: out_db.commit()
243
                    except sql.DatabaseErrors, e: on_error(e)
244
                prep_root()
245
            
246
            row_ct = process_inputs(root, row_ready)
247
            sys.stdout.write('Inserted '+str(row_ins_ct_ref[0])+
248
                ' new rows into database\n')
249
        finally:
250
            out_db.rollback()
251
            out_db.close()
252
    else:
253
        def on_error(e): ex_tracker.track(e)
254
        def row_ready(row_num, input_row): pass
255
        row_ct = process_inputs(root, row_ready)
256
        xml_func.process(root, on_error)
257
        if out_is_xml_ref[0]:
258
            doc.writexml(sys.stdout, **xml_dom.prettyxml_config)
259
        else: # output is CSV
260
            raise NotImplementedError('CSV output not supported yet')
261
    
262
    profiler.stop(row_ct)
263
    ex_tracker.add_iters(row_ct)
264
    if verbose:
265
        sys.stderr.write('Processed '+str(row_ct)+' input rows\n')
266
        sys.stderr.write(profiler.msg()+'\n')
267
        sys.stderr.write(ex_tracker.msg()+'\n')
268
    ex_tracker.exit()
269

    
270
def main():
271
    try: main_()
272
    except Parser.SyntaxException, e: raise SystemExit(str(e))
273

    
274
if __name__ == '__main__':
275
    profile_to = opts.get_env_var('profile_to', None)
276
    if profile_to != None:
277
        import cProfile
278
        cProfile.run(main.func_code, profile_to)
279
    else: main()
(14-14/28)