Project

General

Profile

« Previous | Next » 

Revision 4996

csv2db: Use new sql_io.import_csv()

View differences:

bin/csv2db
3 3
# When no command is specified, just cleans up the specified table.
4 4
# The command may be run more than once.
5 5

  
6
import csv
7 6
import os.path
8
import re
9 7
import subprocess
10 8
import sys
11 9

  
12 10
sys.path.append(os.path.dirname(__file__)+"/../lib")
13 11

  
14
import csvs
15 12
import exc
16 13
import opts
17 14
import sql
......
49 46
    
50 47
    table = sql_gen.Table(table, schema)
51 48
    
52
    use_copy_from = [True]
49
    use_copy_from = True
53 50
    
54 51
    # Loads data into the table using the currently-selected approach.
55 52
    def load():
56 53
        # Open input stream
57 54
        proc = subprocess.Popen(input_cmd, stdout=subprocess.PIPE, bufsize=-1)
58 55
        in_ = proc.stdout
56
        line_in = streams.ProgressInputStream(in_, sys.stderr, n=1000)
59 57
        
60
        # Get format info
61
        info = csvs.stream_info(in_, parse_header=True)
62
        dialect = info.dialect
63
        if csvs.is_tsv(dialect): use_copy_from[0] = False
64
        col_names = map(strings.to_unicode, info.header)
65
        for i, col in enumerate(col_names): # replace empty column names
66
            if col == '': col_names[i] = 'column_'+str(i)
67
        
68
        # Select schema and escape names
69
        def esc_name(name): return db.esc_name(name)
70
        
71
        typed_cols = [sql_gen.TypedCol(v, 'text') for v in col_names]
72
        
73
        log('Creating table')
74
        sql.create_table(db, table, typed_cols, has_pkey=False,
75
            col_indexes=False)
76
        
77
        def load_():
78
            # Create COPY FROM statement
79
            if use_copy_from[0]:
80
                copy_from = ('COPY '+table.to_str(db)+' FROM STDIN DELIMITER '
81
                    +db.esc_value(dialect.delimiter)+' NULL '+db.esc_value(''))
82
                assert not csvs.is_tsv(dialect)
83
                copy_from += ' CSV'
84
                if dialect.quoting != csv.QUOTE_NONE:
85
                    quote_str = db.esc_value(dialect.quotechar)
86
                    copy_from += ' QUOTE '+quote_str
87
                    if dialect.doublequote: copy_from += ' ESCAPE '+quote_str
88
                copy_from += ';\n'
89
            
90
            # Load the data
91
            line_in = streams.ProgressInputStream(in_, sys.stderr, n=1000)
92
            try:
93
                if use_copy_from[0]:
94
                    log('Using COPY FROM')
95
                    log(copy_from, level=2)
96
                    db.db.cursor().copy_expert(copy_from, line_in)
97
                else:
98
                    log('Using INSERT')
99
                    cols_ct = len(col_names)
100
                    for row in csvs.make_reader(line_in, dialect):
101
                        row = map(strings.to_unicode, row)
102
                        util.list_set_length(row, cols_ct) # truncate extra cols
103
                        sql.insert(db, table, row, cacheable=False, log_level=5)
104
            finally:
105
                line_in.close() # also closes proc.stdout
106
                proc.wait()
107
            
108
            if has_row_num: sql.add_row_num(db, table)
109
        sql.with_savepoint(db, load_)
58
        # Import data
59
        try: sql_io.import_csv(db, table, line_in, use_copy_from, has_row_num)
60
        finally:
61
            line_in.close() # also closes proc.stdout
62
            proc.wait()
110 63
    
111 64
    if input_cmd != []:
112 65
        try: load()
113 66
        except sql.DatabaseErrors, e:
114
            if use_copy_from[0]: # first try
67
            if use_copy_from: # first try
115 68
                exc.print_ex(e, plain=True)
116
                use_copy_from[0] = False
69
                use_copy_from = False
117 70
                load() # try again with different approach
118 71
            else: raise
119
    
120
    sql_io.cleanup_table(db, table)
72
    else: sql_io.cleanup_table(db, table)
121 73

  
122 74
main()

Also available in: Unified diff