Project

General

Profile

1
# Database import/export
2

    
3
import exc
4
import dicts
5
import sql
6
import sql_gen
7
import strings
8
import util
9

    
10
def put(db, table, row, pkey_=None, row_ct_ref=None):
11
    '''Recovers from errors.
12
    Only works under PostgreSQL (uses INSERT RETURNING).
13
    '''
14
    row = sql_gen.ColDict(db, table, row)
15
    if pkey_ == None: pkey_ = sql.pkey(db, table, recover=True)
16
    
17
    try:
18
        cur = sql.insert(db, table, row, pkey_, recover=True)
19
        if row_ct_ref != None and cur.rowcount >= 0:
20
            row_ct_ref[0] += cur.rowcount
21
        return sql.value(cur)
22
    except sql.DuplicateKeyException, e:
23
        row = sql_gen.ColDict(db, table,
24
            util.dict_subset_right_join(row, e.cols))
25
        return sql.value(sql.select(db, table, [pkey_], row, recover=True))
26

    
27
def get(db, table, row, pkey, row_ct_ref=None, create=False):
28
    '''Recovers from errors'''
29
    try:
30
        return sql.value(sql.select(db, table, [pkey], row, limit=1,
31
            recover=True))
32
    except StopIteration:
33
        if not create: raise
34
        return put(db, table, row, pkey, row_ct_ref) # insert new row
35

    
36
def is_func_result(col):
37
    return col.table.name.find('(') >= 0 and col.name == 'result'
38

    
39
def into_table_name(out_table, in_tables0, mapping, is_func):
40
    def in_col_str(in_col):
41
        in_col = sql_gen.remove_col_rename(in_col)
42
        if isinstance(in_col, sql_gen.Col):
43
            table = in_col.table
44
            if table == in_tables0:
45
                in_col = sql_gen.to_name_only_col(in_col)
46
            elif is_func_result(in_col): in_col = table # omit col name
47
        return str(in_col)
48
    
49
    str_ = str(out_table)
50
    if is_func:
51
        str_ += '('
52
        
53
        try: value_in_col = mapping['value']
54
        except KeyError:
55
            str_ += ', '.join((str(k)+'='+in_col_str(v)
56
                for k, v in mapping.iteritems()))
57
        else: str_ += in_col_str(value_in_col)
58
        
59
        str_ += ')'
60
    else:
61
        out_col = 'rank'
62
        try: in_col = mapping[out_col]
63
        except KeyError: str_ += '_pkeys'
64
        else: # has a rank column, so hierarchical
65
            str_ += '['+str(out_col)+'='+in_col_str(in_col)+']'
66
    return str_
67

    
68
def put_table(db, out_table, in_tables, mapping, row_ct_ref=None, into=None,
69
    default=None, is_func=False, on_error=exc.raise_):
70
    '''Recovers from errors.
71
    Only works under PostgreSQL (uses INSERT RETURNING).
72
    IMPORTANT: Must be run at the *beginning* of a transaction.
73
    @param in_tables The main input table to select from, followed by a list of
74
        tables to join with it using the main input table's pkey
75
    @param mapping dict(out_table_col=in_table_col, ...)
76
        * out_table_col: str (*not* sql_gen.Col)
77
        * in_table_col: sql_gen.Col|literal-value
78
    @param into The table to contain the output and input pkeys.
79
        Defaults to `out_table.name+'_pkeys'`.
80
    @param default The *output* column to use as the pkey for missing rows.
81
        If this output column does not exist in the mapping, uses None.
82
    @param is_func Whether out_table is the name of a SQL function, not a table
83
    @return sql_gen.Col Where the output pkeys are made available
84
    '''
85
    out_table = sql_gen.as_Table(out_table)
86
    
87
    def log_debug(msg): db.log_debug(msg, level=1.5)
88
    def col_ustr(str_):
89
        return strings.repr_no_u(sql_gen.remove_col_rename(str_))
90
    
91
    log_debug('********** New iteration **********')
92
    log_debug('Inserting these input columns into '+strings.as_tt(
93
        out_table.to_str(db))+':\n'+strings.as_table(mapping, ustr=col_ustr))
94
    
95
    is_function = sql.function_exists(db, out_table)
96
    
97
    if is_function: out_pkey = 'result'
98
    else: out_pkey = sql.pkey(db, out_table, recover=True)
99
    out_pkey_col = sql_gen.as_Col(out_pkey, out_table)
100
    
101
    if mapping == {}: # need at least one column for INSERT SELECT
102
        mapping = {out_pkey: None} # ColDict will replace with default value
103
    
104
    # Create input joins from list of input tables
105
    in_tables_ = in_tables[:] # don't modify input!
106
    in_tables0 = in_tables_.pop(0) # first table is separate
107
    errors_table_ = sql.errors_table(db, in_tables0)
108
    in_pkey = sql.pkey(db, in_tables0, recover=True)
109
    in_pkey_col = sql_gen.as_Col(in_pkey, in_tables0)
110
    input_joins = [in_tables0]+[sql_gen.Join(v,
111
        {in_pkey: sql_gen.join_same_not_null}) for v in in_tables_]
112
    
113
    if into == None:
114
        into = into_table_name(out_table, in_tables0, mapping, is_func)
115
    into = sql_gen.as_Table(into)
116
    
117
    # Set column sources
118
    in_cols = filter(sql_gen.is_table_col, mapping.values())
119
    for col in in_cols:
120
        if col.table == in_tables0: col.set_srcs(sql_gen.src_self)
121
    
122
    log_debug('Joining together input tables into temp table')
123
    # Place in new table for speed and so don't modify input if values edited
124
    in_table = sql_gen.Table('in')
125
    mapping = dicts.join(mapping, sql.flatten(db, in_table, input_joins,
126
        in_cols, preserve=[in_pkey_col], start=0))
127
    input_joins = [in_table]
128
    db.log_debug('Temp table: '+strings.as_tt(in_table.to_str(db)), level=2)
129
    
130
    mapping = sql_gen.ColDict(db, out_table, mapping)
131
        # after applying dicts.join() because that returns a plain dict
132
    
133
    # Resolve default value column
134
    if default != None:
135
        try: default = mapping[default]
136
        except KeyError:
137
            db.log_debug('Default value column '
138
                +strings.as_tt(strings.repr_no_u(default))
139
                +' does not exist in mapping, falling back to None', level=2.1)
140
            default = None
141
    
142
    pkeys_names = [in_pkey, out_pkey]
143
    pkeys_cols = [in_pkey_col, out_pkey_col]
144
    
145
    pkeys_table_exists_ref = [False]
146
    def insert_into_pkeys(joins, cols, distinct=False):
147
        kw_args = {}
148
        if distinct: kw_args.update(dict(distinct_on=[in_pkey_col]))
149
        query = sql.mk_select(db, joins, cols, order_by=None, start=0,
150
            **kw_args)
151
        
152
        if pkeys_table_exists_ref[0]:
153
            sql.insert_select(db, into, pkeys_names, query)
154
        else:
155
            sql.run_query_into(db, query, into=into)
156
            pkeys_table_exists_ref[0] = True
157
    
158
    limit_ref = [None]
159
    conds = set()
160
    distinct_on = sql_gen.ColDict(db, out_table)
161
    def mk_main_select(joins, cols):
162
        distinct_on_cols = [c.to_Col() for c in distinct_on.values()]
163
        return sql.mk_select(db, joins, cols, conds, distinct_on_cols,
164
            limit=limit_ref[0], start=0)
165
    
166
    exc_strs = set()
167
    def log_exc(e):
168
        e_str = exc.str_(e, first_line_only=True)
169
        log_debug('Caught exception: '+e_str)
170
        assert e_str not in exc_strs # avoid infinite loops
171
        exc_strs.add(e_str)
172
    
173
    def remove_all_rows():
174
        log_debug('Ignoring all rows')
175
        limit_ref[0] = 0 # just create an empty pkeys table
176
    
177
    def ignore(in_col, value, e):
178
        sql.track_data_error(db, errors_table_, in_col.srcs, value,
179
            e.cause.pgcode, e.cause.pgerror)
180
        log_debug('Ignoring rows with '+strings.as_tt(repr(in_col))+' = '
181
            +strings.as_tt(repr(value)))
182
    
183
    def remove_rows(in_col, value, e):
184
        ignore(in_col, value, e)
185
        cond = (in_col, sql_gen.CompareCond(value, '!='))
186
        assert cond not in conds # avoid infinite loops
187
        conds.add(cond)
188
    
189
    def invalid2null(in_col, value, e):
190
        ignore(in_col, value, e)
191
        sql.update(db, in_table, [(in_col, None)],
192
            sql_gen.ColValueCond(in_col, value))
193
    
194
    def insert_pkeys_table(which):
195
        return sql_gen.Table(sql_gen.concat(in_table.name,
196
            '_insert_'+which+'_pkeys'))
197
    insert_out_pkeys = insert_pkeys_table('out')
198
    insert_in_pkeys = insert_pkeys_table('in')
199
    
200
    # Do inserts and selects
201
    join_cols = sql_gen.ColDict(db, out_table)
202
    while True:
203
        if limit_ref[0] == 0: # special case
204
            log_debug('Creating an empty pkeys table')
205
            cur = sql.run_query_into(db, sql.mk_select(db, out_table,
206
                [out_pkey], limit=limit_ref[0]), into=insert_out_pkeys)
207
            break # don't do main case
208
        
209
        has_joins = join_cols != {}
210
        
211
        log_debug('Trying to insert new rows')
212
        
213
        # Prepare to insert new rows
214
        insert_joins = input_joins[:] # don't modify original!
215
        insert_args = dict(recover=True, cacheable=False)
216
        if has_joins:
217
            insert_args.update(dict(ignore=True))
218
        else:
219
            insert_args.update(dict(returning=out_pkey, into=insert_out_pkeys))
220
        main_select = mk_main_select(insert_joins, mapping.values())
221
        
222
        def main_insert():
223
            if is_function:
224
                log_debug('Calling function on input rows')
225
                args = dict(((k.name, v) for k, v in mapping.iteritems()))
226
                func_call = sql_gen.NamedCol(out_pkey,
227
                    sql_gen.FunctionCall(out_table, **args))
228
                insert_into_pkeys(input_joins, [in_pkey_col, func_call])
229
                return None
230
            else:
231
                return sql.insert_select(db, out_table, mapping.keys(),
232
                    main_select, **insert_args)
233
        
234
        try:
235
            cur = sql.with_savepoint(db, main_insert)
236
            break # insert successful
237
        except sql.MissingCastException, e:
238
            log_exc(e)
239
            
240
            out_col = e.col
241
            type_ = e.type
242
            
243
            log_debug('Casting '+strings.as_tt(out_col)+' input to '
244
                +strings.as_tt(type_))
245
            mapping[out_col] = sql.cast_temp_col(db, type_, mapping[out_col],
246
                errors_table_)
247
        except sql.DuplicateKeyException, e:
248
            log_exc(e)
249
            
250
            old_join_cols = join_cols.copy()
251
            distinct_on.update(util.dict_subset(mapping, e.cols))
252
            join_cols.update(util.dict_subset_right_join(mapping, e.cols))
253
            log_debug('Ignoring existing rows, comparing on these columns:\n'
254
                +strings.as_inline_table(join_cols, ustr=col_ustr))
255
            assert join_cols != old_join_cols # avoid infinite loops
256
        except sql.NullValueException, e:
257
            log_exc(e)
258
            
259
            out_col, = e.cols
260
            try: in_col = mapping[out_col]
261
            except KeyError:
262
                log_debug('Missing mapping for NOT NULL column '+out_col)
263
                remove_all_rows()
264
            else: remove_rows(in_col, None, e)
265
        except sql.FunctionValueException, e:
266
            log_exc(e)
267
            
268
            func_name = e.name
269
            value = e.value
270
            for out_col, in_col in mapping.iteritems():
271
                in_col = sql_gen.unwrap_func_call(in_col, func_name)
272
                invalid2null(in_col, value, e)
273
        except sql.DatabaseErrors, e:
274
            log_exc(e)
275
            
276
            log_debug('No handler for exception')
277
            on_error(e)
278
            remove_all_rows()
279
        # after exception handled, rerun loop with additional constraints
280
    
281
    if cur != None and row_ct_ref != None and cur.rowcount >= 0:
282
        row_ct_ref[0] += cur.rowcount
283
    
284
    if is_function: pass # pkeys table already created
285
    elif has_joins:
286
        select_joins = input_joins+[sql_gen.Join(out_table, join_cols)]
287
        log_debug('Getting output table pkeys of existing/inserted rows')
288
        insert_into_pkeys(select_joins, pkeys_cols, distinct=True)
289
    else:
290
        sql.add_row_num(db, insert_out_pkeys) # for joining with input pkeys
291
        
292
        log_debug('Getting input table pkeys of inserted rows')
293
        sql.run_query_into(db, mk_main_select(input_joins, [in_pkey]),
294
            into=insert_in_pkeys)
295
        sql.add_row_num(db, insert_in_pkeys) # for joining with output pkeys
296
        
297
        assert sql.table_row_count(db, insert_out_pkeys) == sql.table_row_count(
298
            db, insert_in_pkeys)
299
        
300
        log_debug('Combining output and input pkeys in inserted order')
301
        pkey_joins = [insert_in_pkeys, sql_gen.Join(insert_out_pkeys,
302
            {sql.row_num_col: sql_gen.join_same_not_null})]
303
        insert_into_pkeys(pkey_joins, pkeys_names)
304
        
305
        sql.empty_temp(db, [insert_out_pkeys, insert_in_pkeys])
306
    
307
    db.log_debug('Adding pkey on pkeys table to enable fast joins', level=2.5)
308
    sql.add_pkey(db, into)
309
    
310
    log_debug('Setting pkeys of missing rows to '+strings.as_tt(repr(default)))
311
    missing_rows_joins = input_joins+[sql_gen.Join(into,
312
        {in_pkey: sql_gen.join_same_not_null}, sql_gen.filter_out)]
313
        # must use join_same_not_null or query will take forever
314
    insert_into_pkeys(missing_rows_joins,
315
        [in_pkey_col, sql_gen.NamedCol(out_pkey, default)])
316
    
317
    assert sql.table_row_count(db, into) == sql.table_row_count(db, in_table)
318
    
319
    sql.empty_temp(db, in_table)
320
    
321
    srcs = []
322
    if is_func: srcs = sql_gen.cols_srcs(in_cols)
323
    return sql_gen.Col(out_pkey, into, srcs)
(26-26/37)