| Home | Trees | Indices | Help |
|
|---|
|
|
Asynchronous message streams.
When you have multiple listeners to a process, some of which can connect and disconnect at different times, you need to dispatch the incoming messages to all the listeners. When a listener joins a running stream, they need to get an immediate status update for the computation. This is stored as the stream header. Then, without dropping any messages, the listeners should receive all subsequent messages in the stream in order.
The message stream is multi-channelled, and indexed by a key. Within the service framework the key is likely to be the jobid associated with the message stream.
The contents of the message stream can be anything. When a new listener is registered, the header is immediately put on the queue (if there is one), then all subsequent message are sent until the listener calls hangup().
To attach to a message stream you need an object which accepts put(). An asynchronous queue object is a good choice since it allows you to buffer the messages in one thread and pull them off as needed in another.
Messages are served using TCP transport on the port specified when starting the message server.
The wire format is for publishing messages is JSON, which looks like:
{
'action':'header|delete|put|listen|hangup|close',
'stream':'streamname',
'message': content
}
The content itself will be application dependent. Base types include:
string: 'string'
integer: digits
float: digits.digits
lists: [value, value, ...]
structs: {'key':value, 'key':value, ...}
Objects will be encoded as dictionaries:
{
'.class': 'my.message',
'.version': '0.0',
'.state': {
'key1': value1
'key2': value1
...
}
}
Subscribers to the message service will receive actions with put or close.
Note that for processing convenience in identifying the end of the message, there are no newlines allowed as part of the message. They are shown above for clarity.
The particular wire format of the message can be changed by replacing msgstream.codec, with static methods for encode and decode. See park.util.serial for details.
| Classes | |
|
RemoteStream Monitor remote message streams. |
|
| Functions | |||
|
|||
| Variables | |
message_stream = MessageStream()
|
|
| Home | Trees | Indices | Help |
|
|---|
| Generated by Epydoc 3.0.1 on Mon Mar 16 15:03:12 2009 | http://epydoc.sourceforge.net |