Project

General

Profile

« Previous | Next » 

Revision 2035

Renamed parallel.py to parallelproc.py to avoid conflict with new system parallel module on vegbiendev

View differences:

lib/parallel.py
1
# Parallel processing
2

  
3
import cPickle
4
import itertools
5
import Queue
6
import rand
7
import types
8
import warnings
9

  
10
import collection
11
import dicts
12
import exc
13
from Runnable import Runnable
14

  
15
def try_pickle(value):
16
    try: cPickle.dumps(value)
17
    except Exception, e:
18
        exc.add_msg(e, 'Tried to pickle: '+repr(value))
19
        raise
20

  
21
def prepickle(value, vars_id_dict_):
22
    def filter_(value, is_leaf):
23
        id_ = id(value)
24
        if id_ in vars_id_dict_: value = id_
25
        # Try pickling the value. If it fails, we'll get a full traceback here,
26
        # which is not provided with pickling errors in multiprocessing's Pool.
27
        elif is_leaf: try_pickle(value)
28
        return value
29
    return collection.rmap(filter_, value)
30

  
31
def post_unpickle(value, vars_id_dict_):
32
    def filter_(value, is_leaf):
33
        if type(value) == int: value = vars_id_dict_.get(value, value)
34
            # get() returns the value itself if it isn't a known id()
35
        return value
36
    return collection.rmap(filter_, value)
37

  
38
class SyncPool:
39
    '''A dummy synchronous Pool to use if multiprocessing is not available'''
40
    def __init__(self, processes=None): pass
41
    
42
    class Result:
43
        def __init__(self, value): self.value = value
44
        
45
        def get(timeout=None): return self.value
46
        
47
        def wait(timeout=None): pass
48
        
49
        def ready(): return True
50
        
51
        def successful(): return True # TODO: False if raised exception
52
    
53
    def apply_async(self, func, args=(), kw_args={}, callback=None):
54
        if callback == None: callback = lambda v: None
55
        
56
        value = func(*args, **kw_args)
57
        callback(value)
58
        return self.Result(value)
59

  
60
class MultiProducerPool:
61
    '''A multi-producer pool. You must call pool.main_loop() in the thread that
62
    created this to process new tasks.'''
63
    
64
    def __init__(self, processes=None, locals_={}, *shared):
65
        '''
66
        @param processes If 0, uses SyncPool
67
        @post The # processes actually used is made available in self.process_ct
68
        '''
69
        try:
70
            if processes == 0: raise ImportError('turned off')
71
            import multiprocessing
72
            import multiprocessing.pool
73
        except ImportError, e:
74
            warnings.warn(UserWarning('Not using parallel processing: '+str(e)))
75
            processes = 1
76
            Pool_ = SyncPool
77
            Queue_ = Queue.Queue
78
        else:
79
            if processes == None: processes = multiprocessing.cpu_count()
80
            Pool_ = multiprocessing.pool.Pool
81
            Queue_ = multiprocessing.Queue
82
        
83
        self.process_ct = processes
84
        self.pool = Pool_(processes)
85
        self.queue = Queue_()
86
        self.active_tasks = 0
87
        
88
        # Store a reference to the manager in self, because it will otherwise be
89
        # shutdown right away when it goes out of scope
90
        #self.manager = processing.Manager()
91
        #self.shared_rw = self.manager.Namespace()
92
        
93
        # Values that may be pickled by id()
94
        self.vars_id_dict = dicts.IdDict()
95
        self.share(self, *shared).share_vars(locals_).share_vars(globals())
96
    
97
    def share(self, *values):
98
        '''Call this on all values that should be shared writably between all
99
        processes (and be pickled by id())'''
100
        self.vars_id_dict.add(*values)
101
        return self
102
    
103
    def share_vars(self, vars_):
104
        '''Call this on all vars that should be pickled by id().
105
        Usage: self.share_vars(locals())
106
        @param vars_ {var_name: value}
107
        '''
108
        self.vars_id_dict.add_vars(vars_)
109
        return self
110
    
111
    def main_loop(self):
112
        '''Prime the task queue with at least one task before calling this''' 
113
        while True:
114
            try: call = self.queue.get(timeout=0.1) # sec
115
            except Queue.Empty:
116
                if self.active_tasks == 0: break # program done
117
                else: continue
118
            
119
            def handle_result(*args, **kw_args):
120
                self.active_tasks -= 1
121
                if call.callback != None: call.callback(*args, **kw_args)
122
            
123
            self.active_tasks += 1
124
            self.pool.apply_async(call.func, self.post_unpickle(call.args),
125
                self.post_unpickle(call.kw_args), handle_result)
126
    
127
    class Result:
128
        def get(timeout=None): raise NotImplementedError()
129
        
130
        def wait(timeout=None): raise NotImplementedError()
131
        
132
        def ready(): raise NotImplementedError()
133
        
134
        def successful(): raise NotImplementedError()
135
    
136
    def apply_async(self, func, args=(), kw_args={}, callback=None):
137
        assert callback == None, 'Callbacks not supported'
138
        
139
        call = Runnable(func, *self.prepickle(args), **self.prepickle(kw_args))
140
        call.callback = callback # store this inside the Runnable
141
        
142
        self.queue.put_nowait(call)
143
        return self.Result()
144
    
145
    def prepickle(self, value): return prepickle(value, self.vars_id_dict)
146
    
147
    def post_unpickle(self, value):
148
        return post_unpickle(value, self.vars_id_dict)
lib/parallelproc.py
1
# Parallel processing
2

  
3
import cPickle
4
import itertools
5
import Queue
6
import rand
7
import types
8
import warnings
9

  
10
import collection
11
import dicts
12
import exc
13
from Runnable import Runnable
14

  
15
def try_pickle(value):
16
    try: cPickle.dumps(value)
17
    except Exception, e:
18
        exc.add_msg(e, 'Tried to pickle: '+repr(value))
19
        raise
20

  
21
def prepickle(value, vars_id_dict_):
22
    def filter_(value, is_leaf):
23
        id_ = id(value)
24
        if id_ in vars_id_dict_: value = id_
25
        # Try pickling the value. If it fails, we'll get a full traceback here,
26
        # which is not provided with pickling errors in multiprocessing's Pool.
27
        elif is_leaf: try_pickle(value)
28
        return value
29
    return collection.rmap(filter_, value)
30

  
31
def post_unpickle(value, vars_id_dict_):
32
    def filter_(value, is_leaf):
33
        if type(value) == int: value = vars_id_dict_.get(value, value)
34
            # get() returns the value itself if it isn't a known id()
35
        return value
36
    return collection.rmap(filter_, value)
37

  
38
class SyncPool:
39
    '''A dummy synchronous Pool to use if multiprocessing is not available'''
40
    def __init__(self, processes=None): pass
41
    
42
    class Result:
43
        def __init__(self, value): self.value = value
44
        
45
        def get(timeout=None): return self.value
46
        
47
        def wait(timeout=None): pass
48
        
49
        def ready(): return True
50
        
51
        def successful(): return True # TODO: False if raised exception
52
    
53
    def apply_async(self, func, args=(), kw_args={}, callback=None):
54
        if callback == None: callback = lambda v: None
55
        
56
        value = func(*args, **kw_args)
57
        callback(value)
58
        return self.Result(value)
59

  
60
class MultiProducerPool:
61
    '''A multi-producer pool. You must call pool.main_loop() in the thread that
62
    created this to process new tasks.'''
63
    
64
    def __init__(self, processes=None, locals_={}, *shared):
65
        '''
66
        @param processes If 0, uses SyncPool
67
        @post The # processes actually used is made available in self.process_ct
68
        '''
69
        try:
70
            if processes == 0: raise ImportError('turned off')
71
            import multiprocessing
72
            import multiprocessing.pool
73
        except ImportError, e:
74
            warnings.warn(UserWarning('Not using parallel processing: '+str(e)))
75
            processes = 1
76
            Pool_ = SyncPool
77
            Queue_ = Queue.Queue
78
        else:
79
            if processes == None: processes = multiprocessing.cpu_count()
80
            Pool_ = multiprocessing.pool.Pool
81
            Queue_ = multiprocessing.Queue
82
        
83
        self.process_ct = processes
84
        self.pool = Pool_(processes)
85
        self.queue = Queue_()
86
        self.active_tasks = 0
87
        
88
        # Store a reference to the manager in self, because it will otherwise be
89
        # shutdown right away when it goes out of scope
90
        #self.manager = processing.Manager()
91
        #self.shared_rw = self.manager.Namespace()
92
        
93
        # Values that may be pickled by id()
94
        self.vars_id_dict = dicts.IdDict()
95
        self.share(self, *shared).share_vars(locals_).share_vars(globals())
96
    
97
    def share(self, *values):
98
        '''Call this on all values that should be shared writably between all
99
        processes (and be pickled by id())'''
100
        self.vars_id_dict.add(*values)
101
        return self
102
    
103
    def share_vars(self, vars_):
104
        '''Call this on all vars that should be pickled by id().
105
        Usage: self.share_vars(locals())
106
        @param vars_ {var_name: value}
107
        '''
108
        self.vars_id_dict.add_vars(vars_)
109
        return self
110
    
111
    def main_loop(self):
112
        '''Prime the task queue with at least one task before calling this''' 
113
        while True:
114
            try: call = self.queue.get(timeout=0.1) # sec
115
            except Queue.Empty:
116
                if self.active_tasks == 0: break # program done
117
                else: continue
118
            
119
            def handle_result(*args, **kw_args):
120
                self.active_tasks -= 1
121
                if call.callback != None: call.callback(*args, **kw_args)
122
            
123
            self.active_tasks += 1
124
            self.pool.apply_async(call.func, self.post_unpickle(call.args),
125
                self.post_unpickle(call.kw_args), handle_result)
126
    
127
    class Result:
128
        def get(timeout=None): raise NotImplementedError()
129
        
130
        def wait(timeout=None): raise NotImplementedError()
131
        
132
        def ready(): raise NotImplementedError()
133
        
134
        def successful(): raise NotImplementedError()
135
    
136
    def apply_async(self, func, args=(), kw_args={}, callback=None):
137
        assert callback == None, 'Callbacks not supported'
138
        
139
        call = Runnable(func, *self.prepickle(args), **self.prepickle(kw_args))
140
        call.callback = callback # store this inside the Runnable
141
        
142
        self.queue.put_nowait(call)
143
        return self.Result()
144
    
145
    def prepickle(self, value): return prepickle(value, self.vars_id_dict)
146
    
147
    def post_unpickle(self, value):
148
        return post_unpickle(value, self.vars_id_dict)
bin/map
17 17
import iters
18 18
import maps
19 19
import opts
20
import parallel
20
import parallelproc
21 21
import Parser
22 22
import profiling
23 23
import sql
......
125 125
    profiler = profiling.ItersProfiler(start_now=True, iter_text='row')
126 126
    
127 127
    # Parallel processing
128
    pool = parallel.MultiProducerPool(cpus)
128
    pool = parallelproc.MultiProducerPool(cpus)
129 129
    log('Using '+str(pool.process_ct)+' parallel CPUs')
130 130
    
131 131
    doc = xml_dom.create_doc()

Also available in: Unified diff