Project

General

Profile

1
# Parallel processing
2

    
3
import Queue
4
import warnings
5

    
6
class SyncPool:
7
    '''A dummy synchronous Pool to use if multiprocessing is not available'''
8
    def __init__(self, processes=None): pass
9
    
10
    class Result:
11
        def __init__(self, value): self.value = value
12
        
13
        def get(timeout=None): return self.value
14
        
15
        def wait(timeout=None): pass
16
        
17
        def ready(): return True
18
        
19
        def successful(): return True # TODO: False if raised exception
20
    
21
    def apply_async(self, func, args=None, kw_args=None, callback=None):
22
        if args == None: args = ()
23
        if kw_args == None: kw_args = {}
24
        if callback == None: callback = lambda v: None
25
        
26
        value = func(*args, **kw_args)
27
        callback(value)
28
        return self.Result(value)
29

    
30
class MultiProducerPool:
31
    '''A multi-producer pool. You must call pool.main_loop() in the thread that
32
    created this to process new tasks.'''
33
    def __init__(self, processes=None):
34
        '''
35
        @param processes If 0, uses SyncPool
36
        @post The # processes actually used is made available in self.process_ct
37
        '''
38
        try:
39
            if processes == 0: raise ImportError('turned off')
40
            import multiprocessing
41
            import multiprocessing.pool
42
        except ImportError, e:
43
            warnings.warn(UserWarning('Not using parallel processing: '+str(e)))
44
            processes = 1
45
            Pool_ = SyncPool
46
            Queue_ = Queue.Queue
47
        else:
48
            if processes == None: processes = multiprocessing.cpu_count()
49
            Pool_ = multiprocessing.pool.Pool
50
            Queue_ = multiprocessing.Queue
51
        
52
        self.process_ct = processes
53
        self.pool = Pool_(processes)
54
        self.queue = Queue_()
55
    
56
    def main_loop(self):
57
        '''@param pool Must be a pool returned by mk_pool()'''
58
        try:
59
            # block=False raises Empty immediately if the queue is empty,
60
            # which indicates that the program is done
61
            while True: self.queue.get(block=False)() # each elem is a function
62
        except Queue.Empty: pass
63
    
64
    class Result:
65
        def get(timeout=None): raise NotImplementedError()
66
        
67
        def wait(timeout=None): raise NotImplementedError()
68
        
69
        def ready(): raise NotImplementedError()
70
        
71
        def successful(): raise NotImplementedError()
72
    
73
    def apply_async(self, *args, **kw_args):
74
        def call(): self.pool.apply_async(*args, **kw_args)
75
        self.queue.put_nowait(call)
76
        return self.Result()
(15-15/28)