Revision 3077
Added by Aaron Marcuse-Kubitza over 12 years ago
lib/sql.py | ||
---|---|---|
1 | 1 |
# Database access |
2 | 2 |
|
3 | 3 |
import copy |
4 |
import operator |
|
5 | 4 |
import re |
6 | 5 |
import warnings |
7 | 6 |
|
... | ... | |
1304 | 1303 |
'''For kw_args, see tables()''' |
1305 | 1304 |
for table in tables(db, schema, **kw_args): truncate(db, table, schema) |
1306 | 1305 |
|
1307 |
##### Heuristic queries |
|
1308 |
|
|
1309 |
def put(db, table, row, pkey_=None, row_ct_ref=None): |
|
1310 |
'''Recovers from errors. |
|
1311 |
Only works under PostgreSQL (uses INSERT RETURNING). |
|
1312 |
''' |
|
1313 |
row = sql_gen.ColDict(db, table, row) |
|
1314 |
if pkey_ == None: pkey_ = pkey(db, table, recover=True) |
|
1315 |
|
|
1316 |
try: |
|
1317 |
cur = insert(db, table, row, pkey_, recover=True) |
|
1318 |
if row_ct_ref != None and cur.rowcount >= 0: |
|
1319 |
row_ct_ref[0] += cur.rowcount |
|
1320 |
return value(cur) |
|
1321 |
except DuplicateKeyException, e: |
|
1322 |
row = sql_gen.ColDict(db, table, |
|
1323 |
util.dict_subset_right_join(row, e.cols)) |
|
1324 |
return value(select(db, table, [pkey_], row, recover=True)) |
|
1325 |
|
|
1326 |
def get(db, table, row, pkey, row_ct_ref=None, create=False): |
|
1327 |
'''Recovers from errors''' |
|
1328 |
try: return value(select(db, table, [pkey], row, limit=1, recover=True)) |
|
1329 |
except StopIteration: |
|
1330 |
if not create: raise |
|
1331 |
return put(db, table, row, pkey, row_ct_ref) # insert new row |
|
1332 |
|
|
1333 |
def is_func_result(col): |
|
1334 |
return col.table.name.find('(') >= 0 and col.name == 'result' |
|
1335 |
|
|
1336 |
def into_table_name(out_table, in_tables0, mapping, is_func): |
|
1337 |
def in_col_str(in_col): |
|
1338 |
in_col = sql_gen.remove_col_rename(in_col) |
|
1339 |
if isinstance(in_col, sql_gen.Col): |
|
1340 |
table = in_col.table |
|
1341 |
if table == in_tables0: |
|
1342 |
in_col = sql_gen.to_name_only_col(in_col) |
|
1343 |
elif is_func_result(in_col): in_col = table # omit col name |
|
1344 |
return str(in_col) |
|
1345 |
|
|
1346 |
str_ = str(out_table) |
|
1347 |
if is_func: |
|
1348 |
str_ += '(' |
|
1349 |
|
|
1350 |
try: value_in_col = mapping['value'] |
|
1351 |
except KeyError: |
|
1352 |
str_ += ', '.join((str(k)+'='+in_col_str(v) |
|
1353 |
for k, v in mapping.iteritems())) |
|
1354 |
else: str_ += in_col_str(value_in_col) |
|
1355 |
|
|
1356 |
str_ += ')' |
|
1357 |
else: |
|
1358 |
out_col = 'rank' |
|
1359 |
try: in_col = mapping[out_col] |
|
1360 |
except KeyError: str_ += '_pkeys' |
|
1361 |
else: # has a rank column, so hierarchical |
|
1362 |
str_ += '['+str(out_col)+'='+in_col_str(in_col)+']' |
|
1363 |
return str_ |
|
1364 |
|
|
1365 |
def put_table(db, out_table, in_tables, mapping, row_ct_ref=None, into=None, |
|
1366 |
default=None, is_func=False, on_error=exc.raise_): |
|
1367 |
'''Recovers from errors. |
|
1368 |
Only works under PostgreSQL (uses INSERT RETURNING). |
|
1369 |
IMPORTANT: Must be run at the *beginning* of a transaction. |
|
1370 |
@param in_tables The main input table to select from, followed by a list of |
|
1371 |
tables to join with it using the main input table's pkey |
|
1372 |
@param mapping dict(out_table_col=in_table_col, ...) |
|
1373 |
* out_table_col: str (*not* sql_gen.Col) |
|
1374 |
* in_table_col: sql_gen.Col|literal-value |
|
1375 |
@param into The table to contain the output and input pkeys. |
|
1376 |
Defaults to `out_table.name+'_pkeys'`. |
|
1377 |
@param default The *output* column to use as the pkey for missing rows. |
|
1378 |
If this output column does not exist in the mapping, uses None. |
|
1379 |
@param is_func Whether out_table is the name of a SQL function, not a table |
|
1380 |
@return sql_gen.Col Where the output pkeys are made available |
|
1381 |
''' |
|
1382 |
out_table = sql_gen.as_Table(out_table) |
|
1383 |
|
|
1384 |
def log_debug(msg): db.log_debug(msg, level=1.5) |
|
1385 |
def col_ustr(str_): |
|
1386 |
return strings.repr_no_u(sql_gen.remove_col_rename(str_)) |
|
1387 |
|
|
1388 |
log_debug('********** New iteration **********') |
|
1389 |
log_debug('Inserting these input columns into '+strings.as_tt( |
|
1390 |
out_table.to_str(db))+':\n'+strings.as_table(mapping, ustr=col_ustr)) |
|
1391 |
|
|
1392 |
is_function = function_exists(db, out_table) |
|
1393 |
|
|
1394 |
if is_function: out_pkey = 'result' |
|
1395 |
else: out_pkey = pkey(db, out_table, recover=True) |
|
1396 |
out_pkey_col = sql_gen.as_Col(out_pkey, out_table) |
|
1397 |
|
|
1398 |
if mapping == {}: # need at least one column for INSERT SELECT |
|
1399 |
mapping = {out_pkey: None} # ColDict will replace with default value |
|
1400 |
|
|
1401 |
# Create input joins from list of input tables |
|
1402 |
in_tables_ = in_tables[:] # don't modify input! |
|
1403 |
in_tables0 = in_tables_.pop(0) # first table is separate |
|
1404 |
errors_table_ = errors_table(db, in_tables0) |
|
1405 |
in_pkey = pkey(db, in_tables0, recover=True) |
|
1406 |
in_pkey_col = sql_gen.as_Col(in_pkey, in_tables0) |
|
1407 |
input_joins = [in_tables0]+[sql_gen.Join(v, |
|
1408 |
{in_pkey: sql_gen.join_same_not_null}) for v in in_tables_] |
|
1409 |
|
|
1410 |
if into == None: |
|
1411 |
into = into_table_name(out_table, in_tables0, mapping, is_func) |
|
1412 |
into = sql_gen.as_Table(into) |
|
1413 |
|
|
1414 |
# Set column sources |
|
1415 |
in_cols = filter(sql_gen.is_table_col, mapping.values()) |
|
1416 |
for col in in_cols: |
|
1417 |
if col.table == in_tables0: col.set_srcs(sql_gen.src_self) |
|
1418 |
|
|
1419 |
log_debug('Joining together input tables into temp table') |
|
1420 |
# Place in new table for speed and so don't modify input if values edited |
|
1421 |
in_table = sql_gen.Table('in') |
|
1422 |
mapping = dicts.join(mapping, flatten(db, in_table, input_joins, in_cols, |
|
1423 |
preserve=[in_pkey_col], start=0)) |
|
1424 |
input_joins = [in_table] |
|
1425 |
db.log_debug('Temp table: '+strings.as_tt(in_table.to_str(db)), level=2) |
|
1426 |
|
|
1427 |
mapping = sql_gen.ColDict(db, out_table, mapping) |
|
1428 |
# after applying dicts.join() because that returns a plain dict |
|
1429 |
|
|
1430 |
# Resolve default value column |
|
1431 |
if default != None: |
|
1432 |
try: default = mapping[default] |
|
1433 |
except KeyError: |
|
1434 |
db.log_debug('Default value column ' |
|
1435 |
+strings.as_tt(strings.repr_no_u(default)) |
|
1436 |
+' does not exist in mapping, falling back to None', level=2.1) |
|
1437 |
default = None |
|
1438 |
|
|
1439 |
pkeys_names = [in_pkey, out_pkey] |
|
1440 |
pkeys_cols = [in_pkey_col, out_pkey_col] |
|
1441 |
|
|
1442 |
pkeys_table_exists_ref = [False] |
|
1443 |
def insert_into_pkeys(joins, cols, distinct=False): |
|
1444 |
kw_args = {} |
|
1445 |
if distinct: kw_args.update(dict(distinct_on=[in_pkey_col])) |
|
1446 |
query = mk_select(db, joins, cols, order_by=None, start=0, **kw_args) |
|
1447 |
|
|
1448 |
if pkeys_table_exists_ref[0]: |
|
1449 |
insert_select(db, into, pkeys_names, query) |
|
1450 |
else: |
|
1451 |
run_query_into(db, query, into=into) |
|
1452 |
pkeys_table_exists_ref[0] = True |
|
1453 |
|
|
1454 |
limit_ref = [None] |
|
1455 |
conds = set() |
|
1456 |
distinct_on = sql_gen.ColDict(db, out_table) |
|
1457 |
def mk_main_select(joins, cols): |
|
1458 |
distinct_on_cols = [c.to_Col() for c in distinct_on.values()] |
|
1459 |
return mk_select(db, joins, cols, conds, distinct_on_cols, |
|
1460 |
limit=limit_ref[0], start=0) |
|
1461 |
|
|
1462 |
exc_strs = set() |
|
1463 |
def log_exc(e): |
|
1464 |
e_str = exc.str_(e, first_line_only=True) |
|
1465 |
log_debug('Caught exception: '+e_str) |
|
1466 |
assert e_str not in exc_strs # avoid infinite loops |
|
1467 |
exc_strs.add(e_str) |
|
1468 |
|
|
1469 |
def remove_all_rows(): |
|
1470 |
log_debug('Ignoring all rows') |
|
1471 |
limit_ref[0] = 0 # just create an empty pkeys table |
|
1472 |
|
|
1473 |
def ignore(in_col, value, e): |
|
1474 |
track_data_error(db, errors_table_, in_col.srcs, value, e.cause.pgcode, |
|
1475 |
e.cause.pgerror) |
|
1476 |
log_debug('Ignoring rows with '+strings.as_tt(repr(in_col))+' = ' |
|
1477 |
+strings.as_tt(repr(value))) |
|
1478 |
|
|
1479 |
def remove_rows(in_col, value, e): |
|
1480 |
ignore(in_col, value, e) |
|
1481 |
cond = (in_col, sql_gen.CompareCond(value, '!=')) |
|
1482 |
assert cond not in conds # avoid infinite loops |
|
1483 |
conds.add(cond) |
|
1484 |
|
|
1485 |
def invalid2null(in_col, value, e): |
|
1486 |
ignore(in_col, value, e) |
|
1487 |
update(db, in_table, [(in_col, None)], |
|
1488 |
sql_gen.ColValueCond(in_col, value)) |
|
1489 |
|
|
1490 |
def insert_pkeys_table(which): |
|
1491 |
return sql_gen.Table(sql_gen.concat(in_table.name, |
|
1492 |
'_insert_'+which+'_pkeys')) |
|
1493 |
insert_out_pkeys = insert_pkeys_table('out') |
|
1494 |
insert_in_pkeys = insert_pkeys_table('in') |
|
1495 |
|
|
1496 |
# Do inserts and selects |
|
1497 |
join_cols = sql_gen.ColDict(db, out_table) |
|
1498 |
while True: |
|
1499 |
if limit_ref[0] == 0: # special case |
|
1500 |
log_debug('Creating an empty pkeys table') |
|
1501 |
cur = run_query_into(db, mk_select(db, out_table, [out_pkey], |
|
1502 |
limit=limit_ref[0]), into=insert_out_pkeys) |
|
1503 |
break # don't do main case |
|
1504 |
|
|
1505 |
has_joins = join_cols != {} |
|
1506 |
|
|
1507 |
log_debug('Trying to insert new rows') |
|
1508 |
|
|
1509 |
# Prepare to insert new rows |
|
1510 |
insert_joins = input_joins[:] # don't modify original! |
|
1511 |
insert_args = dict(recover=True, cacheable=False) |
|
1512 |
if has_joins: |
|
1513 |
insert_args.update(dict(ignore=True)) |
|
1514 |
else: |
|
1515 |
insert_args.update(dict(returning=out_pkey, into=insert_out_pkeys)) |
|
1516 |
main_select = mk_main_select(insert_joins, mapping.values()) |
|
1517 |
|
|
1518 |
def main_insert(): |
|
1519 |
if is_function: |
|
1520 |
log_debug('Calling function on input rows') |
|
1521 |
args = dict(((k.name, v) for k, v in mapping.iteritems())) |
|
1522 |
func_call = sql_gen.NamedCol(out_pkey, |
|
1523 |
sql_gen.FunctionCall(out_table, **args)) |
|
1524 |
insert_into_pkeys(input_joins, [in_pkey_col, func_call]) |
|
1525 |
return None |
|
1526 |
else: |
|
1527 |
return insert_select(db, out_table, mapping.keys(), main_select, |
|
1528 |
**insert_args) |
|
1529 |
|
|
1530 |
try: |
|
1531 |
cur = with_savepoint(db, main_insert) |
|
1532 |
break # insert successful |
|
1533 |
except MissingCastException, e: |
|
1534 |
log_exc(e) |
|
1535 |
|
|
1536 |
out_col = e.col |
|
1537 |
type_ = e.type |
|
1538 |
|
|
1539 |
log_debug('Casting '+strings.as_tt(out_col)+' input to ' |
|
1540 |
+strings.as_tt(type_)) |
|
1541 |
mapping[out_col] = cast_temp_col(db, type_, mapping[out_col], |
|
1542 |
errors_table_) |
|
1543 |
except DuplicateKeyException, e: |
|
1544 |
log_exc(e) |
|
1545 |
|
|
1546 |
old_join_cols = join_cols.copy() |
|
1547 |
distinct_on.update(util.dict_subset(mapping, e.cols)) |
|
1548 |
join_cols.update(util.dict_subset_right_join(mapping, e.cols)) |
|
1549 |
log_debug('Ignoring existing rows, comparing on these columns:\n' |
|
1550 |
+strings.as_inline_table(join_cols, ustr=col_ustr)) |
|
1551 |
assert join_cols != old_join_cols # avoid infinite loops |
|
1552 |
except NullValueException, e: |
|
1553 |
log_exc(e) |
|
1554 |
|
|
1555 |
out_col, = e.cols |
|
1556 |
try: in_col = mapping[out_col] |
|
1557 |
except KeyError: |
|
1558 |
log_debug('Missing mapping for NOT NULL column '+out_col) |
|
1559 |
remove_all_rows() |
|
1560 |
else: remove_rows(in_col, None, e) |
|
1561 |
except FunctionValueException, e: |
|
1562 |
log_exc(e) |
|
1563 |
|
|
1564 |
func_name = e.name |
|
1565 |
value = e.value |
|
1566 |
for out_col, in_col in mapping.iteritems(): |
|
1567 |
in_col = sql_gen.unwrap_func_call(in_col, func_name) |
|
1568 |
invalid2null(in_col, value, e) |
|
1569 |
except DatabaseErrors, e: |
|
1570 |
log_exc(e) |
|
1571 |
|
|
1572 |
log_debug('No handler for exception') |
|
1573 |
on_error(e) |
|
1574 |
remove_all_rows() |
|
1575 |
# after exception handled, rerun loop with additional constraints |
|
1576 |
|
|
1577 |
if cur != None and row_ct_ref != None and cur.rowcount >= 0: |
|
1578 |
row_ct_ref[0] += cur.rowcount |
|
1579 |
|
|
1580 |
if is_function: pass # pkeys table already created |
|
1581 |
elif has_joins: |
|
1582 |
select_joins = input_joins+[sql_gen.Join(out_table, join_cols)] |
|
1583 |
log_debug('Getting output table pkeys of existing/inserted rows') |
|
1584 |
insert_into_pkeys(select_joins, pkeys_cols, distinct=True) |
|
1585 |
else: |
|
1586 |
add_row_num(db, insert_out_pkeys) # for joining with input pkeys |
|
1587 |
|
|
1588 |
log_debug('Getting input table pkeys of inserted rows') |
|
1589 |
run_query_into(db, mk_main_select(input_joins, [in_pkey]), |
|
1590 |
into=insert_in_pkeys) |
|
1591 |
add_row_num(db, insert_in_pkeys) # for joining with output pkeys |
|
1592 |
|
|
1593 |
assert table_row_count(db, insert_out_pkeys) == table_row_count(db, |
|
1594 |
insert_in_pkeys) |
|
1595 |
|
|
1596 |
log_debug('Combining output and input pkeys in inserted order') |
|
1597 |
pkey_joins = [insert_in_pkeys, sql_gen.Join(insert_out_pkeys, |
|
1598 |
{row_num_col: sql_gen.join_same_not_null})] |
|
1599 |
insert_into_pkeys(pkey_joins, pkeys_names) |
|
1600 |
|
|
1601 |
empty_temp(db, [insert_out_pkeys, insert_in_pkeys]) |
|
1602 |
|
|
1603 |
db.log_debug('Adding pkey on pkeys table to enable fast joins', level=2.5) |
|
1604 |
add_pkey(db, into) |
|
1605 |
|
|
1606 |
log_debug('Setting pkeys of missing rows to '+strings.as_tt(repr(default))) |
|
1607 |
missing_rows_joins = input_joins+[sql_gen.Join(into, |
|
1608 |
{in_pkey: sql_gen.join_same_not_null}, sql_gen.filter_out)] |
|
1609 |
# must use join_same_not_null or query will take forever |
|
1610 |
insert_into_pkeys(missing_rows_joins, |
|
1611 |
[in_pkey_col, sql_gen.NamedCol(out_pkey, default)]) |
|
1612 |
|
|
1613 |
assert table_row_count(db, into) == table_row_count(db, in_table) |
|
1614 |
|
|
1615 |
empty_temp(db, in_table) |
|
1616 |
|
|
1617 |
srcs = [] |
|
1618 |
if is_func: srcs = sql_gen.cols_srcs(in_cols) |
|
1619 |
return sql_gen.Col(out_pkey, into, srcs) |
|
1620 |
|
|
1621 | 1306 |
##### Data cleanup |
1622 | 1307 |
|
1623 | 1308 |
def cleanup_table(db, table, cols): |
lib/db_xml.py | ||
---|---|---|
7 | 7 |
import exc |
8 | 8 |
import Parser |
9 | 9 |
import sql |
10 |
import sql_io |
|
10 | 11 |
import sql_gen |
11 | 12 |
import strings |
12 | 13 |
import util |
... | ... | |
101 | 102 |
try: |
102 | 103 |
for try_num in xrange(2): |
103 | 104 |
try: |
104 |
id_ = sql.put(db, table, row, pkey_, row_ct_ref) |
|
105 |
id_ = sql_io.put(db, table, row, pkey_, row_ct_ref)
|
|
105 | 106 |
if store_ids: xml_dom.set_id(node, id_) |
106 | 107 |
break |
107 | 108 |
except sql.NullValueException, e: |
... | ... | |
226 | 227 |
row[out_col] = sql_gen.NamedCol(out_col, value) |
227 | 228 |
|
228 | 229 |
# Insert node |
229 |
pkeys_loc = sql.put_table(db, out_table, in_tables, row, row_ins_ct_ref, |
|
230 |
pkeys_loc = sql_io.put_table(db, out_table, in_tables, row, row_ins_ct_ref,
|
|
230 | 231 |
None, next, is_func, on_error) |
231 | 232 |
|
232 | 233 |
sql.empty_temp(db, set(in_tables) - no_empty) |
lib/xml_func.py | ||
---|---|---|
10 | 10 |
import exc |
11 | 11 |
import format |
12 | 12 |
import maps |
13 |
import sql |
|
13 |
import sql_io
|
|
14 | 14 |
import strings |
15 | 15 |
import term |
16 | 16 |
import units |
... | ... | |
100 | 100 |
# pass-through optimization for aggregating functions with one arg |
101 | 101 |
value = items[0][1] # pass through first arg |
102 | 102 |
elif row_mode and name in rel_funcs: # row-based mode: evaluate using DB |
103 |
value = sql.put(db, name, dict(items)) |
|
103 |
value = sql_io.put(db, name, dict(items))
|
|
104 | 104 |
elif column_mode and not name in structural_funcs: # column-based mode |
105 | 105 |
if name in rel_funcs: return # preserve relational functions |
106 | 106 |
# otherwise XML-only, so just replace with last param |
lib/sql_io.py | ||
---|---|---|
1 |
# Database import/export |
|
2 |
|
|
3 |
import exc |
|
4 |
import dicts |
|
5 |
import sql |
|
6 |
import sql_gen |
|
7 |
import strings |
|
8 |
import util |
|
9 |
|
|
10 |
def put(db, table, row, pkey_=None, row_ct_ref=None): |
|
11 |
'''Recovers from errors. |
|
12 |
Only works under PostgreSQL (uses INSERT RETURNING). |
|
13 |
''' |
|
14 |
row = sql_gen.ColDict(db, table, row) |
|
15 |
if pkey_ == None: pkey_ = sql.pkey(db, table, recover=True) |
|
16 |
|
|
17 |
try: |
|
18 |
cur = sql.insert(db, table, row, pkey_, recover=True) |
|
19 |
if row_ct_ref != None and cur.rowcount >= 0: |
|
20 |
row_ct_ref[0] += cur.rowcount |
|
21 |
return sql.value(cur) |
|
22 |
except sql.DuplicateKeyException, e: |
|
23 |
row = sql_gen.ColDict(db, table, |
|
24 |
util.dict_subset_right_join(row, e.cols)) |
|
25 |
return sql.value(sql.select(db, table, [pkey_], row, recover=True)) |
|
26 |
|
|
27 |
def get(db, table, row, pkey, row_ct_ref=None, create=False): |
|
28 |
'''Recovers from errors''' |
|
29 |
try: |
|
30 |
return sql.value(sql.select(db, table, [pkey], row, limit=1, |
|
31 |
recover=True)) |
|
32 |
except StopIteration: |
|
33 |
if not create: raise |
|
34 |
return put(db, table, row, pkey, row_ct_ref) # insert new row |
|
35 |
|
|
36 |
def is_func_result(col): |
|
37 |
return col.table.name.find('(') >= 0 and col.name == 'result' |
|
38 |
|
|
39 |
def into_table_name(out_table, in_tables0, mapping, is_func): |
|
40 |
def in_col_str(in_col): |
|
41 |
in_col = sql_gen.remove_col_rename(in_col) |
|
42 |
if isinstance(in_col, sql_gen.Col): |
|
43 |
table = in_col.table |
|
44 |
if table == in_tables0: |
|
45 |
in_col = sql_gen.to_name_only_col(in_col) |
|
46 |
elif is_func_result(in_col): in_col = table # omit col name |
|
47 |
return str(in_col) |
|
48 |
|
|
49 |
str_ = str(out_table) |
|
50 |
if is_func: |
|
51 |
str_ += '(' |
|
52 |
|
|
53 |
try: value_in_col = mapping['value'] |
|
54 |
except KeyError: |
|
55 |
str_ += ', '.join((str(k)+'='+in_col_str(v) |
|
56 |
for k, v in mapping.iteritems())) |
|
57 |
else: str_ += in_col_str(value_in_col) |
|
58 |
|
|
59 |
str_ += ')' |
|
60 |
else: |
|
61 |
out_col = 'rank' |
|
62 |
try: in_col = mapping[out_col] |
|
63 |
except KeyError: str_ += '_pkeys' |
|
64 |
else: # has a rank column, so hierarchical |
|
65 |
str_ += '['+str(out_col)+'='+in_col_str(in_col)+']' |
|
66 |
return str_ |
|
67 |
|
|
68 |
def put_table(db, out_table, in_tables, mapping, row_ct_ref=None, into=None, |
|
69 |
default=None, is_func=False, on_error=exc.raise_): |
|
70 |
'''Recovers from errors. |
|
71 |
Only works under PostgreSQL (uses INSERT RETURNING). |
|
72 |
IMPORTANT: Must be run at the *beginning* of a transaction. |
|
73 |
@param in_tables The main input table to select from, followed by a list of |
|
74 |
tables to join with it using the main input table's pkey |
|
75 |
@param mapping dict(out_table_col=in_table_col, ...) |
|
76 |
* out_table_col: str (*not* sql_gen.Col) |
|
77 |
* in_table_col: sql_gen.Col|literal-value |
|
78 |
@param into The table to contain the output and input pkeys. |
|
79 |
Defaults to `out_table.name+'_pkeys'`. |
|
80 |
@param default The *output* column to use as the pkey for missing rows. |
|
81 |
If this output column does not exist in the mapping, uses None. |
|
82 |
@param is_func Whether out_table is the name of a SQL function, not a table |
|
83 |
@return sql_gen.Col Where the output pkeys are made available |
|
84 |
''' |
|
85 |
out_table = sql_gen.as_Table(out_table) |
|
86 |
|
|
87 |
def log_debug(msg): db.log_debug(msg, level=1.5) |
|
88 |
def col_ustr(str_): |
|
89 |
return strings.repr_no_u(sql_gen.remove_col_rename(str_)) |
|
90 |
|
|
91 |
log_debug('********** New iteration **********') |
|
92 |
log_debug('Inserting these input columns into '+strings.as_tt( |
|
93 |
out_table.to_str(db))+':\n'+strings.as_table(mapping, ustr=col_ustr)) |
|
94 |
|
|
95 |
is_function = sql.function_exists(db, out_table) |
|
96 |
|
|
97 |
if is_function: out_pkey = 'result' |
|
98 |
else: out_pkey = sql.pkey(db, out_table, recover=True) |
|
99 |
out_pkey_col = sql_gen.as_Col(out_pkey, out_table) |
|
100 |
|
|
101 |
if mapping == {}: # need at least one column for INSERT SELECT |
|
102 |
mapping = {out_pkey: None} # ColDict will replace with default value |
|
103 |
|
|
104 |
# Create input joins from list of input tables |
|
105 |
in_tables_ = in_tables[:] # don't modify input! |
|
106 |
in_tables0 = in_tables_.pop(0) # first table is separate |
|
107 |
errors_table_ = sql.errors_table(db, in_tables0) |
|
108 |
in_pkey = sql.pkey(db, in_tables0, recover=True) |
|
109 |
in_pkey_col = sql_gen.as_Col(in_pkey, in_tables0) |
|
110 |
input_joins = [in_tables0]+[sql_gen.Join(v, |
|
111 |
{in_pkey: sql_gen.join_same_not_null}) for v in in_tables_] |
|
112 |
|
|
113 |
if into == None: |
|
114 |
into = into_table_name(out_table, in_tables0, mapping, is_func) |
|
115 |
into = sql_gen.as_Table(into) |
|
116 |
|
|
117 |
# Set column sources |
|
118 |
in_cols = filter(sql_gen.is_table_col, mapping.values()) |
|
119 |
for col in in_cols: |
|
120 |
if col.table == in_tables0: col.set_srcs(sql_gen.src_self) |
|
121 |
|
|
122 |
log_debug('Joining together input tables into temp table') |
|
123 |
# Place in new table for speed and so don't modify input if values edited |
|
124 |
in_table = sql_gen.Table('in') |
|
125 |
mapping = dicts.join(mapping, sql.flatten(db, in_table, input_joins, |
|
126 |
in_cols, preserve=[in_pkey_col], start=0)) |
|
127 |
input_joins = [in_table] |
|
128 |
db.log_debug('Temp table: '+strings.as_tt(in_table.to_str(db)), level=2) |
|
129 |
|
|
130 |
mapping = sql_gen.ColDict(db, out_table, mapping) |
|
131 |
# after applying dicts.join() because that returns a plain dict |
|
132 |
|
|
133 |
# Resolve default value column |
|
134 |
if default != None: |
|
135 |
try: default = mapping[default] |
|
136 |
except KeyError: |
|
137 |
db.log_debug('Default value column ' |
|
138 |
+strings.as_tt(strings.repr_no_u(default)) |
|
139 |
+' does not exist in mapping, falling back to None', level=2.1) |
|
140 |
default = None |
|
141 |
|
|
142 |
pkeys_names = [in_pkey, out_pkey] |
|
143 |
pkeys_cols = [in_pkey_col, out_pkey_col] |
|
144 |
|
|
145 |
pkeys_table_exists_ref = [False] |
|
146 |
def insert_into_pkeys(joins, cols, distinct=False): |
|
147 |
kw_args = {} |
|
148 |
if distinct: kw_args.update(dict(distinct_on=[in_pkey_col])) |
|
149 |
query = sql.mk_select(db, joins, cols, order_by=None, start=0, |
|
150 |
**kw_args) |
|
151 |
|
|
152 |
if pkeys_table_exists_ref[0]: |
|
153 |
sql.insert_select(db, into, pkeys_names, query) |
|
154 |
else: |
|
155 |
sql.run_query_into(db, query, into=into) |
|
156 |
pkeys_table_exists_ref[0] = True |
|
157 |
|
|
158 |
limit_ref = [None] |
|
159 |
conds = set() |
|
160 |
distinct_on = sql_gen.ColDict(db, out_table) |
|
161 |
def mk_main_select(joins, cols): |
|
162 |
distinct_on_cols = [c.to_Col() for c in distinct_on.values()] |
|
163 |
return sql.mk_select(db, joins, cols, conds, distinct_on_cols, |
|
164 |
limit=limit_ref[0], start=0) |
|
165 |
|
|
166 |
exc_strs = set() |
|
167 |
def log_exc(e): |
|
168 |
e_str = exc.str_(e, first_line_only=True) |
|
169 |
log_debug('Caught exception: '+e_str) |
|
170 |
assert e_str not in exc_strs # avoid infinite loops |
|
171 |
exc_strs.add(e_str) |
|
172 |
|
|
173 |
def remove_all_rows(): |
|
174 |
log_debug('Ignoring all rows') |
|
175 |
limit_ref[0] = 0 # just create an empty pkeys table |
|
176 |
|
|
177 |
def ignore(in_col, value, e): |
|
178 |
sql.track_data_error(db, errors_table_, in_col.srcs, value, |
|
179 |
e.cause.pgcode, e.cause.pgerror) |
|
180 |
log_debug('Ignoring rows with '+strings.as_tt(repr(in_col))+' = ' |
|
181 |
+strings.as_tt(repr(value))) |
|
182 |
|
|
183 |
def remove_rows(in_col, value, e): |
|
184 |
ignore(in_col, value, e) |
|
185 |
cond = (in_col, sql_gen.CompareCond(value, '!=')) |
|
186 |
assert cond not in conds # avoid infinite loops |
|
187 |
conds.add(cond) |
|
188 |
|
|
189 |
def invalid2null(in_col, value, e): |
|
190 |
ignore(in_col, value, e) |
|
191 |
sql.update(db, in_table, [(in_col, None)], |
|
192 |
sql_gen.ColValueCond(in_col, value)) |
|
193 |
|
|
194 |
def insert_pkeys_table(which): |
|
195 |
return sql_gen.Table(sql_gen.concat(in_table.name, |
|
196 |
'_insert_'+which+'_pkeys')) |
|
197 |
insert_out_pkeys = insert_pkeys_table('out') |
|
198 |
insert_in_pkeys = insert_pkeys_table('in') |
|
199 |
|
|
200 |
# Do inserts and selects |
|
201 |
join_cols = sql_gen.ColDict(db, out_table) |
|
202 |
while True: |
|
203 |
if limit_ref[0] == 0: # special case |
|
204 |
log_debug('Creating an empty pkeys table') |
|
205 |
cur = sql.run_query_into(db, sql.mk_select(db, out_table, |
|
206 |
[out_pkey], limit=limit_ref[0]), into=insert_out_pkeys) |
|
207 |
break # don't do main case |
|
208 |
|
|
209 |
has_joins = join_cols != {} |
|
210 |
|
|
211 |
log_debug('Trying to insert new rows') |
|
212 |
|
|
213 |
# Prepare to insert new rows |
|
214 |
insert_joins = input_joins[:] # don't modify original! |
|
215 |
insert_args = dict(recover=True, cacheable=False) |
|
216 |
if has_joins: |
|
217 |
insert_args.update(dict(ignore=True)) |
|
218 |
else: |
|
219 |
insert_args.update(dict(returning=out_pkey, into=insert_out_pkeys)) |
|
220 |
main_select = mk_main_select(insert_joins, mapping.values()) |
|
221 |
|
|
222 |
def main_insert(): |
|
223 |
if is_function: |
|
224 |
log_debug('Calling function on input rows') |
|
225 |
args = dict(((k.name, v) for k, v in mapping.iteritems())) |
|
226 |
func_call = sql_gen.NamedCol(out_pkey, |
|
227 |
sql_gen.FunctionCall(out_table, **args)) |
|
228 |
insert_into_pkeys(input_joins, [in_pkey_col, func_call]) |
|
229 |
return None |
|
230 |
else: |
|
231 |
return sql.insert_select(db, out_table, mapping.keys(), |
|
232 |
main_select, **insert_args) |
|
233 |
|
|
234 |
try: |
|
235 |
cur = sql.with_savepoint(db, main_insert) |
|
236 |
break # insert successful |
|
237 |
except sql.MissingCastException, e: |
|
238 |
log_exc(e) |
|
239 |
|
|
240 |
out_col = e.col |
|
241 |
type_ = e.type |
|
242 |
|
|
243 |
log_debug('Casting '+strings.as_tt(out_col)+' input to ' |
|
244 |
+strings.as_tt(type_)) |
|
245 |
mapping[out_col] = sql.cast_temp_col(db, type_, mapping[out_col], |
|
246 |
errors_table_) |
|
247 |
except sql.DuplicateKeyException, e: |
|
248 |
log_exc(e) |
|
249 |
|
|
250 |
old_join_cols = join_cols.copy() |
|
251 |
distinct_on.update(util.dict_subset(mapping, e.cols)) |
|
252 |
join_cols.update(util.dict_subset_right_join(mapping, e.cols)) |
|
253 |
log_debug('Ignoring existing rows, comparing on these columns:\n' |
|
254 |
+strings.as_inline_table(join_cols, ustr=col_ustr)) |
|
255 |
assert join_cols != old_join_cols # avoid infinite loops |
|
256 |
except sql.NullValueException, e: |
|
257 |
log_exc(e) |
|
258 |
|
|
259 |
out_col, = e.cols |
|
260 |
try: in_col = mapping[out_col] |
|
261 |
except KeyError: |
|
262 |
log_debug('Missing mapping for NOT NULL column '+out_col) |
|
263 |
remove_all_rows() |
|
264 |
else: remove_rows(in_col, None, e) |
|
265 |
except sql.FunctionValueException, e: |
|
266 |
log_exc(e) |
|
267 |
|
|
268 |
func_name = e.name |
|
269 |
value = e.value |
|
270 |
for out_col, in_col in mapping.iteritems(): |
|
271 |
in_col = sql_gen.unwrap_func_call(in_col, func_name) |
|
272 |
invalid2null(in_col, value, e) |
|
273 |
except sql.DatabaseErrors, e: |
|
274 |
log_exc(e) |
|
275 |
|
|
276 |
log_debug('No handler for exception') |
|
277 |
on_error(e) |
|
278 |
remove_all_rows() |
|
279 |
# after exception handled, rerun loop with additional constraints |
|
280 |
|
|
281 |
if cur != None and row_ct_ref != None and cur.rowcount >= 0: |
|
282 |
row_ct_ref[0] += cur.rowcount |
|
283 |
|
|
284 |
if is_function: pass # pkeys table already created |
|
285 |
elif has_joins: |
|
286 |
select_joins = input_joins+[sql_gen.Join(out_table, join_cols)] |
|
287 |
log_debug('Getting output table pkeys of existing/inserted rows') |
|
288 |
insert_into_pkeys(select_joins, pkeys_cols, distinct=True) |
|
289 |
else: |
|
290 |
sql.add_row_num(db, insert_out_pkeys) # for joining with input pkeys |
|
291 |
|
|
292 |
log_debug('Getting input table pkeys of inserted rows') |
|
293 |
sql.run_query_into(db, mk_main_select(input_joins, [in_pkey]), |
|
294 |
into=insert_in_pkeys) |
|
295 |
sql.add_row_num(db, insert_in_pkeys) # for joining with output pkeys |
|
296 |
|
|
297 |
assert sql.table_row_count(db, insert_out_pkeys) == sql.table_row_count( |
|
298 |
db, insert_in_pkeys) |
|
299 |
|
|
300 |
log_debug('Combining output and input pkeys in inserted order') |
|
301 |
pkey_joins = [insert_in_pkeys, sql_gen.Join(insert_out_pkeys, |
|
302 |
{sql.row_num_col: sql_gen.join_same_not_null})] |
|
303 |
insert_into_pkeys(pkey_joins, pkeys_names) |
|
304 |
|
|
305 |
sql.empty_temp(db, [insert_out_pkeys, insert_in_pkeys]) |
|
306 |
|
|
307 |
db.log_debug('Adding pkey on pkeys table to enable fast joins', level=2.5) |
|
308 |
sql.add_pkey(db, into) |
|
309 |
|
|
310 |
log_debug('Setting pkeys of missing rows to '+strings.as_tt(repr(default))) |
|
311 |
missing_rows_joins = input_joins+[sql_gen.Join(into, |
|
312 |
{in_pkey: sql_gen.join_same_not_null}, sql_gen.filter_out)] |
|
313 |
# must use join_same_not_null or query will take forever |
|
314 |
insert_into_pkeys(missing_rows_joins, |
|
315 |
[in_pkey_col, sql_gen.NamedCol(out_pkey, default)]) |
|
316 |
|
|
317 |
assert sql.table_row_count(db, into) == sql.table_row_count(db, in_table) |
|
318 |
|
|
319 |
sql.empty_temp(db, in_table) |
|
320 |
|
|
321 |
srcs = [] |
|
322 |
if is_func: srcs = sql_gen.cols_srcs(in_cols) |
|
323 |
return sql_gen.Col(out_pkey, into, srcs) |
Also available in: Unified diff
Moved Heuristic queries from sql.py to new sql_io.py