1
2 """
3 Manage service lifetime.
4
5 ServiceHandler operates on a service and a communication mesh to manage
6 the service lifetime. Signals sent to the service are received by the
7 handler and stored until the service is ready. Messages from the service
8 are sent through the handler back to the monitor. The handler keeps track
9 of progress messages, updating the client at the appropriate intervals as
10 progress occurs.
11 """
12
13 __all__ = ['ServiceHandler','load']
14
15 import thread, Queue, time
16
17 import park.core.service
18 from park.core import message
19 from park.util.threads import threaded, daemon
22 """
23 Recover symbol from path.
24 """
25 parts = path.split('.')
26 module_name = ".".join(parts[:-1])
27 symbol_name = parts[-1]
28 __import__(module_name)
29 module = sys.modules[module_name]
30 symbol = getattr(module,symbol_name)
31 return symbol
32
33 -def load(service_name):
34 """
35 Return an instance of the named service
36 """
37 factory = _import_symbol(service_name)
38 return factory()
39
41 """Exception raised when the service receives an abort message"""
42
44 """
45 Process signals coming into and out of the service.
46
47 The service handler is running in a separate thread from the service
48 itself. These are the service thread and the computation thread.
49
50 Progress messages are sent approximately every progress_delta seconds
51 when the computation is in a ready state.
52
53 Checkpoint messages are sent approxmiately every checkpoint_delta seconds
54 when the computation is in a ready state.
55
56 Improved messages are possibly sent when they are available, or possibly
57 delayed until the computation is in a ready state.
58 """
59 progress_delta = 1
60 """Send progress updates every second"""
61 checkpoint_delta = 3600
62 """Store checkpoints every hour"""
63 improved_delta = 0
64 """Send improvement messages immediately"""
65 - def __init__(self, client=None, workers=None):
66 self.__client = client
67 """Messaging interface"""
68 self.workers = workers
69 """parallel kernels"""
70 self.__pause_lock = thread.allocate_lock()
71 """Pause will freeze until the lock is aquired"""
72 self.__queue = Queue.Queue()
73 """Queue of messages to process when next in a ready state"""
74
75 self.__reset()
76
78 t = time.time()
79 self.__aborting = False
80 """Computation should periodically check if signal.abort is True"""
81 self.__pausing = False
82 """Application will freeze at next pause location"""
83 self.__progress_time = t
84 """Time of the last progress update"""
85 self.__checkpoint_time = t
86 """Time of the last checkpoint"""
87 self.__improved_time = t
88 """Time of the last improvement message"""
89 self.__improved_pending = None
90 """Whether there is a pending improvement that should be reported"""
91 self.__service = service
92 """Service running on the handler"""
93 self.__ready = False
94 """True when service is in the 'ready' state"""
95
96
98 """
99 If a handler is not found for processing the message asynchronously
100 then queue the message until the system is in a ready state. The
101 message will then be handled with an onReady handler.
102 """
103 self.delay(msg)
104
106 """
107 If a handler is not found for processing the message synchronously
108 then register a warning in the log.
109 """
110 logging.warn("Unhandled signal: "+str(msg))
111
113 """
114 Set handler attributes.
115
116 Set the associated attributes in the service handler.
117
118 Forward any remaining attributes to the service when the
119 computation is in a ready state.
120 """
121
122
123
124
125 pop = []
126 for k,v in msg.attrs.items():
127 if not k.startswith('_') and hasattr(self,k):
128 oldv = getattr(self,k)
129 if not callable(oldv):
130 setattr(self,k,v)
131 pop.append(k)
132 for k in pop: del msg.attrs[k]
133 if len(msg.attrs) > 0:
134 self.delay(msg)
135
137 """
138 Set service attribute.
139
140 Set the associated attributes in the service.
141
142 Raises AttributeError if there are unused attributes.
143 """
144 pop = []
145 for k,v in msg.attrs.items():
146 if hasattr(self.__service,k):
147 setattr(self.__service,k,v)
148 pop.append(k)
149 for k in pop: del msg.attrs[k]
150 if len(msg.attrs) > 0:
151 raise AttributeError("Unknown attributes for %s - %s"%
152 (str(self.__service),", ".join(msg.keys())))
153
155 """
156 Invoke method in the service.
157 """
158 if hasattr(self.__service, msg.method):
159 fn = getattr(self.__service,msg.method)
160 fn(*msg.args, **msg.kw)
161 else:
162 raise AttributeError("unknown method for %s - %s"
163 %(self.__service,msg.method))
164
166 """
167 We received an abort signal. Cancel the computation
168 when it is convenient. If the computation is paused
169 resume it before handling the abort.
170 """
171 self.__aborting = True
172
173 if self.__pausing:
174
175 self.__pausing = False
176 self.__pause_lock.release()
177
179 """
180 We received a pause signal. Halt the computation
181 at the next convenient point.
182 """
183
184 if not self.__pausing:
185
186 self.__pause_lock.acquire()
187 self.__pausing = True
188
190 """
191 We received a resume signal. Let the computation continue
192 from the halted point.
193 """
194
195
196 if self.__pausing:
197
198 self.__pausing = False
199 self.__pause_lock.release()
200
201 - def send(self, msg):
202 """
203 Send a message to the client(s).
204 """
205 self.__client.put(msg)
206
207 - def put(self, msg):
208 """
209 New message has arrived.
210
211 Called from handler thread when new message has arrived.
212
213 For synchronous operations, this returns the result of the
214 operation, otherwise this returns None.
215 """
216 method = "onSignal"+msg.__class__.__name__
217 fn = getattr(self.__service, method, None)
218 if fn is None:
219 fn = getattr(self, method, self.onSignalMessage)
220 fn(msg)
221
223 """
224 Delay the associated message until the compuation is in a ready state.
225 """
226 self.__queue.put(msg)
227
228 - def start(self, service=None, request=None, resume=False):
229 """
230 Run a request through a service.
231
232 This function returns immediately and the request is processed in
233 a separate thread.
234
235 Raises RuntimeError if a service is already running.
236 """
237 if self.__service is not None:
238 raise RuntimeError("Service already running on handler")
239
240 self.__reset(service=service)
241 thread = run_service(self, service, request, resume=resume)
242 def restore(result): self.__service = None
243 thread.after(restore)
244 return thread
245
247 """
248 Send an improvement message to the job listeners.
249
250 Called from the computation thread when the results on the client
251 should be updated.
252 """
253 t = time.time()
254
255 msg = message.Improved(status=state)
256 if t > self.__improved_time + self.improved_delta:
257 self.send(msg)
258 self.__improved_pending = None
259 self.__improved_time = t
260 else:
261 self.__improved_pending = msg
262
264 state = self.__service.checkpoint()
265 if state is not None:
266 self.send(message.Checkpoint(state))
267 self.__checkpoint_time = time.time()
268
270 state = self.__service.progress()
271 if state is not None:
272 if len(state) == 3:
273 k,n,units = state
274 else:
275 k,n = state
276 units = ""
277 self.send(message.Progress(k,n,units=units))
278 self.__progress_time = time.time()
279
281 self.send(self.__improved_pending)
282 self.__improved_pending = None
283 self.__improved_time = time.time()
284
286 """
287 Put the computation in a ready state.
288
289 Process external signals, update progress, set the checkpoint
290 and .
291
292 Signals include pause, abort and resume, plus any
293 signals that could not be handled synchronously.
294
295 Raises AbortService if the application is told to abort.
296
297 This method is run from the service thread.
298 """
299
300 self.__ready = True
301
302
303 t = time.time()
304 if t > self.__checkpoint_time+self.checkpoint_delta:
305 self._send_checkpoint()
306
307
308 if t > self.__progress_time+self.progress_delta:
309 self._send_progress()
310
311
312 if (self.__improved_pending is not None
313 and t > self.__improved_time + self.improved_delta):
314 self._send_improved()
315
316
317 self._process_queue()
318
319
320 if self.__pausing:
321
322
323
324 self.send(message.Paused())
325 self.__pause_lock.acquire()
326 self.__pause_lock.release()
327 if not self.__aborting:
328 self.send(message.Resumed())
329
330 self.__ready = False
331
332
333 if self.__aborting:
334 self.__aborting = False
335 raise AbortService
336
337
339 """
340 Process outstanding messages in the queue using onReady handlers.
341 """
342 for msg in self._messages():
343 method = "onReady"+msg.__class__.__name__
344 fn = getattr(self.__service, method, None)
345 if fn is None:
346 fn = getattr(self, method, None)
347 if fn is None:
348 fn = getattr(self.__service, "onReadyMessage", None)
349 if fn is None:
350 fn = self.onReadyMessage
351 fn(msg)
352
354 """
355 Iterate over the messages in the ready queue.
356 """
357 while True:
358 try:
359 yield self.__queue.get_nowait()
360 self.__queue.task_done()
361 except Queue.Empty:
362 return
363
364 - def map(self, fn, request):
365 """
366 Return the [fn(x) for x in request]
367 """
368 return [self.workers.imap(fn,request)]
369
370 - def imap(self, fn, request, enumerated=False, ordered=True):
371 """
372 Return fn(x) for x in request in order as they are ready.
373
374 If enumerated, returns (i,fn(x)) where i is the incoming order.
375
376 If ordered, returns fn(x) in the order they are completed.
377 """
378 self.workers.imap(fn,request,enumerated=enumerated, ordered=ordered)
379
380
381
382 @threaded
383 -def run_service(handler, service, request, resume=False):
406
408 """
409 The service handler needs a transport to send messages to the client.
410 """
411 - def put(self, msg):
412 """
413 Put a message into the pipe
414 """
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439 -class _SimpleService(park.core.service.Service):
440 - def __init__(self, verbose=False, timestep=1):
441 self.verbose,self.timestep = verbose,timestep
442 self.value = 2
443 self.called = []
445 """Number of seconds to count"""
446 if self.verbose: print self,"prepare(%s)"%str(request)
447 self.called.append('prepare')
448 self.request = request
449 self.state = 0
450 - def run(self, handler):
451 """Count seconds"""
452 if self.verbose: print self,"run(%d,%d)"%(self.state,self.request)
453 self.called.append('run')
454 import time
455 while self.state < self.request:
456 time.sleep(1*self.timestep)
457 self.state += 1
458 handler.improved(self.state)
459 handler.ready()
460 if self.value > 10: raise ValueError("Service error")
461 return self.state
463 if self.verbose: print self,"cleanup()"
464 self.called.append('cleanup')
466 if self.verbose: print "progress()->%d,%d"%(self.state,self.request)
467 self.called.append('progress')
468 return self.state,self.request,'steps'
470 if self.verbose: print "checkpoint()->%d,%d"%(self.request,self.state)
471 self.called.append('checkpoint')
472 return self.request,self.state
474 if self.verbose: print self,"restore(%s)"%str(state)
475 self.called.append('restore')
476 self.request,self.state = state
478 if self.verbose: print self,"received progress message"
479 self.called.append('onReadyProgress')
481 if self.verbose: print self,"received improved message"
482 self.called.append('onSignalImproved')
484 if self.verbose: print self,"received remote invocation",a,kw
485 self.called.append('remotemethod')
486 self.remote = (a,kw)
487
490 self.verbose = verbose
491 self.sent = {}
492 - def put(self, msg):
493 if self.verbose: print "send(%s)"%msg
494 self.sent[msg.__class__] = msg
495 if isinstance(msg, message.Checkpoint): self.checkstate = msg.state
496
498 transport = _SimpleTransport(verbose=True)
499 handler = ServiceHandler(transport)
500 service = _SimpleService(verbose=True,timestep=0.5)
501 thread = handler.start(service,10)
502 time.sleep(2)
503 handler.signal(message.Abort())
504 thread.join()
505
507
508 n = 30
509 rate = 0.01
510 verbose=False
511 transport = _SimpleTransport(verbose=verbose)
512 handler = ServiceHandler(transport)
513 handler.progress_delta = 2*rate
514 handler.checkpoint_delta = 2*rate
515 service = _SimpleService(verbose=verbose,timestep=rate)
516 thread = handler.start(service,n)
517 time.sleep(rate)
518 handler.put(message.Set(value=5))
519 handler.put(message.Progress(None,None))
520 handler.put(message.Improved(status=None))
521 handler.put(message.Invoke('remotemethod',5,kw='george'))
522 time.sleep(5*rate)
523 handler.put(message.Abort())
524 thread.join()
525 if verbose: print service.called,'\n',transport.sent.keys()
526 assert 'prepare' in service.called
527 assert 'run' in service.called
528 assert 'progress' in service.called
529 assert 'checkpoint' in service.called
530 assert 'onReadyProgress' in service.called
531 assert 'onSignalImproved' in service.called
532 assert service.remote == (5, 'george')
533 assert service.value == 5
534 assert service.state > 0 and service.state < n
535 assert transport.checkstate[0] == n
536 assert service.state == transport.sent[message.Improved].result
537
538
539 service = _SimpleService(verbose=verbose,timestep=rate)
540
541
542 thread = handler.start(service,transport.checkstate,resume=True)
543 thread.join()
544 if verbose: print service.called,'\n',transport.sent.keys()
545 assert 'restore' in service.called
546 assert 'run' in service.called
547 assert 'progress' in service.called
548 assert 'checkpoint' in service.called
549 assert service.state == n
550 assert transport.sent[message.Completed].result == n
551 assert service.state == transport.sent[message.Improved].result
552 assert message.Resumed not in transport.sent
553
554
555
556
557 transport.sent = {}
558 thread = handler.start(service, n)
559 time.sleep(5*rate)
560 handler.put(message.Set(value=15))
561 thread.join()
562 assert transport.sent[message.Error].trace[0] is ValueError
563 assert service.state < n
564
565
566 service.value = 5
567
568
569 thread = handler.start(service, n)
570 time.sleep(2*rate)
571 handler.put(message.Pause())
572 time.sleep(2*rate)
573 assert message.Paused in transport.sent
574 assert handler._ServiceHandler__ready
575 value = service.state
576 time.sleep(2*rate)
577 assert value == service.state
578
579
580 handler.put(message.Resume())
581 time.sleep(4*rate)
582 assert value != service.state
583 assert message.Resumed in transport.sent
584
585
586 transport.sent = {}
587 handler.put(message.Pause())
588 time.sleep(2*rate)
589 handler.put(message.Abort())
590 thread.join()
591 assert service.state < n
592 assert message.Paused in transport.sent
593 assert message.Resumed not in transport.sent
594 assert message.Aborted in transport.sent
595
596 if __name__ == "__main__":
597 test()
598
599