Project

General

Profile

1 1942 aaronmk
#!/usr/bin/env python
2
# Loads a command's CSV output stream into a PostgreSQL table.
3 4446 aaronmk
# When no command is specified, just cleans up the specified table.
4 1942 aaronmk
# The command may be run more than once.
5
6
import os.path
7
import subprocess
8
import sys
9
10
sys.path.append(os.path.dirname(__file__)+"/../lib")
11
12 5591 aaronmk
import csvs
13 1942 aaronmk
import exc
14
import opts
15
import sql
16 3080 aaronmk
import sql_io
17 2680 aaronmk
import sql_gen
18 1942 aaronmk
import streams
19 1963 aaronmk
import strings
20 1965 aaronmk
import util
21 1942 aaronmk
22
def main():
23
    # Usage
24
    env_names = []
25
    def usage_err():
26
        raise SystemExit('Usage: '+opts.env_usage(env_names)+' '+sys.argv[0]
27 4446 aaronmk
            +' [input_cmd args...]')
28 1942 aaronmk
29
    # Parse args
30
    input_cmd = sys.argv[1:]
31
32
    # Get config from env vars
33
    table = opts.get_env_var('table', None, env_names)
34
    schema = opts.get_env_var('schema', 'public', env_names)
35
    db_config = opts.get_env_vars(sql.db_config_names, None, env_names)
36 3271 aaronmk
    verbosity = util.cast(float, opts.get_env_var('verbosity', 3, env_names))
37 1942 aaronmk
38 4446 aaronmk
    if not (table != None and 'engine' in db_config): usage_err()
39 2890 aaronmk
40 1942 aaronmk
    # Connect to DB
41 2680 aaronmk
    def log(msg, level=1):
42
        '''Higher level -> more verbose'''
43 3610 aaronmk
        if level <= verbosity:
44
            sys.stderr.write(strings.to_raw_str(msg.rstrip('\n')+'\n'))
45 2680 aaronmk
    db = sql.connect(db_config, log_debug=log)
46 1942 aaronmk
47 3138 aaronmk
    table = sql_gen.Table(table, schema)
48
49 1963 aaronmk
    # Loads data into the table using the currently-selected approach.
50 2680 aaronmk
    def load():
51 1942 aaronmk
        # Open input stream
52 7652 aaronmk
        proc = subprocess.Popen(input_cmd, stdout=subprocess.PIPE, bufsize=-1,
53
            universal_newlines=True)
54 1942 aaronmk
        in_ = proc.stdout
55
56 4996 aaronmk
        # Import data
57 5591 aaronmk
        try: sql_io.import_csv(db, table, *csvs.reader_and_header(in_))
58 4996 aaronmk
        finally:
59 5589 aaronmk
            in_.close() # also closes proc.stdout
60 4996 aaronmk
            proc.wait()
61 1942 aaronmk
62 4446 aaronmk
    if input_cmd != []:
63
        try: load()
64 5580 aaronmk
        except sql.EncodingException, e:
65
            exc.print_ex(e, plain=True)
66
            assert e.name == 'UTF8'
67
68
            db.set_encoding('LATIN1')
69
            load() # try again with new encoding
70 4996 aaronmk
    else: sql_io.cleanup_table(db, table)
71 1942 aaronmk
72
main()