Project

General

Profile

1
#!/usr/bin/env python
2
# Scrubs the taxonpaths in VegBIEN using TNRS.
3
# Runs continuously until no new rows are added after max_pause.
4

    
5
import os.path
6
import StringIO
7
import sys
8
import time
9

    
10
sys.path.append(os.path.dirname(__file__)+"/../lib")
11

    
12
import csvs
13
import opts
14
import profiling
15
import sql
16
import sql_gen
17
import sql_io
18
import streams
19
import strings
20
import tnrs
21

    
22
# Config
23
pause = 60 # sec
24
max_pause = 2*60*60 # sec; = 2 hr; must be >= max import time of one partition
25
assert pause <= max_pause
26
max_taxons = 500 # less than the limit to avoid slowing down the TNRS server
27

    
28
tnrs_data = sql_gen.Table('tnrs')
29

    
30
def main():
31
    # Input
32
    env_names = []
33
    db_config = opts.get_env_vars(sql.db_config_names, None, env_names)
34
    verbosity = float(opts.get_env_var('verbosity', 3, env_names))
35
    if not 'engine' in db_config: raise SystemExit('Usage: '
36
        +opts.env_usage(env_names)+' '+sys.argv[0]+' 2>>log')
37
    
38
    def log(msg, level=1):
39
        '''Higher level -> more verbose'''
40
        if level <= verbosity:
41
            sys.stderr.write(strings.to_raw_str(msg.rstrip('\n')+'\n'))
42
    
43
    # Connect to DB
44
    db = sql.connect(db_config, log_debug=log)
45
    
46
    tnrs_profiler = profiling.ItersProfiler(iter_text='row')
47
    try:
48
        # Iterate over unscrubbed verbatim taxonpaths
49
        start = 0
50
        total_pause = 0
51
        while True:
52
            # Fetch next set
53
            cur = sql.select(db, 'taxonpath', ['taxonomicnamewithauthor'],
54
                [('canon_taxonpath_id', None)], limit=max_taxons, start=start,
55
                cacheable=False)
56
            this_ct = cur.rowcount
57
            start += this_ct # advance start to fetch next set
58
            if this_ct == 0:
59
                total_pause += pause
60
                if total_pause > max_pause: break
61
                log('Waited '+str(total_pause)+' sec. Waiting...')
62
                time.sleep(pause) # wait for more rows
63
                continue # try again
64
            # otherwise, rows found
65
            total_pause = 0
66
            taxons = list(sql.values(cur))
67
            
68
            # Run TNRS
69
            log('Making TNRS request')
70
            tnrs_profiler.start()
71
            try:
72
                try: stream = tnrs.repeated_tnrs_request(taxons)
73
                finally: tnrs_profiler.stop(iter_ct=this_ct)
74
            except InvalidResponse: pass # skip set in case it caused the error
75
            else:
76
                log('Storing TNRS response data')
77
                stream_info = csvs.stream_info(stream, parse_header=True)
78
                stream = streams.ProgressInputStream(stream, sys.stderr, n=1000)
79
                sql_io.append_csv(db, tnrs_data, stream_info, stream)
80
    finally:
81
        log(tnrs_profiler.msg())
82

    
83
main()
(56-56/61)