Project

General

Profile

1 1858 aaronmk
# Parallel processing
2
3 1862 aaronmk
import Queue
4 1858 aaronmk
import warnings
5
6 1862 aaronmk
class SyncPool:
7 1861 aaronmk
    '''A dummy synchronous Pool to use if multiprocessing is not available'''
8 1862 aaronmk
    def __init__(self, processes=None): pass
9
10 1860 aaronmk
    class Result:
11
        def __init__(self, value): self.value = value
12
13
        def get(timeout=None): return self.value
14
15
        def wait(timeout=None): pass
16
17
        def ready(): return True
18
19
        def successful(): return True # TODO: False if raised exception
20
21 1863 aaronmk
    def apply_async(self, func, args=None, kw_args=None, callback=None):
22 1858 aaronmk
        if args == None: args = ()
23 1863 aaronmk
        if kw_args == None: kw_args = {}
24 1858 aaronmk
        if callback == None: callback = lambda v: None
25
26 1860 aaronmk
        value = func(*args, **kw_args)
27
        callback(value)
28 1863 aaronmk
        return self.Result(value)
29 1858 aaronmk
30 1862 aaronmk
class MultiProducerPool:
31
    '''A multi-producer pool. You must call pool.main_loop() in the thread that
32
    created this to process new tasks.'''
33
    def __init__(self, processes=None):
34
        '''
35
        @param processes If 0, uses SyncPool
36
        @post The # processes actually used is made available in self.process_ct
37
        '''
38
        try:
39
            if processes == 0: raise ImportError('turned off')
40
            import multiprocessing
41
            import multiprocessing.pool
42
        except ImportError, e:
43
            warnings.warn(UserWarning('Not using parallel processing: '+str(e)))
44
            processes = 1
45
            Pool_ = SyncPool
46
            Queue_ = Queue.Queue
47
        else:
48
            if processes == None: processes = multiprocessing.cpu_count()
49
            Pool_ = multiprocessing.pool.Pool
50
            Queue_ = multiprocessing.Queue
51
52
        self.process_ct = processes
53
        self.pool = Pool_(processes)
54
        self.queue = Queue_()
55
56
    def main_loop(self):
57
        '''@param pool Must be a pool returned by mk_pool()'''
58
        try:
59
            # block=False raises Empty immediately if the queue is empty,
60
            # which indicates that the program is done
61
            while True: self.queue.get(block=False)() # each elem is a function
62
        except Queue.Empty: pass
63
64
    class Result:
65
        def get(timeout=None): raise NotImplementedError()
66
67
        def wait(timeout=None): raise NotImplementedError()
68
69
        def ready(): raise NotImplementedError()
70
71
        def successful(): raise NotImplementedError()
72
73 1863 aaronmk
    def apply_async(self, *args, **kw_args):
74 1862 aaronmk
        def call(): self.pool.apply_async(*args, **kw_args)
75
        self.queue.put_nowait(call)
76 1863 aaronmk
        return self.Result()