1 """
2 Parallel map-reduce implementation using threads.
3 """
4
5 import traceback
6 import thread
7
9 """
10 Abstract interface to map-reduce accumulator function.
11 """
13 """Receive next part, storing it in the accumulated result"""
15 """Called when all parts have been accumulated"""
16 - def error(self, part, msg):
17 """
18 Exception seen on executing map or reduce. The collector
19 can adjust the accumulated result appropriately to reflect
20 the error.
21 """
22
24 """
25 Abstract interface to map-reduce mapper function.
26 """
31
32 -def pmap(mapper, inputs):
33 """
34 Apply function mapper to all inputs.
35
36 This is the serial version of a parallel iterator, yielding the next
37 sequence value as soon as it is available. There is no guarantee
38 that the order of the inputs will be preserved in the parallel
39 version, so don't depend on it!
40 """
41 for item in inputs:
42 yield mapper(item)
43
45 """
46 Collect all outputs, calling collector(item) for each item in the sequence.
47 """
48 for item in outputs:
49 collector(item)
50 return collector
51
53
54
55 try:
56 preduce(collector, pmap(fn,inputs))
57 collector.finalize()
58
59 except KeyboardInterrupt:
60 fn.abort()
61 thread.interrupt_main()
62 except:
63 msg = traceback.format_exc()
64 collector.error(msg)
65
67 import cProfile, pstats, os
68 def mapr():
69 _pmapreduce_thread(fn, collector, inputs)
70 cProfile.runctx('mapr()', dict(mapr=mapr), {}, 'mapr.out')
71 stats = pstats.Stats('mapr.out')
72
73 stats.sort_stats('cumulative','time','calls')
74 stats.print_stats()
75 os.unlink('mapr.out')
76
77 profile_mapper = False
78 """True if the mapper cost should be profiled."""
79
81 """
82 Apply function mapper to inputs, accumulating the results in collector.
83
84 Collector is a function which accepts the result of mapper(item) for
85 each item of inputs. There is no guarantee that the outputs will be
86 received in order.
87
88 The map is executed in a separate thread so the function returns
89 to the caller immediately.
90 """
91 global profile_mapper
92 fn = _pmapreduce_profile if profile_mapper else _pmapreduce_thread
93 thread.start_new_thread(fn,(mapper,collector, inputs))
94
95
97 import time,numpy
98 class TestCollector(object):
99 def __init__(self):
100 self.done = False
101 def __call__(self, part):
102 print "collecting",part,'for',id(self)
103 def finalize(self):
104 self.done = True
105 print "finalizing"
106 def abort(self):
107 self.done = True
108 print "aborting"
109 def error(self,msg):
110 print "error",msg
111
112 class TestMapper(object):
113 def __call__(self, x):
114 print "mapping",x,'for',id(self)
115 if x == 8: raise Exception('x is 8')
116 time.sleep(4*numpy.random.rand())
117 return x
118
119 collector1 = TestCollector()
120 collector2 = TestCollector()
121 pmapreduce(TestMapper(), collector1, [1,2,3,4,5])
122 pmapreduce(TestMapper(), collector2, [1,2,3,8])
123 while not collector1.done and not collector2.done:
124 time.sleep(1)
125 if __name__ == "__main__": main()
126
127
128 _ = '''
129 # The choice of job to do next is complicated.
130 # 1. Strongly prefer a job of the same type as is already running
131 # on the node. If this job requires significant resources (e.g.,
132 # a large file transfer) increase that preference.
133 # 2. Strongly prefer sending a user's own job to their own machine.
134 # That way at least they can make progress even if the cluster is busy.
135 # 3. Try to start each job as soon as possible. That way if there are
136 # errors, then the user gets feedback early in the process.
137 # 4. Try to balance the load across users. Rather than first come
138 # first serve, jobs use round robin amongst users.
139 # 5. Prefer high priority jobs.
140
141
142 def map(apply,collect,items,priority=1):
143 mapper = MapJob(apply, items, collect, priority)
144
145 class MapJob(object):
146 """
147 Keep track of which jobs have been submitted and which are complete
148 """
149 def __init__(self, workfn, worklist, manager, priority):
150 self.workfn = workfn
151 self.worklist = worklist
152 self.manager = manager
153 self.priority = priority
154 self._priority_edge = 0
155 def next_work(self):
156
157
158 class MapServer(object):
159 """
160 Keep track of work units.
161 """
162 def __init__(self):
163 self.workingset = {}
164
165 def add_work(self, workfn, worklist, manager, priority):
166 """
167 Add a new work list to distributed to worker objects. The manager
168 gathers the results of the work. Work is assigned from the queue
169 based on priority.
170 """
171 job = MapJob(workfn, worklist, manager, priority)
172
173 # add work to the queue in priority order
174 for i,job in enumerate(self.jobs):
175 if job.priority < priority: break
176 self.jobs.insert(i,job)
177
178 # Create an entry in a persistent store for each job to
179 # capture completed work units and to recover from server
180 # reboot.
181
182 # Assign _priority_edge to cumsum(priority)/total_priority.
183 # This allows us to select the next job according to priority
184 # with a random number generator.
185 # NOTE: scalability is a bit of a concern --- the lookup
186 # operation is linear in the number of active jobs. This
187 # can be mitigated by limiting the number of active jobs.
188 total_priority = 0.
189 for job in self.jobs: total_priority += job.priority
190 edge = 0.
191 for job in self.jobs:
192 edge += job.priority/total_priority
193 self.job._priority_edge = edge
194
195
196 def register(self, pool=None):
197 """
198 Called by a worker when they are registering for work.
199
200 Returns the next piece of work.
201 """
202 P = numpy.random.rand()
203 for job in self.jobs:
204 if P < j._priority_edge:
205 return job.new_work()
206
207 return NoWork
208
209 def update(self, jobid, result):
210 """
211 Called by a worker when the work unit is complete.
212
213 Returns the next piece of work.
214 """
215 current_job = self.lookup(jobid)
216 current_job.reduce(result)
217 if numpy.random.rand() < current_job.switch_probability:
218 return current_job.next_work()
219
220 P = numpy.random.rand()
221 for job in self.jobs:
222 if P < job._priority_edge:
223 if job == current_job:
224 return curent_job.next_work()
225 else:
226 return job.new_work()
227
228 return NoWork
229 '''
230