Project

General

Profile

1
# Database import/export
2

    
3
import operator
4

    
5
import exc
6
import dicts
7
import sql
8
import sql_gen
9
import strings
10
import util
11

    
12
##### Data cleanup
13

    
14
def cleanup_table(db, table, cols):
15
    table = sql_gen.as_Table(table)
16
    cols = map(sql_gen.as_Col, cols)
17
    
18
    expr = ('nullif(nullif(trim(both from %s), '+db.esc_value('')+'), '
19
        +db.esc_value(r'\N')+')')
20
    changes = [(v, sql_gen.CustomCode(expr % v.to_str(db)))
21
        for v in cols]
22
    
23
    sql.update(db, table, changes, in_place=True)
24

    
25
##### Error tracking
26

    
27
def track_data_error(db, errors_table, cols, value, error_code, error):
28
    '''
29
    @param errors_table If None, does nothing.
30
    '''
31
    if errors_table == None or cols == (): return
32
    
33
    for col in cols:
34
        try:
35
            sql.insert(db, errors_table, dict(column=col.name, value=value,
36
                error_code=error_code, error=error), recover=True,
37
                cacheable=True, log_level=4)
38
        except sql.DuplicateKeyException: pass
39

    
40
class ExcToErrorsTable(sql_gen.ExcToWarning):
41
    '''Handles an exception by saving it or converting it to a warning.'''
42
    def __init__(self, return_, srcs, errors_table, value=None):
43
        '''
44
        @param return_ See sql_gen.ExcToWarning
45
        @param srcs The column names for the errors table
46
        @param errors_table None|sql_gen.Table
47
        @param value The value (or an expression for it) that caused the error
48
        @pre The invalid value must be in a local variable "value" of type text.
49
        '''
50
        sql_gen.ExcToWarning.__init__(self, return_)
51
        
52
        value = sql_gen.as_Code(value)
53
        
54
        self.srcs = srcs
55
        self.errors_table = errors_table
56
        self.value = value
57
    
58
    def to_str(self, db):
59
        if not self.srcs or self.errors_table == None:
60
            return sql_gen.ExcToWarning.to_str(self, db)
61
        
62
        errors_table_cols = map(sql_gen.Col,
63
            ['column', 'value', 'error_code', 'error'])
64
        col_names_query = sql.mk_select(db, sql_gen.NamedValues('c', None,
65
            [[c.name] for c in self.srcs]), order_by=None)
66
        insert_query = sql.mk_insert_select(db, self.errors_table,
67
            errors_table_cols,
68
            sql_gen.Values(errors_table_cols).to_str(db))+';\n'
69
        return '''\
70
-- Save error in errors table.
71
DECLARE
72
    error_code text := SQLSTATE;
73
    error text := SQLERRM;
74
    value text := '''+self.value.to_str(db)+''';
75
BEGIN
76
    -- Insert the value and error for *each* source column.
77
'''+strings.indent(sql_gen.RowExcIgnore('text', col_names_query, insert_query,
78
    row_var=errors_table_cols[0]).to_str(db))+'''
79
END;
80

    
81
'''+self.return_.to_str(db)
82

    
83
def data_exception_handler(*args, **kw_args):
84
    '''Handles a data_exception by saving it or converting it to a warning.
85
    For params, see ExcToErrorsTable().
86
    '''
87
    return sql_gen.data_exception_handler(ExcToErrorsTable(*args, **kw_args))
88

    
89
def cast(db, type_, col, errors_table=None):
90
    '''Casts an (unrenamed) column or value.
91
    If errors_table set and col has srcs, saves errors in errors_table (using
92
    col's srcs attr as source columns). Otherwise, converts errors to warnings.
93
    @param col str|sql_gen.Col|sql_gen.Literal
94
    @param errors_table None|sql_gen.Table|str
95
    '''
96
    col = sql_gen.as_Col(col)
97
    
98
    # Don't convert exceptions to warnings for user-supplied constants
99
    if isinstance(col, sql_gen.Literal): return sql_gen.Cast(type_, col)
100
    
101
    assert not isinstance(col, sql_gen.NamedCol)
102
    
103
    function_name = strings.first_word(type_)
104
    srcs = col.srcs
105
    save_errors = errors_table != None and srcs
106
    if save_errors: # function will be unique for the given srcs
107
        function_name = str(sql_gen.FunctionCall(function_name,
108
            *map(sql_gen.to_name_only_col, srcs)))
109
    function = db.TempFunction(function_name)
110
    
111
    # Create function definition
112
    modifiers = 'STRICT'
113
    if not save_errors: modifiers = 'IMMUTABLE '+modifiers
114
    value_param = sql_gen.FunctionParam('value', 'text')
115
    handler = data_exception_handler('RETURN NULL;\n', srcs, errors_table,
116
        value_param.name)
117
    body = sql_gen.CustomCode(handler.to_str(db, '''\
118
/* The explicit cast to the return type is needed to make the cast happen
119
inside the try block. (Implicit casts to the return type happen at the end
120
of the function, outside any block.) */
121
RETURN value::'''+type_+''';
122
'''))
123
    body.lang='plpgsql'
124
    sql.define_func(db, sql_gen.FunctionDef(function, type_, body,
125
        [value_param], modifiers))
126
    
127
    return sql_gen.FunctionCall(function, col)
128

    
129
def cast_temp_col(db, type_, col, errors_table=None):
130
    '''Like cast(), but creates a new column with the cast values if the input
131
    is a column.
132
    @return The new column or cast value
133
    '''
134
    def cast_(col): return cast(db, type_, col, errors_table)
135
    
136
    try: col = sql_gen.underlying_col(col)
137
    except sql_gen.NoUnderlyingTableException: return sql_gen.wrap(cast_, col)
138
    
139
    table = col.table
140
    new_col = sql_gen.suffixed_col(col, '::'+strings.first_word(type_))
141
    expr = cast_(col)
142
    
143
    # Add column
144
    new_typed_col = sql_gen.TypedCol(new_col.name, type_)
145
    sql.add_col(db, table, new_typed_col, comment=repr(col)+'::'+type_)
146
    new_col.name = new_typed_col.name # propagate any renaming
147
    
148
    sql.update(db, table, [(new_col, expr)], in_place=True, recover=True)
149
    
150
    return new_col
151

    
152
def errors_table(db, table, if_exists=True):
153
    '''
154
    @param if_exists If set, returns None if the errors table doesn't exist
155
    @return None|sql_gen.Table
156
    '''
157
    table = sql_gen.as_Table(table)
158
    if table.srcs != (): table = table.srcs[0]
159
    
160
    errors_table = sql_gen.suffixed_table(table, '.errors')
161
    if if_exists and not sql.table_exists(db, errors_table): return None
162
    return errors_table
163

    
164
##### Import
165

    
166
def put(db, table, row, pkey_=None, row_ct_ref=None):
167
    '''Recovers from errors.
168
    Only works under PostgreSQL (uses INSERT RETURNING).
169
    '''
170
    row = sql_gen.ColDict(db, table, row)
171
    if pkey_ == None: pkey_ = sql.pkey(db, table, recover=True)
172
    
173
    try:
174
        cur = sql.insert(db, table, row, pkey_, recover=True, log_level=3.5)
175
        if row_ct_ref != None and cur.rowcount >= 0:
176
            row_ct_ref[0] += cur.rowcount
177
        return sql.value(cur)
178
    except sql.DuplicateKeyException, e:
179
        row = sql_gen.ColDict(db, table,
180
            util.dict_subset_right_join(row, e.cols))
181
        return sql.value(sql.select(db, table, [pkey_], row, recover=True,
182
            log_level=3.5))
183
    except sql.NullValueException: return None
184

    
185
def get(db, table, row, pkey, row_ct_ref=None, create=False):
186
    '''Recovers from errors'''
187
    try:
188
        return sql.value(sql.select(db, table, [pkey], row, limit=1,
189
            recover=True))
190
    except StopIteration:
191
        if not create: raise
192
        return put(db, table, row, pkey, row_ct_ref) # insert new row
193

    
194
def is_func_result(col):
195
    return col.table.name.find('(') >= 0 and col.name == 'result'
196

    
197
def into_table_name(out_table, in_tables0, mapping, is_func):
198
    def in_col_str(in_col):
199
        in_col = sql_gen.remove_col_rename(in_col)
200
        if isinstance(in_col, sql_gen.Col):
201
            table = in_col.table
202
            if table == in_tables0:
203
                in_col = sql_gen.to_name_only_col(in_col)
204
            elif is_func_result(in_col): in_col = table # omit col name
205
        return str(in_col)
206
    
207
    str_ = str(out_table)
208
    if is_func:
209
        str_ += '('
210
        
211
        try: value_in_col = mapping['value']
212
        except KeyError:
213
            str_ += ', '.join((str(k)+'='+in_col_str(v)
214
                for k, v in mapping.iteritems()))
215
        else: str_ += in_col_str(value_in_col)
216
        
217
        str_ += ')'
218
    else:
219
        out_col = 'rank'
220
        try: in_col = mapping[out_col]
221
        except KeyError: str_ += '_pkeys'
222
        else: # has a rank column, so hierarchical
223
            str_ += '['+str(out_col)+'='+in_col_str(in_col)+']'
224
    return str_
225

    
226
def put_table(db, out_table, in_tables, mapping, row_ct_ref=None, into=None,
227
    default=None, is_func=False, on_error=exc.raise_):
228
    '''Recovers from errors.
229
    Only works under PostgreSQL (uses INSERT RETURNING).
230
    IMPORTANT: Must be run at the *beginning* of a transaction.
231
    @param in_tables The main input table to select from, followed by a list of
232
        tables to join with it using the main input table's pkey
233
    @param mapping dict(out_table_col=in_table_col, ...)
234
        * out_table_col: str (*not* sql_gen.Col)
235
        * in_table_col: sql_gen.Col|literal-value
236
    @param into The table to contain the output and input pkeys.
237
        Defaults to `out_table.name+'_pkeys'`.
238
    @param default The *output* column to use as the pkey for missing rows.
239
        If this output column does not exist in the mapping, uses None.
240
    @param is_func Whether out_table is the name of a SQL function, not a table
241
    @return sql_gen.Col Where the output pkeys are made available
242
    '''
243
    import psycopg2.extensions
244
    
245
    out_table = sql_gen.as_Table(out_table)
246
    
247
    def log_debug(msg): db.log_debug(msg, level=1.5)
248
    def col_ustr(str_):
249
        return strings.repr_no_u(sql_gen.remove_col_rename(str_))
250
    
251
    log_debug('********** New iteration **********')
252
    log_debug('Inserting these input columns into '+strings.as_tt(
253
        out_table.to_str(db))+':\n'+strings.as_table(mapping, ustr=col_ustr))
254
    
255
    is_function = sql.function_exists(db, out_table)
256
    
257
    if is_function: out_pkey = 'result'
258
    else: out_pkey = sql.pkey(db, out_table, recover=True)
259
    out_pkey_col = sql_gen.as_Col(out_pkey, out_table)
260
    
261
    in_tables_ = in_tables[:] # don't modify input!
262
    try: in_tables0 = in_tables_.pop(0) # first table is separate
263
    except IndexError: in_tables0 = None
264
    else:
265
        in_pkey = sql.pkey(db, in_tables0, recover=True)
266
        in_pkey_col = sql_gen.as_Col(in_pkey, in_tables0)
267
    
268
    # Determine if can use optimization for only literal values
269
    is_literals = not reduce(operator.or_, map(sql_gen.is_table_col,
270
        mapping.values()), False)
271
    is_literals_or_function = is_literals or is_function
272
    
273
    if in_tables0 == None: errors_table_ = None
274
    else: errors_table_ = errors_table(db, in_tables0)
275
    
276
    # Create input joins from list of input tables
277
    input_joins = [in_tables0]+[sql_gen.Join(v,
278
        {in_pkey: sql_gen.join_same_not_null}) for v in in_tables_]
279
    
280
    if mapping == {} and not is_function: # need >= one column for INSERT SELECT
281
        mapping = {out_pkey: None} # ColDict will replace with default value
282
    
283
    if not is_literals:
284
        if into == None:
285
            into = into_table_name(out_table, in_tables0, mapping, is_func)
286
        into = sql_gen.as_Table(into)
287
        
288
        # Set column sources
289
        in_cols = filter(sql_gen.is_table_col, mapping.values())
290
        for col in in_cols:
291
            if col.table == in_tables0: col.set_srcs(sql_gen.src_self)
292
        
293
        log_debug('Joining together input tables into temp table')
294
        # Place in new table so don't modify input and for speed
295
        in_table = sql_gen.Table('in')
296
        mapping = dicts.join(mapping, sql.flatten(db, in_table, input_joins,
297
            in_cols, preserve=[in_pkey_col]))
298
        input_joins = [in_table]
299
        db.log_debug('Temp table: '+strings.as_tt(in_table.to_str(db)), level=2)
300
    
301
    mapping = sql_gen.ColDict(db, out_table, mapping)
302
        # after applying dicts.join() because that returns a plain dict
303
    
304
    # Resolve default value column
305
    if default != None:
306
        try: default = mapping[default]
307
        except KeyError:
308
            db.log_debug('Default value column '
309
                +strings.as_tt(strings.repr_no_u(default))
310
                +' does not exist in mapping, falling back to None', level=2.1)
311
            default = None
312
    
313
    # Save default values for all rows since in_table may have rows deleted
314
    if is_literals: pass
315
    elif is_function: full_in_table = in_table
316
    else:
317
        full_in_table = sql_gen.suffixed_table(in_table, '_full')
318
        full_in_table_cols = [in_pkey_col]
319
        if default != None:
320
            full_in_table_cols.append(default)
321
            default = sql_gen.with_table(default, full_in_table)
322
        sql.run_query_into(db, sql.mk_select(db, in_table, full_in_table_cols,
323
            order_by=None), into=full_in_table, add_pkey_=True)
324
    
325
    if not is_literals:
326
        pkeys_names = [in_pkey, out_pkey]
327
        pkeys_cols = [in_pkey_col, out_pkey_col]
328
    
329
    pkeys_table_exists_ref = [False]
330
    def insert_into_pkeys(joins, cols=None, limit=None, **kw_args):
331
        query = sql.mk_select(db, joins, cols, order_by=None, limit=limit)
332
        if pkeys_table_exists_ref[0]:
333
            sql.insert_select(db, into, pkeys_names, query, **kw_args)
334
        else:
335
            sql.run_query_into(db, query, into=into, add_pkey_=True, **kw_args)
336
            pkeys_table_exists_ref[0] = True
337
    
338
    limit_ref = [None]
339
    def mk_main_select(joins, cols):
340
        return sql.mk_select(db, joins, cols, limit=limit_ref[0], order_by=None)
341
    
342
    if is_literals: insert_in_table = None
343
    else:
344
        insert_in_table = in_table
345
        insert_in_tables = [insert_in_table]
346
    join_cols = sql_gen.ColDict(db, out_table)
347
    
348
    exc_strs = set()
349
    def log_exc(e):
350
        e_str = exc.str_(e, first_line_only=True)
351
        log_debug('Caught exception: '+e_str)
352
        assert e_str not in exc_strs # avoid infinite loops
353
        exc_strs.add(e_str)
354
    
355
    def remove_all_rows():
356
        log_debug('Ignoring all rows')
357
        limit_ref[0] = 0 # just create an empty pkeys table
358
    
359
    def ignore_cond(cond, e):
360
        out_table_cols = sql_gen.ColDict(db, out_table)
361
        out_table_cols.update(util.dict_subset_right_join({},
362
            sql.table_cols(db, out_table)))
363
        
364
        in_cols = []
365
        cond = sql.map_expr(db, cond, mapping, in_cols)
366
        cond = sql.map_expr(db, cond, out_table_cols)
367
        
368
        track_data_error(db, errors_table_, sql_gen.cols_srcs(in_cols), None,
369
            e.cause.pgcode,
370
            strings.ensure_newl(e.cause.pgerror)+'condition: '+cond)
371
        
372
        not_cond = sql_gen.NotCond(sql_gen.CustomCode(cond))
373
        log_debug('Ignoring rows where '+strings.as_tt(not_cond.to_str(db)))
374
        sql.delete(db, insert_in_table, not_cond)
375
    
376
    not_null_cols = set()
377
    def ignore(in_col, value, e):
378
        in_col = sql_gen.with_table(in_col, insert_in_table)
379
        
380
        track_data_error(db, errors_table_, in_col.srcs, value,
381
            e.cause.pgcode, e.cause.pgerror)
382
        log_debug('Ignoring rows with '+strings.as_tt(repr(in_col))+' = '
383
            +strings.as_tt(repr(value)))
384
        
385
        sql.add_index(db, in_col, insert_in_table) # enable fast filtering
386
        if value != None and in_col not in not_null_cols:
387
            # Try just mapping the value to NULL
388
            sql.update(db, insert_in_table, [(in_col, None)],
389
                sql_gen.ColValueCond(in_col, value))
390
        else:
391
            sql.delete(db, insert_in_table, sql_gen.ColValueCond(in_col, value))
392
            if value == None: not_null_cols.add(in_col)
393
    
394
    if not is_literals:
395
        def insert_pkeys_table(which):
396
            return sql_gen.Table(sql_gen.concat(in_table.name,
397
                '_insert_'+which+'_pkeys'))
398
        insert_out_pkeys = insert_pkeys_table('out')
399
        insert_in_pkeys = insert_pkeys_table('in')
400
    
401
    # Do inserts and selects
402
    while True:
403
        has_joins = join_cols != {}
404
        
405
        if limit_ref[0] == 0: # special case
406
            assert not has_joins
407
            
408
            if is_literals: return None
409
            log_debug('Creating an empty output pkeys table')
410
            cur = sql.run_query_into(db, sql.mk_select(db, out_table,
411
                [out_pkey], order_by=None, limit=0), into=insert_out_pkeys)
412
            break # don't do main case
413
        
414
        log_debug('Trying to insert new rows')
415
        
416
        # Prepare to insert new rows
417
        if is_function:
418
            log_debug('Calling function on input rows')
419
            args = dict(((k.name, v) for k, v in mapping.iteritems()))
420
            func_call = sql_gen.NamedCol(out_pkey,
421
                sql_gen.FunctionCall(out_table, **args))
422
            
423
            if not is_literals:
424
                # Create empty pkeys table so its row type can be used
425
                insert_into_pkeys(input_joins, [in_pkey_col, func_call],
426
                    limit=0, recover=True)
427
                
428
                # Create error handling wrapper function
429
                select_cols = [in_pkey_col]+args.values()
430
                args = dict(((k, sql_gen.with_table(v, sql_gen.Table('row')))
431
                    for k, v in args.iteritems()))
432
                func_call = sql_gen.FunctionCall(out_table, **args)
433
                wrapper = db.TempFunction(sql_gen.concat(into.name, '_wrap'))
434
                sql.define_func(db, sql_gen.FunctionDef(wrapper,
435
                    sql_gen.SetOf(into),
436
                    sql_gen.RowExcIgnore(sql_gen.RowType(in_table),
437
                        sql.mk_select(db, input_joins, order_by=None),
438
                        sql_gen.ReturnQuery(sql.mk_select(db,
439
                            fields=[sql_gen.Col(in_pkey, 'row'), func_call],
440
                            explain=False)),
441
                        exc_handler=sql_gen.plpythonu_error_handler)
442
                    ))
443
                wrapper_table = sql_gen.FunctionCall(wrapper)
444
        else:
445
            insert_args = dict(recover=True, cacheable=False)
446
            if has_joins:
447
                insert_args.update(dict(ignore=True))
448
            else:
449
                insert_args.update(dict(returning=out_pkey))
450
                if not is_literals:
451
                    insert_args.update(dict(into=insert_out_pkeys))
452
            main_select = mk_main_select([insert_in_table], [sql_gen.with_table(
453
                c, insert_in_table) for c in mapping.values()])
454
        
455
        try:
456
            cur = None
457
            if is_function:
458
                if is_literals: cur = sql.select(db, fields=[func_call])
459
                else: insert_into_pkeys(wrapper_table, recover=True)
460
            else:
461
                cur = sql.insert_select(db, out_table, mapping.keys(),
462
                    main_select, **insert_args)
463
            break # insert successful
464
        except sql.MissingCastException, e:
465
            log_exc(e)
466
            
467
            out_col = e.col
468
            type_ = e.type
469
            
470
            log_debug('Casting '+strings.as_tt(out_col)+' input to '
471
                +strings.as_tt(type_))
472
            in_col = mapping[out_col]
473
            while True:
474
                try:
475
                    mapping[out_col] = cast_temp_col(db, type_, in_col,
476
                        errors_table_)
477
                    break # cast successful
478
                except sql.InvalidValueException, e:
479
                    log_exc(e)
480
                    
481
                    ignore(in_col, e.value, e)
482
        except sql.DuplicateKeyException, e:
483
            log_exc(e)
484
            
485
            # Different rows violating different unique constraints not
486
            # supported
487
            assert not join_cols
488
            
489
            join_cols.update(util.dict_subset_right_join(mapping, e.cols))
490
            log_debug('Ignoring existing rows, comparing on these columns:\n'
491
                +strings.as_inline_table(join_cols, ustr=col_ustr))
492
            
493
            if is_literals:
494
                return sql.value(sql.select(db, out_table, [out_pkey_col],
495
                    mapping, order_by=None))
496
            
497
            # Uniquify input table to avoid internal duplicate keys
498
            insert_in_table = sql.distinct_table(db, insert_in_table,
499
                join_cols.values())
500
            insert_in_tables.append(insert_in_table)
501
        except sql.NullValueException, e:
502
            log_exc(e)
503
            
504
            out_col, = e.cols
505
            try: in_col = mapping[out_col]
506
            except KeyError:
507
                msg = 'Missing mapping for NOT NULL column '+out_col
508
                log_debug(msg)
509
                if default == None: on_error(SyntaxError(msg)) # required col
510
                remove_all_rows()
511
            else: ignore(in_col, None, e)
512
        except sql.CheckException, e:
513
            log_exc(e)
514
            
515
            ignore_cond(e.cond, e)
516
        except sql.InvalidValueException, e:
517
            log_exc(e)
518
            
519
            for in_col in mapping.values(): ignore(in_col, e.value, e)
520
        except psycopg2.extensions.TransactionRollbackError, e:
521
            log_exc(e)
522
            # retry
523
        except sql.DatabaseErrors, e:
524
            log_exc(e)
525
            
526
            log_debug('No handler for exception')
527
            on_error(e)
528
            remove_all_rows()
529
        # after exception handled, rerun loop with additional constraints
530
    
531
    if cur != None and row_ct_ref != None and cur.rowcount >= 0:
532
        row_ct_ref[0] += cur.rowcount
533
    
534
    if is_literals_or_function: pass # pkeys table already created
535
    elif has_joins:
536
        select_joins = input_joins+[sql_gen.Join(out_table, join_cols)]
537
        log_debug('Getting output table pkeys of existing/inserted rows')
538
        insert_into_pkeys(select_joins, pkeys_cols)
539
    else:
540
        sql.add_row_num(db, insert_out_pkeys) # for joining with input pkeys
541
        
542
        log_debug('Getting input table pkeys of inserted rows')
543
        # Note that mk_main_select() does not use ORDER BY. Instead, assume that
544
        # since the SELECT query is identical to the one used in INSERT SELECT,
545
        # its rows will be retrieved in the same order.
546
        sql.run_query_into(db, mk_main_select(input_joins, [in_pkey]),
547
            into=insert_in_pkeys)
548
        sql.add_row_num(db, insert_in_pkeys) # for joining with output pkeys
549
        
550
        assert sql.table_row_count(db, insert_out_pkeys) == sql.table_row_count(
551
            db, insert_in_pkeys)
552
        
553
        log_debug('Combining output and input pkeys in inserted order')
554
        pkey_joins = [insert_in_pkeys, sql_gen.Join(insert_out_pkeys,
555
            {sql.row_num_col: sql_gen.join_same_not_null})]
556
        insert_into_pkeys(pkey_joins, pkeys_names)
557
        
558
        sql.empty_temp(db, [insert_out_pkeys, insert_in_pkeys])
559
    
560
    if not is_literals_or_function:
561
        log_debug('Setting pkeys of missing rows to '
562
            +strings.as_tt(repr(default)))
563
        missing_rows_joins = [full_in_table, sql_gen.Join(into,
564
            {in_pkey: sql_gen.join_same_not_null}, sql_gen.filter_out)]
565
            # must use join_same_not_null or query will take forever
566
        insert_into_pkeys(missing_rows_joins,
567
            [sql_gen.Col(in_pkey, full_in_table),
568
            sql_gen.NamedCol(out_pkey, default)])
569
    # otherwise, there is already an entry for every row
570
    
571
    if is_literals: return sql.value(cur)
572
    else:
573
        assert (sql.table_row_count(db, into)
574
            == sql.table_row_count(db, full_in_table))
575
        
576
        sql.empty_temp(db, insert_in_tables+[full_in_table])
577
        
578
        srcs = []
579
        if is_func: srcs = sql_gen.cols_srcs(in_cols)
580
        return sql_gen.Col(out_pkey, into, srcs)
(26-26/37)