Project

General

Profile

« Previous | Next » 

Revision 3077

Moved Heuristic queries from sql.py to new sql_io.py

View differences:

lib/sql.py
1 1
# Database access
2 2

  
3 3
import copy
4
import operator
5 4
import re
6 5
import warnings
7 6

  
......
1304 1303
    '''For kw_args, see tables()'''
1305 1304
    for table in tables(db, schema, **kw_args): truncate(db, table, schema)
1306 1305

  
1307
##### Heuristic queries
1308

  
1309
def put(db, table, row, pkey_=None, row_ct_ref=None):
1310
    '''Recovers from errors.
1311
    Only works under PostgreSQL (uses INSERT RETURNING).
1312
    '''
1313
    row = sql_gen.ColDict(db, table, row)
1314
    if pkey_ == None: pkey_ = pkey(db, table, recover=True)
1315
    
1316
    try:
1317
        cur = insert(db, table, row, pkey_, recover=True)
1318
        if row_ct_ref != None and cur.rowcount >= 0:
1319
            row_ct_ref[0] += cur.rowcount
1320
        return value(cur)
1321
    except DuplicateKeyException, e:
1322
        row = sql_gen.ColDict(db, table,
1323
            util.dict_subset_right_join(row, e.cols))
1324
        return value(select(db, table, [pkey_], row, recover=True))
1325

  
1326
def get(db, table, row, pkey, row_ct_ref=None, create=False):
1327
    '''Recovers from errors'''
1328
    try: return value(select(db, table, [pkey], row, limit=1, recover=True))
1329
    except StopIteration:
1330
        if not create: raise
1331
        return put(db, table, row, pkey, row_ct_ref) # insert new row
1332

  
1333
def is_func_result(col):
1334
    return col.table.name.find('(') >= 0 and col.name == 'result'
1335

  
1336
def into_table_name(out_table, in_tables0, mapping, is_func):
1337
    def in_col_str(in_col):
1338
        in_col = sql_gen.remove_col_rename(in_col)
1339
        if isinstance(in_col, sql_gen.Col):
1340
            table = in_col.table
1341
            if table == in_tables0:
1342
                in_col = sql_gen.to_name_only_col(in_col)
1343
            elif is_func_result(in_col): in_col = table # omit col name
1344
        return str(in_col)
1345
    
1346
    str_ = str(out_table)
1347
    if is_func:
1348
        str_ += '('
1349
        
1350
        try: value_in_col = mapping['value']
1351
        except KeyError:
1352
            str_ += ', '.join((str(k)+'='+in_col_str(v)
1353
                for k, v in mapping.iteritems()))
1354
        else: str_ += in_col_str(value_in_col)
1355
        
1356
        str_ += ')'
1357
    else:
1358
        out_col = 'rank'
1359
        try: in_col = mapping[out_col]
1360
        except KeyError: str_ += '_pkeys'
1361
        else: # has a rank column, so hierarchical
1362
            str_ += '['+str(out_col)+'='+in_col_str(in_col)+']'
1363
    return str_
1364

  
1365
def put_table(db, out_table, in_tables, mapping, row_ct_ref=None, into=None,
1366
    default=None, is_func=False, on_error=exc.raise_):
1367
    '''Recovers from errors.
1368
    Only works under PostgreSQL (uses INSERT RETURNING).
1369
    IMPORTANT: Must be run at the *beginning* of a transaction.
1370
    @param in_tables The main input table to select from, followed by a list of
1371
        tables to join with it using the main input table's pkey
1372
    @param mapping dict(out_table_col=in_table_col, ...)
1373
        * out_table_col: str (*not* sql_gen.Col)
1374
        * in_table_col: sql_gen.Col|literal-value
1375
    @param into The table to contain the output and input pkeys.
1376
        Defaults to `out_table.name+'_pkeys'`.
1377
    @param default The *output* column to use as the pkey for missing rows.
1378
        If this output column does not exist in the mapping, uses None.
1379
    @param is_func Whether out_table is the name of a SQL function, not a table
1380
    @return sql_gen.Col Where the output pkeys are made available
1381
    '''
1382
    out_table = sql_gen.as_Table(out_table)
1383
    
1384
    def log_debug(msg): db.log_debug(msg, level=1.5)
1385
    def col_ustr(str_):
1386
        return strings.repr_no_u(sql_gen.remove_col_rename(str_))
1387
    
1388
    log_debug('********** New iteration **********')
1389
    log_debug('Inserting these input columns into '+strings.as_tt(
1390
        out_table.to_str(db))+':\n'+strings.as_table(mapping, ustr=col_ustr))
1391
    
1392
    is_function = function_exists(db, out_table)
1393
    
1394
    if is_function: out_pkey = 'result'
1395
    else: out_pkey = pkey(db, out_table, recover=True)
1396
    out_pkey_col = sql_gen.as_Col(out_pkey, out_table)
1397
    
1398
    if mapping == {}: # need at least one column for INSERT SELECT
1399
        mapping = {out_pkey: None} # ColDict will replace with default value
1400
    
1401
    # Create input joins from list of input tables
1402
    in_tables_ = in_tables[:] # don't modify input!
1403
    in_tables0 = in_tables_.pop(0) # first table is separate
1404
    errors_table_ = errors_table(db, in_tables0)
1405
    in_pkey = pkey(db, in_tables0, recover=True)
1406
    in_pkey_col = sql_gen.as_Col(in_pkey, in_tables0)
1407
    input_joins = [in_tables0]+[sql_gen.Join(v,
1408
        {in_pkey: sql_gen.join_same_not_null}) for v in in_tables_]
1409
    
1410
    if into == None:
1411
        into = into_table_name(out_table, in_tables0, mapping, is_func)
1412
    into = sql_gen.as_Table(into)
1413
    
1414
    # Set column sources
1415
    in_cols = filter(sql_gen.is_table_col, mapping.values())
1416
    for col in in_cols:
1417
        if col.table == in_tables0: col.set_srcs(sql_gen.src_self)
1418
    
1419
    log_debug('Joining together input tables into temp table')
1420
    # Place in new table for speed and so don't modify input if values edited
1421
    in_table = sql_gen.Table('in')
1422
    mapping = dicts.join(mapping, flatten(db, in_table, input_joins, in_cols,
1423
        preserve=[in_pkey_col], start=0))
1424
    input_joins = [in_table]
1425
    db.log_debug('Temp table: '+strings.as_tt(in_table.to_str(db)), level=2)
1426
    
1427
    mapping = sql_gen.ColDict(db, out_table, mapping)
1428
        # after applying dicts.join() because that returns a plain dict
1429
    
1430
    # Resolve default value column
1431
    if default != None:
1432
        try: default = mapping[default]
1433
        except KeyError:
1434
            db.log_debug('Default value column '
1435
                +strings.as_tt(strings.repr_no_u(default))
1436
                +' does not exist in mapping, falling back to None', level=2.1)
1437
            default = None
1438
    
1439
    pkeys_names = [in_pkey, out_pkey]
1440
    pkeys_cols = [in_pkey_col, out_pkey_col]
1441
    
1442
    pkeys_table_exists_ref = [False]
1443
    def insert_into_pkeys(joins, cols, distinct=False):
1444
        kw_args = {}
1445
        if distinct: kw_args.update(dict(distinct_on=[in_pkey_col]))
1446
        query = mk_select(db, joins, cols, order_by=None, start=0, **kw_args)
1447
        
1448
        if pkeys_table_exists_ref[0]:
1449
            insert_select(db, into, pkeys_names, query)
1450
        else:
1451
            run_query_into(db, query, into=into)
1452
            pkeys_table_exists_ref[0] = True
1453
    
1454
    limit_ref = [None]
1455
    conds = set()
1456
    distinct_on = sql_gen.ColDict(db, out_table)
1457
    def mk_main_select(joins, cols):
1458
        distinct_on_cols = [c.to_Col() for c in distinct_on.values()]
1459
        return mk_select(db, joins, cols, conds, distinct_on_cols,
1460
            limit=limit_ref[0], start=0)
1461
    
1462
    exc_strs = set()
1463
    def log_exc(e):
1464
        e_str = exc.str_(e, first_line_only=True)
1465
        log_debug('Caught exception: '+e_str)
1466
        assert e_str not in exc_strs # avoid infinite loops
1467
        exc_strs.add(e_str)
1468
    
1469
    def remove_all_rows():
1470
        log_debug('Ignoring all rows')
1471
        limit_ref[0] = 0 # just create an empty pkeys table
1472
    
1473
    def ignore(in_col, value, e):
1474
        track_data_error(db, errors_table_, in_col.srcs, value, e.cause.pgcode,
1475
            e.cause.pgerror)
1476
        log_debug('Ignoring rows with '+strings.as_tt(repr(in_col))+' = '
1477
            +strings.as_tt(repr(value)))
1478
    
1479
    def remove_rows(in_col, value, e):
1480
        ignore(in_col, value, e)
1481
        cond = (in_col, sql_gen.CompareCond(value, '!='))
1482
        assert cond not in conds # avoid infinite loops
1483
        conds.add(cond)
1484
    
1485
    def invalid2null(in_col, value, e):
1486
        ignore(in_col, value, e)
1487
        update(db, in_table, [(in_col, None)],
1488
            sql_gen.ColValueCond(in_col, value))
1489
    
1490
    def insert_pkeys_table(which):
1491
        return sql_gen.Table(sql_gen.concat(in_table.name,
1492
            '_insert_'+which+'_pkeys'))
1493
    insert_out_pkeys = insert_pkeys_table('out')
1494
    insert_in_pkeys = insert_pkeys_table('in')
1495
    
1496
    # Do inserts and selects
1497
    join_cols = sql_gen.ColDict(db, out_table)
1498
    while True:
1499
        if limit_ref[0] == 0: # special case
1500
            log_debug('Creating an empty pkeys table')
1501
            cur = run_query_into(db, mk_select(db, out_table, [out_pkey],
1502
                limit=limit_ref[0]), into=insert_out_pkeys)
1503
            break # don't do main case
1504
        
1505
        has_joins = join_cols != {}
1506
        
1507
        log_debug('Trying to insert new rows')
1508
        
1509
        # Prepare to insert new rows
1510
        insert_joins = input_joins[:] # don't modify original!
1511
        insert_args = dict(recover=True, cacheable=False)
1512
        if has_joins:
1513
            insert_args.update(dict(ignore=True))
1514
        else:
1515
            insert_args.update(dict(returning=out_pkey, into=insert_out_pkeys))
1516
        main_select = mk_main_select(insert_joins, mapping.values())
1517
        
1518
        def main_insert():
1519
            if is_function:
1520
                log_debug('Calling function on input rows')
1521
                args = dict(((k.name, v) for k, v in mapping.iteritems()))
1522
                func_call = sql_gen.NamedCol(out_pkey,
1523
                    sql_gen.FunctionCall(out_table, **args))
1524
                insert_into_pkeys(input_joins, [in_pkey_col, func_call])
1525
                return None
1526
            else:
1527
                return insert_select(db, out_table, mapping.keys(), main_select,
1528
                    **insert_args)
1529
        
1530
        try:
1531
            cur = with_savepoint(db, main_insert)
1532
            break # insert successful
1533
        except MissingCastException, e:
1534
            log_exc(e)
1535
            
1536
            out_col = e.col
1537
            type_ = e.type
1538
            
1539
            log_debug('Casting '+strings.as_tt(out_col)+' input to '
1540
                +strings.as_tt(type_))
1541
            mapping[out_col] = cast_temp_col(db, type_, mapping[out_col],
1542
                errors_table_)
1543
        except DuplicateKeyException, e:
1544
            log_exc(e)
1545
            
1546
            old_join_cols = join_cols.copy()
1547
            distinct_on.update(util.dict_subset(mapping, e.cols))
1548
            join_cols.update(util.dict_subset_right_join(mapping, e.cols))
1549
            log_debug('Ignoring existing rows, comparing on these columns:\n'
1550
                +strings.as_inline_table(join_cols, ustr=col_ustr))
1551
            assert join_cols != old_join_cols # avoid infinite loops
1552
        except NullValueException, e:
1553
            log_exc(e)
1554
            
1555
            out_col, = e.cols
1556
            try: in_col = mapping[out_col]
1557
            except KeyError:
1558
                log_debug('Missing mapping for NOT NULL column '+out_col)
1559
                remove_all_rows()
1560
            else: remove_rows(in_col, None, e)
1561
        except FunctionValueException, e:
1562
            log_exc(e)
1563
            
1564
            func_name = e.name
1565
            value = e.value
1566
            for out_col, in_col in mapping.iteritems():
1567
                in_col = sql_gen.unwrap_func_call(in_col, func_name)
1568
                invalid2null(in_col, value, e)
1569
        except DatabaseErrors, e:
1570
            log_exc(e)
1571
            
1572
            log_debug('No handler for exception')
1573
            on_error(e)
1574
            remove_all_rows()
1575
        # after exception handled, rerun loop with additional constraints
1576
    
1577
    if cur != None and row_ct_ref != None and cur.rowcount >= 0:
1578
        row_ct_ref[0] += cur.rowcount
1579
    
1580
    if is_function: pass # pkeys table already created
1581
    elif has_joins:
1582
        select_joins = input_joins+[sql_gen.Join(out_table, join_cols)]
1583
        log_debug('Getting output table pkeys of existing/inserted rows')
1584
        insert_into_pkeys(select_joins, pkeys_cols, distinct=True)
1585
    else:
1586
        add_row_num(db, insert_out_pkeys) # for joining with input pkeys
1587
        
1588
        log_debug('Getting input table pkeys of inserted rows')
1589
        run_query_into(db, mk_main_select(input_joins, [in_pkey]),
1590
            into=insert_in_pkeys)
1591
        add_row_num(db, insert_in_pkeys) # for joining with output pkeys
1592
        
1593
        assert table_row_count(db, insert_out_pkeys) == table_row_count(db,
1594
            insert_in_pkeys)
1595
        
1596
        log_debug('Combining output and input pkeys in inserted order')
1597
        pkey_joins = [insert_in_pkeys, sql_gen.Join(insert_out_pkeys,
1598
            {row_num_col: sql_gen.join_same_not_null})]
1599
        insert_into_pkeys(pkey_joins, pkeys_names)
1600
        
1601
        empty_temp(db, [insert_out_pkeys, insert_in_pkeys])
1602
    
1603
    db.log_debug('Adding pkey on pkeys table to enable fast joins', level=2.5)
1604
    add_pkey(db, into)
1605
    
1606
    log_debug('Setting pkeys of missing rows to '+strings.as_tt(repr(default)))
1607
    missing_rows_joins = input_joins+[sql_gen.Join(into,
1608
        {in_pkey: sql_gen.join_same_not_null}, sql_gen.filter_out)]
1609
        # must use join_same_not_null or query will take forever
1610
    insert_into_pkeys(missing_rows_joins,
1611
        [in_pkey_col, sql_gen.NamedCol(out_pkey, default)])
1612
    
1613
    assert table_row_count(db, into) == table_row_count(db, in_table)
1614
    
1615
    empty_temp(db, in_table)
1616
    
1617
    srcs = []
1618
    if is_func: srcs = sql_gen.cols_srcs(in_cols)
1619
    return sql_gen.Col(out_pkey, into, srcs)
1620

  
1621 1306
##### Data cleanup
1622 1307

  
1623 1308
def cleanup_table(db, table, cols):
lib/db_xml.py
7 7
import exc
8 8
import Parser
9 9
import sql
10
import sql_io
10 11
import sql_gen
11 12
import strings
12 13
import util
......
101 102
    try:
102 103
        for try_num in xrange(2):
103 104
            try:
104
                id_ = sql.put(db, table, row, pkey_, row_ct_ref)
105
                id_ = sql_io.put(db, table, row, pkey_, row_ct_ref)
105 106
                if store_ids: xml_dom.set_id(node, id_)
106 107
                break
107 108
            except sql.NullValueException, e:
......
226 227
            row[out_col] = sql_gen.NamedCol(out_col, value)
227 228
    
228 229
    # Insert node
229
    pkeys_loc = sql.put_table(db, out_table, in_tables, row, row_ins_ct_ref,
230
    pkeys_loc = sql_io.put_table(db, out_table, in_tables, row, row_ins_ct_ref,
230 231
        None, next, is_func, on_error)
231 232
    
232 233
    sql.empty_temp(db, set(in_tables) - no_empty)
lib/xml_func.py
10 10
import exc
11 11
import format
12 12
import maps
13
import sql
13
import sql_io
14 14
import strings
15 15
import term
16 16
import units
......
100 100
        # pass-through optimization for aggregating functions with one arg
101 101
        value = items[0][1] # pass through first arg
102 102
    elif row_mode and name in rel_funcs: # row-based mode: evaluate using DB
103
        value = sql.put(db, name, dict(items))
103
        value = sql_io.put(db, name, dict(items))
104 104
    elif column_mode and not name in structural_funcs: # column-based mode
105 105
        if name in rel_funcs: return # preserve relational functions
106 106
        # otherwise XML-only, so just replace with last param
lib/sql_io.py
1
# Database import/export
2

  
3
import exc
4
import dicts
5
import sql
6
import sql_gen
7
import strings
8
import util
9

  
10
def put(db, table, row, pkey_=None, row_ct_ref=None):
11
    '''Recovers from errors.
12
    Only works under PostgreSQL (uses INSERT RETURNING).
13
    '''
14
    row = sql_gen.ColDict(db, table, row)
15
    if pkey_ == None: pkey_ = sql.pkey(db, table, recover=True)
16
    
17
    try:
18
        cur = sql.insert(db, table, row, pkey_, recover=True)
19
        if row_ct_ref != None and cur.rowcount >= 0:
20
            row_ct_ref[0] += cur.rowcount
21
        return sql.value(cur)
22
    except sql.DuplicateKeyException, e:
23
        row = sql_gen.ColDict(db, table,
24
            util.dict_subset_right_join(row, e.cols))
25
        return sql.value(sql.select(db, table, [pkey_], row, recover=True))
26

  
27
def get(db, table, row, pkey, row_ct_ref=None, create=False):
28
    '''Recovers from errors'''
29
    try:
30
        return sql.value(sql.select(db, table, [pkey], row, limit=1,
31
            recover=True))
32
    except StopIteration:
33
        if not create: raise
34
        return put(db, table, row, pkey, row_ct_ref) # insert new row
35

  
36
def is_func_result(col):
37
    return col.table.name.find('(') >= 0 and col.name == 'result'
38

  
39
def into_table_name(out_table, in_tables0, mapping, is_func):
40
    def in_col_str(in_col):
41
        in_col = sql_gen.remove_col_rename(in_col)
42
        if isinstance(in_col, sql_gen.Col):
43
            table = in_col.table
44
            if table == in_tables0:
45
                in_col = sql_gen.to_name_only_col(in_col)
46
            elif is_func_result(in_col): in_col = table # omit col name
47
        return str(in_col)
48
    
49
    str_ = str(out_table)
50
    if is_func:
51
        str_ += '('
52
        
53
        try: value_in_col = mapping['value']
54
        except KeyError:
55
            str_ += ', '.join((str(k)+'='+in_col_str(v)
56
                for k, v in mapping.iteritems()))
57
        else: str_ += in_col_str(value_in_col)
58
        
59
        str_ += ')'
60
    else:
61
        out_col = 'rank'
62
        try: in_col = mapping[out_col]
63
        except KeyError: str_ += '_pkeys'
64
        else: # has a rank column, so hierarchical
65
            str_ += '['+str(out_col)+'='+in_col_str(in_col)+']'
66
    return str_
67

  
68
def put_table(db, out_table, in_tables, mapping, row_ct_ref=None, into=None,
69
    default=None, is_func=False, on_error=exc.raise_):
70
    '''Recovers from errors.
71
    Only works under PostgreSQL (uses INSERT RETURNING).
72
    IMPORTANT: Must be run at the *beginning* of a transaction.
73
    @param in_tables The main input table to select from, followed by a list of
74
        tables to join with it using the main input table's pkey
75
    @param mapping dict(out_table_col=in_table_col, ...)
76
        * out_table_col: str (*not* sql_gen.Col)
77
        * in_table_col: sql_gen.Col|literal-value
78
    @param into The table to contain the output and input pkeys.
79
        Defaults to `out_table.name+'_pkeys'`.
80
    @param default The *output* column to use as the pkey for missing rows.
81
        If this output column does not exist in the mapping, uses None.
82
    @param is_func Whether out_table is the name of a SQL function, not a table
83
    @return sql_gen.Col Where the output pkeys are made available
84
    '''
85
    out_table = sql_gen.as_Table(out_table)
86
    
87
    def log_debug(msg): db.log_debug(msg, level=1.5)
88
    def col_ustr(str_):
89
        return strings.repr_no_u(sql_gen.remove_col_rename(str_))
90
    
91
    log_debug('********** New iteration **********')
92
    log_debug('Inserting these input columns into '+strings.as_tt(
93
        out_table.to_str(db))+':\n'+strings.as_table(mapping, ustr=col_ustr))
94
    
95
    is_function = sql.function_exists(db, out_table)
96
    
97
    if is_function: out_pkey = 'result'
98
    else: out_pkey = sql.pkey(db, out_table, recover=True)
99
    out_pkey_col = sql_gen.as_Col(out_pkey, out_table)
100
    
101
    if mapping == {}: # need at least one column for INSERT SELECT
102
        mapping = {out_pkey: None} # ColDict will replace with default value
103
    
104
    # Create input joins from list of input tables
105
    in_tables_ = in_tables[:] # don't modify input!
106
    in_tables0 = in_tables_.pop(0) # first table is separate
107
    errors_table_ = sql.errors_table(db, in_tables0)
108
    in_pkey = sql.pkey(db, in_tables0, recover=True)
109
    in_pkey_col = sql_gen.as_Col(in_pkey, in_tables0)
110
    input_joins = [in_tables0]+[sql_gen.Join(v,
111
        {in_pkey: sql_gen.join_same_not_null}) for v in in_tables_]
112
    
113
    if into == None:
114
        into = into_table_name(out_table, in_tables0, mapping, is_func)
115
    into = sql_gen.as_Table(into)
116
    
117
    # Set column sources
118
    in_cols = filter(sql_gen.is_table_col, mapping.values())
119
    for col in in_cols:
120
        if col.table == in_tables0: col.set_srcs(sql_gen.src_self)
121
    
122
    log_debug('Joining together input tables into temp table')
123
    # Place in new table for speed and so don't modify input if values edited
124
    in_table = sql_gen.Table('in')
125
    mapping = dicts.join(mapping, sql.flatten(db, in_table, input_joins,
126
        in_cols, preserve=[in_pkey_col], start=0))
127
    input_joins = [in_table]
128
    db.log_debug('Temp table: '+strings.as_tt(in_table.to_str(db)), level=2)
129
    
130
    mapping = sql_gen.ColDict(db, out_table, mapping)
131
        # after applying dicts.join() because that returns a plain dict
132
    
133
    # Resolve default value column
134
    if default != None:
135
        try: default = mapping[default]
136
        except KeyError:
137
            db.log_debug('Default value column '
138
                +strings.as_tt(strings.repr_no_u(default))
139
                +' does not exist in mapping, falling back to None', level=2.1)
140
            default = None
141
    
142
    pkeys_names = [in_pkey, out_pkey]
143
    pkeys_cols = [in_pkey_col, out_pkey_col]
144
    
145
    pkeys_table_exists_ref = [False]
146
    def insert_into_pkeys(joins, cols, distinct=False):
147
        kw_args = {}
148
        if distinct: kw_args.update(dict(distinct_on=[in_pkey_col]))
149
        query = sql.mk_select(db, joins, cols, order_by=None, start=0,
150
            **kw_args)
151
        
152
        if pkeys_table_exists_ref[0]:
153
            sql.insert_select(db, into, pkeys_names, query)
154
        else:
155
            sql.run_query_into(db, query, into=into)
156
            pkeys_table_exists_ref[0] = True
157
    
158
    limit_ref = [None]
159
    conds = set()
160
    distinct_on = sql_gen.ColDict(db, out_table)
161
    def mk_main_select(joins, cols):
162
        distinct_on_cols = [c.to_Col() for c in distinct_on.values()]
163
        return sql.mk_select(db, joins, cols, conds, distinct_on_cols,
164
            limit=limit_ref[0], start=0)
165
    
166
    exc_strs = set()
167
    def log_exc(e):
168
        e_str = exc.str_(e, first_line_only=True)
169
        log_debug('Caught exception: '+e_str)
170
        assert e_str not in exc_strs # avoid infinite loops
171
        exc_strs.add(e_str)
172
    
173
    def remove_all_rows():
174
        log_debug('Ignoring all rows')
175
        limit_ref[0] = 0 # just create an empty pkeys table
176
    
177
    def ignore(in_col, value, e):
178
        sql.track_data_error(db, errors_table_, in_col.srcs, value,
179
            e.cause.pgcode, e.cause.pgerror)
180
        log_debug('Ignoring rows with '+strings.as_tt(repr(in_col))+' = '
181
            +strings.as_tt(repr(value)))
182
    
183
    def remove_rows(in_col, value, e):
184
        ignore(in_col, value, e)
185
        cond = (in_col, sql_gen.CompareCond(value, '!='))
186
        assert cond not in conds # avoid infinite loops
187
        conds.add(cond)
188
    
189
    def invalid2null(in_col, value, e):
190
        ignore(in_col, value, e)
191
        sql.update(db, in_table, [(in_col, None)],
192
            sql_gen.ColValueCond(in_col, value))
193
    
194
    def insert_pkeys_table(which):
195
        return sql_gen.Table(sql_gen.concat(in_table.name,
196
            '_insert_'+which+'_pkeys'))
197
    insert_out_pkeys = insert_pkeys_table('out')
198
    insert_in_pkeys = insert_pkeys_table('in')
199
    
200
    # Do inserts and selects
201
    join_cols = sql_gen.ColDict(db, out_table)
202
    while True:
203
        if limit_ref[0] == 0: # special case
204
            log_debug('Creating an empty pkeys table')
205
            cur = sql.run_query_into(db, sql.mk_select(db, out_table,
206
                [out_pkey], limit=limit_ref[0]), into=insert_out_pkeys)
207
            break # don't do main case
208
        
209
        has_joins = join_cols != {}
210
        
211
        log_debug('Trying to insert new rows')
212
        
213
        # Prepare to insert new rows
214
        insert_joins = input_joins[:] # don't modify original!
215
        insert_args = dict(recover=True, cacheable=False)
216
        if has_joins:
217
            insert_args.update(dict(ignore=True))
218
        else:
219
            insert_args.update(dict(returning=out_pkey, into=insert_out_pkeys))
220
        main_select = mk_main_select(insert_joins, mapping.values())
221
        
222
        def main_insert():
223
            if is_function:
224
                log_debug('Calling function on input rows')
225
                args = dict(((k.name, v) for k, v in mapping.iteritems()))
226
                func_call = sql_gen.NamedCol(out_pkey,
227
                    sql_gen.FunctionCall(out_table, **args))
228
                insert_into_pkeys(input_joins, [in_pkey_col, func_call])
229
                return None
230
            else:
231
                return sql.insert_select(db, out_table, mapping.keys(),
232
                    main_select, **insert_args)
233
        
234
        try:
235
            cur = sql.with_savepoint(db, main_insert)
236
            break # insert successful
237
        except sql.MissingCastException, e:
238
            log_exc(e)
239
            
240
            out_col = e.col
241
            type_ = e.type
242
            
243
            log_debug('Casting '+strings.as_tt(out_col)+' input to '
244
                +strings.as_tt(type_))
245
            mapping[out_col] = sql.cast_temp_col(db, type_, mapping[out_col],
246
                errors_table_)
247
        except sql.DuplicateKeyException, e:
248
            log_exc(e)
249
            
250
            old_join_cols = join_cols.copy()
251
            distinct_on.update(util.dict_subset(mapping, e.cols))
252
            join_cols.update(util.dict_subset_right_join(mapping, e.cols))
253
            log_debug('Ignoring existing rows, comparing on these columns:\n'
254
                +strings.as_inline_table(join_cols, ustr=col_ustr))
255
            assert join_cols != old_join_cols # avoid infinite loops
256
        except sql.NullValueException, e:
257
            log_exc(e)
258
            
259
            out_col, = e.cols
260
            try: in_col = mapping[out_col]
261
            except KeyError:
262
                log_debug('Missing mapping for NOT NULL column '+out_col)
263
                remove_all_rows()
264
            else: remove_rows(in_col, None, e)
265
        except sql.FunctionValueException, e:
266
            log_exc(e)
267
            
268
            func_name = e.name
269
            value = e.value
270
            for out_col, in_col in mapping.iteritems():
271
                in_col = sql_gen.unwrap_func_call(in_col, func_name)
272
                invalid2null(in_col, value, e)
273
        except sql.DatabaseErrors, e:
274
            log_exc(e)
275
            
276
            log_debug('No handler for exception')
277
            on_error(e)
278
            remove_all_rows()
279
        # after exception handled, rerun loop with additional constraints
280
    
281
    if cur != None and row_ct_ref != None and cur.rowcount >= 0:
282
        row_ct_ref[0] += cur.rowcount
283
    
284
    if is_function: pass # pkeys table already created
285
    elif has_joins:
286
        select_joins = input_joins+[sql_gen.Join(out_table, join_cols)]
287
        log_debug('Getting output table pkeys of existing/inserted rows')
288
        insert_into_pkeys(select_joins, pkeys_cols, distinct=True)
289
    else:
290
        sql.add_row_num(db, insert_out_pkeys) # for joining with input pkeys
291
        
292
        log_debug('Getting input table pkeys of inserted rows')
293
        sql.run_query_into(db, mk_main_select(input_joins, [in_pkey]),
294
            into=insert_in_pkeys)
295
        sql.add_row_num(db, insert_in_pkeys) # for joining with output pkeys
296
        
297
        assert sql.table_row_count(db, insert_out_pkeys) == sql.table_row_count(
298
            db, insert_in_pkeys)
299
        
300
        log_debug('Combining output and input pkeys in inserted order')
301
        pkey_joins = [insert_in_pkeys, sql_gen.Join(insert_out_pkeys,
302
            {sql.row_num_col: sql_gen.join_same_not_null})]
303
        insert_into_pkeys(pkey_joins, pkeys_names)
304
        
305
        sql.empty_temp(db, [insert_out_pkeys, insert_in_pkeys])
306
    
307
    db.log_debug('Adding pkey on pkeys table to enable fast joins', level=2.5)
308
    sql.add_pkey(db, into)
309
    
310
    log_debug('Setting pkeys of missing rows to '+strings.as_tt(repr(default)))
311
    missing_rows_joins = input_joins+[sql_gen.Join(into,
312
        {in_pkey: sql_gen.join_same_not_null}, sql_gen.filter_out)]
313
        # must use join_same_not_null or query will take forever
314
    insert_into_pkeys(missing_rows_joins,
315
        [in_pkey_col, sql_gen.NamedCol(out_pkey, default)])
316
    
317
    assert sql.table_row_count(db, into) == sql.table_row_count(db, in_table)
318
    
319
    sql.empty_temp(db, in_table)
320
    
321
    srcs = []
322
    if is_func: srcs = sql_gen.cols_srcs(in_cols)
323
    return sql_gen.Col(out_pkey, into, srcs)

Also available in: Unified diff