Project

General

Profile

1 3077 aaronmk
# Database import/export
2
3 3431 aaronmk
import operator
4
5 3077 aaronmk
import exc
6
import dicts
7
import sql
8
import sql_gen
9
import strings
10
import util
11
12 3081 aaronmk
##### Data cleanup
13
14
def cleanup_table(db, table, cols):
15
    table = sql_gen.as_Table(table)
16
    cols = map(sql_gen.as_Col, cols)
17
18
    expr = ('nullif(nullif(trim(both from %s), '+db.esc_value('')+'), '
19
        +db.esc_value(r'\N')+')')
20
    changes = [(v, sql_gen.CustomCode(expr % v.to_str(db)))
21
        for v in cols]
22
23
    sql.update(db, table, changes, in_place=True)
24
25 3078 aaronmk
##### Error tracking
26
27
def track_data_error(db, errors_table, cols, value, error_code, error):
28
    '''
29
    @param errors_table If None, does nothing.
30
    '''
31
    if errors_table == None or cols == (): return
32
33
    for col in cols:
34
        try:
35
            sql.insert(db, errors_table, dict(column=col.name, value=value,
36
                error_code=error_code, error=error), recover=True,
37
                cacheable=True, log_level=4)
38
        except sql.DuplicateKeyException: pass
39
40 3506 aaronmk
class ExcToErrorsTable(sql_gen.ExcToWarning):
41
    '''Handles an exception by saving it or converting it to a warning.'''
42 3511 aaronmk
    def __init__(self, return_, srcs, errors_table, value=None):
43 3506 aaronmk
        '''
44
        @param return_ See sql_gen.ExcToWarning
45
        @param srcs The column names for the errors table
46
        @param errors_table None|sql_gen.Table
47 3511 aaronmk
        @param value The value (or an expression for it) that caused the error
48 3506 aaronmk
        @pre The invalid value must be in a local variable "value" of type text.
49
        '''
50
        sql_gen.ExcToWarning.__init__(self, return_)
51
52 3511 aaronmk
        value = sql_gen.as_Code(value)
53
54 3506 aaronmk
        self.srcs = srcs
55
        self.errors_table = errors_table
56 3511 aaronmk
        self.value = value
57 3501 aaronmk
58 3506 aaronmk
    def to_str(self, db):
59
        if not self.srcs or self.errors_table == None:
60
            return sql_gen.ExcToWarning.to_str(self, db)
61
62 3459 aaronmk
        errors_table_cols = map(sql_gen.Col,
63
            ['column', 'value', 'error_code', 'error'])
64 3465 aaronmk
        col_names_query = sql.mk_select(db, sql_gen.NamedValues('c', None,
65 3506 aaronmk
            [[c.name] for c in self.srcs]), order_by=None)
66
        insert_query = sql.mk_insert_select(db, self.errors_table,
67
            errors_table_cols,
68 3465 aaronmk
            sql_gen.Values(errors_table_cols).to_str(db))+';\n'
69 3506 aaronmk
        return '''\
70 3459 aaronmk
-- Save error in errors table.
71
DECLARE
72
    error_code text := SQLSTATE;
73
    error text := SQLERRM;
74 3511 aaronmk
    value text := '''+self.value.to_str(db)+''';
75 3459 aaronmk
BEGIN
76
    -- Insert the value and error for *each* source column.
77 3467 aaronmk
'''+strings.indent(sql_gen.RowExcIgnore('text', col_names_query, insert_query,
78
    row_var=errors_table_cols[0]).to_str(db))+'''
79 3459 aaronmk
END;
80 3501 aaronmk
81 3506 aaronmk
'''+self.return_.to_str(db)
82 3459 aaronmk
83 3507 aaronmk
def data_exception_handler(*args, **kw_args):
84 3506 aaronmk
    '''Handles a data_exception by saving it or converting it to a warning.
85
    For params, see ExcToErrorsTable().
86
    '''
87
    return sql_gen.data_exception_handler(ExcToErrorsTable(*args, **kw_args))
88
89 3078 aaronmk
def cast(db, type_, col, errors_table=None):
90
    '''Casts an (unrenamed) column or value.
91
    If errors_table set and col has srcs, saves errors in errors_table (using
92 3360 aaronmk
    col's srcs attr as source columns). Otherwise, converts errors to warnings.
93 3078 aaronmk
    @param col str|sql_gen.Col|sql_gen.Literal
94
    @param errors_table None|sql_gen.Table|str
95
    '''
96
    col = sql_gen.as_Col(col)
97
98 3112 aaronmk
    # Don't convert exceptions to warnings for user-supplied constants
99
    if isinstance(col, sql_gen.Literal): return sql_gen.Cast(type_, col)
100
101 3078 aaronmk
    assert not isinstance(col, sql_gen.NamedCol)
102
103 3460 aaronmk
    function_name = strings.first_word(type_)
104 3459 aaronmk
    srcs = col.srcs
105 3508 aaronmk
    save_errors = errors_table != None and srcs
106
    if save_errors: # function will be unique for the given srcs
107
        function_name = str(sql_gen.FunctionCall(function_name,
108
            *map(sql_gen.to_name_only_col, srcs)))
109 3078 aaronmk
    function = db.TempFunction(function_name)
110
111 3464 aaronmk
    # Create function definition
112
    modifiers = 'STRICT'
113
    if not save_errors: modifiers = 'IMMUTABLE '+modifiers
114 3511 aaronmk
    value_param = sql_gen.FunctionParam('value', 'text')
115
    handler = data_exception_handler('RETURN NULL;\n', srcs, errors_table,
116
        value_param.name)
117 3464 aaronmk
    body = sql_gen.CustomCode(handler.to_str(db, '''\
118 3467 aaronmk
/* The explicit cast to the return type is needed to make the cast happen
119
inside the try block. (Implicit casts to the return type happen at the end
120
of the function, outside any block.) */
121
RETURN value::'''+type_+''';
122 3464 aaronmk
'''))
123
    body.lang='plpgsql'
124 3500 aaronmk
    sql.define_func(db, sql_gen.FunctionDef(function, type_, body,
125 3511 aaronmk
        [value_param], modifiers))
126 3464 aaronmk
127 3078 aaronmk
    return sql_gen.FunctionCall(function, col)
128
129
def cast_temp_col(db, type_, col, errors_table=None):
130
    '''Like cast(), but creates a new column with the cast values if the input
131
    is a column.
132
    @return The new column or cast value
133
    '''
134
    def cast_(col): return cast(db, type_, col, errors_table)
135
136
    try: col = sql_gen.underlying_col(col)
137
    except sql_gen.NoUnderlyingTableException: return sql_gen.wrap(cast_, col)
138
139
    table = col.table
140 3173 aaronmk
    new_col = sql_gen.suffixed_col(col, '::'+strings.first_word(type_))
141 3078 aaronmk
    expr = cast_(col)
142
143
    # Add column
144
    new_typed_col = sql_gen.TypedCol(new_col.name, type_)
145 3174 aaronmk
    sql.add_col(db, table, new_typed_col, comment=repr(col)+'::'+type_)
146 3078 aaronmk
    new_col.name = new_typed_col.name # propagate any renaming
147
148 3110 aaronmk
    sql.update(db, table, [(new_col, expr)], in_place=True, recover=True)
149 3078 aaronmk
150
    return new_col
151
152
def errors_table(db, table, if_exists=True):
153
    '''
154
    @param if_exists If set, returns None if the errors table doesn't exist
155
    @return None|sql_gen.Table
156
    '''
157
    table = sql_gen.as_Table(table)
158
    if table.srcs != (): table = table.srcs[0]
159
160
    errors_table = sql_gen.suffixed_table(table, '.errors')
161
    if if_exists and not sql.table_exists(db, errors_table): return None
162
    return errors_table
163
164
##### Import
165
166 3077 aaronmk
def put(db, table, row, pkey_=None, row_ct_ref=None):
167
    '''Recovers from errors.
168
    Only works under PostgreSQL (uses INSERT RETURNING).
169
    '''
170
    row = sql_gen.ColDict(db, table, row)
171
    if pkey_ == None: pkey_ = sql.pkey(db, table, recover=True)
172
173
    try:
174 3131 aaronmk
        cur = sql.insert(db, table, row, pkey_, recover=True, log_level=3.5)
175 3077 aaronmk
        if row_ct_ref != None and cur.rowcount >= 0:
176
            row_ct_ref[0] += cur.rowcount
177
        return sql.value(cur)
178
    except sql.DuplicateKeyException, e:
179
        row = sql_gen.ColDict(db, table,
180
            util.dict_subset_right_join(row, e.cols))
181 3131 aaronmk
        return sql.value(sql.select(db, table, [pkey_], row, recover=True,
182
            log_level=3.5))
183 3208 aaronmk
    except sql.NullValueException: return None
184 3077 aaronmk
185
def get(db, table, row, pkey, row_ct_ref=None, create=False):
186
    '''Recovers from errors'''
187
    try:
188
        return sql.value(sql.select(db, table, [pkey], row, limit=1,
189
            recover=True))
190
    except StopIteration:
191
        if not create: raise
192
        return put(db, table, row, pkey, row_ct_ref) # insert new row
193
194
def is_func_result(col):
195
    return col.table.name.find('(') >= 0 and col.name == 'result'
196
197
def into_table_name(out_table, in_tables0, mapping, is_func):
198
    def in_col_str(in_col):
199
        in_col = sql_gen.remove_col_rename(in_col)
200
        if isinstance(in_col, sql_gen.Col):
201
            table = in_col.table
202
            if table == in_tables0:
203
                in_col = sql_gen.to_name_only_col(in_col)
204
            elif is_func_result(in_col): in_col = table # omit col name
205
        return str(in_col)
206
207
    str_ = str(out_table)
208
    if is_func:
209
        str_ += '('
210
211
        try: value_in_col = mapping['value']
212
        except KeyError:
213
            str_ += ', '.join((str(k)+'='+in_col_str(v)
214
                for k, v in mapping.iteritems()))
215
        else: str_ += in_col_str(value_in_col)
216
217
        str_ += ')'
218
    else:
219
        out_col = 'rank'
220
        try: in_col = mapping[out_col]
221
        except KeyError: str_ += '_pkeys'
222
        else: # has a rank column, so hierarchical
223
            str_ += '['+str(out_col)+'='+in_col_str(in_col)+']'
224
    return str_
225
226
def put_table(db, out_table, in_tables, mapping, row_ct_ref=None, into=None,
227
    default=None, is_func=False, on_error=exc.raise_):
228
    '''Recovers from errors.
229
    Only works under PostgreSQL (uses INSERT RETURNING).
230
    IMPORTANT: Must be run at the *beginning* of a transaction.
231
    @param in_tables The main input table to select from, followed by a list of
232
        tables to join with it using the main input table's pkey
233
    @param mapping dict(out_table_col=in_table_col, ...)
234
        * out_table_col: str (*not* sql_gen.Col)
235
        * in_table_col: sql_gen.Col|literal-value
236
    @param into The table to contain the output and input pkeys.
237
        Defaults to `out_table.name+'_pkeys'`.
238
    @param default The *output* column to use as the pkey for missing rows.
239
        If this output column does not exist in the mapping, uses None.
240
    @param is_func Whether out_table is the name of a SQL function, not a table
241
    @return sql_gen.Col Where the output pkeys are made available
242
    '''
243 3474 aaronmk
    import psycopg2.extensions
244
245 3077 aaronmk
    out_table = sql_gen.as_Table(out_table)
246
247
    def log_debug(msg): db.log_debug(msg, level=1.5)
248
    def col_ustr(str_):
249
        return strings.repr_no_u(sql_gen.remove_col_rename(str_))
250
251
    log_debug('********** New iteration **********')
252
    log_debug('Inserting these input columns into '+strings.as_tt(
253
        out_table.to_str(db))+':\n'+strings.as_table(mapping, ustr=col_ustr))
254
255
    is_function = sql.function_exists(db, out_table)
256
257
    if is_function: out_pkey = 'result'
258
    else: out_pkey = sql.pkey(db, out_table, recover=True)
259
    out_pkey_col = sql_gen.as_Col(out_pkey, out_table)
260
261
    in_tables_ = in_tables[:] # don't modify input!
262 3432 aaronmk
    try: in_tables0 = in_tables_.pop(0) # first table is separate
263
    except IndexError: in_tables0 = None
264
    else:
265
        in_pkey = sql.pkey(db, in_tables0, recover=True)
266
        in_pkey_col = sql_gen.as_Col(in_pkey, in_tables0)
267 3431 aaronmk
268
    # Determine if can use optimization for only literal values
269
    is_literals = not reduce(operator.or_, map(sql_gen.is_table_col,
270 3434 aaronmk
        mapping.values()), False)
271 3431 aaronmk
    is_literals_or_function = is_literals or is_function
272
273 3432 aaronmk
    if in_tables0 == None: errors_table_ = None
274
    else: errors_table_ = errors_table(db, in_tables0)
275 3431 aaronmk
276
    # Create input joins from list of input tables
277 3077 aaronmk
    input_joins = [in_tables0]+[sql_gen.Join(v,
278
        {in_pkey: sql_gen.join_same_not_null}) for v in in_tables_]
279
280 3433 aaronmk
    if mapping == {} and not is_function: # need >= one column for INSERT SELECT
281
        mapping = {out_pkey: None} # ColDict will replace with default value
282
283 3431 aaronmk
    if not is_literals:
284
        if into == None:
285
            into = into_table_name(out_table, in_tables0, mapping, is_func)
286
        into = sql_gen.as_Table(into)
287
288
        # Set column sources
289
        in_cols = filter(sql_gen.is_table_col, mapping.values())
290
        for col in in_cols:
291
            if col.table == in_tables0: col.set_srcs(sql_gen.src_self)
292
293
        log_debug('Joining together input tables into temp table')
294
        # Place in new table so don't modify input and for speed
295
        in_table = sql_gen.Table('in')
296
        mapping = dicts.join(mapping, sql.flatten(db, in_table, input_joins,
297
            in_cols, preserve=[in_pkey_col]))
298
        input_joins = [in_table]
299
        db.log_debug('Temp table: '+strings.as_tt(in_table.to_str(db)), level=2)
300 3077 aaronmk
301
    mapping = sql_gen.ColDict(db, out_table, mapping)
302
        # after applying dicts.join() because that returns a plain dict
303
304
    # Resolve default value column
305
    if default != None:
306
        try: default = mapping[default]
307
        except KeyError:
308
            db.log_debug('Default value column '
309
                +strings.as_tt(strings.repr_no_u(default))
310
                +' does not exist in mapping, falling back to None', level=2.1)
311
            default = None
312
313 3287 aaronmk
    # Save default values for all rows since in_table may have rows deleted
314 3431 aaronmk
    if is_literals: pass
315
    elif is_function: full_in_table = in_table
316 3386 aaronmk
    else:
317
        full_in_table = sql_gen.suffixed_table(in_table, '_full')
318
        full_in_table_cols = [in_pkey_col]
319
        if default != None:
320
            full_in_table_cols.append(default)
321
            default = sql_gen.with_table(default, full_in_table)
322
        sql.run_query_into(db, sql.mk_select(db, in_table, full_in_table_cols,
323
            order_by=None), into=full_in_table, add_pkey_=True)
324 3287 aaronmk
325 3431 aaronmk
    if not is_literals:
326
        pkeys_names = [in_pkey, out_pkey]
327
        pkeys_cols = [in_pkey_col, out_pkey_col]
328 3077 aaronmk
329
    pkeys_table_exists_ref = [False]
330 3499 aaronmk
    def insert_into_pkeys(joins, cols=None, limit=None, **kw_args):
331 3477 aaronmk
        query = sql.mk_select(db, joins, cols, order_by=None, limit=limit)
332 3077 aaronmk
        if pkeys_table_exists_ref[0]:
333 3289 aaronmk
            sql.insert_select(db, into, pkeys_names, query, **kw_args)
334 3077 aaronmk
        else:
335 3304 aaronmk
            sql.run_query_into(db, query, into=into, add_pkey_=True, **kw_args)
336 3077 aaronmk
            pkeys_table_exists_ref[0] = True
337
338
    limit_ref = [None]
339 3418 aaronmk
    def mk_main_select(joins, cols):
340
        return sql.mk_select(db, joins, cols, limit=limit_ref[0], order_by=None)
341
342 3431 aaronmk
    if is_literals: insert_in_table = None
343
    else:
344
        insert_in_table = in_table
345
        insert_in_tables = [insert_in_table]
346 3352 aaronmk
    join_cols = sql_gen.ColDict(db, out_table)
347 3077 aaronmk
348
    exc_strs = set()
349
    def log_exc(e):
350
        e_str = exc.str_(e, first_line_only=True)
351
        log_debug('Caught exception: '+e_str)
352
        assert e_str not in exc_strs # avoid infinite loops
353
        exc_strs.add(e_str)
354
355
    def remove_all_rows():
356
        log_debug('Ignoring all rows')
357
        limit_ref[0] = 0 # just create an empty pkeys table
358
359 3352 aaronmk
    def ignore_cond(cond, e):
360
        out_table_cols = sql_gen.ColDict(db, out_table)
361
        out_table_cols.update(util.dict_subset_right_join({},
362
            sql.table_cols(db, out_table)))
363
364
        in_cols = []
365
        cond = sql.map_expr(db, cond, mapping, in_cols)
366
        cond = sql.map_expr(db, cond, out_table_cols)
367
368
        track_data_error(db, errors_table_, sql_gen.cols_srcs(in_cols), None,
369
            e.cause.pgcode,
370
            strings.ensure_newl(e.cause.pgerror)+'condition: '+cond)
371
372
        not_cond = sql_gen.NotCond(sql_gen.CustomCode(cond))
373
        log_debug('Ignoring rows where '+strings.as_tt(not_cond.to_str(db)))
374
        sql.delete(db, insert_in_table, not_cond)
375
376 3294 aaronmk
    not_null_cols = set()
377 3077 aaronmk
    def ignore(in_col, value, e):
378 3311 aaronmk
        in_col = sql_gen.with_table(in_col, insert_in_table)
379
380 3078 aaronmk
        track_data_error(db, errors_table_, in_col.srcs, value,
381 3077 aaronmk
            e.cause.pgcode, e.cause.pgerror)
382
        log_debug('Ignoring rows with '+strings.as_tt(repr(in_col))+' = '
383
            +strings.as_tt(repr(value)))
384 3294 aaronmk
385 3310 aaronmk
        sql.add_index(db, in_col, insert_in_table) # enable fast filtering
386 3294 aaronmk
        if value != None and in_col not in not_null_cols:
387
            # Try just mapping the value to NULL
388
            sql.update(db, insert_in_table, [(in_col, None)],
389
                sql_gen.ColValueCond(in_col, value))
390 3293 aaronmk
        else:
391 3309 aaronmk
            sql.delete(db, insert_in_table, sql_gen.ColValueCond(in_col, value))
392 3294 aaronmk
            if value == None: not_null_cols.add(in_col)
393 3077 aaronmk
394 3431 aaronmk
    if not is_literals:
395
        def insert_pkeys_table(which):
396
            return sql_gen.Table(sql_gen.concat(in_table.name,
397
                '_insert_'+which+'_pkeys'))
398
        insert_out_pkeys = insert_pkeys_table('out')
399
        insert_in_pkeys = insert_pkeys_table('in')
400 3077 aaronmk
401
    # Do inserts and selects
402
    while True:
403 3473 aaronmk
        has_joins = join_cols != {}
404
405 3077 aaronmk
        if limit_ref[0] == 0: # special case
406 3473 aaronmk
            assert not has_joins
407
408 3431 aaronmk
            if is_literals: return None
409 3472 aaronmk
            log_debug('Creating an empty output pkeys table')
410 3077 aaronmk
            cur = sql.run_query_into(db, sql.mk_select(db, out_table,
411 3312 aaronmk
                [out_pkey], order_by=None, limit=0), into=insert_out_pkeys)
412 3077 aaronmk
            break # don't do main case
413
414
        log_debug('Trying to insert new rows')
415
416
        # Prepare to insert new rows
417 3291 aaronmk
        if is_function:
418
            log_debug('Calling function on input rows')
419
            args = dict(((k.name, v) for k, v in mapping.iteritems()))
420
            func_call = sql_gen.NamedCol(out_pkey,
421
                sql_gen.FunctionCall(out_table, **args))
422 3477 aaronmk
423 3478 aaronmk
            if not is_literals:
424
                # Create empty pkeys table so its row type can be used
425 3499 aaronmk
                insert_into_pkeys(input_joins, [in_pkey_col, func_call],
426
                    limit=0, recover=True)
427
428
                # Create error handling wrapper function
429
                select_cols = [in_pkey_col]+args.values()
430
                args = dict(((k, sql_gen.with_table(v, sql_gen.Table('row')))
431
                    for k, v in args.iteritems()))
432
                func_call = sql_gen.FunctionCall(out_table, **args)
433
                wrapper = db.TempFunction(sql_gen.concat(into.name, '_wrap'))
434
                sql.define_func(db, sql_gen.FunctionDef(wrapper,
435
                    sql_gen.SetOf(into),
436
                    sql_gen.RowExcIgnore(sql_gen.RowType(in_table),
437
                        sql.mk_select(db, input_joins, order_by=None),
438
                        sql_gen.ReturnQuery(sql.mk_select(db,
439
                            fields=[sql_gen.Col(in_pkey, 'row'), func_call],
440
                            explain=False)),
441
                        exc_handler=sql_gen.plpythonu_error_handler)
442
                    ))
443
                wrapper_table = sql_gen.FunctionCall(wrapper)
444 3077 aaronmk
        else:
445 3291 aaronmk
            insert_args = dict(recover=True, cacheable=False)
446
            if has_joins:
447
                insert_args.update(dict(ignore=True))
448
            else:
449 3431 aaronmk
                insert_args.update(dict(returning=out_pkey))
450
                if not is_literals:
451
                    insert_args.update(dict(into=insert_out_pkeys))
452 3291 aaronmk
            main_select = mk_main_select([insert_in_table], [sql_gen.with_table(
453
                c, insert_in_table) for c in mapping.values()])
454 3077 aaronmk
455 3292 aaronmk
        try:
456
            cur = None
457 3077 aaronmk
            if is_function:
458 3431 aaronmk
                if is_literals: cur = sql.select(db, fields=[func_call])
459 3499 aaronmk
                else: insert_into_pkeys(wrapper_table, recover=True)
460 3077 aaronmk
            else:
461 3292 aaronmk
                cur = sql.insert_select(db, out_table, mapping.keys(),
462 3077 aaronmk
                    main_select, **insert_args)
463
            break # insert successful
464
        except sql.MissingCastException, e:
465
            log_exc(e)
466
467
            out_col = e.col
468
            type_ = e.type
469
470
            log_debug('Casting '+strings.as_tt(out_col)+' input to '
471
                +strings.as_tt(type_))
472 3111 aaronmk
            in_col = mapping[out_col]
473
            while True:
474
                try:
475
                    mapping[out_col] = cast_temp_col(db, type_, in_col,
476
                        errors_table_)
477
                    break # cast successful
478
                except sql.InvalidValueException, e:
479
                    log_exc(e)
480
481 3294 aaronmk
                    ignore(in_col, e.value, e)
482 3077 aaronmk
        except sql.DuplicateKeyException, e:
483
            log_exc(e)
484
485 3274 aaronmk
            # Different rows violating different unique constraints not
486
            # supported
487
            assert not join_cols
488
489 3077 aaronmk
            join_cols.update(util.dict_subset_right_join(mapping, e.cols))
490
            log_debug('Ignoring existing rows, comparing on these columns:\n'
491
                +strings.as_inline_table(join_cols, ustr=col_ustr))
492 3102 aaronmk
493 3431 aaronmk
            if is_literals:
494
                return sql.value(sql.select(db, out_table, [out_pkey_col],
495
                    mapping, order_by=None))
496
497 3102 aaronmk
            # Uniquify input table to avoid internal duplicate keys
498
            insert_in_table = sql.distinct_table(db, insert_in_table,
499 3358 aaronmk
                join_cols.values())
500 3144 aaronmk
            insert_in_tables.append(insert_in_table)
501 3077 aaronmk
        except sql.NullValueException, e:
502
            log_exc(e)
503
504
            out_col, = e.cols
505
            try: in_col = mapping[out_col]
506
            except KeyError:
507 3323 aaronmk
                msg = 'Missing mapping for NOT NULL column '+out_col
508
                log_debug(msg)
509
                if default == None: on_error(SyntaxError(msg)) # required col
510 3077 aaronmk
                remove_all_rows()
511 3294 aaronmk
            else: ignore(in_col, None, e)
512 3352 aaronmk
        except sql.CheckException, e:
513
            log_exc(e)
514
515
            ignore_cond(e.cond, e)
516 3413 aaronmk
        except sql.InvalidValueException, e:
517
            log_exc(e)
518
519
            for in_col in mapping.values(): ignore(in_col, e.value, e)
520 3474 aaronmk
        except psycopg2.extensions.TransactionRollbackError, e:
521
            log_exc(e)
522
            # retry
523 3077 aaronmk
        except sql.DatabaseErrors, e:
524
            log_exc(e)
525
526
            log_debug('No handler for exception')
527
            on_error(e)
528
            remove_all_rows()
529
        # after exception handled, rerun loop with additional constraints
530
531
    if cur != None and row_ct_ref != None and cur.rowcount >= 0:
532
        row_ct_ref[0] += cur.rowcount
533
534 3431 aaronmk
    if is_literals_or_function: pass # pkeys table already created
535 3077 aaronmk
    elif has_joins:
536
        select_joins = input_joins+[sql_gen.Join(out_table, join_cols)]
537
        log_debug('Getting output table pkeys of existing/inserted rows')
538 3306 aaronmk
        insert_into_pkeys(select_joins, pkeys_cols)
539 3077 aaronmk
    else:
540
        sql.add_row_num(db, insert_out_pkeys) # for joining with input pkeys
541
542
        log_debug('Getting input table pkeys of inserted rows')
543 3285 aaronmk
        # Note that mk_main_select() does not use ORDER BY. Instead, assume that
544
        # since the SELECT query is identical to the one used in INSERT SELECT,
545
        # its rows will be retrieved in the same order.
546 3077 aaronmk
        sql.run_query_into(db, mk_main_select(input_joins, [in_pkey]),
547
            into=insert_in_pkeys)
548
        sql.add_row_num(db, insert_in_pkeys) # for joining with output pkeys
549
550
        assert sql.table_row_count(db, insert_out_pkeys) == sql.table_row_count(
551
            db, insert_in_pkeys)
552
553
        log_debug('Combining output and input pkeys in inserted order')
554
        pkey_joins = [insert_in_pkeys, sql_gen.Join(insert_out_pkeys,
555
            {sql.row_num_col: sql_gen.join_same_not_null})]
556
        insert_into_pkeys(pkey_joins, pkeys_names)
557
558
        sql.empty_temp(db, [insert_out_pkeys, insert_in_pkeys])
559
560 3431 aaronmk
    if not is_literals_or_function:
561 3187 aaronmk
        log_debug('Setting pkeys of missing rows to '
562
            +strings.as_tt(repr(default)))
563 3287 aaronmk
        missing_rows_joins = [full_in_table, sql_gen.Join(into,
564 3187 aaronmk
            {in_pkey: sql_gen.join_same_not_null}, sql_gen.filter_out)]
565
            # must use join_same_not_null or query will take forever
566
        insert_into_pkeys(missing_rows_joins,
567 3287 aaronmk
            [sql_gen.Col(in_pkey, full_in_table),
568
            sql_gen.NamedCol(out_pkey, default)])
569 3187 aaronmk
    # otherwise, there is already an entry for every row
570 3077 aaronmk
571 3431 aaronmk
    if is_literals: return sql.value(cur)
572
    else:
573
        assert (sql.table_row_count(db, into)
574
            == sql.table_row_count(db, full_in_table))
575
576
        sql.empty_temp(db, insert_in_tables+[full_in_table])
577
578
        srcs = []
579
        if is_func: srcs = sql_gen.cols_srcs(in_cols)
580
        return sql_gen.Col(out_pkey, into, srcs)