Revision 3078
Added by Aaron Marcuse-Kubitza over 12 years ago
lib/sql.py | ||
---|---|---|
396 | 396 |
table = sql_gen.Table('columns', 'information_schema') |
397 | 397 |
type_ = sql_gen.Coalesce(sql_gen.Nullif(sql_gen.Col('data_type'), |
398 | 398 |
'USER-DEFINED'), sql_gen.Col('udt_name')) |
399 |
cols = [type_, 'column_default', cast(self, 'boolean', 'is_nullable')] |
|
399 |
cols = [type_, 'column_default', |
|
400 |
sql_gen.Cast('boolean', sql_gen.Col('is_nullable'))] |
|
400 | 401 |
|
401 | 402 |
conds = [('table_name', col.table.name), ('column_name', col.name)] |
402 | 403 |
schema = col.table.schema |
... | ... | |
856 | 857 |
into=into, add_indexes_=True) |
857 | 858 |
return dict(items) |
858 | 859 |
|
859 |
def track_data_error(db, errors_table, cols, value, error_code, error): |
|
860 |
''' |
|
861 |
@param errors_table If None, does nothing. |
|
862 |
''' |
|
863 |
if errors_table == None or cols == (): return |
|
864 |
|
|
865 |
for col in cols: |
|
866 |
try: |
|
867 |
insert(db, errors_table, dict(column=col.name, value=value, |
|
868 |
error_code=error_code, error=error), recover=True, |
|
869 |
cacheable=True, log_level=4) |
|
870 |
except DuplicateKeyException: pass |
|
871 |
|
|
872 |
def cast(db, type_, col, errors_table=None): |
|
873 |
'''Casts an (unrenamed) column or value. |
|
874 |
If errors_table set and col has srcs, saves errors in errors_table (using |
|
875 |
col's srcs attr as the source columns) and converts errors to warnings. |
|
876 |
@param col str|sql_gen.Col|sql_gen.Literal |
|
877 |
@param errors_table None|sql_gen.Table|str |
|
878 |
''' |
|
879 |
col = sql_gen.as_Col(col) |
|
880 |
save_errors = (errors_table != None and isinstance(col, sql_gen.Col) |
|
881 |
and col.srcs != ()) |
|
882 |
if not save_errors: return sql_gen.Cast(type_, col) # can't save errors |
|
883 |
|
|
884 |
assert not isinstance(col, sql_gen.NamedCol) |
|
885 |
|
|
886 |
errors_table = sql_gen.as_Table(errors_table) |
|
887 |
srcs = map(sql_gen.to_name_only_col, col.srcs) |
|
888 |
function_name = str(sql_gen.FunctionCall(type_, *srcs)) |
|
889 |
function = db.TempFunction(function_name) |
|
890 |
|
|
891 |
while True: |
|
892 |
# Create function definition |
|
893 |
errors_table_cols = map(sql_gen.Col, |
|
894 |
['column', 'value', 'error_code', 'error']) |
|
895 |
query = '''\ |
|
896 |
CREATE FUNCTION '''+function.to_str(db)+'''(value text) |
|
897 |
RETURNS '''+type_+''' |
|
898 |
LANGUAGE plpgsql |
|
899 |
STRICT |
|
900 |
AS $$ |
|
901 |
BEGIN |
|
902 |
/* The explicit cast to the return type is needed to make the cast happen |
|
903 |
inside the try block. (Implicit casts to the return type happen at the end |
|
904 |
of the function, outside any block.) */ |
|
905 |
RETURN value::'''+type_+'''; |
|
906 |
EXCEPTION |
|
907 |
WHEN data_exception THEN |
|
908 |
-- Save error in errors table. |
|
909 |
DECLARE |
|
910 |
error_code text := SQLSTATE; |
|
911 |
error text := SQLERRM; |
|
912 |
"column" text; |
|
913 |
BEGIN |
|
914 |
-- Insert the value and error for *each* source column. |
|
915 |
FOR "column" IN |
|
916 |
'''+mk_select(db, sql_gen.NamedValues('c', None, [[c.name] for c in srcs]), |
|
917 |
order_by=None, start=0)+''' |
|
918 |
LOOP |
|
919 |
BEGIN |
|
920 |
'''+mk_insert_select(db, errors_table, errors_table_cols, |
|
921 |
sql_gen.Values(errors_table_cols).to_str(db))+'''; |
|
922 |
EXCEPTION |
|
923 |
WHEN unique_violation THEN NULL; -- continue to next row |
|
924 |
END; |
|
925 |
END LOOP; |
|
926 |
END; |
|
927 |
|
|
928 |
RAISE WARNING '%', SQLERRM; |
|
929 |
RETURN NULL; |
|
930 |
END; |
|
931 |
$$; |
|
932 |
''' |
|
933 |
|
|
934 |
# Create function |
|
935 |
try: |
|
936 |
run_query(db, query, recover=True, cacheable=True, |
|
937 |
log_ignore_excs=(DuplicateException,)) |
|
938 |
break # successful |
|
939 |
except DuplicateException: |
|
940 |
function.name = next_version(function.name) |
|
941 |
# try again with next version of name |
|
942 |
|
|
943 |
return sql_gen.FunctionCall(function, col) |
|
944 |
|
|
945 | 860 |
##### Database structure queries |
946 | 861 |
|
947 | 862 |
def table_row_count(db, table, recover=None): |
... | ... | |
1176 | 1091 |
be the primary key.''' |
1177 | 1092 |
add_col(db, table, row_num_typed_col, log_level=3) |
1178 | 1093 |
|
1179 |
def cast_temp_col(db, type_, col, errors_table=None): |
|
1180 |
'''Like cast(), but creates a new column with the cast values if the input |
|
1181 |
is a column. |
|
1182 |
@return The new column or cast value |
|
1183 |
''' |
|
1184 |
def cast_(col): return cast(db, type_, col, errors_table) |
|
1185 |
|
|
1186 |
try: col = sql_gen.underlying_col(col) |
|
1187 |
except sql_gen.NoUnderlyingTableException: return sql_gen.wrap(cast_, col) |
|
1188 |
|
|
1189 |
table = col.table |
|
1190 |
new_col = sql_gen.Col(sql_gen.concat(col.name, '::'+type_), table, col.srcs) |
|
1191 |
expr = cast_(col) |
|
1192 |
|
|
1193 |
# Add column |
|
1194 |
new_typed_col = sql_gen.TypedCol(new_col.name, type_) |
|
1195 |
add_col(db, table, new_typed_col, comment='src: '+repr(col)) |
|
1196 |
new_col.name = new_typed_col.name # propagate any renaming |
|
1197 |
|
|
1198 |
update(db, table, [(new_col, expr)], in_place=True, cacheable=True) |
|
1199 |
add_index(db, new_col) |
|
1200 |
|
|
1201 |
return new_col |
|
1202 |
|
|
1203 | 1094 |
def drop_table(db, table): |
1204 | 1095 |
table = sql_gen.as_Table(table) |
1205 | 1096 |
return run_query(db, 'DROP TABLE IF EXISTS '+table.to_str(db)+' CASCADE') |
... | ... | |
1285 | 1176 |
order_by='routine_schema', limit=1, log_level=4))) != [] |
1286 | 1177 |
# TODO: order_by search_path schema order |
1287 | 1178 |
|
1288 |
def errors_table(db, table, if_exists=True): |
|
1289 |
''' |
|
1290 |
@param if_exists If set, returns None if the errors table doesn't exist |
|
1291 |
@return None|sql_gen.Table |
|
1292 |
''' |
|
1293 |
table = sql_gen.as_Table(table) |
|
1294 |
if table.srcs != (): table = table.srcs[0] |
|
1295 |
|
|
1296 |
errors_table = sql_gen.suffixed_table(table, '.errors') |
|
1297 |
if if_exists and not table_exists(db, errors_table): return None |
|
1298 |
return errors_table |
|
1299 |
|
|
1300 | 1179 |
##### Database management |
1301 | 1180 |
|
1302 | 1181 |
def empty_db(db, schema='public', **kw_args): |
lib/sql_io.py | ||
---|---|---|
7 | 7 |
import strings |
8 | 8 |
import util |
9 | 9 |
|
10 |
##### Error tracking |
|
11 |
|
|
12 |
def track_data_error(db, errors_table, cols, value, error_code, error): |
|
13 |
''' |
|
14 |
@param errors_table If None, does nothing. |
|
15 |
''' |
|
16 |
if errors_table == None or cols == (): return |
|
17 |
|
|
18 |
for col in cols: |
|
19 |
try: |
|
20 |
sql.insert(db, errors_table, dict(column=col.name, value=value, |
|
21 |
error_code=error_code, error=error), recover=True, |
|
22 |
cacheable=True, log_level=4) |
|
23 |
except sql.DuplicateKeyException: pass |
|
24 |
|
|
25 |
def cast(db, type_, col, errors_table=None): |
|
26 |
'''Casts an (unrenamed) column or value. |
|
27 |
If errors_table set and col has srcs, saves errors in errors_table (using |
|
28 |
col's srcs attr as the source columns) and converts errors to warnings. |
|
29 |
@param col str|sql_gen.Col|sql_gen.Literal |
|
30 |
@param errors_table None|sql_gen.Table|str |
|
31 |
''' |
|
32 |
col = sql_gen.as_Col(col) |
|
33 |
save_errors = (errors_table != None and isinstance(col, sql_gen.Col) |
|
34 |
and col.srcs != ()) |
|
35 |
if not save_errors: return sql_gen.Cast(type_, col) # can't save errors |
|
36 |
|
|
37 |
assert not isinstance(col, sql_gen.NamedCol) |
|
38 |
|
|
39 |
errors_table = sql_gen.as_Table(errors_table) |
|
40 |
srcs = map(sql_gen.to_name_only_col, col.srcs) |
|
41 |
function_name = str(sql_gen.FunctionCall(type_, *srcs)) |
|
42 |
function = db.TempFunction(function_name) |
|
43 |
|
|
44 |
while True: |
|
45 |
# Create function definition |
|
46 |
errors_table_cols = map(sql_gen.Col, |
|
47 |
['column', 'value', 'error_code', 'error']) |
|
48 |
query = '''\ |
|
49 |
CREATE FUNCTION '''+function.to_str(db)+'''(value text) |
|
50 |
RETURNS '''+type_+''' |
|
51 |
LANGUAGE plpgsql |
|
52 |
STRICT |
|
53 |
AS $$ |
|
54 |
BEGIN |
|
55 |
/* The explicit cast to the return type is needed to make the cast happen |
|
56 |
inside the try block. (Implicit casts to the return type happen at the end |
|
57 |
of the function, outside any block.) */ |
|
58 |
RETURN value::'''+type_+'''; |
|
59 |
EXCEPTION |
|
60 |
WHEN data_exception THEN |
|
61 |
-- Save error in errors table. |
|
62 |
DECLARE |
|
63 |
error_code text := SQLSTATE; |
|
64 |
error text := SQLERRM; |
|
65 |
"column" text; |
|
66 |
BEGIN |
|
67 |
-- Insert the value and error for *each* source column. |
|
68 |
FOR "column" IN |
|
69 |
'''+sql.mk_select(db, sql_gen.NamedValues('c', None, [[c.name] for c in srcs]), |
|
70 |
order_by=None, start=0)+''' |
|
71 |
LOOP |
|
72 |
BEGIN |
|
73 |
'''+sql.mk_insert_select(db, errors_table, errors_table_cols, |
|
74 |
sql_gen.Values(errors_table_cols).to_str(db))+'''; |
|
75 |
EXCEPTION |
|
76 |
WHEN unique_violation THEN NULL; -- continue to next row |
|
77 |
END; |
|
78 |
END LOOP; |
|
79 |
END; |
|
80 |
|
|
81 |
RAISE WARNING '%', SQLERRM; |
|
82 |
RETURN NULL; |
|
83 |
END; |
|
84 |
$$; |
|
85 |
''' |
|
86 |
|
|
87 |
# Create function |
|
88 |
try: |
|
89 |
sql.run_query(db, query, recover=True, cacheable=True, |
|
90 |
log_ignore_excs=(sql.DuplicateException,)) |
|
91 |
break # successful |
|
92 |
except sql.DuplicateException: |
|
93 |
function.name = sql.next_version(function.name) |
|
94 |
# try again with next version of name |
|
95 |
|
|
96 |
return sql_gen.FunctionCall(function, col) |
|
97 |
|
|
98 |
def cast_temp_col(db, type_, col, errors_table=None): |
|
99 |
'''Like cast(), but creates a new column with the cast values if the input |
|
100 |
is a column. |
|
101 |
@return The new column or cast value |
|
102 |
''' |
|
103 |
def cast_(col): return cast(db, type_, col, errors_table) |
|
104 |
|
|
105 |
try: col = sql_gen.underlying_col(col) |
|
106 |
except sql_gen.NoUnderlyingTableException: return sql_gen.wrap(cast_, col) |
|
107 |
|
|
108 |
table = col.table |
|
109 |
new_col = sql_gen.Col(sql_gen.concat(col.name, '::'+type_), table, col.srcs) |
|
110 |
expr = cast_(col) |
|
111 |
|
|
112 |
# Add column |
|
113 |
new_typed_col = sql_gen.TypedCol(new_col.name, type_) |
|
114 |
sql.add_col(db, table, new_typed_col, comment='src: '+repr(col)) |
|
115 |
new_col.name = new_typed_col.name # propagate any renaming |
|
116 |
|
|
117 |
sql.update(db, table, [(new_col, expr)], in_place=True, cacheable=True) |
|
118 |
sql.add_index(db, new_col) |
|
119 |
|
|
120 |
return new_col |
|
121 |
|
|
122 |
def errors_table(db, table, if_exists=True): |
|
123 |
''' |
|
124 |
@param if_exists If set, returns None if the errors table doesn't exist |
|
125 |
@return None|sql_gen.Table |
|
126 |
''' |
|
127 |
table = sql_gen.as_Table(table) |
|
128 |
if table.srcs != (): table = table.srcs[0] |
|
129 |
|
|
130 |
errors_table = sql_gen.suffixed_table(table, '.errors') |
|
131 |
if if_exists and not sql.table_exists(db, errors_table): return None |
|
132 |
return errors_table |
|
133 |
|
|
134 |
##### Import |
|
135 |
|
|
10 | 136 |
def put(db, table, row, pkey_=None, row_ct_ref=None): |
11 | 137 |
'''Recovers from errors. |
12 | 138 |
Only works under PostgreSQL (uses INSERT RETURNING). |
... | ... | |
104 | 230 |
# Create input joins from list of input tables |
105 | 231 |
in_tables_ = in_tables[:] # don't modify input! |
106 | 232 |
in_tables0 = in_tables_.pop(0) # first table is separate |
107 |
errors_table_ = sql.errors_table(db, in_tables0)
|
|
233 |
errors_table_ = errors_table(db, in_tables0) |
|
108 | 234 |
in_pkey = sql.pkey(db, in_tables0, recover=True) |
109 | 235 |
in_pkey_col = sql_gen.as_Col(in_pkey, in_tables0) |
110 | 236 |
input_joins = [in_tables0]+[sql_gen.Join(v, |
... | ... | |
175 | 301 |
limit_ref[0] = 0 # just create an empty pkeys table |
176 | 302 |
|
177 | 303 |
def ignore(in_col, value, e): |
178 |
sql.track_data_error(db, errors_table_, in_col.srcs, value,
|
|
304 |
track_data_error(db, errors_table_, in_col.srcs, value, |
|
179 | 305 |
e.cause.pgcode, e.cause.pgerror) |
180 | 306 |
log_debug('Ignoring rows with '+strings.as_tt(repr(in_col))+' = ' |
181 | 307 |
+strings.as_tt(repr(value))) |
... | ... | |
242 | 368 |
|
243 | 369 |
log_debug('Casting '+strings.as_tt(out_col)+' input to ' |
244 | 370 |
+strings.as_tt(type_)) |
245 |
mapping[out_col] = sql.cast_temp_col(db, type_, mapping[out_col],
|
|
371 |
mapping[out_col] = cast_temp_col(db, type_, mapping[out_col], |
|
246 | 372 |
errors_table_) |
247 | 373 |
except sql.DuplicateKeyException, e: |
248 | 374 |
log_exc(e) |
Also available in: Unified diff
Moved error tracking from sql.py to sql_io.py