Project

General

Profile

« Previous | Next » 

Revision 1873

parallel.py: MultiProducerPool.apply_async(): Prepickle all function args. Try pickling the args before the queue pickles them, to get better debugging output.

View differences:

parallel.py
1 1
# Parallel processing
2 2

  
3
import cPickle
3 4
import Queue
5
import types
4 6
import warnings
5 7

  
8
import exc
9
from Runnable import Runnable
10

  
11
def try_pickle(value):
12
    try: cPickle.dumps(value)
13
    except Exception, e:
14
        exc.add_msg(e, 'Tried to pickle: '+repr(value))
15
        raise
16

  
17
def prepickle(value):
18
    if isinstance(value, types.FunctionType): return Runnable(value)
19
    elif isinstance(value, MultiProducerPool): return None
20
    elif isinstance(value, list): return map(prepickle, list_)
21
    elif isinstance(value, dict):
22
        return dict(((k, prepickle(v)) for k, v in value.iteritems()))
23
    else: return value
24

  
6 25
class SyncPool:
7 26
    '''A dummy synchronous Pool to use if multiprocessing is not available'''
8 27
    def __init__(self, processes=None): pass
......
30 49
class MultiProducerPool:
31 50
    '''A multi-producer pool. You must call pool.main_loop() in the thread that
32 51
    created this to process new tasks.'''
52
    
33 53
    def __init__(self, processes=None):
34 54
        '''
35 55
        @param processes If 0, uses SyncPool
......
56 76
    def main_loop(self):
57 77
        '''@param pool Must be a pool returned by mk_pool()'''
58 78
        try:
59
            # block=False raises Empty immediately if the queue is empty,
60
            # which indicates that the program is done
61
            while True: self.queue.get(block=False)() # each elem is a function
79
            while True:
80
                # block=False raises Empty immediately if the queue is empty,
81
                # which indicates that the program is done
82
                call = self.queue.get(block=False)
83
                self.pool.apply_async(call.func, call.args, call.kw_args,
84
                    call.callback)
62 85
        except Queue.Empty: pass
63 86
    
64 87
    class Result:
......
70 93
        
71 94
        def successful(): raise NotImplementedError()
72 95
    
73
    def apply_async(self, *args, **kw_args):
74
        def call(): self.pool.apply_async(*args, **kw_args)
96
    def apply_async(self, func, args=None, kw_args=None, callback=None):
97
        if args == None: args = ()
98
        if kw_args == None: kw_args = {}
99
        
100
        call = Runnable(func, *prepickle(args), **prepickle(kw_args))
101
        call.callback = callback # store this inside the Runnable
102
        
103
        # Try pickling the args. If it fails, we'll get a full traceback here,
104
        # which is not provided with pickling errors in multiprocessing's Pool.
105
        try_pickle(call)
106
        
75 107
        self.queue.put_nowait(call)
76 108
        return self.Result()

Also available in: Unified diff