Package park :: Package core :: Module servicehandler

Source Code for Module park.core.servicehandler

  1  # This program is public domain 
  2  """ 
  3  Manage service lifetime. 
  4   
  5  ServiceHandler operates on a service and a communication mesh to manage 
  6  the service lifetime.  Signals sent to the service are received by the 
  7  handler and stored until the service is ready.  Messages from the service 
  8  are sent through the handler back to the monitor.  The handler keeps track 
  9  of progress messages, updating the client at the appropriate intervals as 
 10  progress occurs. 
 11  """ 
 12   
 13  __all__ = ['ServiceHandler','load'] 
 14   
 15  import thread, Queue, time 
 16   
 17  import park.core.service 
 18  from park.core import message 
 19  from park.util.threads import threaded, daemon 
20 21 -def _import_symbol(path):
22 """ 23 Recover symbol from path. 24 """ 25 parts = path.split('.') 26 module_name = ".".join(parts[:-1]) 27 symbol_name = parts[-1] 28 __import__(module_name) 29 module = sys.modules[module_name] 30 symbol = getattr(module,symbol_name) 31 return symbol
32
33 -def load(service_name):
34 """ 35 Return an instance of the named service 36 """ 37 factory = _import_symbol(service_name) 38 return factory()
39
40 -class AbortService(Exception):
41 """Exception raised when the service receives an abort message"""
42
43 -class ServiceHandler(object):
44 """ 45 Process signals coming into and out of the service. 46 47 The service handler is running in a separate thread from the service 48 itself. These are the service thread and the computation thread. 49 50 Progress messages are sent approximately every progress_delta seconds 51 when the computation is in a ready state. 52 53 Checkpoint messages are sent approxmiately every checkpoint_delta seconds 54 when the computation is in a ready state. 55 56 Improved messages are possibly sent when they are available, or possibly 57 delayed until the computation is in a ready state. 58 """ 59 progress_delta = 1 60 """Send progress updates every second""" 61 checkpoint_delta = 3600 62 """Store checkpoints every hour""" 63 improved_delta = 0 64 """Send improvement messages immediately"""
65 - def __init__(self, client=None, workers=None):
66 self.__client = client 67 """Messaging interface""" 68 self.workers = workers 69 """parallel kernels""" 70 self.__pause_lock = thread.allocate_lock() 71 """Pause will freeze until the lock is aquired""" 72 self.__queue = Queue.Queue() 73 """Queue of messages to process when next in a ready state""" 74 75 self.__reset()
76
77 - def __reset(self, service=None):
78 t = time.time() 79 self.__aborting = False 80 """Computation should periodically check if signal.abort is True""" 81 self.__pausing = False 82 """Application will freeze at next pause location""" 83 self.__progress_time = t 84 """Time of the last progress update""" 85 self.__checkpoint_time = t 86 """Time of the last checkpoint""" 87 self.__improved_time = t 88 """Time of the last improvement message""" 89 self.__improved_pending = None 90 """Whether there is a pending improvement that should be reported""" 91 self.__service = service 92 """Service running on the handler""" 93 self.__ready = False 94 """True when service is in the 'ready' state"""
95 96 # Signal handlers
97 - def onSignalMessage(self, msg):
98 """ 99 If a handler is not found for processing the message asynchronously 100 then queue the message until the system is in a ready state. The 101 message will then be handled with an onReady handler. 102 """ 103 self.delay(msg)
104
105 - def onReadyMessage(self, msg):
106 """ 107 If a handler is not found for processing the message synchronously 108 then register a warning in the log. 109 """ 110 logging.warn("Unhandled signal: "+str(msg))
111
112 - def onSignalSet(self, msg):
113 """ 114 Set handler attributes. 115 116 Set the associated attributes in the service handler. 117 118 Forward any remaining attributes to the service when the 119 computation is in a ready state. 120 """ 121 # Ignore requests to set private attributes or override method 122 # names. This is primarily to prevent accidents in which an 123 # attribute to the computation has the same name as a method 124 # in the signal handler. 125 pop = [] 126 for k,v in msg.attrs.items(): 127 if not k.startswith('_') and hasattr(self,k): 128 oldv = getattr(self,k) 129 if not callable(oldv): 130 setattr(self,k,v) 131 pop.append(k) 132 for k in pop: del msg.attrs[k] 133 if len(msg.attrs) > 0: 134 self.delay(msg)
135
136 - def onReadySet(self, msg):
137 """ 138 Set service attribute. 139 140 Set the associated attributes in the service. 141 142 Raises AttributeError if there are unused attributes. 143 """ 144 pop = [] 145 for k,v in msg.attrs.items(): 146 if hasattr(self.__service,k): 147 setattr(self.__service,k,v) 148 pop.append(k) 149 for k in pop: del msg.attrs[k] 150 if len(msg.attrs) > 0: 151 raise AttributeError("Unknown attributes for %s - %s"% 152 (str(self.__service),", ".join(msg.keys())))
153
154 - def onReadyInvoke(self, msg):
155 """ 156 Invoke method in the service. 157 """ 158 if hasattr(self.__service, msg.method): 159 fn = getattr(self.__service,msg.method) 160 fn(*msg.args, **msg.kw) 161 else: 162 raise AttributeError("unknown method for %s - %s" 163 %(self.__service,msg.method))
164
165 - def onSignalAbort(self, msg):
166 """ 167 We received an abort signal. Cancel the computation 168 when it is convenient. If the computation is paused 169 resume it before handling the abort. 170 """ 171 self.__aborting = True 172 # Release the pause 173 if self.__pausing: 174 #print "resuming" 175 self.__pausing = False 176 self.__pause_lock.release()
177
178 - def onSignalPause(self, msg):
179 """ 180 We received a pause signal. Halt the computation 181 at the next convenient point. 182 """ 183 # Ignore multiple requests to pause 184 if not self.__pausing: 185 #print "pausing" 186 self.__pause_lock.acquire() 187 self.__pausing = True
188
189 - def onSignalResume(self, msg):
190 """ 191 We received a resume signal. Let the computation continue 192 from the halted point. 193 """ 194 # Be warned that 195 # Ignore request to resume when not paused 196 if self.__pausing: 197 #print "resuming" 198 self.__pausing = False 199 self.__pause_lock.release()
200
201 - def send(self, msg):
202 """ 203 Send a message to the client(s). 204 """ 205 self.__client.put(msg)
206
207 - def put(self, msg):
208 """ 209 New message has arrived. 210 211 Called from handler thread when new message has arrived. 212 213 For synchronous operations, this returns the result of the 214 operation, otherwise this returns None. 215 """ 216 method = "onSignal"+msg.__class__.__name__ 217 fn = getattr(self.__service, method, None) 218 if fn is None: 219 fn = getattr(self, method, self.onSignalMessage) 220 fn(msg)
221
222 - def delay(self, msg):
223 """ 224 Delay the associated message until the compuation is in a ready state. 225 """ 226 self.__queue.put(msg)
227
228 - def start(self, service=None, request=None, resume=False):
229 """ 230 Run a request through a service. 231 232 This function returns immediately and the request is processed in 233 a separate thread. 234 235 Raises RuntimeError if a service is already running. 236 """ 237 if self.__service is not None: 238 raise RuntimeError("Service already running on handler") 239 240 self.__reset(service=service) 241 thread = run_service(self, service, request, resume=resume) 242 def restore(result): self.__service = None 243 thread.after(restore) 244 return thread
245
246 - def improved(self, state):
247 """ 248 Send an improvement message to the job listeners. 249 250 Called from the computation thread when the results on the client 251 should be updated. 252 """ 253 t = time.time() 254 255 msg = message.Improved(status=state) 256 if t > self.__improved_time + self.improved_delta: 257 self.send(msg) 258 self.__improved_pending = None 259 self.__improved_time = t 260 else: 261 self.__improved_pending = msg
262
263 - def _send_checkpoint(self):
264 state = self.__service.checkpoint() 265 if state is not None: 266 self.send(message.Checkpoint(state)) 267 self.__checkpoint_time = time.time()
268
269 - def _send_progress(self):
270 state = self.__service.progress() 271 if state is not None: 272 if len(state) == 3: 273 k,n,units = state 274 else: 275 k,n = state 276 units = "" 277 self.send(message.Progress(k,n,units=units)) 278 self.__progress_time = time.time()
279
280 - def _send_improved(self):
281 self.send(self.__improved_pending) 282 self.__improved_pending = None 283 self.__improved_time = time.time()
284
285 - def ready(self):
286 """ 287 Put the computation in a ready state. 288 289 Process external signals, update progress, set the checkpoint 290 and . 291 292 Signals include pause, abort and resume, plus any 293 signals that could not be handled synchronously. 294 295 Raises AbortService if the application is told to abort. 296 297 This method is run from the service thread. 298 """ 299 # Let the world know we are in a 'ready' state 300 self.__ready = True 301 302 # Capture the computational state so that we can restore it later. 303 t = time.time() 304 if t > self.__checkpoint_time+self.checkpoint_delta: 305 self._send_checkpoint() 306 307 # Note progress 308 if t > self.__progress_time+self.progress_delta: 309 self._send_progress() 310 311 # Note improvement 312 if (self.__improved_pending is not None 313 and t > self.__improved_time + self.improved_delta): 314 self._send_improved() 315 316 # Process asynchronous messages. 317 self._process_queue() 318 319 # Process pause. 320 if self.__pausing: 321 # if already locked, the following statement will halt until 322 # resume releases the lock from another thread. Once locked 323 # we can immediate release and continue on our way. 324 self.send(message.Paused()) 325 self.__pause_lock.acquire() 326 self.__pause_lock.release() 327 if not self.__aborting: 328 self.send(message.Resumed()) 329 330 self.__ready = False 331 332 # Check for abort signal 333 if self.__aborting: 334 self.__aborting = False 335 raise AbortService
336 337
338 - def _process_queue(self):
339 """ 340 Process outstanding messages in the queue using onReady handlers. 341 """ 342 for msg in self._messages(): 343 method = "onReady"+msg.__class__.__name__ 344 fn = getattr(self.__service, method, None) 345 if fn is None: 346 fn = getattr(self, method, None) 347 if fn is None: 348 fn = getattr(self.__service, "onReadyMessage", None) 349 if fn is None: 350 fn = self.onReadyMessage 351 fn(msg)
352
353 - def _messages(self):
354 """ 355 Iterate over the messages in the ready queue. 356 """ 357 while True: 358 try: 359 yield self.__queue.get_nowait() 360 self.__queue.task_done() 361 except Queue.Empty: 362 return
363
364 - def map(self, fn, request):
365 """ 366 Return the [fn(x) for x in request] 367 """ 368 return [self.workers.imap(fn,request)]
369
370 - def imap(self, fn, request, enumerated=False, ordered=True):
371 """ 372 Return fn(x) for x in request in order as they are ready. 373 374 If enumerated, returns (i,fn(x)) where i is the incoming order. 375 376 If ordered, returns fn(x) in the order they are completed. 377 """ 378 self.workers.imap(fn,request,enumerated=enumerated, ordered=ordered)
379
380 381 382 @threaded 383 -def run_service(handler, service, request, resume=False):
384 from park.core import message 385 # Run user code, trapping exceptions 386 try: 387 # Set service parameters 388 if resume: 389 service.restore(request) 390 else: 391 service.prepare(request) 392 393 # Run the service, waiting for result 394 try: 395 result = service.run(handler) 396 except AbortService: 397 handler.send(message.Aborted()) 398 else: 399 handler.send(message.Completed(result)) 400 finally: 401 service.cleanup() 402 403 except: 404 # Convert error to message 405 handler.send(message.Error())
406
407 -class Transport(object):
408 """ 409 The service handler needs a transport to send messages to the client. 410 """
411 - def put(self, msg):
412 """ 413 Put a message into the pipe 414 """
415
416 # Design idea: use the inheritancy hierarchy for a message? 417 # That is, if EchoA inherits from Echo, do we check for onSignalEchoA and 418 # then onSignalEcho before defaulting to onSignalMessage? 419 # 420 # This is unnecessary complexity. 421 422 # Design idea: allow signals to have a timer attached? 423 # This would allow you to set up a job and by you signal that the 424 # population will be randomized every 20 minutes. 425 # 426 # This is unnecessary complexity. 427 428 # Design idea: if we run the service on a virtual machine then 429 # checkpoint/restore can perhaps be implemented by simply capturing 430 # the machine state. The complication is that parts of the computation 431 # live outside the service, in particular, in the mapping of work 432 # units to other processors. 433 # 434 # For now we insist on cooperative checkpoint/restore. 435 436 437 # ================================================================ 438 # Introspective service and handler to exercise base functionality 439 -class _SimpleService(park.core.service.Service):
440 - def __init__(self, verbose=False, timestep=1):
441 self.verbose,self.timestep = verbose,timestep 442 self.value = 2 443 self.called = []
444 - def prepare(self, request):
445 """Number of seconds to count""" 446 if self.verbose: print self,"prepare(%s)"%str(request) 447 self.called.append('prepare') 448 self.request = request 449 self.state = 0
450 - def run(self, handler):
451 """Count seconds""" 452 if self.verbose: print self,"run(%d,%d)"%(self.state,self.request) 453 self.called.append('run') 454 import time 455 while self.state < self.request: 456 time.sleep(1*self.timestep) 457 self.state += 1 458 handler.improved(self.state) 459 handler.ready() 460 if self.value > 10: raise ValueError("Service error") 461 return self.state
462 - def cleanup(self):
463 if self.verbose: print self,"cleanup()" 464 self.called.append('cleanup')
465 - def progress(self):
466 if self.verbose: print "progress()->%d,%d"%(self.state,self.request) 467 self.called.append('progress') 468 return self.state,self.request,'steps'
469 - def checkpoint(self):
470 if self.verbose: print "checkpoint()->%d,%d"%(self.request,self.state) 471 self.called.append('checkpoint') 472 return self.request,self.state
473 - def restore(self, state):
474 if self.verbose: print self,"restore(%s)"%str(state) 475 self.called.append('restore') 476 self.request,self.state = state
477 - def onReadyProgress(self,msg):
478 if self.verbose: print self,"received progress message" 479 self.called.append('onReadyProgress')
480 - def onSignalImproved(self,msg):
481 if self.verbose: print self,"received improved message" 482 self.called.append('onSignalImproved')
483 - def remotemethod(self, a, kw='kw'):
484 if self.verbose: print self,"received remote invocation",a,kw 485 self.called.append('remotemethod') 486 self.remote = (a,kw)
487
488 -class _SimpleTransport(Transport):
489 - def __init__(self, verbose=False):
490 self.verbose = verbose 491 self.sent = {}
492 - def put(self, msg):
493 if self.verbose: print "send(%s)"%msg 494 self.sent[msg.__class__] = msg 495 if isinstance(msg, message.Checkpoint): self.checkstate = msg.state
496
497 -def demo():
498 transport = _SimpleTransport(verbose=True) 499 handler = ServiceHandler(transport) 500 service = _SimpleService(verbose=True,timestep=0.5) 501 thread = handler.start(service,10) 502 time.sleep(2) 503 handler.signal(message.Abort()) 504 thread.join()
505
506 -def test():
507 # Perform checkpoints 508 n = 30 509 rate = 0.01 510 verbose=False 511 transport = _SimpleTransport(verbose=verbose) 512 handler = ServiceHandler(transport) 513 handler.progress_delta = 2*rate 514 handler.checkpoint_delta = 2*rate 515 service = _SimpleService(verbose=verbose,timestep=rate) 516 thread = handler.start(service,n) 517 time.sleep(rate) 518 handler.put(message.Set(value=5)) # Set attributes 519 handler.put(message.Progress(None,None)) # synchronous signal 520 handler.put(message.Improved(status=None)) # Asynchronous signal 521 handler.put(message.Invoke('remotemethod',5,kw='george')) # RMI 522 time.sleep(5*rate) 523 handler.put(message.Abort()) 524 thread.join() 525 if verbose: print service.called,'\n',transport.sent.keys() 526 assert 'prepare' in service.called 527 assert 'run' in service.called 528 assert 'progress' in service.called 529 assert 'checkpoint' in service.called 530 assert 'onReadyProgress' in service.called 531 assert 'onSignalImproved' in service.called 532 assert service.remote == (5, 'george') 533 assert service.value == 5 # received control signal 534 assert service.state > 0 and service.state < n 535 assert transport.checkstate[0] == n 536 assert service.state == transport.sent[message.Improved].result 537 538 # Try restore into a different service 539 service = _SimpleService(verbose=verbose,timestep=rate) 540 #service.verbose = True 541 #handler.verbose = True 542 thread = handler.start(service,transport.checkstate,resume=True) 543 thread.join() 544 if verbose: print service.called,'\n',transport.sent.keys() 545 assert 'restore' in service.called 546 assert 'run' in service.called 547 assert 'progress' in service.called 548 assert 'checkpoint' in service.called 549 assert service.state == n 550 assert transport.sent[message.Completed].result == n 551 assert service.state == transport.sent[message.Improved].result 552 assert message.Resumed not in transport.sent 553 554 # Make sure errors are trapped 555 #service.verbose = True 556 #handler.verbose = True 557 transport.sent = {} 558 thread = handler.start(service, n) 559 time.sleep(5*rate) 560 handler.put(message.Set(value=15)) 561 thread.join() 562 assert transport.sent[message.Error].trace[0] is ValueError 563 assert service.state < n 564 565 # Check that pause pauses 566 service.value = 5 567 #service.verbose = True 568 #handler.verbose = True 569 thread = handler.start(service, n) 570 time.sleep(2*rate) 571 handler.put(message.Pause()) 572 time.sleep(2*rate) 573 assert message.Paused in transport.sent 574 assert handler._ServiceHandler__ready 575 value = service.state 576 time.sleep(2*rate) 577 assert value == service.state 578 579 # Make sure we can resume 580 handler.put(message.Resume()) 581 time.sleep(4*rate) 582 assert value != service.state 583 assert message.Resumed in transport.sent 584 585 # Make sure that we can abort when paused 586 transport.sent = {} 587 handler.put(message.Pause()) 588 time.sleep(2*rate) 589 handler.put(message.Abort()) 590 thread.join() 591 assert service.state < n 592 assert message.Paused in transport.sent 593 assert message.Resumed not in transport.sent 594 assert message.Aborted in transport.sent
595 596 if __name__ == "__main__": 597 test() 598 #demo() 599