Project

General

Profile

1
# Database import/export
2

    
3
import copy
4
import operator
5
import warnings
6

    
7
import exc
8
import dicts
9
import sql
10
import sql_gen
11
import strings
12
import util
13

    
14
##### Exceptions
15

    
16
# Can't use built-in SyntaxError because it stringifies to only the first line
17
class SyntaxError(Exception): pass
18

    
19
##### Data cleanup
20

    
21
null_strs = ['', r'\N', 'NULL', 'UNKNOWN']
22

    
23
def cleanup_table(db, table, cols):
24
    table = sql_gen.as_Table(table)
25
    cols = [sql_gen.as_Col(c, table) for c in cols]
26
    cols = filter(lambda c: sql_gen.is_text_col(db, c), cols)
27
    
28
    expr = 'trim(both from %s)'
29
    for null in null_strs: expr = 'nullif('+expr+', '+db.esc_value(null)+')'
30
    changes = [(v, sql_gen.CustomCode(expr % v.to_str(db))) for v in cols]
31
    
32
    sql.update(db, table, changes, in_place=True)
33

    
34
##### Error tracking
35

    
36
def track_data_error(db, errors_table, cols, value, error_code, error):
37
    '''
38
    @param errors_table If None, does nothing.
39
    '''
40
    if errors_table == None or cols == (): return
41
    
42
    for col in cols:
43
        try:
44
            sql.insert(db, errors_table, dict(column=col.name, value=value,
45
                error_code=error_code, error=error), recover=True,
46
                cacheable=True, log_level=4)
47
        except sql.DuplicateKeyException: pass
48

    
49
class ExcToErrorsTable(sql_gen.ExcToWarning):
50
    '''Handles an exception by saving it or converting it to a warning.'''
51
    def __init__(self, return_, srcs, errors_table, value=None):
52
        '''
53
        @param return_ See sql_gen.ExcToWarning
54
        @param srcs The column names for the errors table
55
        @param errors_table None|sql_gen.Table
56
        @param value The value (or an expression for it) that caused the error
57
        @pre The invalid value must be in a local variable "value" of type text.
58
        '''
59
        sql_gen.ExcToWarning.__init__(self, return_)
60
        
61
        value = sql_gen.as_Code(value)
62
        
63
        self.srcs = srcs
64
        self.errors_table = errors_table
65
        self.value = value
66
    
67
    def to_str(self, db):
68
        if not self.srcs or self.errors_table == None:
69
            return sql_gen.ExcToWarning.to_str(self, db)
70
        
71
        errors_table_cols = map(sql_gen.Col,
72
            ['column', 'value', 'error_code', 'error'])
73
        col_names_query = sql.mk_select(db, sql_gen.NamedValues('c', None,
74
            [[c.name] for c in self.srcs]), order_by=None)
75
        insert_query = sql.mk_insert_select(db, self.errors_table,
76
            errors_table_cols,
77
            sql_gen.Values(errors_table_cols).to_str(db))+';\n'
78
        return '''\
79
-- Save error in errors table.
80
DECLARE
81
    error_code text := SQLSTATE;
82
    error text := SQLERRM;
83
    value text := '''+self.value.to_str(db)+''';
84
    "column" text;
85
BEGIN
86
    -- Insert the value and error for *each* source column.
87
'''+strings.indent(sql_gen.RowExcIgnore(None, col_names_query, insert_query,
88
    row_var=errors_table_cols[0]).to_str(db))+'''
89
END;
90

    
91
'''+self.return_.to_str(db)
92

    
93
def data_exception_handler(*args, **kw_args):
94
    '''Handles a data_exception by saving it or converting it to a warning.
95
    For params, see ExcToErrorsTable().
96
    '''
97
    return sql_gen.data_exception_handler(ExcToErrorsTable(*args, **kw_args))
98

    
99
def cast(db, type_, col, errors_table=None):
100
    '''Casts an (unrenamed) column or value.
101
    If errors_table set and col has srcs, saves errors in errors_table (using
102
    col's srcs attr as source columns). Otherwise, converts errors to warnings.
103
    @param col str|sql_gen.Col|sql_gen.Literal
104
    @param errors_table None|sql_gen.Table|str
105
    '''
106
    col = sql_gen.as_Col(col)
107
    
108
    # Don't convert exceptions to warnings for user-supplied constants
109
    if isinstance(col, sql_gen.Literal): return sql_gen.Cast(type_, col)
110
    
111
    assert not isinstance(col, sql_gen.NamedCol)
112
    
113
    function_name = strings.first_word(type_)
114
    srcs = col.srcs
115
    save_errors = errors_table != None and srcs
116
    if save_errors: # function will be unique for the given srcs
117
        function_name = strings.ustr(sql_gen.FunctionCall(function_name,
118
            *map(sql_gen.to_name_only_col, srcs)))
119
    function = db.TempFunction(function_name)
120
    
121
    # Create function definition
122
    modifiers = 'STRICT'
123
    if not save_errors: modifiers = 'IMMUTABLE '+modifiers
124
    value_param = sql_gen.FunctionParam('value', 'text')
125
    handler = data_exception_handler('RETURN NULL;\n', srcs, errors_table,
126
        value_param.name)
127
    body = sql_gen.CustomCode(handler.to_str(db, '''\
128
/* The explicit cast to the return type is needed to make the cast happen
129
inside the try block. (Implicit casts to the return type happen at the end
130
of the function, outside any block.) */
131
RETURN value::'''+type_+''';
132
'''))
133
    body.lang='plpgsql'
134
    sql.define_func(db, sql_gen.FunctionDef(function, type_, body,
135
        [value_param], modifiers))
136
    
137
    return sql_gen.FunctionCall(function, col)
138

    
139
def func_wrapper_exception_handler(db, return_, args, errors_table):
140
    '''Handles a function call's data_exceptions.
141
    Supports PL/Python functions.
142
    @param return_ See data_exception_handler()
143
    @param args [arg...] Function call's args
144
    @param errors_table See data_exception_handler()
145
    '''
146
    args = filter(sql_gen.has_srcs, args)
147
    
148
    srcs = sql_gen.cross_join_srcs(args)
149
    value = sql_gen.merge_not_null(db, ',', args)
150
    return sql_gen.NestedExcHandler(
151
        data_exception_handler(return_, srcs, errors_table, value)
152
        , sql_gen.plpythonu_error_handler
153
        )
154

    
155
def cast_temp_col(db, type_, col, errors_table=None):
156
    '''Like cast(), but creates a new column with the cast values if the input
157
    is a column.
158
    @return The new column or cast value
159
    '''
160
    def cast_(col): return cast(db, type_, col, errors_table)
161
    
162
    try: col = sql_gen.underlying_col(col)
163
    except sql_gen.NoUnderlyingTableException: return sql_gen.wrap(cast_, col)
164
    
165
    table = col.table
166
    new_col = sql_gen.suffixed_col(col, '::'+strings.first_word(type_))
167
    expr = cast_(col)
168
    
169
    # Add column
170
    new_typed_col = sql_gen.TypedCol(new_col.name, type_)
171
    sql.add_col(db, table, new_typed_col, comment=strings.urepr(col)+'::'+type_)
172
    new_col.name = new_typed_col.name # propagate any renaming
173
    
174
    sql.update(db, table, [(new_col, expr)], in_place=True, recover=True)
175
    
176
    return new_col
177

    
178
def errors_table(db, table, if_exists=True):
179
    '''
180
    @param if_exists If set, returns None if the errors table doesn't exist
181
    @return None|sql_gen.Table
182
    '''
183
    table = sql_gen.as_Table(table)
184
    if table.srcs != (): table = table.srcs[0]
185
    
186
    errors_table = sql_gen.suffixed_table(table, '.errors')
187
    if if_exists and not sql.table_exists(db, errors_table): return None
188
    return errors_table
189

    
190
##### Import
191

    
192
def put(db, table, row, pkey_=None, row_ct_ref=None):
193
    '''Recovers from errors.
194
    Only works under PostgreSQL (uses INSERT RETURNING).
195
    '''
196
    return put_table(db, table, [], row, row_ct_ref)
197

    
198
def get(db, table, row, pkey, row_ct_ref=None, create=False):
199
    '''Recovers from errors'''
200
    try:
201
        return sql.value(sql.select(db, table, [pkey], row, limit=1,
202
            recover=True))
203
    except StopIteration:
204
        if not create: raise
205
        return put(db, table, row, pkey, row_ct_ref) # insert new row
206

    
207
def is_func_result(col):
208
    return col.table.name.find('(') >= 0 and col.name == 'result'
209

    
210
def into_table_name(out_table, in_tables0, mapping, is_func):
211
    def in_col_str(in_col):
212
        in_col = sql_gen.remove_col_rename(in_col)
213
        if isinstance(in_col, sql_gen.Col):
214
            table = in_col.table
215
            if table == in_tables0:
216
                in_col = sql_gen.to_name_only_col(in_col)
217
            elif is_func_result(in_col): in_col = table # omit col name
218
        return strings.ustr(in_col)
219
    
220
    str_ = str(out_table)
221
    if is_func:
222
        str_ += '('
223
        
224
        try: value_in_col = mapping['value']
225
        except KeyError:
226
            str_ += ', '.join((str(k)+'='+in_col_str(v)
227
                for k, v in mapping.iteritems()))
228
        else: str_ += in_col_str(value_in_col)
229
        
230
        str_ += ')'
231
    else:
232
        out_col = 'rank'
233
        try: in_col = mapping[out_col]
234
        except KeyError: str_ += '_pkeys'
235
        else: # has a rank column, so hierarchical
236
            str_ += '['+str(out_col)+'='+in_col_str(in_col)+']'
237
    return str_
238

    
239
def put_table(db, out_table, in_tables, mapping, row_ct_ref=None, default=None,
240
    col_defaults={}, on_error=exc.reraise):
241
    '''Recovers from errors.
242
    Only works under PostgreSQL (uses INSERT RETURNING).
243
    IMPORTANT: Must be run at the *beginning* of a transaction.
244
    @param in_tables The main input table to select from, followed by a list of
245
        tables to join with it using the main input table's pkey
246
    @param mapping dict(out_table_col=in_table_col, ...)
247
        * out_table_col: str (*not* sql_gen.Col)
248
        * in_table_col: sql_gen.Col|literal-value
249
    @param default The *output* column to use as the pkey for missing rows.
250
        If this output column does not exist in the mapping, uses None.
251
    @param col_defaults Default values for required columns.
252
    @return sql_gen.Col Where the output pkeys are made available
253
    '''
254
    import psycopg2.extensions
255
    
256
    out_table = sql_gen.as_Table(out_table)
257
    
258
    def log_debug(msg): db.log_debug(msg, level=1.5)
259
    def col_ustr(str_):
260
        return strings.repr_no_u(sql_gen.remove_col_rename(str_))
261
    
262
    log_debug('********** New iteration **********')
263
    log_debug('Inserting these input columns into '+strings.as_tt(
264
        out_table.to_str(db))+':\n'+strings.as_table(mapping, ustr=col_ustr))
265
    
266
    is_function = sql.function_exists(db, out_table)
267
    
268
    if is_function: out_pkey = 'result'
269
    else: out_pkey = sql.pkey(db, out_table, recover=True)
270
    out_pkey_col = sql_gen.as_Col(out_pkey, out_table)
271
    
272
    in_tables_ = in_tables[:] # don't modify input!
273
    try: in_tables0 = in_tables_.pop(0) # first table is separate
274
    except IndexError: in_tables0 = None
275
    else:
276
        in_pkey = sql.pkey(db, in_tables0, recover=True)
277
        in_pkey_col = sql_gen.as_Col(in_pkey, in_tables0)
278
    
279
    # Determine if can use optimization for only literal values
280
    is_literals = not reduce(operator.or_, map(sql_gen.is_table_col,
281
        mapping.values()), False)
282
    is_literals_or_function = is_literals or is_function
283
    
284
    if in_tables0 == None: errors_table_ = None
285
    else: errors_table_ = errors_table(db, in_tables0)
286
    
287
    # Create input joins from list of input tables
288
    input_joins = [in_tables0]+[sql_gen.Join(v,
289
        {in_pkey: sql_gen.join_same_not_null}) for v in in_tables_]
290
    
291
    if mapping == {} and not is_function: # need >= one column for INSERT SELECT
292
        mapping = {out_pkey: None} # ColDict will replace with default value
293
    
294
    if not is_literals:
295
        into = sql_gen.as_Table(into_table_name(out_table, in_tables0, mapping,
296
            is_function))
297
        
298
        # Set column sources
299
        in_cols = filter(sql_gen.is_table_col, mapping.values())
300
        for col in in_cols:
301
            if col.table == in_tables0: col.set_srcs(sql_gen.src_self)
302
        
303
        log_debug('Joining together input tables into temp table')
304
        # Place in new table so don't modify input and for speed
305
        in_table = sql_gen.Table('in')
306
        mapping = dicts.join(mapping, sql.flatten(db, in_table, input_joins,
307
            in_cols, preserve=[in_pkey_col]))
308
        input_joins = [in_table]
309
        db.log_debug('Temp table: '+strings.as_tt(in_table.to_str(db)), level=2)
310
    
311
    # Wrap mapping in a sql_gen.ColDict.
312
    # sql_gen.ColDict sanitizes both keys and values passed into it.
313
    # Do after applying dicts.join() because that returns a plain dict.
314
    mapping = sql_gen.ColDict(db, out_table, mapping)
315
    
316
    # Resolve default value column
317
    if default != None:
318
        try: default = mapping[default]
319
        except KeyError:
320
            db.log_debug('Default value column '
321
                +strings.as_tt(strings.repr_no_u(default))
322
                +' does not exist in mapping, falling back to None', level=2.1)
323
            default = None
324
    
325
    # Save default values for all rows since in_table may have rows deleted
326
    if is_literals: pass
327
    elif is_function: full_in_table = in_table
328
    else:
329
        full_in_table = sql_gen.suffixed_table(in_table, '_full')
330
        full_in_table_cols = [in_pkey_col]
331
        if default != None:
332
            full_in_table_cols.append(default)
333
            default = sql_gen.with_table(default, full_in_table)
334
        sql.run_query_into(db, sql.mk_select(db, in_table, full_in_table_cols,
335
            order_by=None), into=full_in_table, add_pkey_=True)
336
    
337
    if not is_literals:
338
        pkeys_names = [in_pkey, out_pkey]
339
        pkeys_cols = [in_pkey_col, out_pkey_col]
340
    
341
    pkeys_table_exists_ref = [False]
342
    def insert_into_pkeys(joins, cols=None, limit=None, **kw_args):
343
        query = sql.mk_select(db, joins, cols, order_by=None, limit=limit)
344
        if pkeys_table_exists_ref[0]:
345
            sql.insert_select(db, into, pkeys_names, query, **kw_args)
346
        else:
347
            sql.run_query_into(db, query, into=into, add_pkey_=True, **kw_args)
348
            pkeys_table_exists_ref[0] = True
349
    
350
    limit_ref = [None]
351
    def mk_main_select(joins, cols):
352
        return sql.mk_select(db, joins, cols, limit=limit_ref[0], order_by=None)
353
    
354
    if is_literals: insert_in_table = None
355
    else:
356
        insert_in_table = in_table
357
        insert_in_tables = [insert_in_table]
358
    join_cols = sql_gen.ColDict(db, out_table)
359
    
360
    exc_strs = set()
361
    def log_exc(e):
362
        e_str = exc.str_(e, first_line_only=True)
363
        log_debug('Caught exception: '+e_str)
364
        if e_str in exc_strs: # avoid infinite loops
365
            log_debug('Exception already seen, handler broken')
366
            on_error(e)
367
            remove_all_rows()
368
        else: exc_strs.add(e_str)
369
    
370
    def remove_all_rows():
371
        log_debug('Ignoring all rows')
372
        limit_ref[0] = 0 # just create an empty pkeys table
373
    
374
    def ignore_cond(cond, e):
375
        if is_literals: remove_all_rows()
376
        else:
377
            out_table_cols = sql_gen.ColDict(db, out_table)
378
            out_table_cols.update(util.dict_subset_right_join({},
379
                sql.table_cols(db, out_table)))
380
            
381
            in_cols = []
382
            cond = sql.map_expr(db, cond, mapping, in_cols)
383
            cond = sql.map_expr(db, cond, out_table_cols)
384
            
385
            track_data_error(db, errors_table_, sql_gen.cols_srcs(in_cols),
386
                None, e.cause.pgcode,
387
                strings.ensure_newl(e.cause.pgerror)+'condition: '+cond)
388
            
389
            not_cond = sql_gen.NotCond(sql_gen.CustomCode(cond))
390
            log_debug('Ignoring rows where '+strings.as_tt(not_cond.to_str(db)))
391
            sql.delete(db, insert_in_table, not_cond)
392
    
393
    not_null_cols = set()
394
    def ignore(in_col, value, e):
395
        if sql_gen.is_table_col(in_col):
396
            in_col = sql_gen.with_table(in_col, insert_in_table)
397
            
398
            track_data_error(db, errors_table_, in_col.srcs, value,
399
                e.cause.pgcode, e.cause.pgerror)
400
            
401
            sql.add_index(db, in_col, insert_in_table) # enable fast filtering
402
            if value != None and in_col not in not_null_cols:
403
                log_debug('Replacing invalid value '+strings.as_tt(repr(value))
404
                    +' with NULL in column '+strings.as_tt(in_col.to_str(db)))
405
                sql.update(db, insert_in_table, [(in_col, None)],
406
                    sql_gen.ColValueCond(in_col, value))
407
            else:
408
                log_debug('Ignoring rows with '+strings.as_tt(in_col.to_str(db))
409
                    +' = '+strings.as_tt(repr(value)))
410
                sql.delete(db, insert_in_table,
411
                    sql_gen.ColValueCond(in_col, value))
412
                if value == None: not_null_cols.add(in_col)
413
        else:
414
            assert isinstance(in_col, sql_gen.NamedCol)
415
            in_value = sql_gen.remove_col_rename(in_col)
416
            assert sql_gen.is_literal(in_value)
417
            if value == in_value.value:
418
                if value != None:
419
                    log_debug('Replacing invalid literal '
420
                        +strings.as_tt(in_col.to_str(db))+' with NULL')
421
                    mapping[in_col.name] = None
422
                else:
423
                    remove_all_rows()
424
            # otherwise, all columns were being ignore()d because the specific
425
            # column couldn't be identified, and this was not the invalid column
426
    
427
    if not is_literals:
428
        def insert_pkeys_table(which):
429
            return sql_gen.Table(sql_gen.concat(in_table.name,
430
                '_insert_'+which+'_pkeys'))
431
        insert_out_pkeys = insert_pkeys_table('out')
432
        insert_in_pkeys = insert_pkeys_table('in')
433
    
434
    def mk_func_call():
435
        args = dict(((k.name, v) for k, v in mapping.iteritems()))
436
        func_call = sql_gen.NamedCol(out_pkey,
437
            sql_gen.FunctionCall(out_table, **args))
438
        return func_call, args
439
        
440
    if is_function and not is_literals:
441
        log_debug('Defining wrapper function')
442
        
443
        func_call, args = mk_func_call()
444
        
445
        # Create empty pkeys table so its row type can be used
446
        insert_into_pkeys(input_joins, [in_pkey_col, func_call], limit=0,
447
            recover=True)
448
        result_type = db.col_info(sql_gen.Col(out_pkey, into)).type
449
        
450
        ## Create error handling wrapper function
451
        
452
        wrapper = db.TempFunction(sql_gen.concat(into.name, '_wrap'))
453
        
454
        select_cols = [in_pkey_col]+args.values()
455
        row_var = copy.copy(sql_gen.row_var)
456
        row_var.set_srcs([in_table])
457
        in_pkey_var = sql_gen.Col(in_pkey, row_var)
458
        
459
        args = dict(((k, sql_gen.with_table(v, row_var))
460
            for k, v in args.iteritems()))
461
        func_call = sql_gen.FunctionCall(out_table, **args)
462
        
463
        def mk_return(result):
464
            return sql_gen.ReturnQuery(sql.mk_select(db,
465
                fields=[in_pkey_var, result], explain=False))
466
        exc_handler = func_wrapper_exception_handler(db,
467
            mk_return(sql_gen.Cast(result_type, None)), args.values(),
468
            errors_table_)
469
        
470
        sql.define_func(db, sql_gen.FunctionDef(wrapper, sql_gen.SetOf(into),
471
            sql_gen.RowExcIgnore(sql_gen.RowType(in_table),
472
                sql.mk_select(db, input_joins, order_by=None),
473
                mk_return(func_call), exc_handler=exc_handler)
474
            ))
475
        wrapper_table = sql_gen.FunctionCall(wrapper)
476
    
477
    # Do inserts and selects
478
    while True:
479
        has_joins = join_cols != {}
480
        
481
        # Handle unrecoverable errors in a special case
482
        if limit_ref[0] == 0:
483
            if is_literals or default == None:
484
                default = sql_gen.remove_col_rename(default)
485
                log_debug('Returning default: '+strings.as_tt(repr(default)))
486
                return default
487
            elif is_function: pass # empty pkeys table already created
488
            else:
489
                log_debug('Creating an empty output pkeys table')
490
                has_joins = False # use the no-joins case
491
                cur = sql.run_query_into(db, sql.mk_select(db, out_table,
492
                    [out_pkey], order_by=None, limit=0), into=insert_out_pkeys)
493
            
494
            break # don't do main case
495
        
496
        # Prepare to insert new rows
497
        if is_function:
498
            log_debug('Calling function on input rows')
499
            if is_literals: func_call, args = mk_func_call()
500
        else:
501
            log_debug('Trying to insert new rows')
502
            insert_args = dict(recover=True, cacheable=False)
503
            if has_joins:
504
                insert_args.update(dict(ignore=True))
505
            else:
506
                insert_args.update(dict(returning=out_pkey))
507
                if not is_literals:
508
                    insert_args.update(dict(into=insert_out_pkeys))
509
            main_select = mk_main_select([insert_in_table], [sql_gen.with_table(
510
                c, insert_in_table) for c in mapping.values()])
511
        
512
        try:
513
            cur = None
514
            if is_function:
515
                if is_literals:
516
                    cur = sql.select(db, fields=[func_call], recover=True,
517
                        cacheable=True)
518
                else: insert_into_pkeys(wrapper_table, recover=True)
519
            else:
520
                cur = sql.insert_select(db, out_table, mapping.keys(),
521
                    main_select, **insert_args)
522
            break # insert successful
523
        except sql.MissingCastException, e:
524
            log_exc(e)
525
            
526
            type_ = e.type
527
            if e.col == None: out_cols = mapping.keys()
528
            else: out_cols = [e.col]
529
            
530
            for out_col in out_cols:
531
                log_debug('Casting '+strings.as_tt(strings.repr_no_u(out_col))
532
                    +' input to '+strings.as_tt(type_))
533
                in_col = mapping[out_col]
534
                while True:
535
                    try:
536
                        mapping[out_col] = cast_temp_col(db, type_, in_col,
537
                            errors_table_)
538
                        break # cast successful
539
                    except sql.InvalidValueException, e:
540
                        log_exc(e)
541
                        
542
                        ignore(in_col, e.value, e)
543
        except sql.DuplicateKeyException, e:
544
            log_exc(e)
545
            
546
            # Different rows violating different unique constraints not
547
            # supported
548
            assert not join_cols
549
            
550
            join_cols.update(util.dict_subset_right_join(mapping, e.cols))
551
            log_debug('Ignoring existing rows, comparing on these columns:\n'
552
                +strings.as_inline_table(join_cols, ustr=col_ustr))
553
            
554
            if is_literals:
555
                return sql.value(sql.select(db, out_table, [out_pkey_col],
556
                    join_cols, order_by=None))
557
            
558
            # Uniquify input table to avoid internal duplicate keys
559
            insert_in_table = sql.distinct_table(db, insert_in_table,
560
                join_cols.values())
561
            insert_in_tables.append(insert_in_table)
562
        except sql.NullValueException, e:
563
            log_exc(e)
564
            
565
            out_col, = e.cols
566
            try: in_col = mapping[out_col]
567
            except KeyError, e:
568
                try: in_col = mapping[out_col] = col_defaults[out_col]
569
                except KeyError:
570
                    msg = 'Missing mapping for NOT NULL column '+out_col
571
                    log_debug(msg)
572
                    if default == None: warnings.warn(UserWarning(msg))
573
                        # not an error because sometimes the mappings include
574
                        # extra tables which aren't used by the dataset
575
                    remove_all_rows()
576
            else: ignore(in_col, None, e)
577
        except sql.CheckException, e:
578
            log_exc(e)
579
            
580
            ignore_cond(e.cond, e)
581
        except sql.InvalidValueException, e:
582
            log_exc(e)
583
            
584
            for in_col in mapping.values(): ignore(in_col, e.value, e)
585
        except psycopg2.extensions.TransactionRollbackError, e:
586
            log_exc(e)
587
            # retry
588
        except sql.DatabaseErrors, e:
589
            log_exc(e)
590
            
591
            log_debug('No handler for exception')
592
            on_error(e)
593
            remove_all_rows()
594
        # after exception handled, rerun loop with additional constraints
595
    
596
    if cur != None and row_ct_ref != None and cur.rowcount >= 0:
597
        row_ct_ref[0] += cur.rowcount
598
    
599
    if is_literals: return sql.value(cur)
600
    
601
    if is_function: pass # pkeys table already created
602
    elif has_joins:
603
        select_joins = input_joins+[sql_gen.Join(out_table, join_cols)]
604
        log_debug('Getting output table pkeys of existing/inserted rows')
605
        insert_into_pkeys(select_joins, pkeys_cols)
606
    else:
607
        sql.add_row_num(db, insert_out_pkeys) # for joining with input pkeys
608
        
609
        log_debug('Getting input table pkeys of inserted rows')
610
        # Note that mk_main_select() does not use ORDER BY. Instead, assume that
611
        # since the SELECT query is identical to the one used in INSERT SELECT,
612
        # its rows will be retrieved in the same order.
613
        sql.run_query_into(db, mk_main_select(input_joins, [in_pkey]),
614
            into=insert_in_pkeys)
615
        sql.add_row_num(db, insert_in_pkeys) # for joining with output pkeys
616
        
617
        assert sql.table_row_count(db, insert_out_pkeys) == sql.table_row_count(
618
            db, insert_in_pkeys)
619
        
620
        log_debug('Combining output and input pkeys in inserted order')
621
        pkey_joins = [insert_in_pkeys, sql_gen.Join(insert_out_pkeys,
622
            {sql.row_num_col: sql_gen.join_same_not_null})]
623
        insert_into_pkeys(pkey_joins, pkeys_names)
624
        
625
        sql.empty_temp(db, [insert_out_pkeys, insert_in_pkeys])
626
    
627
    if limit_ref[0] == 0 or not is_function: # is_function doesn't leave holes
628
        log_debug('Setting pkeys of missing rows to '
629
            +strings.as_tt(repr(default)))
630
        missing_rows_joins = [full_in_table, sql_gen.Join(into,
631
            {in_pkey: sql_gen.join_same_not_null}, sql_gen.filter_out)]
632
            # must use join_same_not_null or query will take forever
633
        insert_into_pkeys(missing_rows_joins,
634
            [sql_gen.Col(in_pkey, full_in_table),
635
            sql_gen.NamedCol(out_pkey, default)])
636
    # otherwise, there is already an entry for every row
637
    
638
    assert (sql.table_row_count(db, into)
639
        == sql.table_row_count(db, full_in_table))
640
    
641
    sql.empty_temp(db, insert_in_tables+[full_in_table])
642
    
643
    srcs = []
644
    if is_function: srcs = sql_gen.cols_srcs(in_cols)
645
    return sql_gen.Col(out_pkey, into, srcs)
(29-29/40)