Project

General

Profile

1
# Database import/export
2

    
3
import copy
4
import operator
5
import warnings
6

    
7
import exc
8
import dicts
9
import sql
10
import sql_gen
11
import strings
12
import util
13

    
14
##### Exceptions
15

    
16
# Can't use built-in SyntaxError because it stringifies to only the first line
17
class SyntaxError(Exception): pass
18

    
19
##### Data cleanup
20

    
21
null_strs = ['', '-', r'\N', 'NULL', 'UNKNOWN', 'nulo']
22

    
23
def cleanup_table(db, table):
24
    table = sql_gen.as_Table(table)
25
    cols = [sql_gen.as_Col(strings.ustr(c), table)
26
        for c in sql.table_cols(db, table)]
27
    cols = filter(lambda c: sql_gen.is_text_col(db, c), cols)
28
    if not cols: return
29
    
30
    db.log_debug('Cleaning up table', level=1.5)
31
    
32
    expr = 'trim(both from %s)'
33
    for null in null_strs: expr = 'nullif('+expr+', '+db.esc_value(null)+')'
34
    changes = [(v, sql_gen.CustomCode(expr % v.to_str(db))) for v in cols]
35
    
36
    while True:
37
        try:
38
            sql.update(db, table, changes, in_place=True, recover=True)
39
            break # successful
40
        except sql.NullValueException, e:
41
            db.log_debug('Caught exception: '+exc.str_(e))
42
            col, = e.cols
43
            sql.drop_not_null(db, col)
44
    
45
    db.log_debug('Vacuuming and reanalyzing table', level=1.5)
46
    sql.vacuum(db, table)
47

    
48
##### Error tracking
49

    
50
def track_data_error(db, errors_table, cols, value, error_code, error):
51
    '''
52
    @param errors_table If None, does nothing.
53
    '''
54
    if errors_table == None or cols == (): return
55
    
56
    for col in cols:
57
        try:
58
            sql.insert(db, errors_table, dict(column=col.name, value=value,
59
                error_code=error_code, error=error), recover=True,
60
                cacheable=True, log_level=4)
61
        except sql.DuplicateKeyException: pass
62

    
63
class ExcToErrorsTable(sql_gen.ExcToWarning):
64
    '''Handles an exception by saving it or converting it to a warning.'''
65
    def __init__(self, return_, srcs, errors_table, value=None):
66
        '''
67
        @param return_ See sql_gen.ExcToWarning
68
        @param srcs The column names for the errors table
69
        @param errors_table None|sql_gen.Table
70
        @param value The value (or an expression for it) that caused the error
71
        @pre The invalid value must be in a local variable "value" of type text.
72
        '''
73
        sql_gen.ExcToWarning.__init__(self, return_)
74
        
75
        value = sql_gen.as_Code(value)
76
        
77
        self.srcs = srcs
78
        self.errors_table = errors_table
79
        self.value = value
80
    
81
    def to_str(self, db):
82
        if not self.srcs or self.errors_table == None:
83
            return sql_gen.ExcToWarning.to_str(self, db)
84
        
85
        errors_table_cols = map(sql_gen.Col,
86
            ['column', 'value', 'error_code', 'error'])
87
        col_names_query = sql.mk_select(db, sql_gen.NamedValues('c', None,
88
            [[c.name] for c in self.srcs]), order_by=None)
89
        insert_query = sql.mk_insert_select(db, self.errors_table,
90
            errors_table_cols,
91
            sql_gen.Values(errors_table_cols).to_str(db))+';\n'
92
        return '''\
93
-- Save error in errors table.
94
DECLARE
95
    error_code text := SQLSTATE;
96
    error text := SQLERRM;
97
    value text := '''+self.value.to_str(db)+''';
98
    "column" text;
99
BEGIN
100
    -- Insert the value and error for *each* source column.
101
'''+strings.indent(sql_gen.RowExcIgnore(None, col_names_query, insert_query,
102
    row_var=errors_table_cols[0]).to_str(db))+'''
103
END;
104

    
105
'''+self.return_.to_str(db)
106

    
107
def data_exception_handler(*args, **kw_args):
108
    '''Handles a data_exception by saving it or converting it to a warning.
109
    For params, see ExcToErrorsTable().
110
    '''
111
    return sql_gen.data_exception_handler(ExcToErrorsTable(*args, **kw_args))
112

    
113
def cast(db, type_, col, errors_table=None):
114
    '''Casts an (unrenamed) column or value.
115
    If errors_table set and col has srcs, saves errors in errors_table (using
116
    col's srcs attr as source columns). Otherwise, converts errors to warnings.
117
    @param col str|sql_gen.Col|sql_gen.Literal
118
    @param errors_table None|sql_gen.Table|str
119
    '''
120
    col = sql_gen.as_Col(col)
121
    
122
    # Don't convert exceptions to warnings for user-supplied constants
123
    if isinstance(col, sql_gen.Literal): return sql_gen.Cast(type_, col)
124
    
125
    assert not isinstance(col, sql_gen.NamedCol)
126
    
127
    function_name = strings.first_word(type_)
128
    srcs = col.srcs
129
    save_errors = errors_table != None and srcs
130
    if save_errors: # function will be unique for the given srcs
131
        function_name = strings.ustr(sql_gen.FunctionCall(function_name,
132
            *map(sql_gen.to_name_only_col, srcs)))
133
    function = db.TempFunction(function_name)
134
    
135
    # Create function definition
136
    modifiers = 'STRICT'
137
    if not save_errors: modifiers = 'IMMUTABLE '+modifiers
138
    value_param = sql_gen.FunctionParam('value', 'text')
139
    handler = data_exception_handler('RETURN NULL;\n', srcs, errors_table,
140
        value_param.name)
141
    body = sql_gen.CustomCode(handler.to_str(db, '''\
142
/* The explicit cast to the return type is needed to make the cast happen
143
inside the try block. (Implicit casts to the return type happen at the end
144
of the function, outside any block.) */
145
RETURN value::'''+type_+''';
146
'''))
147
    body.lang='plpgsql'
148
    sql.define_func(db, sql_gen.FunctionDef(function, type_, body,
149
        [value_param], modifiers))
150
    
151
    return sql_gen.FunctionCall(function, col)
152

    
153
def func_wrapper_exception_handler(db, return_, args, errors_table):
154
    '''Handles a function call's data_exceptions.
155
    Supports PL/Python functions.
156
    @param return_ See data_exception_handler()
157
    @param args [arg...] Function call's args
158
    @param errors_table See data_exception_handler()
159
    '''
160
    args = filter(sql_gen.has_srcs, args)
161
    
162
    srcs = sql_gen.cross_join_srcs(args)
163
    value = sql_gen.merge_not_null(db, ',', args)
164
    return sql_gen.NestedExcHandler(
165
        data_exception_handler(return_, srcs, errors_table, value)
166
        , sql_gen.plpythonu_error_handler
167
        )
168

    
169
def cast_temp_col(db, type_, col, errors_table=None):
170
    '''Like cast(), but creates a new column with the cast values if the input
171
    is a column.
172
    @return The new column or cast value
173
    '''
174
    def cast_(col): return cast(db, type_, col, errors_table)
175
    
176
    try: col = sql_gen.underlying_col(col)
177
    except sql_gen.NoUnderlyingTableException: return sql_gen.wrap(cast_, col)
178
    
179
    table = col.table
180
    new_col = sql_gen.suffixed_col(col, '::'+strings.first_word(type_))
181
    expr = cast_(col)
182
    
183
    # Add column
184
    new_typed_col = sql_gen.TypedCol(new_col.name, type_)
185
    sql.add_col(db, table, new_typed_col, comment=strings.urepr(col)+'::'+type_)
186
    new_col.name = new_typed_col.name # propagate any renaming
187
    
188
    sql.update(db, table, [(new_col, expr)], in_place=True, recover=True)
189
    
190
    return new_col
191

    
192
def errors_table(db, table, if_exists=True):
193
    '''
194
    @param if_exists If set, returns None if the errors table doesn't exist
195
    @return None|sql_gen.Table
196
    '''
197
    table = sql_gen.as_Table(table)
198
    if table.srcs != (): table = table.srcs[0]
199
    
200
    errors_table = sql_gen.suffixed_table(table, '.errors')
201
    if if_exists and not sql.table_exists(db, errors_table): return None
202
    return errors_table
203

    
204
def mk_errors_table(db, table):
205
    errors_table_ = errors_table(db, table, if_exists=False)
206
    if sql.table_exists(db, errors_table_, cacheable=False): return
207
    
208
    typed_cols = [
209
        sql_gen.TypedCol('column', 'text', nullable=False),
210
        sql_gen.TypedCol('value', 'text'),
211
        sql_gen.TypedCol('error_code', 'character varying(5)', nullable=False),
212
        sql_gen.TypedCol('error', 'text', nullable=False),
213
        ]
214
    sql.create_table(db, errors_table_, typed_cols, has_pkey=False)
215
    index_cols = ['column', 'value', 'error_code', 'error']
216
    sql.add_index(db, index_cols, errors_table_, unique=True)
217

    
218
##### Import
219

    
220
def put(db, table, row, pkey_=None, row_ct_ref=None):
221
    '''Recovers from errors.
222
    Only works under PostgreSQL (uses INSERT RETURNING).
223
    '''
224
    return put_table(db, table, [], row, row_ct_ref)
225

    
226
def get(db, table, row, pkey, row_ct_ref=None, create=False):
227
    '''Recovers from errors'''
228
    try:
229
        return sql.value(sql.select(db, table, [pkey], row, limit=1,
230
            recover=True))
231
    except StopIteration:
232
        if not create: raise
233
        return put(db, table, row, pkey, row_ct_ref) # insert new row
234

    
235
def is_func_result(col):
236
    return col.table.name.find('(') >= 0 and col.name == 'result'
237

    
238
def into_table_name(out_table, in_tables0, mapping, is_func):
239
    def in_col_str(in_col):
240
        in_col = sql_gen.remove_col_rename(in_col)
241
        if isinstance(in_col, sql_gen.Col):
242
            table = in_col.table
243
            if table == in_tables0:
244
                in_col = sql_gen.to_name_only_col(in_col)
245
            elif is_func_result(in_col): in_col = table # omit col name
246
        return strings.ustr(in_col)
247
    
248
    str_ = strings.ustr(out_table)
249
    if is_func:
250
        str_ += '('
251
        
252
        try: value_in_col = mapping['value']
253
        except KeyError:
254
            str_ += ', '.join((strings.ustr(k)+'='+in_col_str(v)
255
                for k, v in mapping.iteritems()))
256
        else: str_ += in_col_str(value_in_col)
257
        
258
        str_ += ')'
259
    else:
260
        out_col = 'rank'
261
        try: in_col = mapping[out_col]
262
        except KeyError: str_ += '_pkeys'
263
        else: # has a rank column, so hierarchical
264
            str_ += '['+strings.ustr(out_col)+'='+in_col_str(in_col)+']'
265
    return str_
266

    
267
def put_table(db, out_table, in_tables, mapping, row_ct_ref=None, default=None,
268
    col_defaults={}, on_error=exc.reraise):
269
    '''Recovers from errors.
270
    Only works under PostgreSQL (uses INSERT RETURNING).
271
    IMPORTANT: Must be run at the *beginning* of a transaction.
272
    @param in_tables The main input table to select from, followed by a list of
273
        tables to join with it using the main input table's pkey
274
    @param mapping dict(out_table_col=in_table_col, ...)
275
        * out_table_col: str (*not* sql_gen.Col)
276
        * in_table_col: sql_gen.Col|literal-value
277
    @param default The *output* column to use as the pkey for missing rows.
278
        If this output column does not exist in the mapping, uses None.
279
    @param col_defaults Default values for required columns.
280
    @return sql_gen.Col Where the output pkeys are made available
281
    '''
282
    import psycopg2.extensions
283
    
284
    out_table = sql_gen.as_Table(out_table)
285
    
286
    def log_debug(msg): db.log_debug(msg, level=1.5)
287
    def col_ustr(str_):
288
        return strings.repr_no_u(sql_gen.remove_col_rename(str_))
289
    
290
    log_debug('********** New iteration **********')
291
    log_debug('Inserting these input columns into '+strings.as_tt(
292
        out_table.to_str(db))+':\n'+strings.as_table(mapping, ustr=col_ustr))
293
    
294
    is_function = sql.function_exists(db, out_table)
295
    
296
    # Warn if inserting empty table rows
297
    if not mapping and not is_function: # functions with no args OK
298
        warnings.warn(UserWarning('Inserting empty table row(s)'))
299
    
300
    if is_function: out_pkey = 'result'
301
    else: out_pkey = sql.pkey(db, out_table, recover=True)
302
    out_pkey_col = sql_gen.as_Col(out_pkey, out_table)
303
    
304
    in_tables_ = in_tables[:] # don't modify input!
305
    try: in_tables0 = in_tables_.pop(0) # first table is separate
306
    except IndexError: in_tables0 = None
307
    else:
308
        in_pkey = sql.pkey(db, in_tables0, recover=True)
309
        in_pkey_col = sql_gen.as_Col(in_pkey, in_tables0)
310
    
311
    # Determine if can use optimization for only literal values
312
    is_literals = not reduce(operator.or_, map(sql_gen.is_table_col,
313
        mapping.values()), False)
314
    is_literals_or_function = is_literals or is_function
315
    
316
    if in_tables0 == None: errors_table_ = None
317
    else: errors_table_ = errors_table(db, in_tables0)
318
    
319
    # Create input joins from list of input tables
320
    input_joins = [in_tables0]+[sql_gen.Join(v,
321
        {in_pkey: sql_gen.join_same_not_null}) for v in in_tables_]
322
    
323
    if mapping == {} and not is_function: # need >= one column for INSERT SELECT
324
        mapping = {out_pkey: None} # ColDict will replace with default value
325
    
326
    if not is_literals:
327
        into = sql_gen.as_Table(into_table_name(out_table, in_tables0, mapping,
328
            is_function))
329
        # Ensure into's out_pkey is different from in_pkey by prepending table
330
        if is_function: into_out_pkey = out_pkey
331
        else: into_out_pkey = strings.ustr(out_pkey_col)
332
        
333
        # Set column sources
334
        in_cols = filter(sql_gen.is_table_col, mapping.values())
335
        for col in in_cols:
336
            if col.table == in_tables0: col.set_srcs(sql_gen.src_self)
337
        
338
        log_debug('Joining together input tables into temp table')
339
        # Place in new table so don't modify input and for speed
340
        in_table = sql_gen.Table('in')
341
        mapping = dicts.join(mapping, sql.flatten(db, in_table, input_joins,
342
            in_cols, preserve=[in_pkey_col]))
343
        input_joins = [in_table]
344
        db.log_debug('Temp table: '+strings.as_tt(in_table.to_str(db)), level=2)
345
    
346
    # Wrap mapping in a sql_gen.ColDict.
347
    # sql_gen.ColDict sanitizes both keys and values passed into it.
348
    # Do after applying dicts.join() because that returns a plain dict.
349
    mapping = sql_gen.ColDict(db, out_table, mapping)
350
    
351
    # Resolve default value column
352
    if default != None:
353
        try: default = mapping[default]
354
        except KeyError:
355
            db.log_debug('Default value column '
356
                +strings.as_tt(strings.repr_no_u(default))
357
                +' does not exist in mapping, falling back to None', level=2.1)
358
            default = None
359
    
360
    # Save default values for all rows since in_table may have rows deleted
361
    if is_literals: pass
362
    elif is_function: full_in_table = in_table
363
    else:
364
        full_in_table = sql_gen.suffixed_table(in_table, '_full')
365
        full_in_table_cols = [in_pkey_col]
366
        if default != None:
367
            full_in_table_cols.append(default)
368
            default = sql_gen.with_table(default, full_in_table)
369
        sql.run_query_into(db, sql.mk_select(db, in_table, full_in_table_cols,
370
            order_by=None), into=full_in_table, add_pkey_=True)
371
    
372
    pkeys_table_exists_ref = [False]
373
    def insert_into_pkeys(joins, cols=None, limit=None, **kw_args):
374
        query = sql.mk_select(db, joins, cols, order_by=None, limit=limit)
375
        if pkeys_table_exists_ref[0]:
376
            sql.insert_select(db, into, [in_pkey, into_out_pkey], query,
377
                **kw_args)
378
        else:
379
            sql.run_query_into(db, query, into=into, add_pkey_=True, **kw_args)
380
            pkeys_table_exists_ref[0] = True
381
    
382
    limit_ref = [None]
383
    def mk_main_select(joins, cols):
384
        return sql.mk_select(db, joins, cols, limit=limit_ref[0], order_by=None)
385
    
386
    if is_literals: insert_in_table = None
387
    else:
388
        insert_in_table = in_table
389
        insert_in_tables = [insert_in_table]
390
    join_cols = sql_gen.ColDict(db, out_table)
391
    
392
    exc_strs = set()
393
    def log_exc(e):
394
        e_str = exc.str_(e, first_line_only=True)
395
        log_debug('Caught exception: '+e_str)
396
        if e_str in exc_strs: # avoid infinite loops
397
            log_debug('Exception already seen, handler broken')
398
            on_error(e)
399
            remove_all_rows()
400
        else: exc_strs.add(e_str)
401
    
402
    def remove_all_rows():
403
        log_debug('Ignoring all rows')
404
        limit_ref[0] = 0 # just create an empty pkeys table
405
    
406
    def ignore_cond(cond, e):
407
        if is_literals: remove_all_rows()
408
        else:
409
            out_table_cols = sql_gen.ColDict(db, out_table)
410
            out_table_cols.update(util.dict_subset_right_join({},
411
                sql.table_cols(db, out_table)))
412
            
413
            in_cols = []
414
            cond = sql.map_expr(db, cond, mapping, in_cols)
415
            cond = sql.map_expr(db, cond, out_table_cols)
416
            
417
            track_data_error(db, errors_table_, sql_gen.cols_srcs(in_cols),
418
                None, e.cause.pgcode,
419
                strings.ensure_newl(e.cause.pgerror)+'condition: '+cond)
420
            
421
            not_cond = sql_gen.NotCond(sql_gen.CustomCode(cond))
422
            log_debug('Ignoring rows where '+strings.as_tt(not_cond.to_str(db)))
423
            sql.delete(db, insert_in_table, not_cond)
424
    
425
    not_null_cols = set()
426
    def ignore(in_col, value, e):
427
        if sql_gen.is_table_col(in_col):
428
            in_col = sql_gen.with_table(in_col, insert_in_table)
429
            
430
            track_data_error(db, errors_table_, in_col.srcs, value,
431
                e.cause.pgcode, e.cause.pgerror)
432
            
433
            sql.add_index(db, in_col, insert_in_table) # enable fast filtering
434
            if value != None and in_col not in not_null_cols:
435
                log_debug('Replacing invalid value '
436
                    +strings.as_tt(strings.urepr(value))+' with NULL in column '
437
                    +strings.as_tt(in_col.to_str(db)))
438
                sql.update(db, insert_in_table, [(in_col, None)],
439
                    sql_gen.ColValueCond(in_col, value))
440
            else:
441
                log_debug('Ignoring rows with '+strings.as_tt(in_col.to_str(db))
442
                    +' = '+strings.as_tt(strings.urepr(value)))
443
                sql.delete(db, insert_in_table,
444
                    sql_gen.ColValueCond(in_col, value))
445
                if value == None: not_null_cols.add(in_col)
446
        else:
447
            assert isinstance(in_col, sql_gen.NamedCol)
448
            in_value = sql_gen.remove_col_rename(in_col)
449
            assert sql_gen.is_literal(in_value)
450
            if value == in_value.value:
451
                if value != None:
452
                    log_debug('Replacing invalid literal '
453
                        +strings.as_tt(in_col.to_str(db))+' with NULL')
454
                    mapping[in_col.name] = None
455
                else:
456
                    remove_all_rows()
457
            # otherwise, all columns were being ignore()d because the specific
458
            # column couldn't be identified, and this was not the invalid column
459
    
460
    if not is_literals:
461
        def insert_pkeys_table(which):
462
            return sql_gen.Table(sql_gen.concat(in_table.name,
463
                '_insert_'+which+'_pkeys'))
464
        insert_out_pkeys = insert_pkeys_table('out')
465
        insert_in_pkeys = insert_pkeys_table('in')
466
    
467
    def mk_func_call():
468
        args = dict(((k.name, v) for k, v in mapping.iteritems()))
469
        return sql_gen.FunctionCall(out_table, **args), args
470
        
471
    if is_function and not is_literals:
472
        log_debug('Defining wrapper function')
473
        
474
        func_call, args = mk_func_call()
475
        func_call = sql_gen.NamedCol(into_out_pkey, func_call)
476
        
477
        # Create empty pkeys table so its row type can be used
478
        insert_into_pkeys(input_joins, [in_pkey_col, func_call], limit=0,
479
            recover=True)
480
        result_type = db.col_info(sql_gen.Col(into_out_pkey, into)).type
481
        
482
        ## Create error handling wrapper function
483
        
484
        wrapper = db.TempFunction(sql_gen.concat(into.name, '_wrap'))
485
        
486
        select_cols = [in_pkey_col]+args.values()
487
        row_var = copy.copy(sql_gen.row_var)
488
        row_var.set_srcs([in_table])
489
        in_pkey_var = sql_gen.Col(in_pkey, row_var)
490
        
491
        args = dict(((k, sql_gen.with_table(v, row_var))
492
            for k, v in args.iteritems()))
493
        func_call = sql_gen.FunctionCall(out_table, **args)
494
        
495
        def mk_return(result):
496
            return sql_gen.ReturnQuery(sql.mk_select(db,
497
                fields=[in_pkey_var, result], explain=False))
498
        exc_handler = func_wrapper_exception_handler(db,
499
            mk_return(sql_gen.Cast(result_type, None)), args.values(),
500
            errors_table_)
501
        
502
        sql.define_func(db, sql_gen.FunctionDef(wrapper, sql_gen.SetOf(into),
503
            sql_gen.RowExcIgnore(sql_gen.RowType(in_table),
504
                sql.mk_select(db, input_joins, order_by=None),
505
                mk_return(func_call), exc_handler=exc_handler)
506
            ))
507
        wrapper_table = sql_gen.FunctionCall(wrapper)
508
    
509
    # Do inserts and selects
510
    while True:
511
        has_joins = join_cols != {}
512
        
513
        # Handle unrecoverable errors in a special case
514
        if limit_ref[0] == 0:
515
            if is_literals or default == None:
516
                default = sql_gen.remove_col_rename(default)
517
                log_debug('Returning default: '
518
                    +strings.as_tt(strings.urepr(default)))
519
                return default
520
            elif is_function: pass # empty pkeys table already created
521
            else:
522
                log_debug('Creating an empty output pkeys table')
523
                has_joins = False # use the no-joins case
524
                cur = sql.run_query_into(db, sql.mk_select(db, out_table,
525
                    [out_pkey], order_by=None, limit=0), into=insert_out_pkeys)
526
            
527
            break # don't do main case
528
        
529
        # Prepare to insert new rows
530
        if is_function:
531
            log_debug('Calling function on input rows')
532
            if is_literals: func_call, args = mk_func_call()
533
        else:
534
            log_debug('Trying to insert new rows')
535
            insert_args = dict(recover=True, cacheable=False)
536
            if has_joins:
537
                insert_args.update(dict(ignore=True))
538
            else:
539
                insert_args.update(dict(returning=out_pkey))
540
                if not is_literals:
541
                    insert_args.update(dict(into=insert_out_pkeys))
542
            main_select = mk_main_select([insert_in_table], [sql_gen.with_table(
543
                c, insert_in_table) for c in mapping.values()])
544
        
545
        try:
546
            cur = None
547
            if is_function:
548
                if is_literals:
549
                    cur = sql.select(db, fields=[func_call], recover=True,
550
                        cacheable=True)
551
                else: insert_into_pkeys(wrapper_table, recover=True)
552
            else:
553
                cur = sql.insert_select(db, out_table, mapping.keys(),
554
                    main_select, **insert_args)
555
            break # insert successful
556
        except sql.MissingCastException, e:
557
            log_exc(e)
558
            
559
            type_ = e.type
560
            if e.col == None: out_cols = mapping.keys()
561
            else: out_cols = [e.col]
562
            
563
            for out_col in out_cols:
564
                log_debug('Casting '+strings.as_tt(strings.repr_no_u(out_col))
565
                    +' input to '+strings.as_tt(type_))
566
                in_col = mapping[out_col]
567
                while True:
568
                    try:
569
                        mapping[out_col] = cast_temp_col(db, type_, in_col,
570
                            errors_table_)
571
                        break # cast successful
572
                    except sql.InvalidValueException, e:
573
                        log_exc(e)
574
                        
575
                        ignore(in_col, e.value, e)
576
        except sql.DuplicateKeyException, e:
577
            log_exc(e)
578
            
579
            # Different rows violating different unique constraints not
580
            # supported
581
            assert not join_cols
582
            
583
            join_cols.update(util.dict_subset_right_join(mapping, e.cols))
584
            log_debug('Ignoring existing rows, comparing on these columns:\n'
585
                +strings.as_inline_table(join_cols, ustr=col_ustr))
586
            
587
            if is_literals:
588
                return sql.value(sql.select(db, out_table, [out_pkey_col],
589
                    join_cols, order_by=None))
590
            
591
            # Uniquify input table to avoid internal duplicate keys
592
            insert_in_table = sql.distinct_table(db, insert_in_table,
593
                join_cols.values())
594
            insert_in_tables.append(insert_in_table)
595
        except sql.NullValueException, e:
596
            log_exc(e)
597
            
598
            out_col, = e.cols
599
            try: in_col = mapping[out_col]
600
            except KeyError, e:
601
                try: in_col = mapping[out_col] = col_defaults[out_col]
602
                except KeyError:
603
                    msg = 'Missing mapping for NOT NULL column '+out_col
604
                    log_debug(msg)
605
                    if default == None: warnings.warn(UserWarning(msg))
606
                        # not an error because sometimes the mappings include
607
                        # extra tables which aren't used by the dataset
608
                    remove_all_rows()
609
            else: ignore(in_col, None, e)
610
        except sql.CheckException, e:
611
            log_exc(e)
612
            
613
            ignore_cond(e.cond, e)
614
        except sql.InvalidValueException, e:
615
            log_exc(e)
616
            
617
            for in_col in mapping.values(): ignore(in_col, e.value, e)
618
        except psycopg2.extensions.TransactionRollbackError, e:
619
            log_exc(e)
620
            # retry
621
        except sql.DatabaseErrors, e:
622
            log_exc(e)
623
            
624
            log_debug('No handler for exception')
625
            on_error(e)
626
            remove_all_rows()
627
        # after exception handled, rerun loop with additional constraints
628
    
629
    if cur != None and row_ct_ref != None and cur.rowcount >= 0:
630
        row_ct_ref[0] += cur.rowcount
631
    
632
    if is_literals: return sql.value(cur)
633
    
634
    if is_function: pass # pkeys table already created
635
    elif has_joins:
636
        select_joins = input_joins+[sql_gen.Join(out_table, join_cols)]
637
        log_debug('Getting output table pkeys of existing/inserted rows')
638
        insert_into_pkeys(select_joins, [in_pkey_col,
639
            sql_gen.NamedCol(into_out_pkey, out_pkey_col)])
640
    else:
641
        sql.add_row_num(db, insert_out_pkeys) # for joining with input pkeys
642
        
643
        log_debug('Getting input table pkeys of inserted rows')
644
        # Note that mk_main_select() does not use ORDER BY. Instead, assume that
645
        # since the SELECT query is identical to the one used in INSERT SELECT,
646
        # its rows will be retrieved in the same order.
647
        sql.run_query_into(db, mk_main_select(input_joins, [in_pkey]),
648
            into=insert_in_pkeys)
649
        sql.add_row_num(db, insert_in_pkeys) # for joining with output pkeys
650
        
651
        assert sql.table_row_count(db, insert_out_pkeys) == sql.table_row_count(
652
            db, insert_in_pkeys)
653
        
654
        log_debug('Combining output and input pkeys in inserted order')
655
        pkey_joins = [insert_in_pkeys, sql_gen.Join(insert_out_pkeys,
656
            {sql.row_num_col: sql_gen.join_same_not_null})]
657
        in_col = sql_gen.Col(in_pkey, insert_in_pkeys)
658
        out_col = sql_gen.NamedCol(into_out_pkey,
659
            sql_gen.Col(out_pkey, insert_out_pkeys))
660
        insert_into_pkeys(pkey_joins, [in_col, out_col])
661
        
662
        sql.empty_temp(db, [insert_out_pkeys, insert_in_pkeys])
663
    
664
    if limit_ref[0] == 0 or not is_function: # is_function doesn't leave holes
665
        log_debug('Setting pkeys of missing rows to '
666
            +strings.as_tt(strings.urepr(default)))
667
        missing_rows_joins = [full_in_table, sql_gen.Join(into,
668
            {in_pkey: sql_gen.join_same_not_null}, sql_gen.filter_out)]
669
            # must use join_same_not_null or query will take forever
670
        insert_into_pkeys(missing_rows_joins,
671
            [sql_gen.Col(in_pkey, full_in_table),
672
            sql_gen.NamedCol(into_out_pkey, default)])
673
    # otherwise, there is already an entry for every row
674
    
675
    assert (sql.table_row_count(db, into)
676
        == sql.table_row_count(db, full_in_table))
677
    
678
    sql.empty_temp(db, insert_in_tables+[full_in_table])
679
    
680
    srcs = []
681
    if is_function: srcs = sql_gen.cols_srcs(in_cols)
682
    return sql_gen.Col(into_out_pkey, into, srcs)
(29-29/41)