Package park :: Package util :: Module msgstream

Source Code for Module park.util.msgstream

  1  # This program is public domain 
  2  """ 
  3  Asynchronous message streams. 
  4   
  5  When you have multiple listeners to a process, some of which can connect and 
  6  disconnect at different times, you need to dispatch the incoming messages to 
  7  all the listeners.  When a listener joins a running stream, they need to get 
  8  an immediate status update for the computation.  This is stored as the 
  9  stream header. Then, without dropping any messages, the listeners should 
 10  receive all subsequent messages in the stream in order. 
 11   
 12  The message stream is multi-channelled, and indexed by a key.  Within 
 13  the service framework the key is likely to be the jobid associated with 
 14  the message stream. 
 15   
 16  The contents of the message stream can be anything.  When a new listener 
 17  is registered, the header is immediately put on the queue (if there is one), 
 18  then all subsequent message are sent until the listener calls hangup(). 
 19   
 20  To attach to a message stream you need an object which accepts put(). 
 21  An asynchronous queue object is a good choice since it allows you to 
 22  buffer the messages in one thread and pull them off as needed in another. 
 23   
 24  Wire protocol 
 25  ============= 
 26   
 27  Messages are served using TCP transport on the port specified when starting 
 28  the message server. 
 29   
 30  The wire format is for publishing messages is JSON, which looks like:: 
 31   
 32      { 
 33          'action':'header|delete|put|listen|hangup|close', 
 34          'stream':'streamname', 
 35          'message': content 
 36      } 
 37   
 38  The content itself will be application dependent.  Base types include:: 
 39   
 40      string:  'string' 
 41      integer:  digits 
 42      float:    digits.digits 
 43      lists:    [value, value, ...] 
 44      structs:  {'key':value, 'key':value, ...} 
 45   
 46  Objects will be encoded as dictionaries:: 
 47   
 48      { 
 49          '.class': 'my.message', 
 50          '.version': '0.0', 
 51          '.state': { 
 52              'key1': value1 
 53              'key2': value1 
 54              ... 
 55          } 
 56      } 
 57   
 58  Subscribers to the message service will receive actions with put or close. 
 59   
 60  Note that for processing convenience in identifying the end of the message, 
 61  there are no newlines allowed as part of the message.  They are shown above 
 62  for clarity. 
 63   
 64  The particular wire format of the message can be changed by replacing 
 65  msgstream.codec, with static methods for encode and decode.  See 
 66  `park.util.serial` for details. 
 67  """ 
 68  from __future__ import with_statement 
 69   
 70  __all__ = ['message_stream', 'RemoteStream', 'serve'] 
 71   
 72  import time 
 73  import thread 
 74  import logging 
 75  import socket 
 76  import SocketServer 
 77  import Queue 
 78  from park.util.codec import JSON as codec 
 79  import park.util.threads as threads 
 80   
 81  logger = logging.getLogger('message_stream') 
 82  """Message stream is a service --- it needs to log requests and errors""" 
83 84 # Design note: we cannot buffer the messages in the stream. The only 85 # logical buffer size in this case would be the entire stream history 86 # and that may large. Furthermore, leading messages in the stream are 87 # not of value to the computation. The final result will be available 88 # for as long as the job is stored on the server, which is a lifetime 89 # much longer than the message queue. 90 91 -class MessageStream:
92 """ 93 Message streams. 94 95 For each active job on the system there is a message stream 96 containing the job header and a list of listening objects. 97 Listeners should accept a put() message to process the next 98 item in the stream. 99 """
100 - def __init__(self):
101 self._lock = thread.allocate_lock() 102 self._listeners = {} 103 self._headers = {}
104
105 - def header(self, stream, message):
106 """ 107 Set or clear the stream header. 108 """ 109 with self._lock: 110 if message is None: 111 self._headers.pop(stream, None) 112 else: 113 self._headers[stream] = message
114
115 - def delete(self, stream):
116 """ 117 Delete the message stream 'stream'. 118 """ 119 self._headers.pop(stream,None) 120 self._listeners.pop(stream,None)
121
122 - def put(self, stream, message):
123 """ 124 Put a message on stream 'stream', transfering it to all listening queues. 125 """ 126 #print "post message on",stream,message 127 with self._lock: 128 #print "post",stream,message,"->",self._listeners.get(stream,[]) 129 for queue in self._listeners.get(stream,[]): 130 queue.put((stream,message))
131
132 - def listen(self, stream, queue):
133 """ 134 Listen to message stream 'stream', adding all new messages to queue. 135 136 The stream header will be the first message queued. 137 """ 138 #print "listening on",stream 139 with self._lock: 140 queues = self._listeners.setdefault(stream,[]) 141 queues.append(queue) 142 #print "listen",stream,"->",self._listeners.get(stream,[]) 143 # Make sure that the Join header is the first item in the queue. 144 header = self._headers.get(stream, None) 145 if header is not None: 146 queue.put((stream,header))
147
148 - def hangup(self, stream, queue):
149 """ 150 Stop listening to message stream 'stream' with queue. 151 """ 152 with self._lock: 153 try: 154 queuelist = self._listeners[stream] 155 queuelist.remove(queue) 156 if queuelist == []: 157 del self._listeners[stream] 158 except KeyError: 159 pass 160 except ValueError: 161 pass
162
163 - def endqueue(self, queue):
164 """ 165 Close queue, removing it from all message streams. 166 """ 167 with self._lock: 168 purge = [] 169 for stream,queuelist in self._listeners.iteritems(): 170 try: 171 queuelist.remove(queue) 172 if queuelist == []: purge.append(stream) 173 except ValueError: 174 pass 175 for stream in purge: 176 del self._listeners[stream]
177
178 # Singleton message stream 179 message_stream = MessageStream() 180 181 182 # ===== Remote message service ===== 183 -def sendcommand(file, action=None, stream=None, message=None):
184 """ 185 Send a message over the wire. 186 """ 187 if message: 188 obj = dict(action=action, stream=stream, message=message) 189 else: 190 obj = dict(action=action, stream=stream) 191 s = codec.encode(obj) 192 file.write(s) 193 file.write('\n')
194
195 -def recvcommand(file):
196 """ 197 Read a message from the wire, return action, stream and message. 198 If message is not available, return None. 199 """ 200 # bits of code from pyro 201 try: 202 s = file.readline() 203 except socket.timeout: 204 raise TimeoutError("connection timeout receiving") 205 except socket.error,x: 206 #if x.args[0] == errno.EINTR or (hasattr(errno, 'WSAEINTR') and x.args[0] == errno.WSAEINTR): 207 # interrupted system call, just retry 208 # continue 209 s = "" 210 #raise ConnectionClosedError('connection lost: %s' % x) 211 #except SSLError,x: 212 # raise ConnectionClosedError('connection lost: %s' % x) 213 if s == "": 214 return 'endqueue',None,None 215 #logger.debug('recvd '+s.strip()) 216 obj = codec.decode(s) 217 return obj['action'],obj['stream'],obj.get('message',None)
218
219 -class ClientRequest(SocketServer.BaseRequestHandler):
220 """ 221 Process connections to the message stream from clients. 222 """
223 - def setup(self):
224 """ 225 Received a new message stream request. 226 """ 227 # Convert socket into a normal file object; buffered reads, 228 # unbuffered writes. 229 self.rfile = self.request.makefile("rt", -1) 230 self.wfile = self.request.makefile("wt", 0) 231 232 # Note connection, and start queue 233 host,port = self.client_address 234 logger.info(host+':'+str(port)+' connected.') 235 self.queue = Queue.Queue() 236 self.queue_thread = self._run_queue()
237
238 - def finish(self):
239 """ 240 Stop listening and forwarding messages. 241 242 Make sure queue thread is terminated. 243 """ 244 host,port = self.client_address 245 logger.info(host+':'+str(port)+' disconnected.') 246 sendcommand(self.wfile, action='close') 247 time.sleep(0.1) 248 self.queue.put((None,None)) # Stop the queue 249 self.rfile.close()
250 # self.queue_thread.join() 251
252 - def handle(self):
253 """ 254 Process requests to listen for or hang up on particular 255 message streams. 256 """ 257 while True: 258 action,stream,message = recvcommand(self.rfile) 259 if action == 'endqueue': 260 message_stream.endqueue(self.queue) 261 break 262 elif action == 'listen': 263 message_stream.listen(stream, self.queue) 264 elif action == 'hangup': 265 message_stream.hangup(stream, self.queue) 266 elif action == 'delete': 267 message_stream.delete(stream) 268 elif action == 'header': 269 message_stream.header(stream, message) 270 elif action == 'put': 271 message_stream.put(stream, message)
272 273 @threads.daemon
274 - def _run_queue(self):
275 """ 276 Send messages to the listening socket when they arrive. 277 """ 278 while True: 279 stream,message = self.queue.get() 280 if message == None: break 281 sendcommand(self.wfile, action='put', 282 stream=stream, message=message) 283 self.wfile.close()
284
285 -class MessageStreamService(SocketServer.ThreadingTCPServer):
286 allow_reuse_address = True
287 288 message_service = None
289 -def serve(server=None):
290 """ 291 Serve a message stream to remote monitors. 292 """ 293 global message_service 294 host,port = server.split(':') 295 port = int(port) 296 message_service = MessageStreamService((host,port),ClientRequest) 297 logger.info('starting on '+server) 298 message_service.serve_forever()
299
300 -class RemoteStream(object):
301 """ 302 Monitor remote message streams. 303 304 This is a bridge service which forwards various remote stream 305 streams to local listeners. It keeps only one connection open 306 to the remote stream server regardless of how many streams are 307 listening. 308 309 The interface is identical to the MessageStream interface, other 310 than the constructor and destructor. The remote stream has a 311 close method that should be called explicitly. 312 """
313 - def __init__(self, server=None):
314 host,port = server.split(':') 315 port = int(port) 316 #logger.debug('opening socket '+host+':'+str(port)) 317 self.closed = True 318 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 319 self.socket.connect((host,port)) 320 self.closed = False 321 self.rfile = self.socket.makefile('rt',-1) 322 self.wfile = self.socket.makefile('wt',0) 323 self.queue_thread = self._process_queue() 324 self._listeners = {}
325 - def close(self):
326 """ 327 Close the remote streamer and detach all associated listeners. 328 """ 329 if not self.closed: 330 sendcommand(self.wfile,action='endqueue') 331 self.wfile.close() 332 self.socket.close() 333 # self.queue_thread.wait() 334 self.closed = True
335 - def __del__(self):
336 if not self.closed: 337 print "remote message stream not properly closed"
338 - def header(self, stream, message):
339 """ 340 Record a new header for a stream. 341 """ 342 sendcommand(self.wfile,action='header',stream=stream,message=message)
343 - def delete(self, stream):
344 """ 345 Delete a stream. 346 """ 347 sendcommand(self.wfile,action='delete',stream=stream) 348 self._listeners.pop(stream,None)
349 - def put(self, stream, message):
350 """ 351 Post a message on stream. 352 """ 353 sendcommand(self.wfile,action='put',stream=stream,message=message)
354 - def listen(self, stream, queue):
355 """ 356 Start listening on a stream. 357 """ 358 if stream in self._listeners: 359 # Already listening to stream, so no need to contact server 360 if queue not in self._listeners[stream]: 361 self._listeners[stream].append(queue) 362 else: 363 # Not yet listening to stream, so register with server 364 sendcommand(self.wfile,action='listen',stream=stream) 365 self._listeners[stream] = [queue]
366 - def hangup(self, stream, queue):
367 """ 368 Stop listening on stream. 369 """ 370 # Silently ignore requests to hangup on streams that are not open 371 if stream in self._listeners and queue in self._listeners[stream]: 372 self._listeners[stream].remove(queue) 373 if self._listeners[stream] == []: 374 # Hanging up on last listener, so deregister from server 375 sendcommand(self.wfile,action='hangup',stream=stream) 376 del self._listeners[stream]
377 - def endqueue(self,queue):
378 """ 379 Stop all listeners for queue. 380 """ 381 purge = [] 382 for stream,queuelist in self._listeners.iteritems(): 383 try: 384 queuelist.remove(queue) 385 if queuelist == []: purge.append(stream) 386 except ValueError: 387 pass 388 for stream in purge: 389 # Deregister from server any listeners that are remaining 390 sendcommand(self.wfile,action='hangup',stream=stream) 391 del self._listeners[stream]
392 @threads.daemon
393 - def _process_queue(self):
394 """ 395 Process messages as they arrive from the message server. 396 """ 397 while True: 398 action,stream,message = recvcommand(self.rfile) 399 if action == 'close': 400 break 401 elif action == 'put': 402 # Post the message to all local listeners 403 for queue in self._listeners.get(stream,[]): 404 queue.put((stream,message)) 405 self.rfile.close()
406
407 # Design notes: 408 # 409 # Message streams could be used for signals, but care would be needed to 410 # make sure that the listener is set up when the job is queued, not when 411 # the job is started since otherwise some signals will be lost. Furthermore, 412 # the signal queue will need to be persistent to survive a server reboot. 413 # That said, having a single message passing scheme to handle both signals 414 # and monitor events does simplify the architecture of the server. Message 415 # stream ids will need to be tagged with both jobid and direction however. 416 # 417 # The third type of communication, synchronous messaging for querying job 418 # parameters in a running job, could be simulated by on the client side 419 # by queuing irrelevant messages until you get the response but that is 420 # awkward for the client. In order to support that kind of communication, 421 # we will need a direct rpc to the running job, and this direct communication 422 # can be used to insert signals onto the jobs queue with minimal overhead. 423 # 424 # Pyro already creates an event server. 425 # * Disadvantages: 426 # opaque wire protocol, semi-transparent xml-pickle 427 # message streams don't have headers 428 # requires nameserver (though this can be fixed) 429 # in flux; 3.8->4.0 is a stalled rewrite 430 # * Advantages: 431 # supports SSL and some authentication 432 # supports NT services 433 # more robust support for Windows 434 # community of developers and users 435 # Consider adding heartbeat to pyro and/or msgstream. 436 437 438 # ================================== Testing ============================== 439 -def _run_test(streamer, echo=False, rate=0.1):
440 import Queue 441 import time 442 443 class NamedQueue(Queue.Queue): 444 def __init__(self, name): 445 Queue.Queue.__init__(self) 446 self.name = name
447 def __str__(self): return self.name 448 def __repr__(self): return "Queue('%s')"%self.name 449 450 t0 = time.time() 451 print_lock = thread.allocate_lock() 452 def note(*seq): 453 if echo: 454 with print_lock: 455 for s in seq: 456 print s, 457 print 458 @threads.daemon 459 def process_queue(queue, sequence, test=True): 460 while True: 461 stream,message = queue.get() 462 id,k = message 463 queue.task_done() 464 t = time.time() - t0 465 if echo: 466 note("recv",queue,":",stream,"<-",(id,k),"time",t) 467 elif True: # Change this to False to generate output 468 assert (id,k,stream) in sequence,str((id,k,stream)) 469 assert abs(sequence[(id,k,stream)]-t) < 0.1 470 else: 471 s = 's'+queue.name[1:] 472 print "%s[%s] = %g"%(s,(id,k,stream), time.time()-t0) 473 @threads.threaded 474 def post(id, stream, deltas): 475 for k,t in enumerate(deltas): 476 time.sleep(t*rate) 477 note(" send ",id,":",stream,"<-",(id,k)) 478 streamer.put(stream,(id,k)) 479 time.sleep(0.1) # Give the queue a chance to process before exit() 480 def listen(stream,queue): 481 note("+++",queue,":",stream) 482 streamer.listen(stream,queue) 483 def hangup(stream,queue): 484 note("---",queue,":",stream) 485 if stream == None: 486 streamer.endqueue(queue) 487 else: 488 streamer.hangup(stream,queue) 489 490 q1 = NamedQueue('Q1') 491 q2 = NamedQueue('Q2') 492 q3 = NamedQueue('Q3') 493 494 # The following statements are generated by running process_queue with 495 # test=False rather than test=True. Make sure these are correct by hand 496 # before using them in the test. 497 seqlines = """ 498 s3[('H2', 25, 'M2')] = 0.0726418 499 s3[('S1', 0, 'M1')] = 0.132086 500 s1[('S1', 0, 'M1')] = 0.132233 501 s3[('S2', 0, 'M1')] = 0.142248 502 s1[('S2', 0, 'M1')] = 0.142368 503 s3[('S3', 0, 'M2')] = 0.152345 504 s3[('S4', 0, 'M2')] = 0.162684 505 s1[('S1', 1, 'M1')] = 0.332362 506 s3[('S1', 1, 'M1')] = 0.332588 507 s1[('S2', 1, 'M1')] = 0.342376 508 s3[('S2', 1, 'M1')] = 0.342523 509 s3[('S3', 1, 'M2')] = 0.352517 510 s3[('S4', 1, 'M2')] = 0.36285 511 s1[('S1', 2, 'M1')] = 0.432998 512 s3[('S1', 2, 'M1')] = 0.433155 513 s1[('S2', 2, 'M1')] = 0.442616 514 s3[('S2', 2, 'M1')] = 0.44279 515 s3[('S3', 2, 'M2')] = 0.452711 516 s3[('S4', 2, 'M2')] = 0.463012 517 s2[('H2', 25, 'M2')] = 0.573093 518 s2[('S3', 3, 'M2')] = 0.652863 519 s2[('S4', 3, 'M2')] = 0.663167 520 s2[('S3', 4, 'M2')] = 0.953022 521 s2[('S4', 4, 'M2')] = 0.96333 522 s2[('S3', 5, 'M2')] = 1.15318 523 s2[('S4', 5, 'M2')] = 1.16345 524 """ 525 s1,s2,s3 = {},{},{} 526 for line in seqlines.split('\n'): 527 exec line in dict(s1=s1,s2=s2,s3=s3) 528 #print s1,s2,s3 529 530 # Switch between gen and test to generate seqlines 531 #for q in [q1,q2,q3]: process_queue(q,None,test=False) 532 for q,s in [(q1,s1),(q2,s2),(q3,s3)]: process_queue(q,s,test=True) 533 streamer.header('M2',('H2',25)) 534 t1 = post('S1','M1',[1,2,1,2,3,2]) 535 t2 = post('S2','M1',[1,2,1,2,3,2]) 536 t3 = post('S3','M2',[1,2,1,2,3,2]) 537 t4 = post('S4','M2',[1,2,1,2,3,2]) 538 listen('M1',q1) 539 listen('M1',q3) 540 listen('M2',q3) 541 time.sleep(5*rate) 542 hangup(None,q3) 543 hangup('M1',q1) 544 listen('M2',q2) 545 t1.join() 546 t2.join() 547 t3.join() 548 t4.join() 549
550 551 -def _run_local(*args, **kw):
552 if kw.get('echo',False): print "==== Starting local demo ====" 553 _run_test(message_stream, *args, **kw)
554
555 -def _run_remote(*args, **kw):
556 import threading, time 557 558 if kw.get('echo',False): print "==== Starting remote demo ====" 559 # Start the remote service 560 server = 'localhost:51932' 561 thread = threading.Thread(target=serve,kwargs=dict(server=server)) 562 thread.setDaemon(True) 563 thread.start() 564 time.sleep(0.1) 565 streamer = RemoteStream(server) 566 _run_test(streamer, *args, **kw) 567 streamer.close() 568 message_service.server_close()
569
570 -def demo():
571 import time 572 logging.basicConfig(level=logging.DEBUG) 573 #_run_local(echo=True,rate=.1) 574 #time.sleep(4) 575 _run_remote(echo=True,rate=.1)
576
577 -def test():
578 _run_local() 579 _run_remote()
580 581 582 if __name__ == "__main__": 583 demo() 584 #test() 585