Project

General

Profile

1 3077 aaronmk
# Database import/export
2
3 3534 aaronmk
import copy
4 3431 aaronmk
import operator
5 3714 aaronmk
import warnings
6 3431 aaronmk
7 3077 aaronmk
import exc
8
import dicts
9
import sql
10
import sql_gen
11
import strings
12
import util
13
14 3645 aaronmk
##### Exceptions
15
16
# Can't use built-in SyntaxError because it stringifies to only the first line
17
class SyntaxError(Exception): pass
18
19 3081 aaronmk
##### Data cleanup
20
21 4915 aaronmk
null_strs = ['', '-', r'\N', 'NULL', 'UNKNOWN', 'nulo']
22 4209 aaronmk
23 4447 aaronmk
def cleanup_table(db, table):
24 3081 aaronmk
    table = sql_gen.as_Table(table)
25 4914 aaronmk
    cols = [sql_gen.as_Col(strings.ustr(c), table)
26
        for c in sql.table_cols(db, table)]
27 4407 aaronmk
    cols = filter(lambda c: sql_gen.is_text_col(db, c), cols)
28 4928 aaronmk
    if not cols: return
29 3081 aaronmk
30 4993 aaronmk
    db.log_debug('Cleaning up table', level=1.5)
31
32 4209 aaronmk
    expr = 'trim(both from %s)'
33
    for null in null_strs: expr = 'nullif('+expr+', '+db.esc_value(null)+')'
34
    changes = [(v, sql_gen.CustomCode(expr % v.to_str(db))) for v in cols]
35 3081 aaronmk
36 4444 aaronmk
    while True:
37
        try:
38
            sql.update(db, table, changes, in_place=True, recover=True)
39
            break # successful
40
        except sql.NullValueException, e:
41 4457 aaronmk
            db.log_debug('Caught exception: '+exc.str_(e))
42 4444 aaronmk
            col, = e.cols
43
            sql.drop_not_null(db, col)
44 4992 aaronmk
45
    db.log_debug('Vacuuming and reanalyzing table', level=1.5)
46
    sql.vacuum(db, table)
47 3081 aaronmk
48 3078 aaronmk
##### Error tracking
49
50
def track_data_error(db, errors_table, cols, value, error_code, error):
51
    '''
52
    @param errors_table If None, does nothing.
53
    '''
54
    if errors_table == None or cols == (): return
55
56
    for col in cols:
57
        try:
58
            sql.insert(db, errors_table, dict(column=col.name, value=value,
59
                error_code=error_code, error=error), recover=True,
60
                cacheable=True, log_level=4)
61
        except sql.DuplicateKeyException: pass
62
63 3506 aaronmk
class ExcToErrorsTable(sql_gen.ExcToWarning):
64
    '''Handles an exception by saving it or converting it to a warning.'''
65 3511 aaronmk
    def __init__(self, return_, srcs, errors_table, value=None):
66 3506 aaronmk
        '''
67
        @param return_ See sql_gen.ExcToWarning
68
        @param srcs The column names for the errors table
69
        @param errors_table None|sql_gen.Table
70 3511 aaronmk
        @param value The value (or an expression for it) that caused the error
71 3506 aaronmk
        @pre The invalid value must be in a local variable "value" of type text.
72
        '''
73
        sql_gen.ExcToWarning.__init__(self, return_)
74
75 3511 aaronmk
        value = sql_gen.as_Code(value)
76
77 3506 aaronmk
        self.srcs = srcs
78
        self.errors_table = errors_table
79 3511 aaronmk
        self.value = value
80 3501 aaronmk
81 3506 aaronmk
    def to_str(self, db):
82
        if not self.srcs or self.errors_table == None:
83
            return sql_gen.ExcToWarning.to_str(self, db)
84
85 3459 aaronmk
        errors_table_cols = map(sql_gen.Col,
86
            ['column', 'value', 'error_code', 'error'])
87 3465 aaronmk
        col_names_query = sql.mk_select(db, sql_gen.NamedValues('c', None,
88 3506 aaronmk
            [[c.name] for c in self.srcs]), order_by=None)
89
        insert_query = sql.mk_insert_select(db, self.errors_table,
90
            errors_table_cols,
91 3465 aaronmk
            sql_gen.Values(errors_table_cols).to_str(db))+';\n'
92 3506 aaronmk
        return '''\
93 3459 aaronmk
-- Save error in errors table.
94
DECLARE
95
    error_code text := SQLSTATE;
96
    error text := SQLERRM;
97 3511 aaronmk
    value text := '''+self.value.to_str(db)+''';
98 3529 aaronmk
    "column" text;
99 3459 aaronmk
BEGIN
100
    -- Insert the value and error for *each* source column.
101 3529 aaronmk
'''+strings.indent(sql_gen.RowExcIgnore(None, col_names_query, insert_query,
102 3467 aaronmk
    row_var=errors_table_cols[0]).to_str(db))+'''
103 3459 aaronmk
END;
104 3501 aaronmk
105 3506 aaronmk
'''+self.return_.to_str(db)
106 3459 aaronmk
107 3507 aaronmk
def data_exception_handler(*args, **kw_args):
108 3506 aaronmk
    '''Handles a data_exception by saving it or converting it to a warning.
109
    For params, see ExcToErrorsTable().
110
    '''
111
    return sql_gen.data_exception_handler(ExcToErrorsTable(*args, **kw_args))
112
113 3078 aaronmk
def cast(db, type_, col, errors_table=None):
114
    '''Casts an (unrenamed) column or value.
115
    If errors_table set and col has srcs, saves errors in errors_table (using
116 3360 aaronmk
    col's srcs attr as source columns). Otherwise, converts errors to warnings.
117 3078 aaronmk
    @param col str|sql_gen.Col|sql_gen.Literal
118
    @param errors_table None|sql_gen.Table|str
119
    '''
120
    col = sql_gen.as_Col(col)
121
122 3112 aaronmk
    # Don't convert exceptions to warnings for user-supplied constants
123
    if isinstance(col, sql_gen.Literal): return sql_gen.Cast(type_, col)
124
125 3078 aaronmk
    assert not isinstance(col, sql_gen.NamedCol)
126
127 3460 aaronmk
    function_name = strings.first_word(type_)
128 3459 aaronmk
    srcs = col.srcs
129 3508 aaronmk
    save_errors = errors_table != None and srcs
130
    if save_errors: # function will be unique for the given srcs
131 3750 aaronmk
        function_name = strings.ustr(sql_gen.FunctionCall(function_name,
132 3508 aaronmk
            *map(sql_gen.to_name_only_col, srcs)))
133 3078 aaronmk
    function = db.TempFunction(function_name)
134
135 3464 aaronmk
    # Create function definition
136
    modifiers = 'STRICT'
137
    if not save_errors: modifiers = 'IMMUTABLE '+modifiers
138 3511 aaronmk
    value_param = sql_gen.FunctionParam('value', 'text')
139
    handler = data_exception_handler('RETURN NULL;\n', srcs, errors_table,
140
        value_param.name)
141 3464 aaronmk
    body = sql_gen.CustomCode(handler.to_str(db, '''\
142 3467 aaronmk
/* The explicit cast to the return type is needed to make the cast happen
143
inside the try block. (Implicit casts to the return type happen at the end
144
of the function, outside any block.) */
145
RETURN value::'''+type_+''';
146 3464 aaronmk
'''))
147
    body.lang='plpgsql'
148 3500 aaronmk
    sql.define_func(db, sql_gen.FunctionDef(function, type_, body,
149 3511 aaronmk
        [value_param], modifiers))
150 3464 aaronmk
151 3078 aaronmk
    return sql_gen.FunctionCall(function, col)
152
153 3538 aaronmk
def func_wrapper_exception_handler(db, return_, args, errors_table):
154 3524 aaronmk
    '''Handles a function call's data_exceptions.
155
    Supports PL/Python functions.
156
    @param return_ See data_exception_handler()
157
    @param args [arg...] Function call's args
158
    @param errors_table See data_exception_handler()
159
    '''
160
    args = filter(sql_gen.has_srcs, args)
161
162
    srcs = sql_gen.cross_join_srcs(args)
163 3538 aaronmk
    value = sql_gen.merge_not_null(db, ',', args)
164 3524 aaronmk
    return sql_gen.NestedExcHandler(
165
        data_exception_handler(return_, srcs, errors_table, value)
166
        , sql_gen.plpythonu_error_handler
167
        )
168
169 3078 aaronmk
def cast_temp_col(db, type_, col, errors_table=None):
170
    '''Like cast(), but creates a new column with the cast values if the input
171
    is a column.
172
    @return The new column or cast value
173
    '''
174
    def cast_(col): return cast(db, type_, col, errors_table)
175
176
    try: col = sql_gen.underlying_col(col)
177
    except sql_gen.NoUnderlyingTableException: return sql_gen.wrap(cast_, col)
178
179
    table = col.table
180 3173 aaronmk
    new_col = sql_gen.suffixed_col(col, '::'+strings.first_word(type_))
181 3078 aaronmk
    expr = cast_(col)
182
183
    # Add column
184
    new_typed_col = sql_gen.TypedCol(new_col.name, type_)
185 3750 aaronmk
    sql.add_col(db, table, new_typed_col, comment=strings.urepr(col)+'::'+type_)
186 3078 aaronmk
    new_col.name = new_typed_col.name # propagate any renaming
187
188 3110 aaronmk
    sql.update(db, table, [(new_col, expr)], in_place=True, recover=True)
189 3078 aaronmk
190
    return new_col
191
192
def errors_table(db, table, if_exists=True):
193
    '''
194
    @param if_exists If set, returns None if the errors table doesn't exist
195
    @return None|sql_gen.Table
196
    '''
197
    table = sql_gen.as_Table(table)
198
    if table.srcs != (): table = table.srcs[0]
199
200
    errors_table = sql_gen.suffixed_table(table, '.errors')
201
    if if_exists and not sql.table_exists(db, errors_table): return None
202
    return errors_table
203
204 4436 aaronmk
def mk_errors_table(db, table):
205
    errors_table_ = errors_table(db, table, if_exists=False)
206 4557 aaronmk
    if sql.table_exists(db, errors_table_, cacheable=False): return
207 4436 aaronmk
208
    typed_cols = [
209
        sql_gen.TypedCol('column', 'text', nullable=False),
210
        sql_gen.TypedCol('value', 'text'),
211
        sql_gen.TypedCol('error_code', 'character varying(5)', nullable=False),
212
        sql_gen.TypedCol('error', 'text', nullable=False),
213
        ]
214
    sql.create_table(db, errors_table_, typed_cols, has_pkey=False)
215
    index_cols = ['column', 'value', 'error_code', 'error']
216
    sql.add_index(db, index_cols, errors_table_, unique=True)
217
218 3078 aaronmk
##### Import
219
220 3077 aaronmk
def put(db, table, row, pkey_=None, row_ct_ref=None):
221
    '''Recovers from errors.
222
    Only works under PostgreSQL (uses INSERT RETURNING).
223
    '''
224 3633 aaronmk
    return put_table(db, table, [], row, row_ct_ref)
225 3077 aaronmk
226
def get(db, table, row, pkey, row_ct_ref=None, create=False):
227
    '''Recovers from errors'''
228
    try:
229
        return sql.value(sql.select(db, table, [pkey], row, limit=1,
230
            recover=True))
231
    except StopIteration:
232
        if not create: raise
233
        return put(db, table, row, pkey, row_ct_ref) # insert new row
234
235
def is_func_result(col):
236
    return col.table.name.find('(') >= 0 and col.name == 'result'
237
238
def into_table_name(out_table, in_tables0, mapping, is_func):
239
    def in_col_str(in_col):
240
        in_col = sql_gen.remove_col_rename(in_col)
241
        if isinstance(in_col, sql_gen.Col):
242
            table = in_col.table
243
            if table == in_tables0:
244
                in_col = sql_gen.to_name_only_col(in_col)
245
            elif is_func_result(in_col): in_col = table # omit col name
246 3750 aaronmk
        return strings.ustr(in_col)
247 3077 aaronmk
248 4491 aaronmk
    str_ = strings.ustr(out_table)
249 3077 aaronmk
    if is_func:
250
        str_ += '('
251
252
        try: value_in_col = mapping['value']
253
        except KeyError:
254 4491 aaronmk
            str_ += ', '.join((strings.ustr(k)+'='+in_col_str(v)
255 3077 aaronmk
                for k, v in mapping.iteritems()))
256
        else: str_ += in_col_str(value_in_col)
257
258
        str_ += ')'
259
    else:
260
        out_col = 'rank'
261
        try: in_col = mapping[out_col]
262
        except KeyError: str_ += '_pkeys'
263
        else: # has a rank column, so hierarchical
264 4491 aaronmk
            str_ += '['+strings.ustr(out_col)+'='+in_col_str(in_col)+']'
265 3077 aaronmk
    return str_
266
267 3628 aaronmk
def put_table(db, out_table, in_tables, mapping, row_ct_ref=None, default=None,
268 3660 aaronmk
    col_defaults={}, on_error=exc.reraise):
269 3077 aaronmk
    '''Recovers from errors.
270
    Only works under PostgreSQL (uses INSERT RETURNING).
271
    IMPORTANT: Must be run at the *beginning* of a transaction.
272
    @param in_tables The main input table to select from, followed by a list of
273
        tables to join with it using the main input table's pkey
274
    @param mapping dict(out_table_col=in_table_col, ...)
275
        * out_table_col: str (*not* sql_gen.Col)
276
        * in_table_col: sql_gen.Col|literal-value
277
    @param default The *output* column to use as the pkey for missing rows.
278
        If this output column does not exist in the mapping, uses None.
279 3618 aaronmk
    @param col_defaults Default values for required columns.
280 3077 aaronmk
    @return sql_gen.Col Where the output pkeys are made available
281
    '''
282 3474 aaronmk
    import psycopg2.extensions
283
284 3077 aaronmk
    out_table = sql_gen.as_Table(out_table)
285
286
    def log_debug(msg): db.log_debug(msg, level=1.5)
287
    def col_ustr(str_):
288
        return strings.repr_no_u(sql_gen.remove_col_rename(str_))
289
290
    log_debug('********** New iteration **********')
291
    log_debug('Inserting these input columns into '+strings.as_tt(
292
        out_table.to_str(db))+':\n'+strings.as_table(mapping, ustr=col_ustr))
293
294
    is_function = sql.function_exists(db, out_table)
295
296 4984 aaronmk
    # Warn if inserting empty table rows
297
    if not mapping and not is_function: # functions with no args OK
298
        warnings.warn(UserWarning('Inserting empty table row(s)'))
299
300 3077 aaronmk
    if is_function: out_pkey = 'result'
301
    else: out_pkey = sql.pkey(db, out_table, recover=True)
302
    out_pkey_col = sql_gen.as_Col(out_pkey, out_table)
303
304
    in_tables_ = in_tables[:] # don't modify input!
305 3432 aaronmk
    try: in_tables0 = in_tables_.pop(0) # first table is separate
306
    except IndexError: in_tables0 = None
307
    else:
308
        in_pkey = sql.pkey(db, in_tables0, recover=True)
309
        in_pkey_col = sql_gen.as_Col(in_pkey, in_tables0)
310 3431 aaronmk
311
    # Determine if can use optimization for only literal values
312
    is_literals = not reduce(operator.or_, map(sql_gen.is_table_col,
313 3434 aaronmk
        mapping.values()), False)
314 3431 aaronmk
    is_literals_or_function = is_literals or is_function
315
316 3432 aaronmk
    if in_tables0 == None: errors_table_ = None
317
    else: errors_table_ = errors_table(db, in_tables0)
318 3431 aaronmk
319
    # Create input joins from list of input tables
320 3077 aaronmk
    input_joins = [in_tables0]+[sql_gen.Join(v,
321
        {in_pkey: sql_gen.join_same_not_null}) for v in in_tables_]
322
323 3433 aaronmk
    if mapping == {} and not is_function: # need >= one column for INSERT SELECT
324
        mapping = {out_pkey: None} # ColDict will replace with default value
325
326 3431 aaronmk
    if not is_literals:
327 3628 aaronmk
        into = sql_gen.as_Table(into_table_name(out_table, in_tables0, mapping,
328
            is_function))
329 4484 aaronmk
        # Ensure into's out_pkey is different from in_pkey by prepending table
330 4495 aaronmk
        if is_function: into_out_pkey = out_pkey
331
        else: into_out_pkey = strings.ustr(out_pkey_col)
332 3431 aaronmk
333
        # Set column sources
334
        in_cols = filter(sql_gen.is_table_col, mapping.values())
335
        for col in in_cols:
336
            if col.table == in_tables0: col.set_srcs(sql_gen.src_self)
337
338
        log_debug('Joining together input tables into temp table')
339
        # Place in new table so don't modify input and for speed
340
        in_table = sql_gen.Table('in')
341
        mapping = dicts.join(mapping, sql.flatten(db, in_table, input_joins,
342
            in_cols, preserve=[in_pkey_col]))
343
        input_joins = [in_table]
344
        db.log_debug('Temp table: '+strings.as_tt(in_table.to_str(db)), level=2)
345 3077 aaronmk
346 3692 aaronmk
    # Wrap mapping in a sql_gen.ColDict.
347
    # sql_gen.ColDict sanitizes both keys and values passed into it.
348
    # Do after applying dicts.join() because that returns a plain dict.
349 3077 aaronmk
    mapping = sql_gen.ColDict(db, out_table, mapping)
350
351
    # Resolve default value column
352
    if default != None:
353
        try: default = mapping[default]
354
        except KeyError:
355
            db.log_debug('Default value column '
356
                +strings.as_tt(strings.repr_no_u(default))
357
                +' does not exist in mapping, falling back to None', level=2.1)
358
            default = None
359
360 3287 aaronmk
    # Save default values for all rows since in_table may have rows deleted
361 3431 aaronmk
    if is_literals: pass
362
    elif is_function: full_in_table = in_table
363 3386 aaronmk
    else:
364
        full_in_table = sql_gen.suffixed_table(in_table, '_full')
365
        full_in_table_cols = [in_pkey_col]
366
        if default != None:
367
            full_in_table_cols.append(default)
368
            default = sql_gen.with_table(default, full_in_table)
369
        sql.run_query_into(db, sql.mk_select(db, in_table, full_in_table_cols,
370
            order_by=None), into=full_in_table, add_pkey_=True)
371 3287 aaronmk
372 3077 aaronmk
    pkeys_table_exists_ref = [False]
373 3499 aaronmk
    def insert_into_pkeys(joins, cols=None, limit=None, **kw_args):
374 3477 aaronmk
        query = sql.mk_select(db, joins, cols, order_by=None, limit=limit)
375 3077 aaronmk
        if pkeys_table_exists_ref[0]:
376 4484 aaronmk
            sql.insert_select(db, into, [in_pkey, into_out_pkey], query,
377
                **kw_args)
378 3077 aaronmk
        else:
379 3304 aaronmk
            sql.run_query_into(db, query, into=into, add_pkey_=True, **kw_args)
380 3077 aaronmk
            pkeys_table_exists_ref[0] = True
381
382
    limit_ref = [None]
383 3418 aaronmk
    def mk_main_select(joins, cols):
384
        return sql.mk_select(db, joins, cols, limit=limit_ref[0], order_by=None)
385
386 3431 aaronmk
    if is_literals: insert_in_table = None
387
    else:
388
        insert_in_table = in_table
389
        insert_in_tables = [insert_in_table]
390 3352 aaronmk
    join_cols = sql_gen.ColDict(db, out_table)
391 3077 aaronmk
392
    exc_strs = set()
393
    def log_exc(e):
394
        e_str = exc.str_(e, first_line_only=True)
395
        log_debug('Caught exception: '+e_str)
396 3552 aaronmk
        if e_str in exc_strs: # avoid infinite loops
397
            log_debug('Exception already seen, handler broken')
398
            on_error(e)
399
            remove_all_rows()
400
        else: exc_strs.add(e_str)
401 3077 aaronmk
402
    def remove_all_rows():
403
        log_debug('Ignoring all rows')
404
        limit_ref[0] = 0 # just create an empty pkeys table
405
406 3352 aaronmk
    def ignore_cond(cond, e):
407 3704 aaronmk
        if is_literals: remove_all_rows()
408
        else:
409
            out_table_cols = sql_gen.ColDict(db, out_table)
410
            out_table_cols.update(util.dict_subset_right_join({},
411
                sql.table_cols(db, out_table)))
412
413
            in_cols = []
414
            cond = sql.map_expr(db, cond, mapping, in_cols)
415
            cond = sql.map_expr(db, cond, out_table_cols)
416
417
            track_data_error(db, errors_table_, sql_gen.cols_srcs(in_cols),
418
                None, e.cause.pgcode,
419
                strings.ensure_newl(e.cause.pgerror)+'condition: '+cond)
420
421
            not_cond = sql_gen.NotCond(sql_gen.CustomCode(cond))
422
            log_debug('Ignoring rows where '+strings.as_tt(not_cond.to_str(db)))
423
            sql.delete(db, insert_in_table, not_cond)
424 3352 aaronmk
425 3294 aaronmk
    not_null_cols = set()
426 3077 aaronmk
    def ignore(in_col, value, e):
427 3630 aaronmk
        if sql_gen.is_table_col(in_col):
428
            in_col = sql_gen.with_table(in_col, insert_in_table)
429
430
            track_data_error(db, errors_table_, in_col.srcs, value,
431
                e.cause.pgcode, e.cause.pgerror)
432
433
            sql.add_index(db, in_col, insert_in_table) # enable fast filtering
434
            if value != None and in_col not in not_null_cols:
435 4492 aaronmk
                log_debug('Replacing invalid value '
436
                    +strings.as_tt(strings.urepr(value))+' with NULL in column '
437
                    +strings.as_tt(in_col.to_str(db)))
438 3630 aaronmk
                sql.update(db, insert_in_table, [(in_col, None)],
439
                    sql_gen.ColValueCond(in_col, value))
440
            else:
441 3637 aaronmk
                log_debug('Ignoring rows with '+strings.as_tt(in_col.to_str(db))
442 4492 aaronmk
                    +' = '+strings.as_tt(strings.urepr(value)))
443 3630 aaronmk
                sql.delete(db, insert_in_table,
444
                    sql_gen.ColValueCond(in_col, value))
445
                if value == None: not_null_cols.add(in_col)
446 3293 aaronmk
        else:
447 3630 aaronmk
            assert isinstance(in_col, sql_gen.NamedCol)
448 3684 aaronmk
            in_value = sql_gen.remove_col_rename(in_col)
449
            assert sql_gen.is_literal(in_value)
450
            if value == in_value.value:
451
                if value != None:
452
                    log_debug('Replacing invalid literal '
453
                        +strings.as_tt(in_col.to_str(db))+' with NULL')
454
                    mapping[in_col.name] = None
455
                else:
456
                    remove_all_rows()
457
            # otherwise, all columns were being ignore()d because the specific
458
            # column couldn't be identified, and this was not the invalid column
459 3077 aaronmk
460 3431 aaronmk
    if not is_literals:
461
        def insert_pkeys_table(which):
462
            return sql_gen.Table(sql_gen.concat(in_table.name,
463
                '_insert_'+which+'_pkeys'))
464
        insert_out_pkeys = insert_pkeys_table('out')
465
        insert_in_pkeys = insert_pkeys_table('in')
466 3077 aaronmk
467 3918 aaronmk
    def mk_func_call():
468 3550 aaronmk
        args = dict(((k.name, v) for k, v in mapping.iteritems()))
469 4484 aaronmk
        return sql_gen.FunctionCall(out_table, **args), args
470 3550 aaronmk
471 3918 aaronmk
    if is_function and not is_literals:
472
        log_debug('Defining wrapper function')
473
474
        func_call, args = mk_func_call()
475 4484 aaronmk
        func_call = sql_gen.NamedCol(into_out_pkey, func_call)
476 3918 aaronmk
477
        # Create empty pkeys table so its row type can be used
478
        insert_into_pkeys(input_joins, [in_pkey_col, func_call], limit=0,
479
            recover=True)
480 4484 aaronmk
        result_type = db.col_info(sql_gen.Col(into_out_pkey, into)).type
481 3918 aaronmk
482
        ## Create error handling wrapper function
483
484
        wrapper = db.TempFunction(sql_gen.concat(into.name, '_wrap'))
485
486
        select_cols = [in_pkey_col]+args.values()
487
        row_var = copy.copy(sql_gen.row_var)
488
        row_var.set_srcs([in_table])
489
        in_pkey_var = sql_gen.Col(in_pkey, row_var)
490
491
        args = dict(((k, sql_gen.with_table(v, row_var))
492
            for k, v in args.iteritems()))
493
        func_call = sql_gen.FunctionCall(out_table, **args)
494
495
        def mk_return(result):
496
            return sql_gen.ReturnQuery(sql.mk_select(db,
497
                fields=[in_pkey_var, result], explain=False))
498
        exc_handler = func_wrapper_exception_handler(db,
499
            mk_return(sql_gen.Cast(result_type, None)), args.values(),
500
            errors_table_)
501
502
        sql.define_func(db, sql_gen.FunctionDef(wrapper, sql_gen.SetOf(into),
503
            sql_gen.RowExcIgnore(sql_gen.RowType(in_table),
504
                sql.mk_select(db, input_joins, order_by=None),
505
                mk_return(func_call), exc_handler=exc_handler)
506
            ))
507
        wrapper_table = sql_gen.FunctionCall(wrapper)
508 3550 aaronmk
509 3077 aaronmk
    # Do inserts and selects
510
    while True:
511 3473 aaronmk
        has_joins = join_cols != {}
512
513 3551 aaronmk
        # Handle unrecoverable errors in a special case
514
        if limit_ref[0] == 0:
515 4207 aaronmk
            if is_literals or default == None:
516 3686 aaronmk
                default = sql_gen.remove_col_rename(default)
517 4492 aaronmk
                log_debug('Returning default: '
518
                    +strings.as_tt(strings.urepr(default)))
519 3623 aaronmk
                return default
520 3551 aaronmk
            elif is_function: pass # empty pkeys table already created
521
            else:
522
                log_debug('Creating an empty output pkeys table')
523 3745 aaronmk
                has_joins = False # use the no-joins case
524 3551 aaronmk
                cur = sql.run_query_into(db, sql.mk_select(db, out_table,
525
                    [out_pkey], order_by=None, limit=0), into=insert_out_pkeys)
526
527 3077 aaronmk
            break # don't do main case
528
529
        # Prepare to insert new rows
530 3918 aaronmk
        if is_function:
531
            log_debug('Calling function on input rows')
532
            if is_literals: func_call, args = mk_func_call()
533 3077 aaronmk
        else:
534 3550 aaronmk
            log_debug('Trying to insert new rows')
535 3291 aaronmk
            insert_args = dict(recover=True, cacheable=False)
536
            if has_joins:
537
                insert_args.update(dict(ignore=True))
538
            else:
539 3431 aaronmk
                insert_args.update(dict(returning=out_pkey))
540
                if not is_literals:
541
                    insert_args.update(dict(into=insert_out_pkeys))
542 3291 aaronmk
            main_select = mk_main_select([insert_in_table], [sql_gen.with_table(
543
                c, insert_in_table) for c in mapping.values()])
544 3077 aaronmk
545 3292 aaronmk
        try:
546
            cur = None
547 3077 aaronmk
            if is_function:
548 3917 aaronmk
                if is_literals:
549
                    cur = sql.select(db, fields=[func_call], recover=True,
550
                        cacheable=True)
551 3499 aaronmk
                else: insert_into_pkeys(wrapper_table, recover=True)
552 3077 aaronmk
            else:
553 3292 aaronmk
                cur = sql.insert_select(db, out_table, mapping.keys(),
554 3077 aaronmk
                    main_select, **insert_args)
555
            break # insert successful
556
        except sql.MissingCastException, e:
557
            log_exc(e)
558
559
            type_ = e.type
560 4140 aaronmk
            if e.col == None: out_cols = mapping.keys()
561
            else: out_cols = [e.col]
562 3077 aaronmk
563 4140 aaronmk
            for out_col in out_cols:
564 4171 aaronmk
                log_debug('Casting '+strings.as_tt(strings.repr_no_u(out_col))
565 4140 aaronmk
                    +' input to '+strings.as_tt(type_))
566
                in_col = mapping[out_col]
567
                while True:
568
                    try:
569
                        mapping[out_col] = cast_temp_col(db, type_, in_col,
570
                            errors_table_)
571
                        break # cast successful
572
                    except sql.InvalidValueException, e:
573
                        log_exc(e)
574
575
                        ignore(in_col, e.value, e)
576 3077 aaronmk
        except sql.DuplicateKeyException, e:
577
            log_exc(e)
578
579 3274 aaronmk
            # Different rows violating different unique constraints not
580
            # supported
581
            assert not join_cols
582
583 3077 aaronmk
            join_cols.update(util.dict_subset_right_join(mapping, e.cols))
584
            log_debug('Ignoring existing rows, comparing on these columns:\n'
585
                +strings.as_inline_table(join_cols, ustr=col_ustr))
586 3102 aaronmk
587 3431 aaronmk
            if is_literals:
588
                return sql.value(sql.select(db, out_table, [out_pkey_col],
589 4025 aaronmk
                    join_cols, order_by=None))
590 3431 aaronmk
591 3102 aaronmk
            # Uniquify input table to avoid internal duplicate keys
592
            insert_in_table = sql.distinct_table(db, insert_in_table,
593 3358 aaronmk
                join_cols.values())
594 3144 aaronmk
            insert_in_tables.append(insert_in_table)
595 3077 aaronmk
        except sql.NullValueException, e:
596
            log_exc(e)
597
598
            out_col, = e.cols
599
            try: in_col = mapping[out_col]
600 3618 aaronmk
            except KeyError, e:
601
                try: in_col = mapping[out_col] = col_defaults[out_col]
602
                except KeyError:
603
                    msg = 'Missing mapping for NOT NULL column '+out_col
604
                    log_debug(msg)
605 3713 aaronmk
                    if default == None: warnings.warn(UserWarning(msg))
606
                        # not an error because sometimes the mappings include
607
                        # extra tables which aren't used by the dataset
608 3618 aaronmk
                    remove_all_rows()
609 3294 aaronmk
            else: ignore(in_col, None, e)
610 3352 aaronmk
        except sql.CheckException, e:
611
            log_exc(e)
612
613
            ignore_cond(e.cond, e)
614 3413 aaronmk
        except sql.InvalidValueException, e:
615
            log_exc(e)
616
617
            for in_col in mapping.values(): ignore(in_col, e.value, e)
618 3474 aaronmk
        except psycopg2.extensions.TransactionRollbackError, e:
619
            log_exc(e)
620
            # retry
621 3077 aaronmk
        except sql.DatabaseErrors, e:
622
            log_exc(e)
623
624
            log_debug('No handler for exception')
625
            on_error(e)
626
            remove_all_rows()
627
        # after exception handled, rerun loop with additional constraints
628
629
    if cur != None and row_ct_ref != None and cur.rowcount >= 0:
630
        row_ct_ref[0] += cur.rowcount
631
632 3530 aaronmk
    if is_literals: return sql.value(cur)
633
634
    if is_function: pass # pkeys table already created
635 3077 aaronmk
    elif has_joins:
636
        select_joins = input_joins+[sql_gen.Join(out_table, join_cols)]
637
        log_debug('Getting output table pkeys of existing/inserted rows')
638 4484 aaronmk
        insert_into_pkeys(select_joins, [in_pkey_col,
639
            sql_gen.NamedCol(into_out_pkey, out_pkey_col)])
640 3077 aaronmk
    else:
641
        sql.add_row_num(db, insert_out_pkeys) # for joining with input pkeys
642
643
        log_debug('Getting input table pkeys of inserted rows')
644 3285 aaronmk
        # Note that mk_main_select() does not use ORDER BY. Instead, assume that
645
        # since the SELECT query is identical to the one used in INSERT SELECT,
646
        # its rows will be retrieved in the same order.
647 3077 aaronmk
        sql.run_query_into(db, mk_main_select(input_joins, [in_pkey]),
648
            into=insert_in_pkeys)
649
        sql.add_row_num(db, insert_in_pkeys) # for joining with output pkeys
650
651
        assert sql.table_row_count(db, insert_out_pkeys) == sql.table_row_count(
652
            db, insert_in_pkeys)
653
654
        log_debug('Combining output and input pkeys in inserted order')
655
        pkey_joins = [insert_in_pkeys, sql_gen.Join(insert_out_pkeys,
656
            {sql.row_num_col: sql_gen.join_same_not_null})]
657 4484 aaronmk
        in_col = sql_gen.Col(in_pkey, insert_in_pkeys)
658
        out_col = sql_gen.NamedCol(into_out_pkey,
659
            sql_gen.Col(out_pkey, insert_out_pkeys))
660
        insert_into_pkeys(pkey_joins, [in_col, out_col])
661 3077 aaronmk
662
        sql.empty_temp(db, [insert_out_pkeys, insert_in_pkeys])
663
664 3531 aaronmk
    if limit_ref[0] == 0 or not is_function: # is_function doesn't leave holes
665 3187 aaronmk
        log_debug('Setting pkeys of missing rows to '
666 4492 aaronmk
            +strings.as_tt(strings.urepr(default)))
667 3287 aaronmk
        missing_rows_joins = [full_in_table, sql_gen.Join(into,
668 3187 aaronmk
            {in_pkey: sql_gen.join_same_not_null}, sql_gen.filter_out)]
669
            # must use join_same_not_null or query will take forever
670
        insert_into_pkeys(missing_rows_joins,
671 3287 aaronmk
            [sql_gen.Col(in_pkey, full_in_table),
672 4484 aaronmk
            sql_gen.NamedCol(into_out_pkey, default)])
673 3187 aaronmk
    # otherwise, there is already an entry for every row
674 3077 aaronmk
675 3530 aaronmk
    assert (sql.table_row_count(db, into)
676
        == sql.table_row_count(db, full_in_table))
677
678
    sql.empty_temp(db, insert_in_tables+[full_in_table])
679
680
    srcs = []
681 3619 aaronmk
    if is_function: srcs = sql_gen.cols_srcs(in_cols)
682 4484 aaronmk
    return sql_gen.Col(into_out_pkey, into, srcs)