Project

General

Profile

« Previous | Next » 

Revision 1877

parallel.py: MultiProducerPool: Pickle objects by ID if they're accessible to the main_loop process. This should allow e.g. DB connections and pools to be pickled, if they were defined in the main process.

View differences:

parallel.py
1 1
# Parallel processing
2 2

  
3 3
import cPickle
4
import itertools
4 5
import Queue
6
import rand
5 7
import types
6 8
import warnings
7 9

  
10
import collection
11
import dicts
8 12
import exc
9 13
from Runnable import Runnable
10 14

  
......
14 18
        exc.add_msg(e, 'Tried to pickle: '+repr(value))
15 19
        raise
16 20

  
17
def prepickle(value):
18
    if isinstance(value, types.FunctionType): return Runnable(value)
19
    elif isinstance(value, MultiProducerPool): return None
20
    elif isinstance(value, list): return map(prepickle, list_)
21
    elif isinstance(value, dict):
22
        return dict(((k, prepickle(v)) for k, v in value.iteritems()))
23
    else: return value
21
def vars_id_dict(locals_, globals_, *misc):
22
    '''Usage: vars_id_dict(locals(), globals(), misc...)'''
23
    vars_ = map(lambda v: v.values(), [locals_, globals_]) + list(misc)
24
    return dicts.id_dict(vars_)
24 25

  
26
def prepickle(value, vars_id_dict_):
27
    def filter_(value):
28
        # Try pickling the value. If it fails, we'll get a full traceback here,
29
        # which is not provided with pickling errors in multiprocessing's Pool.
30
        try: try_pickle(value)
31
        except Exception, e:
32
            id_ = id(value)
33
            if id_ in vars_id_dict_: value = id_
34
            else: raise e
35
        return value
36
    return collection.rmap(filter_, value)
37

  
38
def post_unpickle(value, vars_id_dict_):
39
    def filter_(value):
40
        try: return vars_id_dict_[value] # value is an id()
41
        except KeyError: return value
42
    return collection.rmap(filter_, value)
43

  
25 44
class SyncPool:
26 45
    '''A dummy synchronous Pool to use if multiprocessing is not available'''
27 46
    def __init__(self, processes=None): pass
......
37 56
        
38 57
        def successful(): return True # TODO: False if raised exception
39 58
    
40
    def apply_async(self, func, args=None, kw_args=None, callback=None):
41
        if args == None: args = ()
42
        if kw_args == None: kw_args = {}
59
    def apply_async(self, func, args=(), kw_args={}, callback=None):
43 60
        if callback == None: callback = lambda v: None
44 61
        
45 62
        value = func(*args, **kw_args)
......
50 67
    '''A multi-producer pool. You must call pool.main_loop() in the thread that
51 68
    created this to process new tasks.'''
52 69
    
53
    def __init__(self, processes=None):
70
    def __init__(self, processes=None, locals_=None, globals_=None, *shared):
54 71
        '''
55 72
        @param processes If 0, uses SyncPool
56 73
        @post The # processes actually used is made available in self.process_ct
57 74
        '''
75
        if locals_ == None: locals_ = locals()
76
        if globals_ == None: globals_ = globals()
77
        
58 78
        try:
59 79
            if processes == 0: raise ImportError('turned off')
60 80
            import multiprocessing
......
72 92
        self.process_ct = processes
73 93
        self.pool = Pool_(processes)
74 94
        self.queue = Queue_()
95
        # Values that may be pickled by id()
96
        self.vars_id_dict = vars_id_dict(locals_, globals_, *shared)
75 97
    
98
    def share(self, value):
99
        '''Call this on every value that that may be pickled by id()'''
100
        self.vars_id_dict[id(value)] = value
101
    
76 102
    def main_loop(self):
77 103
        '''@param pool Must be a pool returned by mk_pool()'''
78 104
        try:
......
80 106
                # block=False raises Empty immediately if the queue is empty,
81 107
                # which indicates that the program is done
82 108
                call = self.queue.get(block=False)
83
                self.pool.apply_async(call.func, call.args, call.kw_args,
84
                    call.callback)
109
                self.pool.apply_async(call.func, self.post_unpickle(call.args),
110
                    self.post_unpickle(call.kw_args), call.callback)
85 111
        except Queue.Empty: pass
86 112
    
87 113
    class Result:
......
93 119
        
94 120
        def successful(): raise NotImplementedError()
95 121
    
96
    def apply_async(self, func, args=None, kw_args=None, callback=None):
97
        if args == None: args = ()
98
        if kw_args == None: kw_args = {}
122
    def apply_async(self, func, args=(), kw_args={}, callback=None):
123
        assert callback == None, 'Callbacks not supported'
99 124
        
100
        call = Runnable(func, *prepickle(args), **prepickle(kw_args))
125
        call = Runnable(func, *self.prepickle(args), **self.prepickle(kw_args))
101 126
        call.callback = callback # store this inside the Runnable
102 127
        
103
        # Try pickling the args. If it fails, we'll get a full traceback here,
104
        # which is not provided with pickling errors in multiprocessing's Pool.
105
        try_pickle(call)
106
        
107 128
        self.queue.put_nowait(call)
108 129
        return self.Result()
130
    
131
    def prepickle(self, value): return prepickle(value, self.vars_id_dict)
132
    
133
    def post_unpickle(self, value):
134
        return post_unpickle(value, self.vars_id_dict)

Also available in: Unified diff