Revision 2680
Added by Aaron Marcuse-Kubitza over 12 years ago
csv2db | ||
---|---|---|
14 | 14 |
import exc |
15 | 15 |
import opts |
16 | 16 |
import sql |
17 |
import sql_gen |
|
17 | 18 |
import streams |
18 | 19 |
import strings |
19 | 20 |
import util |
... | ... | |
33 | 34 |
table = opts.get_env_var('table', None, env_names) |
34 | 35 |
schema = opts.get_env_var('schema', 'public', env_names) |
35 | 36 |
db_config = opts.get_env_vars(sql.db_config_names, None, env_names) |
36 |
debug = opts.env_flag('debug', False, env_names)
|
|
37 |
verbosity = util.cast(float, opts.get_env_var('verbosity', 1, env_names))
|
|
37 | 38 |
if not (table != None and 'engine' in db_config): usage_err() |
38 | 39 |
|
39 | 40 |
# Connect to DB |
40 |
db = sql.connect(db_config) |
|
41 |
def log(msg, level=1): |
|
42 |
'''Higher level -> more verbose''' |
|
43 |
if level <= verbosity: sys.stderr.write(msg.rstrip('\n')+'\n') |
|
44 |
db = sql.connect(db_config, log_debug=log) |
|
41 | 45 |
|
42 | 46 |
use_copy_from = [True] |
43 | 47 |
|
44 | 48 |
# Loads data into the table using the currently-selected approach. |
45 |
def load_():
|
|
49 |
def load(): |
|
46 | 50 |
# Open input stream |
47 | 51 |
proc = subprocess.Popen(input_cmd, stdout=subprocess.PIPE, bufsize=-1) |
48 | 52 |
in_ = proc.stdout |
... | ... | |
51 | 55 |
info = csvs.stream_info(in_, parse_header=True) |
52 | 56 |
dialect = info.dialect |
53 | 57 |
if csvs.is_tsv(dialect): use_copy_from[0] = False |
54 |
cols = info.header |
|
55 |
for i, col in enumerate(cols): # replace empty column names |
|
56 |
if col == '': cols[i] = 'column_'+str(i) |
|
58 |
col_names = info.header
|
|
59 |
for i, col in enumerate(col_names): # replace empty column names
|
|
60 |
if col == '': col_names[i] = 'column_'+str(i)
|
|
57 | 61 |
|
58 | 62 |
# Select schema and escape names |
59 | 63 |
def esc_name(name): return sql.esc_name(db, name) |
60 | 64 |
sql.run_query(db, 'SET search_path TO '+esc_name(schema)) |
61 |
esc_table = esc_name(table) |
|
62 |
esc_cols = map(esc_name, cols) |
|
63 | 65 |
|
64 |
# Create CREATE TABLE statement |
|
65 |
pkey = esc_name(table+'_pkey') |
|
66 |
create_table = 'CREATE TABLE '+esc_table+' (\n' |
|
67 |
create_table += ' row_num serial NOT NULL PRIMARY KEY\n' |
|
68 |
for esc_col in esc_cols: create_table += ' , '+esc_col+' text\n' |
|
69 |
create_table += ');\n' |
|
70 |
if debug: sys.stderr.write(create_table) |
|
66 |
def load_(): |
|
67 |
log('Creating table') |
|
68 |
typed_cols = [sql_gen.TypedCol('row_num', 'serial')]+[ |
|
69 |
sql_gen.TypedCol(v, 'text') for v in col_names] |
|
70 |
sql.create_table(db, table, typed_cols) |
|
71 |
|
|
72 |
# Create COPY FROM statement |
|
73 |
if use_copy_from[0]: |
|
74 |
cur = db.db.cursor() |
|
75 |
copy_from = ('COPY '+esc_name(table)+' (' |
|
76 |
+(', '.join(map(esc_name, col_names))) |
|
77 |
+') FROM STDIN DELIMITER %(delimiter)s NULL %(null)s') |
|
78 |
assert not csvs.is_tsv(dialect) |
|
79 |
copy_from += ' CSV' |
|
80 |
if dialect.quoting != csv.QUOTE_NONE: |
|
81 |
copy_from += ' QUOTE %(quotechar)s' |
|
82 |
if dialect.doublequote: copy_from += ' ESCAPE %(quotechar)s' |
|
83 |
copy_from += ';\n' |
|
84 |
copy_from = cur.mogrify(copy_from, dict(delimiter= |
|
85 |
dialect.delimiter, null='', quotechar=dialect.quotechar)) |
|
86 |
|
|
87 |
# Load the data |
|
88 |
line_in = streams.ProgressInputStream(in_, sys.stderr, |
|
89 |
'Processed %d row(s)', n=1000) |
|
90 |
try: |
|
91 |
if use_copy_from[0]: |
|
92 |
log('Using COPY FROM') |
|
93 |
log(copy_from, level=2) |
|
94 |
db.db.cursor().copy_expert(copy_from, line_in) |
|
95 |
else: |
|
96 |
log('Using INSERT') |
|
97 |
cols_ct = len(col_names)+1 # +1 for row_num |
|
98 |
for row in csvs.make_reader(line_in, dialect): |
|
99 |
row = map(strings.to_unicode, row) |
|
100 |
row.insert(0, sql.default) # row_num is autogen |
|
101 |
util.list_set_length(row, cols_ct) # truncate extra cols |
|
102 |
sql.insert(db, table, row, log_level=4) |
|
103 |
finally: |
|
104 |
line_in.close() # also closes proc.stdout |
|
105 |
proc.wait() |
|
106 |
sql.with_savepoint(db, load_) |
|
71 | 107 |
|
72 |
# Create table |
|
73 |
sql.run_query(db, create_table) |
|
108 |
db.db.commit() |
|
74 | 109 |
|
75 |
# Create COPY FROM statement |
|
76 |
if use_copy_from[0]: |
|
77 |
cur = db.db.cursor() |
|
78 |
copy_from = ('COPY '+esc_table+' ('+(', '.join(esc_cols)) |
|
79 |
+') FROM STDIN DELIMITER %(delimiter)s NULL %(null)s') |
|
80 |
assert not csvs.is_tsv(dialect) |
|
81 |
copy_from += ' CSV' |
|
82 |
if dialect.quoting != csv.QUOTE_NONE: |
|
83 |
copy_from += ' QUOTE %(quotechar)s' |
|
84 |
if dialect.doublequote: copy_from += ' ESCAPE %(quotechar)s' |
|
85 |
copy_from += ';\n' |
|
86 |
copy_from = cur.mogrify(copy_from, dict(delimiter=dialect.delimiter, |
|
87 |
null='', quotechar=dialect.quotechar)) |
|
88 |
if debug: sys.stderr.write(copy_from) |
|
110 |
log('Cleaning up table') |
|
111 |
sql.cleanup_table(db, table, col_names) |
|
112 |
db.db.commit() |
|
89 | 113 |
|
90 |
# Load the data |
|
91 |
line_in = streams.ProgressInputStream(in_, sys.stderr, |
|
92 |
'Processed %d row(s)', n=1000) |
|
93 |
try: |
|
94 |
if use_copy_from[0]: |
|
95 |
sys.stderr.write('Using COPY FROM\n') |
|
96 |
db.db.cursor().copy_expert(copy_from, line_in) |
|
97 |
else: |
|
98 |
sys.stderr.write('Using INSERT\n') |
|
99 |
cols_ct = len(cols)+1 # +1 for row_num |
|
100 |
for row in csvs.make_reader(line_in, dialect): |
|
101 |
row = map(strings.to_unicode, row) |
|
102 |
row.insert(0, sql.default) # leave space for autogen row_num |
|
103 |
util.list_set_length(row, cols_ct) # truncate extra cols |
|
104 |
sql.insert(db, table, row) |
|
105 |
finally: |
|
106 |
line_in.close() # also closes proc.stdout |
|
107 |
proc.wait() |
|
108 |
|
|
109 |
# Clean up the data |
|
110 |
sys.stderr.write('Cleaning up table\n') |
|
111 |
sql.cleanup_table(db, table, cols) |
|
112 |
load = lambda: sql.with_savepoint(db, load_) |
|
114 |
log('Adding indexes') |
|
115 |
for name in col_names: |
|
116 |
log('Adding index on '+name) |
|
117 |
sql.add_index(db, sql_gen.Col(name, table)) |
|
118 |
db.db.commit() |
|
113 | 119 |
|
114 | 120 |
try: load() |
115 | 121 |
except sql.DatabaseErrors, e: |
... | ... | |
118 | 124 |
use_copy_from[0] = False |
119 | 125 |
load() # try again with different approach |
120 | 126 |
else: raise e |
121 |
db.db.commit() |
|
122 | 127 |
|
123 | 128 |
main() |
Also available in: Unified diff
csv2db: Use verbosity-based logging like bin/map. Use sql.create_table(). Add indexes on the columns to speed up column-based import and to speed up searching the table for particular values.