Project

General

Profile

« Previous | Next » 

Revision 1862

parallel.py: Changed to use multi-producer pool, which requires calling pool.main_loop()

View differences:

lib/parallel.py
1 1
# Parallel processing
2 2

  
3
import Queue
3 4
import warnings
4 5

  
5
class Pool:
6
class SyncPool:
6 7
    '''A dummy synchronous Pool to use if multiprocessing is not available'''
8
    def __init__(self, processes=None): pass
9
    
7 10
    class Result:
8 11
        def __init__(self, value): self.value = value
9 12
        
......
24 27
        callback(value)
25 28
        return Result(value)
26 29

  
27
def mk_pool(cpus=None):
28
    '''@return tuple(pool, cpus)'''
29
    try:
30
        if cpus == 0: raise ImportError('Parallel processing turned off')
31
        import multiprocessing
32
        import multiprocessing.pool
33
    except ImportError, e:
34
        warnings.warn(UserWarning('Not using parallel processing: '+str(e)))
35
        pool = Pool()
36
    else:
37
        if cpus == None: cpus = multiprocessing.cpu_count()
38
        pool = multiprocessing.pool.Pool(processes=cpus)
39
    return (pool, cpus)
30
class MultiProducerPool:
31
    '''A multi-producer pool. You must call pool.main_loop() in the thread that
32
    created this to process new tasks.'''
33
    def __init__(self, processes=None):
34
        '''
35
        @param processes If 0, uses SyncPool
36
        @post The # processes actually used is made available in self.process_ct
37
        '''
38
        try:
39
            if processes == 0: raise ImportError('turned off')
40
            import multiprocessing
41
            import multiprocessing.pool
42
        except ImportError, e:
43
            warnings.warn(UserWarning('Not using parallel processing: '+str(e)))
44
            processes = 1
45
            Pool_ = SyncPool
46
            Queue_ = Queue.Queue
47
        else:
48
            if processes == None: processes = multiprocessing.cpu_count()
49
            Pool_ = multiprocessing.pool.Pool
50
            Queue_ = multiprocessing.Queue
51
        
52
        self.process_ct = processes
53
        self.pool = Pool_(processes)
54
        self.queue = Queue_()
55
    
56
    def main_loop(self):
57
        '''@param pool Must be a pool returned by mk_pool()'''
58
        try:
59
            # block=False raises Empty immediately if the queue is empty,
60
            # which indicates that the program is done
61
            while True: self.queue.get(block=False)() # each elem is a function
62
        except Queue.Empty: pass
63
    
64
    class Result:
65
        def get(timeout=None): raise NotImplementedError()
66
        
67
        def wait(timeout=None): raise NotImplementedError()
68
        
69
        def ready(): raise NotImplementedError()
70
        
71
        def successful(): raise NotImplementedError()
72
    
73
    def apply_async(*args, **kw_args):
74
        def call(): self.pool.apply_async(*args, **kw_args)
75
        self.queue.put_nowait(call)
76
        return Result()
bin/map
116 116
    profiler = profiling.ItersProfiler(start_now=True, iter_text='row')
117 117
    
118 118
    # Parallel processing
119
    pool, cpu_ct = parallel.mk_pool(cpus)
120
    log_start('Using '+str(cpu_ct)+' parallel CPUs')
119
    pool = parallel.MultiProducerPool(cpus)
120
    log_start('Using '+str(pool.process_ct)+' parallel CPUs')
121 121
    
122 122
    doc = xml_dom.create_doc()
123 123
    root = doc.documentElement
......
325 325
        else: # output is CSV
326 326
            raise NotImplementedError('CSV output not supported yet')
327 327
    
328
    # Parallel processing
329
    pool.main_loop()
330
    
328 331
    profiler.stop(row_ct)
329 332
    ex_tracker.add_iters(row_ct)
330 333
    if verbose:

Also available in: Unified diff