Revision 4996
Added by Aaron Marcuse-Kubitza over 12 years ago
bin/csv2db | ||
---|---|---|
3 | 3 |
# When no command is specified, just cleans up the specified table. |
4 | 4 |
# The command may be run more than once. |
5 | 5 |
|
6 |
import csv |
|
7 | 6 |
import os.path |
8 |
import re |
|
9 | 7 |
import subprocess |
10 | 8 |
import sys |
11 | 9 |
|
12 | 10 |
sys.path.append(os.path.dirname(__file__)+"/../lib") |
13 | 11 |
|
14 |
import csvs |
|
15 | 12 |
import exc |
16 | 13 |
import opts |
17 | 14 |
import sql |
... | ... | |
49 | 46 |
|
50 | 47 |
table = sql_gen.Table(table, schema) |
51 | 48 |
|
52 |
use_copy_from = [True]
|
|
49 |
use_copy_from = True
|
|
53 | 50 |
|
54 | 51 |
# Loads data into the table using the currently-selected approach. |
55 | 52 |
def load(): |
56 | 53 |
# Open input stream |
57 | 54 |
proc = subprocess.Popen(input_cmd, stdout=subprocess.PIPE, bufsize=-1) |
58 | 55 |
in_ = proc.stdout |
56 |
line_in = streams.ProgressInputStream(in_, sys.stderr, n=1000) |
|
59 | 57 |
|
60 |
# Get format info |
|
61 |
info = csvs.stream_info(in_, parse_header=True) |
|
62 |
dialect = info.dialect |
|
63 |
if csvs.is_tsv(dialect): use_copy_from[0] = False |
|
64 |
col_names = map(strings.to_unicode, info.header) |
|
65 |
for i, col in enumerate(col_names): # replace empty column names |
|
66 |
if col == '': col_names[i] = 'column_'+str(i) |
|
67 |
|
|
68 |
# Select schema and escape names |
|
69 |
def esc_name(name): return db.esc_name(name) |
|
70 |
|
|
71 |
typed_cols = [sql_gen.TypedCol(v, 'text') for v in col_names] |
|
72 |
|
|
73 |
log('Creating table') |
|
74 |
sql.create_table(db, table, typed_cols, has_pkey=False, |
|
75 |
col_indexes=False) |
|
76 |
|
|
77 |
def load_(): |
|
78 |
# Create COPY FROM statement |
|
79 |
if use_copy_from[0]: |
|
80 |
copy_from = ('COPY '+table.to_str(db)+' FROM STDIN DELIMITER ' |
|
81 |
+db.esc_value(dialect.delimiter)+' NULL '+db.esc_value('')) |
|
82 |
assert not csvs.is_tsv(dialect) |
|
83 |
copy_from += ' CSV' |
|
84 |
if dialect.quoting != csv.QUOTE_NONE: |
|
85 |
quote_str = db.esc_value(dialect.quotechar) |
|
86 |
copy_from += ' QUOTE '+quote_str |
|
87 |
if dialect.doublequote: copy_from += ' ESCAPE '+quote_str |
|
88 |
copy_from += ';\n' |
|
89 |
|
|
90 |
# Load the data |
|
91 |
line_in = streams.ProgressInputStream(in_, sys.stderr, n=1000) |
|
92 |
try: |
|
93 |
if use_copy_from[0]: |
|
94 |
log('Using COPY FROM') |
|
95 |
log(copy_from, level=2) |
|
96 |
db.db.cursor().copy_expert(copy_from, line_in) |
|
97 |
else: |
|
98 |
log('Using INSERT') |
|
99 |
cols_ct = len(col_names) |
|
100 |
for row in csvs.make_reader(line_in, dialect): |
|
101 |
row = map(strings.to_unicode, row) |
|
102 |
util.list_set_length(row, cols_ct) # truncate extra cols |
|
103 |
sql.insert(db, table, row, cacheable=False, log_level=5) |
|
104 |
finally: |
|
105 |
line_in.close() # also closes proc.stdout |
|
106 |
proc.wait() |
|
107 |
|
|
108 |
if has_row_num: sql.add_row_num(db, table) |
|
109 |
sql.with_savepoint(db, load_) |
|
58 |
# Import data |
|
59 |
try: sql_io.import_csv(db, table, line_in, use_copy_from, has_row_num) |
|
60 |
finally: |
|
61 |
line_in.close() # also closes proc.stdout |
|
62 |
proc.wait() |
|
110 | 63 |
|
111 | 64 |
if input_cmd != []: |
112 | 65 |
try: load() |
113 | 66 |
except sql.DatabaseErrors, e: |
114 |
if use_copy_from[0]: # first try
|
|
67 |
if use_copy_from: # first try |
|
115 | 68 |
exc.print_ex(e, plain=True) |
116 |
use_copy_from[0] = False
|
|
69 |
use_copy_from = False |
|
117 | 70 |
load() # try again with different approach |
118 | 71 |
else: raise |
119 |
|
|
120 |
sql_io.cleanup_table(db, table) |
|
72 |
else: sql_io.cleanup_table(db, table) |
|
121 | 73 |
|
122 | 74 |
main() |
Also available in: Unified diff
csv2db: Use new sql_io.import_csv()