Project

General

Profile

1
# Parallel processing
2

    
3
import cPickle
4
import itertools
5
import Queue
6
import rand
7
import types
8
import warnings
9

    
10
import collection
11
import dicts
12
import exc
13
from Runnable import Runnable
14

    
15
def try_pickle(value):
16
    try: cPickle.dumps(value)
17
    except Exception, e:
18
        exc.add_msg(e, 'Tried to pickle: '+repr(value))
19
        raise
20

    
21
def prepickle(value, vars_id_dict_):
22
    def filter_(value, is_leaf):
23
        id_ = id(value)
24
        if id_ in vars_id_dict_: value = id_
25
        # Try pickling the value. If it fails, we'll get a full traceback here,
26
        # which is not provided with pickling errors in multiprocessing's Pool.
27
        elif is_leaf: try_pickle(value)
28
        return value
29
    return collection.rmap(filter_, value)
30

    
31
def post_unpickle(value, vars_id_dict_):
32
    def filter_(value, is_leaf):
33
        if type(value) == int: value = vars_id_dict_.get(value, value)
34
            # get() returns the value itself if it isn't a known id()
35
        return value
36
    return collection.rmap(filter_, value)
37

    
38
class SyncPool:
39
    '''A dummy synchronous Pool to use if multiprocessing is not available'''
40
    def __init__(self, processes=None): pass
41
    
42
    class Result:
43
        def __init__(self, value): self.value = value
44
        
45
        def get(timeout=None): return self.value
46
        
47
        def wait(timeout=None): pass
48
        
49
        def ready(): return True
50
        
51
        def successful(): return True # TODO: False if raised exception
52
    
53
    def apply_async(self, func, args=(), kw_args={}, callback=None):
54
        if callback == None: callback = lambda v: None
55
        
56
        value = func(*args, **kw_args)
57
        callback(value)
58
        return self.Result(value)
59

    
60
class MultiProducerPool:
61
    '''A multi-producer pool. You must call pool.main_loop() in the thread that
62
    created this to process new tasks.'''
63
    
64
    def __init__(self, processes=None, locals_={}, *shared):
65
        '''
66
        @param processes If 0, uses SyncPool
67
        @post The # processes actually used is made available in self.process_ct
68
        '''
69
        try:
70
            if processes == 0: raise ImportError('turned off')
71
            import multiprocessing
72
            import multiprocessing.pool
73
        except ImportError, e:
74
            warnings.warn(UserWarning('Not using parallel processing: '+str(e)))
75
            processes = 1
76
            Pool_ = SyncPool
77
            Queue_ = Queue.Queue
78
        else:
79
            if processes == None: processes = multiprocessing.cpu_count()
80
            Pool_ = multiprocessing.pool.Pool
81
            Queue_ = multiprocessing.Queue
82
        
83
        self.process_ct = processes
84
        self.pool = Pool_(processes)
85
        self.queue = Queue_()
86
        self.active_tasks = 0
87
        
88
        # Values that may be pickled by id()
89
        self.vars_id_dict = dicts.IdDict()
90
        self.share(self, *shared).share_vars(locals_).share_vars(globals())
91
    
92
    def share(self, *values):
93
        '''Call this on all values that that should be pickled by id()'''
94
        self.vars_id_dict.add(*values)
95
        return self
96
    
97
    def share_vars(self, vars_):
98
        '''Call this on all vars that that should be pickled by id().
99
        Usage: self.share_vars(locals())
100
        '''
101
        self.vars_id_dict.add_vars(vars_)
102
        return self
103
    
104
    def main_loop(self):
105
        '''Prime the task queue with at least one task before calling this''' 
106
        while True:
107
            try: call = self.queue.get(timeout=0.1) # sec
108
            except Queue.Empty:
109
                if self.active_tasks == 0: break # program done
110
                else: continue
111
            
112
            def handle_result(*args, **kw_args):
113
                self.active_tasks -= 1
114
                if call.callback != None: call.callback(*args, **kw_args)
115
            
116
            self.active_tasks += 1
117
            self.pool.apply_async(call.func, self.post_unpickle(call.args),
118
                self.post_unpickle(call.kw_args), handle_result)
119
    
120
    class Result:
121
        def get(timeout=None): raise NotImplementedError()
122
        
123
        def wait(timeout=None): raise NotImplementedError()
124
        
125
        def ready(): raise NotImplementedError()
126
        
127
        def successful(): raise NotImplementedError()
128
    
129
    def apply_async(self, func, args=(), kw_args={}, callback=None):
130
        assert callback == None, 'Callbacks not supported'
131
        
132
        call = Runnable(func, *self.prepickle(args), **self.prepickle(kw_args))
133
        call.callback = callback # store this inside the Runnable
134
        
135
        self.queue.put_nowait(call)
136
        return self.Result()
137
    
138
    def prepickle(self, value): return prepickle(value, self.vars_id_dict)
139
    
140
    def post_unpickle(self, value):
141
        return post_unpickle(value, self.vars_id_dict)
(18-18/32)