Package park :: Package core :: Module condition

Source Code for Module park.core.condition

  1  # This program is public domain 
  2  """ 
  3  Workflow management 
  4   
  5  When a job is scheduled it can have associated conditions, such as the 
  6  completion of other jobs, fetching of data from a server, or availability 
  7  of certain resources. 
  8   
  9  Since it can be tedious to specify the service each time when setting 
 10  up a sophisticated workflow on a remote service, we allow the following 
 11  shorthand: 
 12   
 13      with service: 
 14          job1 
 15          job2 
 16          ... 
 17   
 18  This sets default_service to service for all commands that submit work 
 19  to a service. 
 20   
 21  Example 
 22  ======= 
 23   
 24  The following example generates a complicated model, e.g., by running some 
 25  set of simulations to see what the expected starting point of a fit 
 26  should be.  The fit also needs some large file from the SNS that needs 
 27  to be reduced.  After all this is ready then the refinement can begin. 
 28   
 29  We also need to make sure that our locally defined model is available on 
 30  the cluster. 
 31   
 32  The code for setting up this computation would look something like the 
 33  following:: 
 34   
 35      import park 
 36      from reflectometry.reduction import SnsReduce 
 37      import mymodel 
 38   
 39      # We are dealing with a couple of different cluster queues 
 40      # authentication happens when we connect to the service. 
 41      # Park maintains a local authentication server to manage 
 42      # authentication between sessions.  Think ssh-agent. 
 43      sns = park.remote_service('https://sns.ornl.gov/service') 
 44      compufans = park.remote_service('compufans') 
 45   
 46      # Start reduction on the SNS cluster 
 47      reduction = SnsReduce(infile='/snsdata/RefL/bigfile.nxs', 
 48                            outfile='~/shared/bigfile.refl', 
 49                            service=sns) 
 50   
 51      # Rather than specifying the service for each request, set the 
 52      # default service for a group of requests. 
 53      with compufans: 
 54          # Transfer my model to the cluster; I can do this because I have 
 55          # local privileges.  The setup.py for mymodel will have to build 
 56          # an egg and put it in a predictable place. 
 57          upload = park.install_service(mymodel) 
 58   
 59          # Transfer the reduced file from the SNS to the local cluster 
 60          # when the reduction is complete.  Since this is a point-to-point 
 61          # remote transfer, we need to grab our authentication token from 
 62          # the sns and pass it to the cluster to do the fetching.  We don't 
 63          # actually care about the destination filename.  Let the fetch 
 64          # command choose it. 
 65          sns_auth = park.credentials(service=sns) 
 66          fetch = park.fetch_url( 
 67              #url = 'https://sns.ornl.gov/service/pkienzle/bigfile.refl', 
 68              url = reduction.url, 
 69              auth=sns_auth, 
 70              when=reduction.iscompleted) 
 71   
 72          # Prepare the model on the local cluster.  This is a simulation which 
 73          # does a lot of work to determine what kinds of models are feasible 
 74          # before trying to do any refinement.  Again, we don't actually care 
 75          # about the output filename, so let the service choose it. 
 76          model = mymodel.prefit_simulation(when=upload.iscompleted) 
 77   
 78          # Define the fit as a combination of the model and the data.  Assume 
 79          # that model defined in model.filename sets the fitting parameters 
 80          # appropriately. 
 81          assembly = mymodel.define_fit(model=model.filename, 
 82                                        data=fetch.filename) 
 83   
 84          # Run the fit on the local cluster. 
 85          fit = park.Fit(assembly,when=model.iscompleted&fetch.iscompleted) 
 86   
 87  The effect of this workflow is a declarative logic much like make:: 
 88   
 89      reduction: 
 90          SnsReduce 
 91      fetch: reduction 
 92          FetchURL 
 93      upload: 
 94          park.upload_package 
 95      model: upload 
 96          mymodel.sim 
 97      assembly: 
 98          mymodel.refine 
 99      fit: model, assembly 
100          Fit 
101   
102  In fact the logic is more powerful because we have boolean operators 
103  (and=&, or=| and not=-), allowing us to e.g., run two simulations and 
104  wait for either of them to complete. 
105   
106  Note: fetch and model filenames need to be generated immediately 
107  when the job is created for this logic to work; we may need to 
108  rethink the connection between the output of one job and the input 
109  of another since we don't want the script to block on submission 
110  to the queue.  A solution is to make the filename property on the 
111  fetch proxy do the blocking. 
112   
113  Note: if any of these pieces fail, then the dependent jobs will be 
114  cancelled.  We need the various services to memoize so that when the 
115  problem is fixed and the script is rerun there is a slight delay 
116  in checking that the pieces are complete, but it does not redo the 
117  work of running simulations or transfering large files. 
118   
119  Note: making sure that the version number of local and remote models 
120  are the same will be part of job submission.  The version number 
121  of the relevant packages should be part of the memoize key.  Version 
122  numbers of dependencies needs to be incorporated in this infrastructure. 
123   
124  Model versions for standard models can be installed implicitly on the 
125  server.  This can be done using easy_install from the danse package 
126  repository.  The server will import them with the appropriate requires 
127  commands.  For models which are not part of the danse repository, 
128  either because they are private to the user or because they are under 
129  active development, we need a way to transfer the model from the 
130  client machine to the server.  Again, we could rely on implicit 
131  mechanisms, where the local machine is treated as a danse repository, 
132  or we can use an explicit park.install as we have done above. 
133   
134  A note on security: if the user is not authorized to upload modules, 
135  we will need a mechanism for querying the server for the available 
136  service versions and use one of those instead. 
137   
138  Note: a condition on one service being tested on another service may 
139  cause authentication complications.  The condition may need to ship 
140  an authentication token to the fetching machine, or otherwise bounce 
141  the authentication off the agent on the client. 
142   
143  Defining Conditions 
144  =================== 
145   
146  Conditions are complicated: they are set up by the client but 
147  processed on the server.  In order for the condition to act 
148  properly on the client side we need a proxy which doubles as a 
149  condition definition for the service and which the client can use 
150  to test the status of a running job. 
151   
152  TODO: extended use case to start multiple jobs at the same time, and kill 
153  all the others once one of them is "good enough". 
154   
155  """ 
156   
157  __all__ = ['IsFetched','IsComplete','IsCancelled'] 
158   
159  import itertools 
160   
161  from park.core import message 
162  from park.util.threads import threaded 
163  import park.util.mathcontext 
164 165 166 -class Depends(object):
167 """ 168 Manage conditional dependencies. 169 170 Keep track of the set of jobs that depend on a particular condition 171 so that when all the conditions satisfied the job can be run, or when 172 the dependent jobs are cancelled the job can be run. 173 174 Depends should be treated as a singleton. 175 """ 176 177 STREAM="condition" 178 """Name of the message stream on which condition updates are posted."""
179 - class Notify(message.Message):
180 """ 181 Message posted on the condition stream when a condition is satisfied 182 or cancelled. All dependent jobs must be checked to see if they 183 are ready to run. 184 """
185 - def __init__(self, id=id, status='ready'):
186 self.id, self.status = id,status
187 188 _unique_id = itertools.count()
189 - def __init__(self):
190 self.jobs = {} 191 self.dependencies = {} 192 message_stream.listen(self.STREAM,self)
193 194 @staticmethod
195 - def notify(atom):
196 """ 197 Note that a precondition has been met. 198 199 This sends a Depends.Notify message to the job queue containing 200 the atom id and the completion status or 'ready' or 'cancelled'. 201 """ 202 status = 'cancelled' if atom.cancelled else 'ready' 203 message_stream.put(Depends.STREAM,Depends.Notify(id=atom.id, 204 status=status))
205
206 - def put(self, msg):
207 """ 208 Process a completed condition. 209 210 msg is a Depends.Notify message containing the atom that has 211 been completed. 212 """ 213 atomid,status = msg.id,msg.status 214 self.resolve(atom.id)
215
216 - def register(self, condition, ready, cancel):
217 """ 218 Record the dependency of the job on the various conditions. 219 220 condition is a conditional expression 221 ready is a function to call when the condition is ready 222 cancel is a function to call when the condition is complete 223 """ 224 jobid = self._unique_id.next() 225 self.jobs[jobid] = (condition, ready, cancel) 226 227 # Note the job in the jobset for all atoms 228 for atom in condition.depends: 229 jobset = get(self.dependencies,atom.id,set([])) 230 jobset.add(jobid) 231 self.dependencies[atom.id] = jobset
232
233 - def resolve(self, atomid):
234 """ 235 Resolve dependencies given that the status of atomid has changed. 236 237 Any jobs that are now ready are run. Any whose dependencies 238 are cancelled are cancelled. 239 """ 240 jobset = self.dependencies[atomid] 241 del self.dependencies[atomid] 242 for jobid in jobset: 243 condition,ready,canel = self.jobs[jobid] 244 245 if condition.cancelled: 246 del self.jobs[jobid] 247 cancel() 248 249 elif condition.ready: 250 del self.jobs[jobid] 251 ready()
252
253 254 -class Condition(object):
255 """ 256 Condition that must be satisfied before the job can be run. 257 This will include URLs which must be fetched and job ids that must be 258 complete before the job can be run. All preconditions must have an 259 attribute 'ready' which is True if the precondtion is complete. 260 """
261 - class Cancelled(Exception):
262 """ 263 Exception raised by condition.ready if condition cannot be satisfied. 264 """
265 266 _ready = False 267 _cancelled = False
268 - def _getready(self):
269 if self._cancelled: raise Condition.Cancelled 270 return self._ready
271 - def _getdepends(self):
272 return set()
273 - def _getcancelled(self):
274 return self._cancelled
275 ready = property(fget=lambda self: self._getready(), 276 doc="True if condition is satisfied") 277 depends = property(fget=lambda self: self._getdepends(), 278 doc="List of dependent conditions") 279 cancelled = property(fget=lambda self: self._getcancelled(), 280 doc="True if condition was cancelled")
281 - def __and__(self, condition):
282 return And(self, condition)
283 - def __or__(self, condition):
284 return Or(self, condition)
285 - def __invert__(self):
286 return Not(self)
287 - def __call__(self):
288 return self.ready
289 - def __str__(self): return "Condition"
290
291 -class And(Condition):
292 """ 293 Wait for condition 1 and condition 2 294 295 ready returns True if both conditions are satisfied. 296 cancelled returns True if either condition is cancelled 297 depends lists the dependencies of both conditions 298 299 ready raises Condition.Cancelled if either conditions is cancelled 300 """
301 - def __init__(self, left, right):
302 self.left,self.right = left,right
303 - def _getready(self):
304 return self.left.ready and self.right.ready
305 - def _getcancelled(self):
306 return self.left.cancelled or self.right.cancelled
307 - def _getdepends(self):
308 return self.left.depends | self.right.depends
309 - def __str__(self): return "(%s and %s)"%(self.left,self.right)
310
311 -class Or(Condition):
312 """ 313 Wait for condition 1 or condition 2 314 315 ready returns True if either condition is satisfied. 316 cancelled returns True if both conditions are cancelled 317 depends lists the dependencies of both conditions 318 319 ready raises Condition.Cancelled if both conditions are cancelled 320 """
321 - def __init__(self, left, right):
322 self.left,self.right = left,right
323 - def _getready(self):
324 # Want short-circuit __or__ so that the right condition is not 325 # computed if the left condition is not satisfied. Don't know 326 # what the python interpreter does, so don't use the following: 327 # return self.left.ready or self.right.ready 328 try: 329 if self.left.ready: return True 330 except Condition.Cancelled: 331 # Don't care if left condition completes on or; check the 332 # right condition. 333 pass 334 # If let and right are cancelled, then propagate cancelled 335 return self.right.ready
336 - def _getcancelled(self):
337 return self.left.cancelled and self.right.cancelled
338 - def _getdepends(self):
339 return self.left.depends | self.right.depends
340 - def __str__(self): return "(%s or %s)"%(self.left,self.right)
341
342 -class Not(Condition):
343 """ 344 Wait until condition 1 is not satisfied. 345 346 ready returns True if condition is not satisfied 347 """
348 - def __init__(self, condition):
349 self.condition = condition
350 - def _getready(self):
351 return not self.condition.ready
352 - def _getcancelled(self):
353 return self.condition.cancelled
354 - def _getdepends(self):
355 return self.condition.depends
356 - def __str__(self): return "not "+str(self.condition)
357
358 -class Atom(Condition):
359 """ 360 The work to perform to satisfy a condition. 361 362 An atom is a full condition, with a ready state, a cancelled state 363 and a set of dependencies. The dependency is the atom itself. 364 365 Each atom has a start method which causes the work to be performed. 366 When the work is complete, the self.notify function should be called to 367 trigger any dependent work. 368 369 Generally the same notify method will be used for all atoms. The 370 default is to call park.util.condition.Depends.notify, which uses 371 the message_stream to notify the running queue that 372 function 373 """ 374 __unique_id = itertools.count() 375 send = Depends.notify
376 - def __init__(self):
377 """ 378 Generate a new atom with a unique id. 379 """ 380 Condition.__init__(self) 381 self.id = self.__unique_id.next()
382 - def _getdepends(self):
383 """ 384 The condition is dependent only on self. 385 """ 386 return set([self])
387 - def notify(self):
388 self.send(self)
389 - def start(self):
390 """ 391 Run the condition. 392 393 Do whatever processing is required to satisfy the condition then 394 call self.notify(). Unless the processing is trivial the work 395 should be done in a separate thread. Use the @threaded decorator 396 from park.util.threads to mark the start method as threaded so 397 that the caller doesn't have to know if the work is trivial. 398 """ 399 raise NotImplementedError
400
401 -class IsMessage(Atom):
402 """ 403 Wait on a message in the message stream. 404 """
405 - def __init__(self, key):
406 Atom.__init__(self) 407 self.key = key
408 409 @threaded
410 - def start(self):
411 """ 412 Listen on the message queue for all messages associated with the key. 413 Filter each message until we find one that satisfies the condition. 414 Posts a Condition message saying that the condition can continue. 415 """ 416 # Register with job 417 queue = Queue.Queue() 418 msgstream.listen(self.key, queue) 419 while True: 420 msg = queue.get() 421 queue.task_done() 422 fn_name = "OnMessage"+msg.__class__.__name__ 423 fn = getattr(self, fn_name, self.OnMessage) 424 # User defined message filter: make sure to intercept errors 425 # and cancel the job if they occur. 426 try: 427 if fn(msg): break 428 except: 429 self._cancelled = True 430 self.notify() 431 raise 432 433 # Trigger the job queue that a condition has been satisfied 434 self._ready = True 435 self.notify()
436
437 - def OnMessage(self,msg):
438 """ 439 Message filter for unhandled messages. 440 441 By default they are ignored, but subclasses can override. 442 """ 443 return False
444
445 - def OnLastMessage(self, msg):
446 """ 447 Message filter for Aborted, Error and Completed. 448 449 If message stream ends before the condition is satisfied, then 450 the condition is cancelled. 451 """ 452 self._cancelled = True 453 return True
454 OnAborted = OnLastMessage 455 OnError = OnLastMessage 456 OnCompleted = OnLastMessage
457 - def __str__(self): return 'IsMessage'
458
459 -class IsCompleted(IsMessage):
460 """ 461 Wait for job to complete 462 """
463 - def OnCompleted(self, msg):
464 return True
465 - def __str__(self): return 'IsCompleted'
466
467 -class IsImproved(IsMessage):
468 """ 469 Wait for job result which matches expression. 470 471 Follows the message stream for the job waiting for an 'Improved' message. 472 For each message, evaluate an expression, signalling ready when that 473 expression is True. Expressions can use any functions 474 Expressions involving the attributes of the to be better than expression. Result is assumed 475 to be a dictionary of values and variables. 476 """
477 - def __init__(self, expression="True"):
478 IsMessage.__init__(self) 479 self.expression = expression
480 - def OnImproved(self, msg):
481 success = park.util.mathcontext.eval(self.expression,msg.__dict__) 482 return success
483 - def __str__(self): return 'IsCompleted'
484
485 -class IsFetched(Atom):
486 """ 487 Wait for download to complete. 488 """
489 - def __init__(self, url):
490 Atom.__init__(self) 491 self.url = url
492 @threaded
493 - def start(self):
494 raise NotImplementedError
495 # queue the download request and wait for it to complete 496 # let the fetcher decide how it wants to balance resources
497 - def __str__(self): return 'IsFetched(%s)'%url
498
499 -class IsCompletedRequest(Condition):
500 - def __init__(self, jobid):
501 self.jobid = jobid
502
503 504 # TODO: reduce the number of threads 505 # message filters can all live in the same thread rather than creating 506 # one thread per condition. This is a performance concern rather than 507 # a functionality concern so delay action for now. 508 509 # ==================== Test code =============================== 510 -def _check(expr, ready, depends):
511 #print "expr",expr 512 if ready is Exception: 513 try: expr.ready 514 except Condition.Cancelled: pass 515 else: raise AssertionError(str(expr)+".ready does not raise Cancelled") 516 if expr.cancelled != True: 517 raise AssertionError(str(expr)+".cancelled expected True") 518 else: 519 try: 520 if expr.ready != ready: 521 raise AssertionError(str(expr)+".ready expected "+str(ready)) 522 except Condition.Cancelled: 523 raise AssertionError(str(expr)+".ready raised Cancelled") 524 if expr.cancelled != False: 525 raise AssertionError(str(expr)+".cancelled expected False") 526 assert expr.depends == depends, str(expr)+".depends=[%s], not [%s]"%( 527 ",".join(str(s) for s in expr.depends), 528 ",".join(str(s) for s in depends))
529
530 531 -def test():
532 true = Atom(); true._ready = True 533 false = Atom(); false._ready = False 534 cancelled = Atom(); cancelled._cancelled = True 535 _check((true|false)&true, True, set([true,false])) 536 _check((cancelled | true), True, set([true,cancelled])) 537 _check((cancelled | false), False, set([false,cancelled])) 538 _check((cancelled | cancelled), Exception, set([cancelled])) 539 _check((cancelled & true), Exception,set([true,cancelled])) 540 _check((cancelled & false), Exception,set([false,cancelled])) 541 _check((cancelled & cancelled), Exception, set([cancelled])) 542 _check(~true,False,set([true])) 543 _check(~false,True,set([false])) 544 _check(~cancelled,Exception,set([cancelled]))
545 ##Forced error to make sure our error checking works 546 #_check((cancelled & cancelled), True, set([true])) 547 548 if __name__ == "__main__": test() 549