1
2 """
3 Asynchronous message streams.
4
5 When you have multiple listeners to a process, some of which can connect and
6 disconnect at different times, you need to dispatch the incoming messages to
7 all the listeners. When a listener joins a running stream, they need to get
8 an immediate status update for the computation. This is stored as the
9 stream header. Then, without dropping any messages, the listeners should
10 receive all subsequent messages in the stream in order.
11
12 The message stream is multi-channelled, and indexed by a key. Within
13 the service framework the key is likely to be the jobid associated with
14 the message stream.
15
16 The contents of the message stream can be anything. When a new listener
17 is registered, the header is immediately put on the queue (if there is one),
18 then all subsequent message are sent until the listener calls hangup().
19
20 To attach to a message stream you need an object which accepts put().
21 An asynchronous queue object is a good choice since it allows you to
22 buffer the messages in one thread and pull them off as needed in another.
23
24 Wire protocol
25 =============
26
27 Messages are served using TCP transport on the port specified when starting
28 the message server.
29
30 The wire format is for publishing messages is JSON, which looks like::
31
32 {
33 'action':'header|delete|put|listen|hangup|close',
34 'stream':'streamname',
35 'message': content
36 }
37
38 The content itself will be application dependent. Base types include::
39
40 string: 'string'
41 integer: digits
42 float: digits.digits
43 lists: [value, value, ...]
44 structs: {'key':value, 'key':value, ...}
45
46 Objects will be encoded as dictionaries::
47
48 {
49 '.class': 'my.message',
50 '.version': '0.0',
51 '.state': {
52 'key1': value1
53 'key2': value1
54 ...
55 }
56 }
57
58 Subscribers to the message service will receive actions with put or close.
59
60 Note that for processing convenience in identifying the end of the message,
61 there are no newlines allowed as part of the message. They are shown above
62 for clarity.
63
64 The particular wire format of the message can be changed by replacing
65 msgstream.codec, with static methods for encode and decode. See
66 `park.util.serial` for details.
67 """
68 from __future__ import with_statement
69
70 __all__ = ['message_stream', 'RemoteStream', 'serve']
71
72 import time
73 import thread
74 import logging
75 import socket
76 import SocketServer
77 import Queue
78 from park.util.codec import JSON as codec
79 import park.util.threads as threads
80
81 logger = logging.getLogger('message_stream')
82 """Message stream is a service --- it needs to log requests and errors"""
92 """
93 Message streams.
94
95 For each active job on the system there is a message stream
96 containing the job header and a list of listening objects.
97 Listeners should accept a put() message to process the next
98 item in the stream.
99 """
101 self._lock = thread.allocate_lock()
102 self._listeners = {}
103 self._headers = {}
104
106 """
107 Set or clear the stream header.
108 """
109 with self._lock:
110 if message is None:
111 self._headers.pop(stream, None)
112 else:
113 self._headers[stream] = message
114
116 """
117 Delete the message stream 'stream'.
118 """
119 self._headers.pop(stream,None)
120 self._listeners.pop(stream,None)
121
122 - def put(self, stream, message):
123 """
124 Put a message on stream 'stream', transfering it to all listening queues.
125 """
126
127 with self._lock:
128
129 for queue in self._listeners.get(stream,[]):
130 queue.put((stream,message))
131
132 - def listen(self, stream, queue):
133 """
134 Listen to message stream 'stream', adding all new messages to queue.
135
136 The stream header will be the first message queued.
137 """
138
139 with self._lock:
140 queues = self._listeners.setdefault(stream,[])
141 queues.append(queue)
142
143
144 header = self._headers.get(stream, None)
145 if header is not None:
146 queue.put((stream,header))
147
148 - def hangup(self, stream, queue):
149 """
150 Stop listening to message stream 'stream' with queue.
151 """
152 with self._lock:
153 try:
154 queuelist = self._listeners[stream]
155 queuelist.remove(queue)
156 if queuelist == []:
157 del self._listeners[stream]
158 except KeyError:
159 pass
160 except ValueError:
161 pass
162
164 """
165 Close queue, removing it from all message streams.
166 """
167 with self._lock:
168 purge = []
169 for stream,queuelist in self._listeners.iteritems():
170 try:
171 queuelist.remove(queue)
172 if queuelist == []: purge.append(stream)
173 except ValueError:
174 pass
175 for stream in purge:
176 del self._listeners[stream]
177
184 """
185 Send a message over the wire.
186 """
187 if message:
188 obj = dict(action=action, stream=stream, message=message)
189 else:
190 obj = dict(action=action, stream=stream)
191 s = codec.encode(obj)
192 file.write(s)
193 file.write('\n')
194
196 """
197 Read a message from the wire, return action, stream and message.
198 If message is not available, return None.
199 """
200
201 try:
202 s = file.readline()
203 except socket.timeout:
204 raise TimeoutError("connection timeout receiving")
205 except socket.error,x:
206
207
208
209 s = ""
210
211
212
213 if s == "":
214 return 'endqueue',None,None
215
216 obj = codec.decode(s)
217 return obj['action'],obj['stream'],obj.get('message',None)
218
220 """
221 Process connections to the message stream from clients.
222 """
224 """
225 Received a new message stream request.
226 """
227
228
229 self.rfile = self.request.makefile("rt", -1)
230 self.wfile = self.request.makefile("wt", 0)
231
232
233 host,port = self.client_address
234 logger.info(host+':'+str(port)+' connected.')
235 self.queue = Queue.Queue()
236 self.queue_thread = self._run_queue()
237
239 """
240 Stop listening and forwarding messages.
241
242 Make sure queue thread is terminated.
243 """
244 host,port = self.client_address
245 logger.info(host+':'+str(port)+' disconnected.')
246 sendcommand(self.wfile, action='close')
247 time.sleep(0.1)
248 self.queue.put((None,None))
249 self.rfile.close()
250
251
272
273 @threads.daemon
275 """
276 Send messages to the listening socket when they arrive.
277 """
278 while True:
279 stream,message = self.queue.get()
280 if message == None: break
281 sendcommand(self.wfile, action='put',
282 stream=stream, message=message)
283 self.wfile.close()
284
287
288 message_service = None
299
301 """
302 Monitor remote message streams.
303
304 This is a bridge service which forwards various remote stream
305 streams to local listeners. It keeps only one connection open
306 to the remote stream server regardless of how many streams are
307 listening.
308
309 The interface is identical to the MessageStream interface, other
310 than the constructor and destructor. The remote stream has a
311 close method that should be called explicitly.
312 """
314 host,port = server.split(':')
315 port = int(port)
316
317 self.closed = True
318 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
319 self.socket.connect((host,port))
320 self.closed = False
321 self.rfile = self.socket.makefile('rt',-1)
322 self.wfile = self.socket.makefile('wt',0)
323 self.queue_thread = self._process_queue()
324 self._listeners = {}
326 """
327 Close the remote streamer and detach all associated listeners.
328 """
329 if not self.closed:
330 sendcommand(self.wfile,action='endqueue')
331 self.wfile.close()
332 self.socket.close()
333
334 self.closed = True
336 if not self.closed:
337 print "remote message stream not properly closed"
339 """
340 Record a new header for a stream.
341 """
342 sendcommand(self.wfile,action='header',stream=stream,message=message)
344 """
345 Delete a stream.
346 """
347 sendcommand(self.wfile,action='delete',stream=stream)
348 self._listeners.pop(stream,None)
349 - def put(self, stream, message):
350 """
351 Post a message on stream.
352 """
353 sendcommand(self.wfile,action='put',stream=stream,message=message)
354 - def listen(self, stream, queue):
355 """
356 Start listening on a stream.
357 """
358 if stream in self._listeners:
359
360 if queue not in self._listeners[stream]:
361 self._listeners[stream].append(queue)
362 else:
363
364 sendcommand(self.wfile,action='listen',stream=stream)
365 self._listeners[stream] = [queue]
366 - def hangup(self, stream, queue):
367 """
368 Stop listening on stream.
369 """
370
371 if stream in self._listeners and queue in self._listeners[stream]:
372 self._listeners[stream].remove(queue)
373 if self._listeners[stream] == []:
374
375 sendcommand(self.wfile,action='hangup',stream=stream)
376 del self._listeners[stream]
378 """
379 Stop all listeners for queue.
380 """
381 purge = []
382 for stream,queuelist in self._listeners.iteritems():
383 try:
384 queuelist.remove(queue)
385 if queuelist == []: purge.append(stream)
386 except ValueError:
387 pass
388 for stream in purge:
389
390 sendcommand(self.wfile,action='hangup',stream=stream)
391 del self._listeners[stream]
392 @threads.daemon
394 """
395 Process messages as they arrive from the message server.
396 """
397 while True:
398 action,stream,message = recvcommand(self.rfile)
399 if action == 'close':
400 break
401 elif action == 'put':
402
403 for queue in self._listeners.get(stream,[]):
404 queue.put((stream,message))
405 self.rfile.close()
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439 -def _run_test(streamer, echo=False, rate=0.1):
440 import Queue
441 import time
442
443 class NamedQueue(Queue.Queue):
444 def __init__(self, name):
445 Queue.Queue.__init__(self)
446 self.name = name
447 def __str__(self): return self.name
448 def __repr__(self): return "Queue('%s')"%self.name
449
450 t0 = time.time()
451 print_lock = thread.allocate_lock()
452 def note(*seq):
453 if echo:
454 with print_lock:
455 for s in seq:
456 print s,
457 print
458 @threads.daemon
459 def process_queue(queue, sequence, test=True):
460 while True:
461 stream,message = queue.get()
462 id,k = message
463 queue.task_done()
464 t = time.time() - t0
465 if echo:
466 note("recv",queue,":",stream,"<-",(id,k),"time",t)
467 elif True:
468 assert (id,k,stream) in sequence,str((id,k,stream))
469 assert abs(sequence[(id,k,stream)]-t) < 0.1
470 else:
471 s = 's'+queue.name[1:]
472 print "%s[%s] = %g"%(s,(id,k,stream), time.time()-t0)
473 @threads.threaded
474 def post(id, stream, deltas):
475 for k,t in enumerate(deltas):
476 time.sleep(t*rate)
477 note(" send ",id,":",stream,"<-",(id,k))
478 streamer.put(stream,(id,k))
479 time.sleep(0.1)
480 def listen(stream,queue):
481 note("+++",queue,":",stream)
482 streamer.listen(stream,queue)
483 def hangup(stream,queue):
484 note("---",queue,":",stream)
485 if stream == None:
486 streamer.endqueue(queue)
487 else:
488 streamer.hangup(stream,queue)
489
490 q1 = NamedQueue('Q1')
491 q2 = NamedQueue('Q2')
492 q3 = NamedQueue('Q3')
493
494
495
496
497 seqlines = """
498 s3[('H2', 25, 'M2')] = 0.0726418
499 s3[('S1', 0, 'M1')] = 0.132086
500 s1[('S1', 0, 'M1')] = 0.132233
501 s3[('S2', 0, 'M1')] = 0.142248
502 s1[('S2', 0, 'M1')] = 0.142368
503 s3[('S3', 0, 'M2')] = 0.152345
504 s3[('S4', 0, 'M2')] = 0.162684
505 s1[('S1', 1, 'M1')] = 0.332362
506 s3[('S1', 1, 'M1')] = 0.332588
507 s1[('S2', 1, 'M1')] = 0.342376
508 s3[('S2', 1, 'M1')] = 0.342523
509 s3[('S3', 1, 'M2')] = 0.352517
510 s3[('S4', 1, 'M2')] = 0.36285
511 s1[('S1', 2, 'M1')] = 0.432998
512 s3[('S1', 2, 'M1')] = 0.433155
513 s1[('S2', 2, 'M1')] = 0.442616
514 s3[('S2', 2, 'M1')] = 0.44279
515 s3[('S3', 2, 'M2')] = 0.452711
516 s3[('S4', 2, 'M2')] = 0.463012
517 s2[('H2', 25, 'M2')] = 0.573093
518 s2[('S3', 3, 'M2')] = 0.652863
519 s2[('S4', 3, 'M2')] = 0.663167
520 s2[('S3', 4, 'M2')] = 0.953022
521 s2[('S4', 4, 'M2')] = 0.96333
522 s2[('S3', 5, 'M2')] = 1.15318
523 s2[('S4', 5, 'M2')] = 1.16345
524 """
525 s1,s2,s3 = {},{},{}
526 for line in seqlines.split('\n'):
527 exec line in dict(s1=s1,s2=s2,s3=s3)
528
529
530
531
532 for q,s in [(q1,s1),(q2,s2),(q3,s3)]: process_queue(q,s,test=True)
533 streamer.header('M2',('H2',25))
534 t1 = post('S1','M1',[1,2,1,2,3,2])
535 t2 = post('S2','M1',[1,2,1,2,3,2])
536 t3 = post('S3','M2',[1,2,1,2,3,2])
537 t4 = post('S4','M2',[1,2,1,2,3,2])
538 listen('M1',q1)
539 listen('M1',q3)
540 listen('M2',q3)
541 time.sleep(5*rate)
542 hangup(None,q3)
543 hangup('M1',q1)
544 listen('M2',q2)
545 t1.join()
546 t2.join()
547 t3.join()
548 t4.join()
549
552 if kw.get('echo',False): print "==== Starting local demo ===="
553 _run_test(message_stream, *args, **kw)
554
556 import threading, time
557
558 if kw.get('echo',False): print "==== Starting remote demo ===="
559
560 server = 'localhost:51932'
561 thread = threading.Thread(target=serve,kwargs=dict(server=server))
562 thread.setDaemon(True)
563 thread.start()
564 time.sleep(0.1)
565 streamer = RemoteStream(server)
566 _run_test(streamer, *args, **kw)
567 streamer.close()
568 message_service.server_close()
569
571 import time
572 logging.basicConfig(level=logging.DEBUG)
573
574
575 _run_remote(echo=True,rate=.1)
576
578 _run_local()
579 _run_remote()
580
581
582 if __name__ == "__main__":
583 demo()
584
585