Project

General

Profile

« Previous | Next » 

Revision 3078

Moved error tracking from sql.py to sql_io.py

View differences:

lib/sql.py
396 396
        table = sql_gen.Table('columns', 'information_schema')
397 397
        type_ = sql_gen.Coalesce(sql_gen.Nullif(sql_gen.Col('data_type'),
398 398
            'USER-DEFINED'), sql_gen.Col('udt_name'))
399
        cols = [type_, 'column_default', cast(self, 'boolean', 'is_nullable')]
399
        cols = [type_, 'column_default',
400
            sql_gen.Cast('boolean', sql_gen.Col('is_nullable'))]
400 401
        
401 402
        conds = [('table_name', col.table.name), ('column_name', col.name)]
402 403
        schema = col.table.schema
......
856 857
        into=into, add_indexes_=True)
857 858
    return dict(items)
858 859

  
859
def track_data_error(db, errors_table, cols, value, error_code, error):
860
    '''
861
    @param errors_table If None, does nothing.
862
    '''
863
    if errors_table == None or cols == (): return
864
    
865
    for col in cols:
866
        try:
867
            insert(db, errors_table, dict(column=col.name, value=value,
868
                error_code=error_code, error=error), recover=True,
869
                cacheable=True, log_level=4)
870
        except DuplicateKeyException: pass
871

  
872
def cast(db, type_, col, errors_table=None):
873
    '''Casts an (unrenamed) column or value.
874
    If errors_table set and col has srcs, saves errors in errors_table (using
875
    col's srcs attr as the source columns) and converts errors to warnings.
876
    @param col str|sql_gen.Col|sql_gen.Literal
877
    @param errors_table None|sql_gen.Table|str
878
    '''
879
    col = sql_gen.as_Col(col)
880
    save_errors = (errors_table != None and isinstance(col, sql_gen.Col)
881
        and col.srcs != ())
882
    if not save_errors: return sql_gen.Cast(type_, col) # can't save errors
883
    
884
    assert not isinstance(col, sql_gen.NamedCol)
885
    
886
    errors_table = sql_gen.as_Table(errors_table)
887
    srcs = map(sql_gen.to_name_only_col, col.srcs)
888
    function_name = str(sql_gen.FunctionCall(type_, *srcs))
889
    function = db.TempFunction(function_name)
890
    
891
    while True:
892
        # Create function definition
893
        errors_table_cols = map(sql_gen.Col,
894
            ['column', 'value', 'error_code', 'error'])
895
        query = '''\
896
CREATE FUNCTION '''+function.to_str(db)+'''(value text)
897
RETURNS '''+type_+'''
898
LANGUAGE plpgsql
899
STRICT
900
AS $$
901
BEGIN
902
    /* The explicit cast to the return type is needed to make the cast happen
903
    inside the try block. (Implicit casts to the return type happen at the end
904
    of the function, outside any block.) */
905
    RETURN value::'''+type_+''';
906
EXCEPTION
907
    WHEN data_exception THEN
908
        -- Save error in errors table.
909
        DECLARE
910
            error_code text := SQLSTATE;
911
            error text := SQLERRM;
912
            "column" text;
913
        BEGIN
914
            -- Insert the value and error for *each* source column.
915
            FOR "column" IN
916
'''+mk_select(db, sql_gen.NamedValues('c', None, [[c.name] for c in srcs]),
917
    order_by=None, start=0)+'''
918
            LOOP
919
                BEGIN
920
'''+mk_insert_select(db, errors_table, errors_table_cols,
921
    sql_gen.Values(errors_table_cols).to_str(db))+''';
922
                EXCEPTION
923
                    WHEN unique_violation THEN NULL; -- continue to next row
924
                END;
925
            END LOOP;
926
        END;
927
        
928
        RAISE WARNING '%', SQLERRM;
929
        RETURN NULL;
930
END;
931
$$;
932
'''
933
        
934
        # Create function
935
        try:
936
            run_query(db, query, recover=True, cacheable=True,
937
                log_ignore_excs=(DuplicateException,))
938
            break # successful
939
        except DuplicateException:
940
            function.name = next_version(function.name)
941
            # try again with next version of name
942
    
943
    return sql_gen.FunctionCall(function, col)
944

  
945 860
##### Database structure queries
946 861

  
947 862
def table_row_count(db, table, recover=None):
......
1176 1091
    be the primary key.'''
1177 1092
    add_col(db, table, row_num_typed_col, log_level=3)
1178 1093

  
1179
def cast_temp_col(db, type_, col, errors_table=None):
1180
    '''Like cast(), but creates a new column with the cast values if the input
1181
    is a column.
1182
    @return The new column or cast value
1183
    '''
1184
    def cast_(col): return cast(db, type_, col, errors_table)
1185
    
1186
    try: col = sql_gen.underlying_col(col)
1187
    except sql_gen.NoUnderlyingTableException: return sql_gen.wrap(cast_, col)
1188
    
1189
    table = col.table
1190
    new_col = sql_gen.Col(sql_gen.concat(col.name, '::'+type_), table, col.srcs)
1191
    expr = cast_(col)
1192
    
1193
    # Add column
1194
    new_typed_col = sql_gen.TypedCol(new_col.name, type_)
1195
    add_col(db, table, new_typed_col, comment='src: '+repr(col))
1196
    new_col.name = new_typed_col.name # propagate any renaming
1197
    
1198
    update(db, table, [(new_col, expr)], in_place=True, cacheable=True)
1199
    add_index(db, new_col)
1200
    
1201
    return new_col
1202

  
1203 1094
def drop_table(db, table):
1204 1095
    table = sql_gen.as_Table(table)
1205 1096
    return run_query(db, 'DROP TABLE IF EXISTS '+table.to_str(db)+' CASCADE')
......
1285 1176
        order_by='routine_schema', limit=1, log_level=4))) != []
1286 1177
        # TODO: order_by search_path schema order
1287 1178

  
1288
def errors_table(db, table, if_exists=True):
1289
    '''
1290
    @param if_exists If set, returns None if the errors table doesn't exist
1291
    @return None|sql_gen.Table
1292
    '''
1293
    table = sql_gen.as_Table(table)
1294
    if table.srcs != (): table = table.srcs[0]
1295
    
1296
    errors_table = sql_gen.suffixed_table(table, '.errors')
1297
    if if_exists and not table_exists(db, errors_table): return None
1298
    return errors_table
1299

  
1300 1179
##### Database management
1301 1180

  
1302 1181
def empty_db(db, schema='public', **kw_args):
lib/sql_io.py
7 7
import strings
8 8
import util
9 9

  
10
##### Error tracking
11

  
12
def track_data_error(db, errors_table, cols, value, error_code, error):
13
    '''
14
    @param errors_table If None, does nothing.
15
    '''
16
    if errors_table == None or cols == (): return
17
    
18
    for col in cols:
19
        try:
20
            sql.insert(db, errors_table, dict(column=col.name, value=value,
21
                error_code=error_code, error=error), recover=True,
22
                cacheable=True, log_level=4)
23
        except sql.DuplicateKeyException: pass
24

  
25
def cast(db, type_, col, errors_table=None):
26
    '''Casts an (unrenamed) column or value.
27
    If errors_table set and col has srcs, saves errors in errors_table (using
28
    col's srcs attr as the source columns) and converts errors to warnings.
29
    @param col str|sql_gen.Col|sql_gen.Literal
30
    @param errors_table None|sql_gen.Table|str
31
    '''
32
    col = sql_gen.as_Col(col)
33
    save_errors = (errors_table != None and isinstance(col, sql_gen.Col)
34
        and col.srcs != ())
35
    if not save_errors: return sql_gen.Cast(type_, col) # can't save errors
36
    
37
    assert not isinstance(col, sql_gen.NamedCol)
38
    
39
    errors_table = sql_gen.as_Table(errors_table)
40
    srcs = map(sql_gen.to_name_only_col, col.srcs)
41
    function_name = str(sql_gen.FunctionCall(type_, *srcs))
42
    function = db.TempFunction(function_name)
43
    
44
    while True:
45
        # Create function definition
46
        errors_table_cols = map(sql_gen.Col,
47
            ['column', 'value', 'error_code', 'error'])
48
        query = '''\
49
CREATE FUNCTION '''+function.to_str(db)+'''(value text)
50
RETURNS '''+type_+'''
51
LANGUAGE plpgsql
52
STRICT
53
AS $$
54
BEGIN
55
    /* The explicit cast to the return type is needed to make the cast happen
56
    inside the try block. (Implicit casts to the return type happen at the end
57
    of the function, outside any block.) */
58
    RETURN value::'''+type_+''';
59
EXCEPTION
60
    WHEN data_exception THEN
61
        -- Save error in errors table.
62
        DECLARE
63
            error_code text := SQLSTATE;
64
            error text := SQLERRM;
65
            "column" text;
66
        BEGIN
67
            -- Insert the value and error for *each* source column.
68
            FOR "column" IN
69
'''+sql.mk_select(db, sql_gen.NamedValues('c', None, [[c.name] for c in srcs]),
70
    order_by=None, start=0)+'''
71
            LOOP
72
                BEGIN
73
'''+sql.mk_insert_select(db, errors_table, errors_table_cols,
74
    sql_gen.Values(errors_table_cols).to_str(db))+''';
75
                EXCEPTION
76
                    WHEN unique_violation THEN NULL; -- continue to next row
77
                END;
78
            END LOOP;
79
        END;
80
        
81
        RAISE WARNING '%', SQLERRM;
82
        RETURN NULL;
83
END;
84
$$;
85
'''
86
        
87
        # Create function
88
        try:
89
            sql.run_query(db, query, recover=True, cacheable=True,
90
                log_ignore_excs=(sql.DuplicateException,))
91
            break # successful
92
        except sql.DuplicateException:
93
            function.name = sql.next_version(function.name)
94
            # try again with next version of name
95
    
96
    return sql_gen.FunctionCall(function, col)
97

  
98
def cast_temp_col(db, type_, col, errors_table=None):
99
    '''Like cast(), but creates a new column with the cast values if the input
100
    is a column.
101
    @return The new column or cast value
102
    '''
103
    def cast_(col): return cast(db, type_, col, errors_table)
104
    
105
    try: col = sql_gen.underlying_col(col)
106
    except sql_gen.NoUnderlyingTableException: return sql_gen.wrap(cast_, col)
107
    
108
    table = col.table
109
    new_col = sql_gen.Col(sql_gen.concat(col.name, '::'+type_), table, col.srcs)
110
    expr = cast_(col)
111
    
112
    # Add column
113
    new_typed_col = sql_gen.TypedCol(new_col.name, type_)
114
    sql.add_col(db, table, new_typed_col, comment='src: '+repr(col))
115
    new_col.name = new_typed_col.name # propagate any renaming
116
    
117
    sql.update(db, table, [(new_col, expr)], in_place=True, cacheable=True)
118
    sql.add_index(db, new_col)
119
    
120
    return new_col
121

  
122
def errors_table(db, table, if_exists=True):
123
    '''
124
    @param if_exists If set, returns None if the errors table doesn't exist
125
    @return None|sql_gen.Table
126
    '''
127
    table = sql_gen.as_Table(table)
128
    if table.srcs != (): table = table.srcs[0]
129
    
130
    errors_table = sql_gen.suffixed_table(table, '.errors')
131
    if if_exists and not sql.table_exists(db, errors_table): return None
132
    return errors_table
133

  
134
##### Import
135

  
10 136
def put(db, table, row, pkey_=None, row_ct_ref=None):
11 137
    '''Recovers from errors.
12 138
    Only works under PostgreSQL (uses INSERT RETURNING).
......
104 230
    # Create input joins from list of input tables
105 231
    in_tables_ = in_tables[:] # don't modify input!
106 232
    in_tables0 = in_tables_.pop(0) # first table is separate
107
    errors_table_ = sql.errors_table(db, in_tables0)
233
    errors_table_ = errors_table(db, in_tables0)
108 234
    in_pkey = sql.pkey(db, in_tables0, recover=True)
109 235
    in_pkey_col = sql_gen.as_Col(in_pkey, in_tables0)
110 236
    input_joins = [in_tables0]+[sql_gen.Join(v,
......
175 301
        limit_ref[0] = 0 # just create an empty pkeys table
176 302
    
177 303
    def ignore(in_col, value, e):
178
        sql.track_data_error(db, errors_table_, in_col.srcs, value,
304
        track_data_error(db, errors_table_, in_col.srcs, value,
179 305
            e.cause.pgcode, e.cause.pgerror)
180 306
        log_debug('Ignoring rows with '+strings.as_tt(repr(in_col))+' = '
181 307
            +strings.as_tt(repr(value)))
......
242 368
            
243 369
            log_debug('Casting '+strings.as_tt(out_col)+' input to '
244 370
                +strings.as_tt(type_))
245
            mapping[out_col] = sql.cast_temp_col(db, type_, mapping[out_col],
371
            mapping[out_col] = cast_temp_col(db, type_, mapping[out_col],
246 372
                errors_table_)
247 373
        except sql.DuplicateKeyException, e:
248 374
            log_exc(e)

Also available in: Unified diff