Package park :: Package core :: Module message

Source Code for Module park.core.message

  1  # This program is public domain 
  2  """ 
  3  Standard Park messages. 
  4   
  5  Communication amongst clients, workers and services within park is 
  6  handled by passing messages between them.  These messages can signal 
  7  actions to be taken or results from actions. 
  8   
  9  The standard messages are grouped below. 
 10   
 11  :group service: Abort, Pause, Resume, Control 
 12  :group monitor: Started, Joined, Progress, Improved, Completed, Aborted 
 13  :group job queue: Cleanup, Purge 
 14  :group other: Message, Error, Log, Checkpoint 
 15  """ 
 16  __all__ = ['Message'] 
 17   
 18  import sys 
 19  import logging 
 20  import traceback 
 21   
 22   
23 -class Message(object):
24 """ 25 Message type 26 """
27 - def __init__(self, **kw): self.__dict__ = kw
28 - def __str__(self): return self.__class__.__name__
29 30 # Signals for jobs
31 -class Abort(Message):
32 """ 33 Signal that the computation should be aborted. 34 35 The wait time is the amount of time to give the computation a chance 36 to return an improved result. The default is None for immediate abort. 37 """
38 - def __init__(self, wait=None):
39 self.wait = wait
40
41 -class Pause(Message):
42 """ 43 Signal that the computation should pause at the next convenient 44 stopping point. 45 """
46
47 -class Resume(Message):
48 """ 49 Signal that the computation should resume. 50 """
51
52 -class Set(Message):
53 """ 54 Set attributes on the running job. 55 """
56 - def __init__(self, **kw):
57 self.attrs = kw
58
59 -class Invoke(Message):
60 """ 61 Invoke a method on the running job. 62 """
63 - def __init__(self, method, *args, **kw):
64 self.method = method 65 self.args = args 66 self.kw = kw
67 - def __str__(self):
68 return "Invoke "+self.method
69 70 # Signals for the job queue
71 -class Cleanup(Message):
72 """ 73 Signal that intermediate files associated with a job should be 74 removed from the service. 75 """
76
77 -class Purge(Message):
78 """ 79 Signal that all information associated with a job should be 80 removed from the service 81 """
82 83 # Monitor from jobs to the job queue
84 -class Checkpoint(Message):
85 """ 86 Checkpoint information required to restore the job to the 87 given state after server crash. 88 """
89 - def __init__(self, state):
90 self.state = state
91 92 # Monitors
93 -class Started(Message):
94 """ 95 Started. 96 97 Sent when the job has started processing. 98 99 Accepts an arbitrary list of keyword arguments. 100 101 Clients joining a stream will receive either a Started message 102 or a Joined message depending on whether the stream is already 103 flowing. 104 """
105
106 -class Joined(Message):
107 """ 108 Joined. 109 110 Sent when the listener is attached to a running job. This is 111 a combination of Progress and Improved. 112 """
113 - def __init__(self, progress, improved):
114 self.progress = progress 115 """Most recent progress message""" 116 self.improved = improved 117 """Most recent improved message"""
118
119 -class Progress(Message):
120 """ 121 Progress: k units of n. 122 123 Sent when a certain amount of progress has happened. 124 125 Use the job controller to specify the reporting 126 frequency (time and/or percentage). 127 """
128 - def __init__(self, complete=None, total=None, units=None):
129 self.total = total 130 """Total work to complete""" 131 self.complete = complete 132 """Amount of work complete""" 133 self.units = units 134 """Units of work, or None"""
135 - def __str__(self):
136 if self.units is not None: 137 return "Progress: %s %s of %s"%(self.complete, self.units, self.total) 138 else: 139 return "Progress: %s of %s"%(self.complete, self.total)
140
141 -class Improved(Message):
142 """ 143 Improved: partial result. 144 145 Use the job controller to specify the improvement frequency 146 (time and/or percentage). 147 148 The format of the results message is service dependent. 149 """
150
151 -class Completed(Message):
152 """ 153 Completed: final result. 154 155 The format of the results message is service dependent. 156 """
157
158 -class Error(Message):
159 """ 160 Error: trace 161 162 Job has terminated because of errors. 163 """
164 - def __init__(self, trace=None):
165 if trace == None: trace = sys.exc_info() 166 self.trace = trace 167 """The stack trace returned from exc_info()"""
168 - def __str__(self):
169 #print "traceback",traceback.format_exception(*self.trace) 170 try: 171 return "".join(traceback.format_exception(*self.trace)) 172 except TypeError: 173 return "Error: "+str(self.trace)
174
175 -class Aborted(Message):
176 """ 177 Aborted: partial result 178 179 Job terminated before completion, possibly in response to an Abort 180 signal. 181 """
182
183 -class Paused(Message):
184 """ 185 Paused 186 187 Job is suspended in response to a Pause signal. 188 """
189
190 -class Resumed(Message):
191 """ 192 Resumed 193 194 Job is running in response to a Resume signal. 195 """
196
197 -class Log(Message):
198 """ 199 Log module.function: log record 200 """ 201 formatter = logging.Formatter("Log %(module)s.%(funcName)s: %(message)s")
202 - def __init__(self, record):
203 self.record = record 204 """The partial result completed; this is job specific"""
205 - def __str__(self):
206 return self.formatter.format(self.record)
207