Project

General

Profile

1
#!/usr/bin/env python
2
# Maps one datasource to another, using a map spreadsheet if needed
3
# Exit status is the # of errors in the import, up to the maximum exit status
4
# For outputting an XML file to a PostgreSQL database, use the general format of
5
# http://vegbank.org/vegdocs/xml/vegbank_example_ver1.0.2.xml
6

    
7
import csv
8
import os.path
9
import sys
10
import xml.dom.minidom as minidom
11

    
12
sys.path.append(os.path.dirname(__file__)+"/../lib")
13

    
14
import csvs
15
import exc
16
import opts
17
import Parser
18
import profiling
19
import sql
20
import strings
21
import term
22
import util
23
import xpath
24
import xml_dom
25
import xml_func
26

    
27
def metadata_value(name): return None # this feature has been removed
28

    
29
def cleanup(val):
30
    if val == None: return val
31
    return util.none_if(strings.cleanup(strings.ustr(val)), u'', u'\\N')
32

    
33
def main_():
34
    env_names = []
35
    def usage_err():
36
        raise SystemExit('Usage: '+opts.env_usage(env_names, True)+' '
37
            +sys.argv[0]+' [map_path...] [<input] [>output]')
38
    
39
    # Get db config from env vars
40
    db_config_names = ['engine', 'host', 'user', 'password', 'database']
41
    def get_db_config(prefix):
42
        return opts.get_env_vars(db_config_names, prefix, env_names)
43
    in_db_config = get_db_config('in')
44
    out_db_config = get_db_config('out')
45
    in_is_db = 'engine' in in_db_config
46
    out_is_db = 'engine' in out_db_config
47
    
48
    # Get other config from env vars
49
    end = util.cast(int, opts.get_env_var('n', None, env_names))
50
    start = util.cast(int, opts.get_env_var('start', '0', env_names))
51
    if end != None: end += start
52
    test = opts.env_flag('test', False, env_names)
53
    commit = opts.env_flag('commit', False, env_names) and not test
54
        # never commit in test mode
55
    redo = opts.env_flag('redo', test, env_names) and not commit
56
        # never redo in commit mode (manually run `make empty_db` instead)
57
    debug = opts.env_flag('debug', False, env_names)
58
    sql.run_raw_query.debug = debug
59
    verbose = debug or opts.env_flag('verbose', False, env_names)
60
    opts.get_env_var('profile_to', None, env_names) # add to env_names
61
    
62
    # Logging
63
    def log(msg, on=verbose):
64
        if on: sys.stderr.write(msg)
65
    def log_start(action, on=verbose): log(action+'...\n', on)
66
    
67
    # Parse args
68
    map_paths = sys.argv[1:]
69
    if map_paths == []:
70
        if in_is_db or not out_is_db: usage_err()
71
        else: map_paths = [None]
72
    
73
    def connect_db(db_config):
74
        log_start('Connecting to '+sql.db_config_str(db_config))
75
        return sql.connect(db_config)
76
    
77
    ex_tracker = exc.ExPercentTracker(iter_text='row')
78
    profiler = profiling.ItersProfiler(start_now=True, iter_text='row')
79
    
80
    doc = xml_dom.create_doc()
81
    root = doc.documentElement
82
    out_is_xml_ref = [False]
83
    in_label_ref = [None]
84
    def update_in_label():
85
        if in_label_ref[0] != None:
86
            xpath.get(root, '/_ignore/inLabel="'+in_label_ref[0]+'"', True)
87
    def prep_root():
88
        root.clear()
89
        update_in_label()
90
    prep_root()
91
    
92
    def process_input(root, row_ready, map_path):
93
        '''Inputs datasource to XML tree, mapping if needed'''
94
        # Load map header
95
        in_is_xpaths = True
96
        out_is_xpaths = True
97
        out_label = None
98
        if map_path != None:
99
            metadata = []
100
            mappings = []
101
            stream = open(map_path, 'rb')
102
            reader = csv.reader(stream)
103
            in_label, out_label = reader.next()[:2]
104
            def split_col_name(name):
105
                name, sep, root = name.partition(':')
106
                return name.partition('[')[0], sep != '', root
107
                    # extract datasrc from "datasrc[data_format]"
108
            in_label, in_is_xpaths, in_root = split_col_name(in_label)
109
            in_label_ref[0] = in_label
110
            update_in_label()
111
            out_label, out_is_xpaths, out_root = split_col_name(out_label)
112
            has_types = out_root.startswith('/*s/') # outer elements are types
113
            for row in reader:
114
                in_, out = row[:2]
115
                if out != '':
116
                    if out_is_xpaths: out = xpath.parse(out_root+out)
117
                    mappings.append((in_, out))
118
            stream.close()
119
            
120
            root.ownerDocument.documentElement.tagName = out_label
121
        in_is_xml = in_is_xpaths and not in_is_db
122
        out_is_xml_ref[0] = out_is_xpaths and not out_is_db
123
        
124
        if in_is_xml:
125
            doc0 = minidom.parse(sys.stdin)
126
            doc0_root = doc0.documentElement
127
            if out_label == None: out_label = doc0_root.tagName
128
        
129
        def process_rows(process_row, rows):
130
            '''Processes input rows
131
            @param process_row(in_row, i)
132
            '''
133
            i = -1 # in case for loop does not execute
134
            for i, row in enumerate(rows):
135
                if i < start: continue
136
                if end != None and i >= end: break
137
                process_row(row, i)
138
                row_ready(i, row)
139
            row_ct = i-start+1
140
            return row_ct
141
        
142
        def map_rows(get_value, rows):
143
            '''Maps input rows
144
            @param get_value(in_, row):str
145
            '''
146
            def process_row(row, i):
147
                row_id = str(i)
148
                for in_, out in mappings:
149
                    value = metadata_value(in_)
150
                    if value == None:
151
                        log_start('Getting '+str(in_), debug)
152
                        value = cleanup(get_value(in_, row))
153
                    if value != None:
154
                        log_start('Putting '+str(out), debug)
155
                        xpath.put_obj(root, out, row_id, has_types, value)
156
            return process_rows(process_row, rows)
157
        
158
        if map_path == None:
159
            iter_ = xml_dom.NodeElemIter(doc0_root)
160
            util.skip(iter_, xml_dom.is_text) # skip metadata
161
            row_ct = process_rows(lambda row, i: root.appendChild(row), iter_)
162
        elif in_is_db:
163
            assert in_is_xpaths
164
            
165
            in_db = connect_db(in_db_config)
166
            in_pkeys = {}
167
            cur = sql.select(in_db, table=in_root, fields=None, conds=None,
168
                limit=end, start=0)
169
            col_names = list(sql.col_names(cur))
170
            col_idxs = util.list_flip(col_names)
171
            
172
            mappings_new = []
173
            for i, mapping in enumerate(mappings):
174
                in_, out = mapping
175
                if metadata_value(in_) == None:
176
                    try: mapping = (col_idxs[in_], out)
177
                    except KeyError: continue
178
                mappings_new.append(mapping)
179
            mappings = mappings_new
180
            
181
            def get_value(in_, row):
182
                try: return row.list[in_]
183
                except IndexError: return None
184
            def wrap_row(row): return util.ListDict(row, col_names, col_idxs)
185
            row_ct = map_rows(get_value, util.WrapIter(wrap_row, sql.rows(cur)))
186
            
187
            in_db.close()
188
        elif in_is_xml:
189
            def get_value(in_, row):
190
                nodes = xpath.get(row, in_, allow_rooted=False)
191
                if nodes != []: return xml_dom.value(nodes[0])
192
                else: return None
193
            rows = xpath.get(doc0_root, in_root, limit=end)
194
            if rows == []: raise SystemExit('Map error: Root "'+in_root
195
                +'" not found in input')
196
            row_ct = map_rows(get_value, rows)
197
        else: # input is CSV
198
            map_ = dict(mappings)
199
            reader, col_names = csvs.reader_and_header(sys.stdin)
200
            col_idxs = util.list_flip(col_names)
201
            
202
            mappings_new = []
203
            for i, mapping in enumerate(mappings):
204
                in_, out = mapping
205
                if metadata_value(in_) == None:
206
                    try: mapping = (col_idxs[in_], out)
207
                    except KeyError: continue
208
                mappings_new.append(mapping)
209
            mappings = mappings_new
210
            
211
            def get_value(in_, row):
212
                try: return row.list[in_]
213
                except IndexError: return None
214
            def wrap_row(row): return util.ListDict(row, col_names, col_idxs)
215
            row_ct = map_rows(get_value, util.WrapIter(wrap_row, reader))
216
        
217
        return row_ct
218
    
219
    def process_inputs(root, row_ready):
220
        row_ct = 0
221
        for map_path in map_paths:
222
            row_ct += process_input(root, row_ready, map_path)
223
        return row_ct
224
    
225
    if out_is_db:
226
        import db_xml
227
        
228
        out_db = connect_db(out_db_config)
229
        out_pkeys = {}
230
        try:
231
            if redo: sql.empty_db(out_db)
232
            row_ins_ct_ref = [0]
233
            
234
            def row_ready(row_num, input_row):
235
                def on_error(e):
236
                    exc.add_msg(e, term.emph('row #:')+' '+str(row_num))
237
                    exc.add_msg(e, term.emph('input row:')+'\n'+str(input_row))
238
                    exc.add_msg(e, term.emph('output row:')+'\n'+str(root))
239
                    ex_tracker.track(e)
240
                
241
                xml_func.process(root, on_error)
242
                if not xml_dom.is_empty(root):
243
                    assert xml_dom.has_one_child(root)
244
                    try:
245
                        sql.with_savepoint(out_db,
246
                            lambda: db_xml.put(out_db, root.firstChild,
247
                                out_pkeys, row_ins_ct_ref, on_error))
248
                        if commit: out_db.commit()
249
                    except sql.DatabaseErrors, e: on_error(e)
250
                prep_root()
251
            
252
            row_ct = process_inputs(root, row_ready)
253
            sys.stdout.write('Inserted '+str(row_ins_ct_ref[0])+
254
                ' new rows into database\n')
255
        finally:
256
            out_db.rollback()
257
            out_db.close()
258
    else:
259
        def on_error(e): ex_tracker.track(e)
260
        def row_ready(row_num, input_row): pass
261
        row_ct = process_inputs(root, row_ready)
262
        xml_func.process(root, on_error)
263
        if out_is_xml_ref[0]:
264
            doc.writexml(sys.stdout, **xml_dom.prettyxml_config)
265
        else: # output is CSV
266
            raise NotImplementedError('CSV output not supported yet')
267
    
268
    profiler.stop(row_ct)
269
    ex_tracker.add_iters(row_ct)
270
    if verbose:
271
        sys.stderr.write('Processed '+str(row_ct)+' input rows\n')
272
        sys.stderr.write(profiler.msg()+'\n')
273
        sys.stderr.write(ex_tracker.msg()+'\n')
274
    ex_tracker.exit()
275

    
276
def main():
277
    try: main_()
278
    except Parser.SyntaxException, e: raise SystemExit(str(e))
279

    
280
if __name__ == '__main__':
281
    profile_to = opts.get_env_var('profile_to', None)
282
    if profile_to != None:
283
        import cProfile
284
        cProfile.run(main.func_code, profile_to)
285
    else: main()
(14-14/28)