Project

General

Profile

1 1858 aaronmk
# Parallel processing
2
3 1873 aaronmk
import cPickle
4 1862 aaronmk
import Queue
5 1873 aaronmk
import types
6 1858 aaronmk
import warnings
7
8 1873 aaronmk
import exc
9
from Runnable import Runnable
10
11
def try_pickle(value):
12
    try: cPickle.dumps(value)
13
    except Exception, e:
14
        exc.add_msg(e, 'Tried to pickle: '+repr(value))
15
        raise
16
17
def prepickle(value):
18
    if isinstance(value, types.FunctionType): return Runnable(value)
19
    elif isinstance(value, MultiProducerPool): return None
20
    elif isinstance(value, list): return map(prepickle, list_)
21
    elif isinstance(value, dict):
22
        return dict(((k, prepickle(v)) for k, v in value.iteritems()))
23
    else: return value
24
25 1862 aaronmk
class SyncPool:
26 1861 aaronmk
    '''A dummy synchronous Pool to use if multiprocessing is not available'''
27 1862 aaronmk
    def __init__(self, processes=None): pass
28
29 1860 aaronmk
    class Result:
30
        def __init__(self, value): self.value = value
31
32
        def get(timeout=None): return self.value
33
34
        def wait(timeout=None): pass
35
36
        def ready(): return True
37
38
        def successful(): return True # TODO: False if raised exception
39
40 1863 aaronmk
    def apply_async(self, func, args=None, kw_args=None, callback=None):
41 1858 aaronmk
        if args == None: args = ()
42 1863 aaronmk
        if kw_args == None: kw_args = {}
43 1858 aaronmk
        if callback == None: callback = lambda v: None
44
45 1860 aaronmk
        value = func(*args, **kw_args)
46
        callback(value)
47 1863 aaronmk
        return self.Result(value)
48 1858 aaronmk
49 1862 aaronmk
class MultiProducerPool:
50
    '''A multi-producer pool. You must call pool.main_loop() in the thread that
51
    created this to process new tasks.'''
52 1873 aaronmk
53 1862 aaronmk
    def __init__(self, processes=None):
54
        '''
55
        @param processes If 0, uses SyncPool
56
        @post The # processes actually used is made available in self.process_ct
57
        '''
58
        try:
59
            if processes == 0: raise ImportError('turned off')
60
            import multiprocessing
61
            import multiprocessing.pool
62
        except ImportError, e:
63
            warnings.warn(UserWarning('Not using parallel processing: '+str(e)))
64
            processes = 1
65
            Pool_ = SyncPool
66
            Queue_ = Queue.Queue
67
        else:
68
            if processes == None: processes = multiprocessing.cpu_count()
69
            Pool_ = multiprocessing.pool.Pool
70
            Queue_ = multiprocessing.Queue
71
72
        self.process_ct = processes
73
        self.pool = Pool_(processes)
74
        self.queue = Queue_()
75
76
    def main_loop(self):
77
        '''@param pool Must be a pool returned by mk_pool()'''
78
        try:
79 1873 aaronmk
            while True:
80
                # block=False raises Empty immediately if the queue is empty,
81
                # which indicates that the program is done
82
                call = self.queue.get(block=False)
83
                self.pool.apply_async(call.func, call.args, call.kw_args,
84
                    call.callback)
85 1862 aaronmk
        except Queue.Empty: pass
86
87
    class Result:
88
        def get(timeout=None): raise NotImplementedError()
89
90
        def wait(timeout=None): raise NotImplementedError()
91
92
        def ready(): raise NotImplementedError()
93
94
        def successful(): raise NotImplementedError()
95
96 1873 aaronmk
    def apply_async(self, func, args=None, kw_args=None, callback=None):
97
        if args == None: args = ()
98
        if kw_args == None: kw_args = {}
99
100
        call = Runnable(func, *prepickle(args), **prepickle(kw_args))
101
        call.callback = callback # store this inside the Runnable
102
103
        # Try pickling the args. If it fails, we'll get a full traceback here,
104
        # which is not provided with pickling errors in multiprocessing's Pool.
105
        try_pickle(call)
106
107 1862 aaronmk
        self.queue.put_nowait(call)
108 1863 aaronmk
        return self.Result()