Package park :: Package optim :: Module pmap

Source Code for Module park.optim.pmap

  1  """ 
  2  Parallel map-reduce implementation using threads. 
  3  """ 
  4   
  5  import traceback 
  6  import thread 
  7   
8 -class Collector(object):
9 """ 10 Abstract interface to map-reduce accumulator function. 11 """
12 - def __call__(self, part):
13 """Receive next part, storing it in the accumulated result"""
14 - def finalize(self):
15 """Called when all parts have been accumulated"""
16 - def error(self, part, msg):
17 """ 18 Exception seen on executing map or reduce. The collector 19 can adjust the accumulated result appropriately to reflect 20 the error. 21 """
22
23 -class Mapper(object):
24 """ 25 Abstract interface to map-reduce mapper function. 26 """
27 - def __call__(self, value):
28 """Evaluate part"""
29 - def abort(self):
30 """Stop the mapper"""
31
32 -def pmap(mapper, inputs):
33 """ 34 Apply function mapper to all inputs. 35 36 This is the serial version of a parallel iterator, yielding the next 37 sequence value as soon as it is available. There is no guarantee 38 that the order of the inputs will be preserved in the parallel 39 version, so don't depend on it! 40 """ 41 for item in inputs: 42 yield mapper(item)
43
44 -def preduce(collector, outputs):
45 """ 46 Collect all outputs, calling collector(item) for each item in the sequence. 47 """ 48 for item in outputs: 49 collector(item) 50 return collector
51
52 -def _pmapreduce_thread(fn, collector, inputs):
53 #import time 54 #t0 = time.time() 55 try: 56 preduce(collector, pmap(fn,inputs)) 57 collector.finalize() 58 #print "dt",time.time()-t0 59 except KeyboardInterrupt: 60 fn.abort() 61 thread.interrupt_main() 62 except: 63 msg = traceback.format_exc() 64 collector.error(msg)
65
66 -def _pmapreduce_profile(fn, collector, inputs):
67 import cProfile, pstats, os 68 def mapr(): 69 _pmapreduce_thread(fn, collector, inputs)
70 cProfile.runctx('mapr()', dict(mapr=mapr), {}, 'mapr.out') 71 stats = pstats.Stats('mapr.out') 72 #stats.sort_stats('time') 73 stats.sort_stats('cumulative','time','calls') 74 stats.print_stats() 75 os.unlink('mapr.out') 76 77 profile_mapper = False 78 """True if the mapper cost should be profiled.""" 79
80 -def pmapreduce(mapper, collector, inputs):
81 """ 82 Apply function mapper to inputs, accumulating the results in collector. 83 84 Collector is a function which accepts the result of mapper(item) for 85 each item of inputs. There is no guarantee that the outputs will be 86 received in order. 87 88 The map is executed in a separate thread so the function returns 89 to the caller immediately. 90 """ 91 global profile_mapper 92 fn = _pmapreduce_profile if profile_mapper else _pmapreduce_thread 93 thread.start_new_thread(fn,(mapper,collector, inputs))
94 95
96 -def main():
97 import time,numpy 98 class TestCollector(object): 99 def __init__(self): 100 self.done = False
101 def __call__(self, part): 102 print "collecting",part,'for',id(self) 103 def finalize(self): 104 self.done = True 105 print "finalizing" 106 def abort(self): 107 self.done = True 108 print "aborting" 109 def error(self,msg): 110 print "error",msg 111 112 class TestMapper(object): 113 def __call__(self, x): 114 print "mapping",x,'for',id(self) 115 if x == 8: raise Exception('x is 8') 116 time.sleep(4*numpy.random.rand()) 117 return x 118 119 collector1 = TestCollector() 120 collector2 = TestCollector() 121 pmapreduce(TestMapper(), collector1, [1,2,3,4,5]) 122 pmapreduce(TestMapper(), collector2, [1,2,3,8]) 123 while not collector1.done and not collector2.done: 124 time.sleep(1) 125 if __name__ == "__main__": main() 126 127 128 _ = ''' 129 # The choice of job to do next is complicated. 130 # 1. Strongly prefer a job of the same type as is already running 131 # on the node. If this job requires significant resources (e.g., 132 # a large file transfer) increase that preference. 133 # 2. Strongly prefer sending a user's own job to their own machine. 134 # That way at least they can make progress even if the cluster is busy. 135 # 3. Try to start each job as soon as possible. That way if there are 136 # errors, then the user gets feedback early in the process. 137 # 4. Try to balance the load across users. Rather than first come 138 # first serve, jobs use round robin amongst users. 139 # 5. Prefer high priority jobs. 140 141 142 def map(apply,collect,items,priority=1): 143 mapper = MapJob(apply, items, collect, priority) 144 145 class MapJob(object): 146 """ 147 Keep track of which jobs have been submitted and which are complete 148 """ 149 def __init__(self, workfn, worklist, manager, priority): 150 self.workfn = workfn 151 self.worklist = worklist 152 self.manager = manager 153 self.priority = priority 154 self._priority_edge = 0 155 def next_work(self): 156 157 158 class MapServer(object): 159 """ 160 Keep track of work units. 161 """ 162 def __init__(self): 163 self.workingset = {} 164 165 def add_work(self, workfn, worklist, manager, priority): 166 """ 167 Add a new work list to distributed to worker objects. The manager 168 gathers the results of the work. Work is assigned from the queue 169 based on priority. 170 """ 171 job = MapJob(workfn, worklist, manager, priority) 172 173 # add work to the queue in priority order 174 for i,job in enumerate(self.jobs): 175 if job.priority < priority: break 176 self.jobs.insert(i,job) 177 178 # Create an entry in a persistent store for each job to 179 # capture completed work units and to recover from server 180 # reboot. 181 182 # Assign _priority_edge to cumsum(priority)/total_priority. 183 # This allows us to select the next job according to priority 184 # with a random number generator. 185 # NOTE: scalability is a bit of a concern --- the lookup 186 # operation is linear in the number of active jobs. This 187 # can be mitigated by limiting the number of active jobs. 188 total_priority = 0. 189 for job in self.jobs: total_priority += job.priority 190 edge = 0. 191 for job in self.jobs: 192 edge += job.priority/total_priority 193 self.job._priority_edge = edge 194 195 196 def register(self, pool=None): 197 """ 198 Called by a worker when they are registering for work. 199 200 Returns the next piece of work. 201 """ 202 P = numpy.random.rand() 203 for job in self.jobs: 204 if P < j._priority_edge: 205 return job.new_work() 206 207 return NoWork 208 209 def update(self, jobid, result): 210 """ 211 Called by a worker when the work unit is complete. 212 213 Returns the next piece of work. 214 """ 215 current_job = self.lookup(jobid) 216 current_job.reduce(result) 217 if numpy.random.rand() < current_job.switch_probability: 218 return current_job.next_work() 219 220 P = numpy.random.rand() 221 for job in self.jobs: 222 if P < job._priority_edge: 223 if job == current_job: 224 return curent_job.next_work() 225 else: 226 return job.new_work() 227 228 return NoWork 229 ''' 230