Revision 3550
Added by Aaron Marcuse-Kubitza over 12 years ago
lib/sql_io.py | ||
---|---|---|
416 | 416 |
insert_out_pkeys = insert_pkeys_table('out') |
417 | 417 |
insert_in_pkeys = insert_pkeys_table('in') |
418 | 418 |
|
419 |
if is_function: |
|
420 |
args = dict(((k.name, v) for k, v in mapping.iteritems())) |
|
421 |
func_call = sql_gen.NamedCol(out_pkey, |
|
422 |
sql_gen.FunctionCall(out_table, **args)) |
|
423 |
|
|
424 |
if not is_literals: |
|
425 |
log_debug('Defining wrapper function') |
|
426 |
|
|
427 |
# Create empty pkeys table so its row type can be used |
|
428 |
insert_into_pkeys(input_joins, [in_pkey_col, func_call], limit=0, |
|
429 |
recover=True) |
|
430 |
result_type = db.col_info(sql_gen.Col(out_pkey, into)).type |
|
431 |
|
|
432 |
## Create error handling wrapper function |
|
433 |
|
|
434 |
wrapper = db.TempFunction(sql_gen.concat(into.name, '_wrap')) |
|
435 |
|
|
436 |
select_cols = [in_pkey_col]+args.values() |
|
437 |
row_var = copy.copy(sql_gen.row_var) |
|
438 |
row_var.set_srcs([in_table]) |
|
439 |
in_pkey_var = sql_gen.Col(in_pkey, row_var) |
|
440 |
|
|
441 |
args = dict(((k, sql_gen.with_table(v, row_var)) |
|
442 |
for k, v in args.iteritems())) |
|
443 |
func_call = sql_gen.FunctionCall(out_table, **args) |
|
444 |
|
|
445 |
def mk_return(result): |
|
446 |
return sql_gen.ReturnQuery(sql.mk_select(db, |
|
447 |
fields=[in_pkey_var, result], explain=False)) |
|
448 |
exc_handler = func_wrapper_exception_handler(db, |
|
449 |
mk_return(sql_gen.Cast(result_type, None)), args.values(), |
|
450 |
errors_table_) |
|
451 |
|
|
452 |
sql.define_func(db, sql_gen.FunctionDef(wrapper, |
|
453 |
sql_gen.SetOf(into), |
|
454 |
sql_gen.RowExcIgnore(sql_gen.RowType(in_table), |
|
455 |
sql.mk_select(db, input_joins, order_by=None), |
|
456 |
mk_return(func_call), exc_handler=exc_handler) |
|
457 |
)) |
|
458 |
wrapper_table = sql_gen.FunctionCall(wrapper) |
|
459 |
|
|
419 | 460 |
# Do inserts and selects |
420 | 461 |
while True: |
421 | 462 |
has_joins = join_cols != {} |
... | ... | |
429 | 470 |
[out_pkey], order_by=None, limit=0), into=insert_out_pkeys) |
430 | 471 |
break # don't do main case |
431 | 472 |
|
432 |
log_debug('Trying to insert new rows') |
|
433 |
|
|
434 | 473 |
# Prepare to insert new rows |
435 |
if is_function: |
|
436 |
log_debug('Calling function on input rows') |
|
437 |
args = dict(((k.name, v) for k, v in mapping.iteritems())) |
|
438 |
func_call = sql_gen.NamedCol(out_pkey, |
|
439 |
sql_gen.FunctionCall(out_table, **args)) |
|
440 |
|
|
441 |
if not is_literals: |
|
442 |
# Create empty pkeys table so its row type can be used |
|
443 |
insert_into_pkeys(input_joins, [in_pkey_col, func_call], |
|
444 |
limit=0, recover=True) |
|
445 |
result_type = db.col_info(sql_gen.Col(out_pkey, into)).type |
|
446 |
|
|
447 |
## Create error handling wrapper function |
|
448 |
|
|
449 |
wrapper = db.TempFunction(sql_gen.concat(into.name, '_wrap')) |
|
450 |
|
|
451 |
select_cols = [in_pkey_col]+args.values() |
|
452 |
row_var = copy.copy(sql_gen.row_var) |
|
453 |
row_var.set_srcs([in_table]) |
|
454 |
in_pkey_var = sql_gen.Col(in_pkey, row_var) |
|
455 |
|
|
456 |
args = dict(((k, sql_gen.with_table(v, row_var)) |
|
457 |
for k, v in args.iteritems())) |
|
458 |
func_call = sql_gen.FunctionCall(out_table, **args) |
|
459 |
|
|
460 |
def mk_return(result): |
|
461 |
return sql_gen.ReturnQuery(sql.mk_select(db, |
|
462 |
fields=[in_pkey_var, result], explain=False)) |
|
463 |
exc_handler = func_wrapper_exception_handler(db, |
|
464 |
mk_return(sql_gen.Cast(result_type, None)), args.values(), |
|
465 |
errors_table_) |
|
466 |
|
|
467 |
sql.define_func(db, sql_gen.FunctionDef(wrapper, |
|
468 |
sql_gen.SetOf(into), |
|
469 |
sql_gen.RowExcIgnore(sql_gen.RowType(in_table), |
|
470 |
sql.mk_select(db, input_joins, order_by=None), |
|
471 |
mk_return(func_call), exc_handler=exc_handler) |
|
472 |
)) |
|
473 |
wrapper_table = sql_gen.FunctionCall(wrapper) |
|
474 |
if is_function: log_debug('Calling function on input rows') |
|
474 | 475 |
else: |
476 |
log_debug('Trying to insert new rows') |
|
475 | 477 |
insert_args = dict(recover=True, cacheable=False) |
476 | 478 |
if has_joins: |
477 | 479 |
insert_args.update(dict(ignore=True)) |
Also available in: Unified diff
sql_io.py: put_table(): is_function: Factored defining the error handling wrapper function out of the main loop because it only needs to run once. Don't log "Trying to insert new rows" in function mode because it's inaccurate.