Project

General

Profile

1
#!/usr/bin/env python
2
# Loads a command's CSV output stream into a PostgreSQL table.
3
# When no command is specified, just cleans up the specified table.
4
# The command may be run more than once.
5

    
6
import os.path
7
import subprocess
8
import sys
9

    
10
sys.path.append(os.path.dirname(__file__)+"/../lib")
11

    
12
import exc
13
import opts
14
import sql
15
import sql_io
16
import sql_gen
17
import streams
18
import strings
19
import util
20

    
21
def main():
22
    # Usage
23
    env_names = []
24
    def usage_err():
25
        raise SystemExit('Usage: '+opts.env_usage(env_names)+' '+sys.argv[0]
26
            +' [input_cmd args...]')
27
    
28
    # Parse args
29
    input_cmd = sys.argv[1:]
30
    
31
    # Get config from env vars
32
    table = opts.get_env_var('table', None, env_names)
33
    schema = opts.get_env_var('schema', 'public', env_names)
34
    db_config = opts.get_env_vars(sql.db_config_names, None, env_names)
35
    verbosity = util.cast(float, opts.get_env_var('verbosity', 3, env_names))
36
    
37
    if not (table != None and 'engine' in db_config): usage_err()
38
    
39
    # Connect to DB
40
    def log(msg, level=1):
41
        '''Higher level -> more verbose'''
42
        if level <= verbosity:
43
            sys.stderr.write(strings.to_raw_str(msg.rstrip('\n')+'\n'))
44
    db = sql.connect(db_config, log_debug=log)
45
    
46
    table = sql_gen.Table(table, schema)
47
    
48
    use_copy_from = True
49
    
50
    # Loads data into the table using the currently-selected approach.
51
    def load():
52
        # Open input stream
53
        proc = subprocess.Popen(input_cmd, stdout=subprocess.PIPE, bufsize=-1)
54
        in_ = proc.stdout
55
        line_in = streams.ProgressInputStream(in_, sys.stderr, n=1000)
56
        
57
        # Import data
58
        try: sql_io.import_csv(db, table, line_in, use_copy_from)
59
        finally:
60
            line_in.close() # also closes proc.stdout
61
            proc.wait()
62
    
63
    if input_cmd != []:
64
        try: load()
65
        except sql.EncodingException, e:
66
            exc.print_ex(e, plain=True)
67
            assert e.name == 'UTF8'
68
            
69
            db.set_encoding('LATIN1')
70
            load() # try again with new encoding
71
        except sql.DatabaseErrors, e:
72
            if use_copy_from: # first try
73
                exc.print_ex(e, plain=True)
74
                use_copy_from = False
75
                load() # try again with different approach
76
            else: raise
77
    else: sql_io.cleanup_table(db, table)
78

    
79
main()
(10-10/61)