Revision 982
Added by Aaron Marcuse-Kubitza almost 13 years ago
map | ||
---|---|---|
12 | 12 |
import exc |
13 | 13 |
import opts |
14 | 14 |
import Parser |
15 |
import profiling |
|
15 | 16 |
import sql |
16 | 17 |
import strings |
17 | 18 |
import term |
... | ... | |
120 | 121 |
if end != None and i >= end: break |
121 | 122 |
process_row(row, i) |
122 | 123 |
row_ready(i, row) |
123 |
|
|
124 | 124 |
row_ct = i-start+1 |
125 |
ex_tracker.add_iters(row_ct) |
|
126 |
sys.stderr.write('Processed '+str(row_ct)+' input rows\n') |
|
125 |
return row_ct |
|
127 | 126 |
|
128 | 127 |
def map_rows(get_value, rows): |
129 | 128 |
'''Maps input rows |
... | ... | |
138 | 137 |
value = get_value(in_, row) |
139 | 138 |
if value != None: xpath.put_obj(root, out, row_id, |
140 | 139 |
has_types, strings.cleanup(value)) |
141 |
process_rows(process_row, rows) |
|
140 |
return process_rows(process_row, rows)
|
|
142 | 141 |
|
143 | 142 |
if map_path == None: |
144 | 143 |
iter_ = xml_dom.NodeElemIter(doc0.documentElement) |
145 | 144 |
util.skip(iter_, xml_dom.is_text) # skip metadata |
146 |
process_rows(lambda row, i: root.appendChild(row), iter_) |
|
145 |
row_ct = process_rows(lambda row, i: root.appendChild(row), iter_)
|
|
147 | 146 |
elif in_is_db: |
148 | 147 |
assert in_is_xpaths |
149 | 148 |
|
... | ... | |
164 | 163 |
value = sql.value_or_none(db_xml.get(in_db, in_, in_pkeys)) |
165 | 164 |
if value != None: return str(value) |
166 | 165 |
else: return None |
167 |
map_rows(get_value, sql.rows(db_xml.get(in_db, in_root_xml, |
|
166 |
row_ct = map_rows(get_value, sql.rows(db_xml.get(in_db, in_root_xml,
|
|
168 | 167 |
in_pkeys, end, 0))) |
169 | 168 |
in_db.close() |
170 | 169 |
elif in_is_xml: |
... | ... | |
175 | 174 |
rows = xpath.get(doc0.documentElement, in_root, limit=end) |
176 | 175 |
if rows == []: raise SystemExit('Map error: Root "'+in_root |
177 | 176 |
+'" not found in input') |
178 |
map_rows(get_value, rows) |
|
177 |
row_ct = map_rows(get_value, rows)
|
|
179 | 178 |
else: # input is CSV |
180 | 179 |
map_ = dict(mappings) |
181 | 180 |
reader = csv.reader(sys.stdin) |
... | ... | |
191 | 190 |
value = row[in_] |
192 | 191 |
if value != '': return value |
193 | 192 |
else: return None |
194 |
map_rows(get_value, reader) |
|
193 |
row_ct = map_rows(get_value, reader) |
|
194 |
|
|
195 |
return row_ct |
|
195 | 196 |
|
196 | 197 |
def process_inputs(root, row_ready): |
197 |
for map_path in map_paths: process_input(root, row_ready, map_path) |
|
198 |
row_ct = 0 |
|
199 |
for map_path in map_paths: |
|
200 |
row_ct += process_input(root, row_ready, map_path) |
|
201 |
return row_ct |
|
198 | 202 |
|
199 |
# Output XML tree
|
|
203 |
profiler = profiling.ItersProfiler(start_now=True, iter_text='row')
|
|
200 | 204 |
doc = xml_dom.create_doc() |
201 | 205 |
root = doc.documentElement |
202 | 206 |
if out_is_db: |
... | ... | |
206 | 210 |
out_pkeys = {} |
207 | 211 |
try: |
208 | 212 |
if redo: sql.empty_db(out_db) |
209 |
row_ct_ref = [0] |
|
213 |
row_ins_ct_ref = [0]
|
|
210 | 214 |
|
211 | 215 |
def row_ready(row_num, input_row): |
212 | 216 |
def on_error(e): |
... | ... | |
219 | 223 |
if not xml_dom.is_empty(root): |
220 | 224 |
assert xml_dom.has_one_child(root) |
221 | 225 |
try: |
222 |
sql.with_savepoint(out_db, lambda: db_xml.put(out_db, |
|
223 |
root.firstChild, out_pkeys, row_ct_ref, on_error)) |
|
226 |
sql.with_savepoint(out_db, |
|
227 |
lambda: db_xml.put(out_db, root.firstChild, |
|
228 |
out_pkeys, row_ins_ct_ref, on_error)) |
|
224 | 229 |
if commit: out_db.commit() |
225 | 230 |
except sql.DatabaseErrors, e: on_error(e) |
226 | 231 |
root.clear() |
227 | 232 |
|
228 |
process_inputs(root, row_ready) |
|
229 |
sys.stdout.write('Inserted '+str(row_ct_ref[0])+ |
|
233 |
row_ct = process_inputs(root, row_ready)
|
|
234 |
sys.stdout.write('Inserted '+str(row_ins_ct_ref[0])+
|
|
230 | 235 |
' new rows into database\n') |
231 | 236 |
finally: |
232 | 237 |
out_db.rollback() |
... | ... | |
234 | 239 |
else: |
235 | 240 |
def on_error(e): ex_tracker.track(e) |
236 | 241 |
def row_ready(row_num, input_row): pass |
237 |
process_inputs(root, row_ready) |
|
242 |
row_ct = process_inputs(root, row_ready)
|
|
238 | 243 |
xml_func.process(root, on_error) |
239 | 244 |
if out_is_xml_ref[0]: |
240 | 245 |
doc.writexml(sys.stdout, **xml_dom.prettyxml_config) |
241 | 246 |
else: # output is CSV |
242 | 247 |
raise NotImplementedError('CSV output not supported yet') |
248 |
profiler.stop(row_ct) |
|
249 |
ex_tracker.add_iters(row_ct) |
|
250 |
sys.stderr.write('Processed '+str(row_ct)+' input rows\n') |
|
251 |
sys.stderr.write(profiler.msg()+'\n') |
|
243 | 252 |
|
244 | 253 |
def main(): |
245 | 254 |
try: main_() |
Also available in: Unified diff
bin/map: Use profiling.ItersProfiler. Refactored input row count calculation to have each function aggregate and return the row count, and then display the row count and statistics that depend on it at the end of the program.