Project

General

Profile

1
# Parallel processing
2

    
3
import cPickle
4
import Queue
5
import types
6
import warnings
7

    
8
import exc
9
from Runnable import Runnable
10

    
11
def try_pickle(value):
12
    try: cPickle.dumps(value)
13
    except Exception, e:
14
        exc.add_msg(e, 'Tried to pickle: '+repr(value))
15
        raise
16

    
17
def prepickle(value):
18
    if isinstance(value, types.FunctionType): return Runnable(value)
19
    elif isinstance(value, MultiProducerPool): return None
20
    elif isinstance(value, list): return map(prepickle, list_)
21
    elif isinstance(value, dict):
22
        return dict(((k, prepickle(v)) for k, v in value.iteritems()))
23
    else: return value
24

    
25
class SyncPool:
26
    '''A dummy synchronous Pool to use if multiprocessing is not available'''
27
    def __init__(self, processes=None): pass
28
    
29
    class Result:
30
        def __init__(self, value): self.value = value
31
        
32
        def get(timeout=None): return self.value
33
        
34
        def wait(timeout=None): pass
35
        
36
        def ready(): return True
37
        
38
        def successful(): return True # TODO: False if raised exception
39
    
40
    def apply_async(self, func, args=None, kw_args=None, callback=None):
41
        if args == None: args = ()
42
        if kw_args == None: kw_args = {}
43
        if callback == None: callback = lambda v: None
44
        
45
        value = func(*args, **kw_args)
46
        callback(value)
47
        return self.Result(value)
48

    
49
class MultiProducerPool:
50
    '''A multi-producer pool. You must call pool.main_loop() in the thread that
51
    created this to process new tasks.'''
52
    
53
    def __init__(self, processes=None):
54
        '''
55
        @param processes If 0, uses SyncPool
56
        @post The # processes actually used is made available in self.process_ct
57
        '''
58
        try:
59
            if processes == 0: raise ImportError('turned off')
60
            import multiprocessing
61
            import multiprocessing.pool
62
        except ImportError, e:
63
            warnings.warn(UserWarning('Not using parallel processing: '+str(e)))
64
            processes = 1
65
            Pool_ = SyncPool
66
            Queue_ = Queue.Queue
67
        else:
68
            if processes == None: processes = multiprocessing.cpu_count()
69
            Pool_ = multiprocessing.pool.Pool
70
            Queue_ = multiprocessing.Queue
71
        
72
        self.process_ct = processes
73
        self.pool = Pool_(processes)
74
        self.queue = Queue_()
75
    
76
    def main_loop(self):
77
        '''@param pool Must be a pool returned by mk_pool()'''
78
        try:
79
            while True:
80
                # block=False raises Empty immediately if the queue is empty,
81
                # which indicates that the program is done
82
                call = self.queue.get(block=False)
83
                self.pool.apply_async(call.func, call.args, call.kw_args,
84
                    call.callback)
85
        except Queue.Empty: pass
86
    
87
    class Result:
88
        def get(timeout=None): raise NotImplementedError()
89
        
90
        def wait(timeout=None): raise NotImplementedError()
91
        
92
        def ready(): raise NotImplementedError()
93
        
94
        def successful(): raise NotImplementedError()
95
    
96
    def apply_async(self, func, args=None, kw_args=None, callback=None):
97
        if args == None: args = ()
98
        if kw_args == None: kw_args = {}
99
        
100
        call = Runnable(func, *prepickle(args), **prepickle(kw_args))
101
        call.callback = callback # store this inside the Runnable
102
        
103
        # Try pickling the args. If it fails, we'll get a full traceback here,
104
        # which is not provided with pickling errors in multiprocessing's Pool.
105
        try_pickle(call)
106
        
107
        self.queue.put_nowait(call)
108
        return self.Result()
(17-17/31)