Revision 1873
Added by Aaron Marcuse-Kubitza over 12 years ago
parallel.py | ||
---|---|---|
1 | 1 |
# Parallel processing |
2 | 2 |
|
3 |
import cPickle |
|
3 | 4 |
import Queue |
5 |
import types |
|
4 | 6 |
import warnings |
5 | 7 |
|
8 |
import exc |
|
9 |
from Runnable import Runnable |
|
10 |
|
|
11 |
def try_pickle(value): |
|
12 |
try: cPickle.dumps(value) |
|
13 |
except Exception, e: |
|
14 |
exc.add_msg(e, 'Tried to pickle: '+repr(value)) |
|
15 |
raise |
|
16 |
|
|
17 |
def prepickle(value): |
|
18 |
if isinstance(value, types.FunctionType): return Runnable(value) |
|
19 |
elif isinstance(value, MultiProducerPool): return None |
|
20 |
elif isinstance(value, list): return map(prepickle, list_) |
|
21 |
elif isinstance(value, dict): |
|
22 |
return dict(((k, prepickle(v)) for k, v in value.iteritems())) |
|
23 |
else: return value |
|
24 |
|
|
6 | 25 |
class SyncPool: |
7 | 26 |
'''A dummy synchronous Pool to use if multiprocessing is not available''' |
8 | 27 |
def __init__(self, processes=None): pass |
... | ... | |
30 | 49 |
class MultiProducerPool: |
31 | 50 |
'''A multi-producer pool. You must call pool.main_loop() in the thread that |
32 | 51 |
created this to process new tasks.''' |
52 |
|
|
33 | 53 |
def __init__(self, processes=None): |
34 | 54 |
''' |
35 | 55 |
@param processes If 0, uses SyncPool |
... | ... | |
56 | 76 |
def main_loop(self): |
57 | 77 |
'''@param pool Must be a pool returned by mk_pool()''' |
58 | 78 |
try: |
59 |
# block=False raises Empty immediately if the queue is empty, |
|
60 |
# which indicates that the program is done |
|
61 |
while True: self.queue.get(block=False)() # each elem is a function |
|
79 |
while True: |
|
80 |
# block=False raises Empty immediately if the queue is empty, |
|
81 |
# which indicates that the program is done |
|
82 |
call = self.queue.get(block=False) |
|
83 |
self.pool.apply_async(call.func, call.args, call.kw_args, |
|
84 |
call.callback) |
|
62 | 85 |
except Queue.Empty: pass |
63 | 86 |
|
64 | 87 |
class Result: |
... | ... | |
70 | 93 |
|
71 | 94 |
def successful(): raise NotImplementedError() |
72 | 95 |
|
73 |
def apply_async(self, *args, **kw_args): |
|
74 |
def call(): self.pool.apply_async(*args, **kw_args) |
|
96 |
def apply_async(self, func, args=None, kw_args=None, callback=None): |
|
97 |
if args == None: args = () |
|
98 |
if kw_args == None: kw_args = {} |
|
99 |
|
|
100 |
call = Runnable(func, *prepickle(args), **prepickle(kw_args)) |
|
101 |
call.callback = callback # store this inside the Runnable |
|
102 |
|
|
103 |
# Try pickling the args. If it fails, we'll get a full traceback here, |
|
104 |
# which is not provided with pickling errors in multiprocessing's Pool. |
|
105 |
try_pickle(call) |
|
106 |
|
|
75 | 107 |
self.queue.put_nowait(call) |
76 | 108 |
return self.Result() |
Also available in: Unified diff
parallel.py: MultiProducerPool.apply_async(): Prepickle all function args. Try pickling the args before the queue pickles them, to get better debugging output.