Revision 1877
Added by Aaron Marcuse-Kubitza over 12 years ago
parallel.py | ||
---|---|---|
1 | 1 |
# Parallel processing |
2 | 2 |
|
3 | 3 |
import cPickle |
4 |
import itertools |
|
4 | 5 |
import Queue |
6 |
import rand |
|
5 | 7 |
import types |
6 | 8 |
import warnings |
7 | 9 |
|
10 |
import collection |
|
11 |
import dicts |
|
8 | 12 |
import exc |
9 | 13 |
from Runnable import Runnable |
10 | 14 |
|
... | ... | |
14 | 18 |
exc.add_msg(e, 'Tried to pickle: '+repr(value)) |
15 | 19 |
raise |
16 | 20 |
|
17 |
def prepickle(value): |
|
18 |
if isinstance(value, types.FunctionType): return Runnable(value) |
|
19 |
elif isinstance(value, MultiProducerPool): return None |
|
20 |
elif isinstance(value, list): return map(prepickle, list_) |
|
21 |
elif isinstance(value, dict): |
|
22 |
return dict(((k, prepickle(v)) for k, v in value.iteritems())) |
|
23 |
else: return value |
|
21 |
def vars_id_dict(locals_, globals_, *misc): |
|
22 |
'''Usage: vars_id_dict(locals(), globals(), misc...)''' |
|
23 |
vars_ = map(lambda v: v.values(), [locals_, globals_]) + list(misc) |
|
24 |
return dicts.id_dict(vars_) |
|
24 | 25 |
|
26 |
def prepickle(value, vars_id_dict_): |
|
27 |
def filter_(value): |
|
28 |
# Try pickling the value. If it fails, we'll get a full traceback here, |
|
29 |
# which is not provided with pickling errors in multiprocessing's Pool. |
|
30 |
try: try_pickle(value) |
|
31 |
except Exception, e: |
|
32 |
id_ = id(value) |
|
33 |
if id_ in vars_id_dict_: value = id_ |
|
34 |
else: raise e |
|
35 |
return value |
|
36 |
return collection.rmap(filter_, value) |
|
37 |
|
|
38 |
def post_unpickle(value, vars_id_dict_): |
|
39 |
def filter_(value): |
|
40 |
try: return vars_id_dict_[value] # value is an id() |
|
41 |
except KeyError: return value |
|
42 |
return collection.rmap(filter_, value) |
|
43 |
|
|
25 | 44 |
class SyncPool: |
26 | 45 |
'''A dummy synchronous Pool to use if multiprocessing is not available''' |
27 | 46 |
def __init__(self, processes=None): pass |
... | ... | |
37 | 56 |
|
38 | 57 |
def successful(): return True # TODO: False if raised exception |
39 | 58 |
|
40 |
def apply_async(self, func, args=None, kw_args=None, callback=None): |
|
41 |
if args == None: args = () |
|
42 |
if kw_args == None: kw_args = {} |
|
59 |
def apply_async(self, func, args=(), kw_args={}, callback=None): |
|
43 | 60 |
if callback == None: callback = lambda v: None |
44 | 61 |
|
45 | 62 |
value = func(*args, **kw_args) |
... | ... | |
50 | 67 |
'''A multi-producer pool. You must call pool.main_loop() in the thread that |
51 | 68 |
created this to process new tasks.''' |
52 | 69 |
|
53 |
def __init__(self, processes=None): |
|
70 |
def __init__(self, processes=None, locals_=None, globals_=None, *shared):
|
|
54 | 71 |
''' |
55 | 72 |
@param processes If 0, uses SyncPool |
56 | 73 |
@post The # processes actually used is made available in self.process_ct |
57 | 74 |
''' |
75 |
if locals_ == None: locals_ = locals() |
|
76 |
if globals_ == None: globals_ = globals() |
|
77 |
|
|
58 | 78 |
try: |
59 | 79 |
if processes == 0: raise ImportError('turned off') |
60 | 80 |
import multiprocessing |
... | ... | |
72 | 92 |
self.process_ct = processes |
73 | 93 |
self.pool = Pool_(processes) |
74 | 94 |
self.queue = Queue_() |
95 |
# Values that may be pickled by id() |
|
96 |
self.vars_id_dict = vars_id_dict(locals_, globals_, *shared) |
|
75 | 97 |
|
98 |
def share(self, value): |
|
99 |
'''Call this on every value that that may be pickled by id()''' |
|
100 |
self.vars_id_dict[id(value)] = value |
|
101 |
|
|
76 | 102 |
def main_loop(self): |
77 | 103 |
'''@param pool Must be a pool returned by mk_pool()''' |
78 | 104 |
try: |
... | ... | |
80 | 106 |
# block=False raises Empty immediately if the queue is empty, |
81 | 107 |
# which indicates that the program is done |
82 | 108 |
call = self.queue.get(block=False) |
83 |
self.pool.apply_async(call.func, call.args, call.kw_args,
|
|
84 |
call.callback) |
|
109 |
self.pool.apply_async(call.func, self.post_unpickle(call.args),
|
|
110 |
self.post_unpickle(call.kw_args), call.callback)
|
|
85 | 111 |
except Queue.Empty: pass |
86 | 112 |
|
87 | 113 |
class Result: |
... | ... | |
93 | 119 |
|
94 | 120 |
def successful(): raise NotImplementedError() |
95 | 121 |
|
96 |
def apply_async(self, func, args=None, kw_args=None, callback=None): |
|
97 |
if args == None: args = () |
|
98 |
if kw_args == None: kw_args = {} |
|
122 |
def apply_async(self, func, args=(), kw_args={}, callback=None): |
|
123 |
assert callback == None, 'Callbacks not supported' |
|
99 | 124 |
|
100 |
call = Runnable(func, *prepickle(args), **prepickle(kw_args))
|
|
125 |
call = Runnable(func, *self.prepickle(args), **self.prepickle(kw_args))
|
|
101 | 126 |
call.callback = callback # store this inside the Runnable |
102 | 127 |
|
103 |
# Try pickling the args. If it fails, we'll get a full traceback here, |
|
104 |
# which is not provided with pickling errors in multiprocessing's Pool. |
|
105 |
try_pickle(call) |
|
106 |
|
|
107 | 128 |
self.queue.put_nowait(call) |
108 | 129 |
return self.Result() |
130 |
|
|
131 |
def prepickle(self, value): return prepickle(value, self.vars_id_dict) |
|
132 |
|
|
133 |
def post_unpickle(self, value): |
|
134 |
return post_unpickle(value, self.vars_id_dict) |
Also available in: Unified diff
parallel.py: MultiProducerPool: Pickle objects by ID if they're accessible to the main_loop process. This should allow e.g. DB connections and pools to be pickled, if they were defined in the main process.