Project

General

Profile

1 1858 aaronmk
# Parallel processing
2
3 1873 aaronmk
import cPickle
4 1877 aaronmk
import itertools
5 1862 aaronmk
import Queue
6 1877 aaronmk
import rand
7 1873 aaronmk
import types
8 1858 aaronmk
import warnings
9
10 1877 aaronmk
import collection
11
import dicts
12 1873 aaronmk
import exc
13
from Runnable import Runnable
14 4492 aaronmk
import strings
15 1873 aaronmk
16
def try_pickle(value):
17
    try: cPickle.dumps(value)
18
    except Exception, e:
19 4492 aaronmk
        exc.add_msg(e, 'Tried to pickle: '+strings.urepr(value))
20 1873 aaronmk
        raise
21
22 1877 aaronmk
def prepickle(value, vars_id_dict_):
23 1885 aaronmk
    def filter_(value, is_leaf):
24 1880 aaronmk
        id_ = id(value)
25
        if id_ in vars_id_dict_: value = id_
26 1877 aaronmk
        # Try pickling the value. If it fails, we'll get a full traceback here,
27
        # which is not provided with pickling errors in multiprocessing's Pool.
28 1885 aaronmk
        elif is_leaf: try_pickle(value)
29 1877 aaronmk
        return value
30
    return collection.rmap(filter_, value)
31
32
def post_unpickle(value, vars_id_dict_):
33 1885 aaronmk
    def filter_(value, is_leaf):
34
        if type(value) == int: value = vars_id_dict_.get(value, value)
35
            # get() returns the value itself if it isn't a known id()
36
        return value
37 1877 aaronmk
    return collection.rmap(filter_, value)
38
39 1862 aaronmk
class SyncPool:
40 1861 aaronmk
    '''A dummy synchronous Pool to use if multiprocessing is not available'''
41 1862 aaronmk
    def __init__(self, processes=None): pass
42
43 1860 aaronmk
    class Result:
44
        def __init__(self, value): self.value = value
45
46
        def get(timeout=None): return self.value
47
48
        def wait(timeout=None): pass
49
50
        def ready(): return True
51
52
        def successful(): return True # TODO: False if raised exception
53
54 1877 aaronmk
    def apply_async(self, func, args=(), kw_args={}, callback=None):
55 1858 aaronmk
        if callback == None: callback = lambda v: None
56
57 1860 aaronmk
        value = func(*args, **kw_args)
58
        callback(value)
59 1863 aaronmk
        return self.Result(value)
60 1858 aaronmk
61 1862 aaronmk
class MultiProducerPool:
62
    '''A multi-producer pool. You must call pool.main_loop() in the thread that
63
    created this to process new tasks.'''
64 1873 aaronmk
65 1885 aaronmk
    def __init__(self, processes=None, locals_={}, *shared):
66 1862 aaronmk
        '''
67
        @param processes If 0, uses SyncPool
68
        @post The # processes actually used is made available in self.process_ct
69
        '''
70
        try:
71
            if processes == 0: raise ImportError('turned off')
72
            import multiprocessing
73
            import multiprocessing.pool
74
        except ImportError, e:
75
            processes = 1
76
            Pool_ = SyncPool
77
            Queue_ = Queue.Queue
78
        else:
79
            if processes == None: processes = multiprocessing.cpu_count()
80
            Pool_ = multiprocessing.pool.Pool
81
            Queue_ = multiprocessing.Queue
82
83
        self.process_ct = processes
84
        self.pool = Pool_(processes)
85
        self.queue = Queue_()
86 1885 aaronmk
        self.active_tasks = 0
87
88 1887 aaronmk
        # Store a reference to the manager in self, because it will otherwise be
89
        # shutdown right away when it goes out of scope
90
        #self.manager = processing.Manager()
91
        #self.shared_rw = self.manager.Namespace()
92
93 1877 aaronmk
        # Values that may be pickled by id()
94 1885 aaronmk
        self.vars_id_dict = dicts.IdDict()
95
        self.share(self, *shared).share_vars(locals_).share_vars(globals())
96 1862 aaronmk
97 1885 aaronmk
    def share(self, *values):
98 1887 aaronmk
        '''Call this on all values that should be shared writably between all
99
        processes (and be pickled by id())'''
100 1885 aaronmk
        self.vars_id_dict.add(*values)
101
        return self
102 1877 aaronmk
103 1885 aaronmk
    def share_vars(self, vars_):
104 1887 aaronmk
        '''Call this on all vars that should be pickled by id().
105 1885 aaronmk
        Usage: self.share_vars(locals())
106 1887 aaronmk
        @param vars_ {var_name: value}
107 1885 aaronmk
        '''
108
        self.vars_id_dict.add_vars(vars_)
109
        return self
110
111 1862 aaronmk
    def main_loop(self):
112 4026 aaronmk
        '''Prime the task queue with at least one task before calling this'''
113 1885 aaronmk
        while True:
114
            try: call = self.queue.get(timeout=0.1) # sec
115
            except Queue.Empty:
116
                if self.active_tasks == 0: break # program done
117
                else: continue
118
119
            def handle_result(*args, **kw_args):
120
                self.active_tasks -= 1
121
                if call.callback != None: call.callback(*args, **kw_args)
122
123
            self.active_tasks += 1
124
            self.pool.apply_async(call.func, self.post_unpickle(call.args),
125
                self.post_unpickle(call.kw_args), handle_result)
126 1862 aaronmk
127
    class Result:
128
        def get(timeout=None): raise NotImplementedError()
129
130
        def wait(timeout=None): raise NotImplementedError()
131
132
        def ready(): raise NotImplementedError()
133
134
        def successful(): raise NotImplementedError()
135
136 1877 aaronmk
    def apply_async(self, func, args=(), kw_args={}, callback=None):
137
        assert callback == None, 'Callbacks not supported'
138 1873 aaronmk
139 1877 aaronmk
        call = Runnable(func, *self.prepickle(args), **self.prepickle(kw_args))
140 1873 aaronmk
        call.callback = callback # store this inside the Runnable
141
142 1862 aaronmk
        self.queue.put_nowait(call)
143 1863 aaronmk
        return self.Result()
144 1877 aaronmk
145
    def prepickle(self, value): return prepickle(value, self.vars_id_dict)
146
147
    def post_unpickle(self, value):
148
        return post_unpickle(value, self.vars_id_dict)