1
2 """
3 Workflow management
4
5 When a job is scheduled it can have associated conditions, such as the
6 completion of other jobs, fetching of data from a server, or availability
7 of certain resources.
8
9 Since it can be tedious to specify the service each time when setting
10 up a sophisticated workflow on a remote service, we allow the following
11 shorthand:
12
13 with service:
14 job1
15 job2
16 ...
17
18 This sets default_service to service for all commands that submit work
19 to a service.
20
21 Example
22 =======
23
24 The following example generates a complicated model, e.g., by running some
25 set of simulations to see what the expected starting point of a fit
26 should be. The fit also needs some large file from the SNS that needs
27 to be reduced. After all this is ready then the refinement can begin.
28
29 We also need to make sure that our locally defined model is available on
30 the cluster.
31
32 The code for setting up this computation would look something like the
33 following::
34
35 import park
36 from reflectometry.reduction import SnsReduce
37 import mymodel
38
39 # We are dealing with a couple of different cluster queues
40 # authentication happens when we connect to the service.
41 # Park maintains a local authentication server to manage
42 # authentication between sessions. Think ssh-agent.
43 sns = park.remote_service('https://sns.ornl.gov/service')
44 compufans = park.remote_service('compufans')
45
46 # Start reduction on the SNS cluster
47 reduction = SnsReduce(infile='/snsdata/RefL/bigfile.nxs',
48 outfile='~/shared/bigfile.refl',
49 service=sns)
50
51 # Rather than specifying the service for each request, set the
52 # default service for a group of requests.
53 with compufans:
54 # Transfer my model to the cluster; I can do this because I have
55 # local privileges. The setup.py for mymodel will have to build
56 # an egg and put it in a predictable place.
57 upload = park.install_service(mymodel)
58
59 # Transfer the reduced file from the SNS to the local cluster
60 # when the reduction is complete. Since this is a point-to-point
61 # remote transfer, we need to grab our authentication token from
62 # the sns and pass it to the cluster to do the fetching. We don't
63 # actually care about the destination filename. Let the fetch
64 # command choose it.
65 sns_auth = park.credentials(service=sns)
66 fetch = park.fetch_url(
67 #url = 'https://sns.ornl.gov/service/pkienzle/bigfile.refl',
68 url = reduction.url,
69 auth=sns_auth,
70 when=reduction.iscompleted)
71
72 # Prepare the model on the local cluster. This is a simulation which
73 # does a lot of work to determine what kinds of models are feasible
74 # before trying to do any refinement. Again, we don't actually care
75 # about the output filename, so let the service choose it.
76 model = mymodel.prefit_simulation(when=upload.iscompleted)
77
78 # Define the fit as a combination of the model and the data. Assume
79 # that model defined in model.filename sets the fitting parameters
80 # appropriately.
81 assembly = mymodel.define_fit(model=model.filename,
82 data=fetch.filename)
83
84 # Run the fit on the local cluster.
85 fit = park.Fit(assembly,when=model.iscompleted&fetch.iscompleted)
86
87 The effect of this workflow is a declarative logic much like make::
88
89 reduction:
90 SnsReduce
91 fetch: reduction
92 FetchURL
93 upload:
94 park.upload_package
95 model: upload
96 mymodel.sim
97 assembly:
98 mymodel.refine
99 fit: model, assembly
100 Fit
101
102 In fact the logic is more powerful because we have boolean operators
103 (and=&, or=| and not=-), allowing us to e.g., run two simulations and
104 wait for either of them to complete.
105
106 Note: fetch and model filenames need to be generated immediately
107 when the job is created for this logic to work; we may need to
108 rethink the connection between the output of one job and the input
109 of another since we don't want the script to block on submission
110 to the queue. A solution is to make the filename property on the
111 fetch proxy do the blocking.
112
113 Note: if any of these pieces fail, then the dependent jobs will be
114 cancelled. We need the various services to memoize so that when the
115 problem is fixed and the script is rerun there is a slight delay
116 in checking that the pieces are complete, but it does not redo the
117 work of running simulations or transfering large files.
118
119 Note: making sure that the version number of local and remote models
120 are the same will be part of job submission. The version number
121 of the relevant packages should be part of the memoize key. Version
122 numbers of dependencies needs to be incorporated in this infrastructure.
123
124 Model versions for standard models can be installed implicitly on the
125 server. This can be done using easy_install from the danse package
126 repository. The server will import them with the appropriate requires
127 commands. For models which are not part of the danse repository,
128 either because they are private to the user or because they are under
129 active development, we need a way to transfer the model from the
130 client machine to the server. Again, we could rely on implicit
131 mechanisms, where the local machine is treated as a danse repository,
132 or we can use an explicit park.install as we have done above.
133
134 A note on security: if the user is not authorized to upload modules,
135 we will need a mechanism for querying the server for the available
136 service versions and use one of those instead.
137
138 Note: a condition on one service being tested on another service may
139 cause authentication complications. The condition may need to ship
140 an authentication token to the fetching machine, or otherwise bounce
141 the authentication off the agent on the client.
142
143 Defining Conditions
144 ===================
145
146 Conditions are complicated: they are set up by the client but
147 processed on the server. In order for the condition to act
148 properly on the client side we need a proxy which doubles as a
149 condition definition for the service and which the client can use
150 to test the status of a running job.
151
152 TODO: extended use case to start multiple jobs at the same time, and kill
153 all the others once one of them is "good enough".
154
155 """
156
157 __all__ = ['IsFetched','IsComplete','IsCancelled']
158
159 import itertools
160
161 from park.core import message
162 from park.util.threads import threaded
163 import park.util.mathcontext
167 """
168 Manage conditional dependencies.
169
170 Keep track of the set of jobs that depend on a particular condition
171 so that when all the conditions satisfied the job can be run, or when
172 the dependent jobs are cancelled the job can be run.
173
174 Depends should be treated as a singleton.
175 """
176
177 STREAM="condition"
178 """Name of the message stream on which condition updates are posted."""
179 - class Notify(message.Message):
180 """
181 Message posted on the condition stream when a condition is satisfied
182 or cancelled. All dependent jobs must be checked to see if they
183 are ready to run.
184 """
187
188 _unique_id = itertools.count()
193
194 @staticmethod
196 """
197 Note that a precondition has been met.
198
199 This sends a Depends.Notify message to the job queue containing
200 the atom id and the completion status or 'ready' or 'cancelled'.
201 """
202 status = 'cancelled' if atom.cancelled else 'ready'
203 message_stream.put(Depends.STREAM,Depends.Notify(id=atom.id,
204 status=status))
205
206 - def put(self, msg):
207 """
208 Process a completed condition.
209
210 msg is a Depends.Notify message containing the atom that has
211 been completed.
212 """
213 atomid,status = msg.id,msg.status
214 self.resolve(atom.id)
215
216 - def register(self, condition, ready, cancel):
217 """
218 Record the dependency of the job on the various conditions.
219
220 condition is a conditional expression
221 ready is a function to call when the condition is ready
222 cancel is a function to call when the condition is complete
223 """
224 jobid = self._unique_id.next()
225 self.jobs[jobid] = (condition, ready, cancel)
226
227
228 for atom in condition.depends:
229 jobset = get(self.dependencies,atom.id,set([]))
230 jobset.add(jobid)
231 self.dependencies[atom.id] = jobset
232
234 """
235 Resolve dependencies given that the status of atomid has changed.
236
237 Any jobs that are now ready are run. Any whose dependencies
238 are cancelled are cancelled.
239 """
240 jobset = self.dependencies[atomid]
241 del self.dependencies[atomid]
242 for jobid in jobset:
243 condition,ready,canel = self.jobs[jobid]
244
245 if condition.cancelled:
246 del self.jobs[jobid]
247 cancel()
248
249 elif condition.ready:
250 del self.jobs[jobid]
251 ready()
252
255 """
256 Condition that must be satisfied before the job can be run.
257 This will include URLs which must be fetched and job ids that must be
258 complete before the job can be run. All preconditions must have an
259 attribute 'ready' which is True if the precondtion is complete.
260 """
262 """
263 Exception raised by condition.ready if condition cannot be satisfied.
264 """
265
266 _ready = False
267 _cancelled = False
269 if self._cancelled: raise Condition.Cancelled
270 return self._ready
274 return self._cancelled
275 ready = property(fget=lambda self: self._getready(),
276 doc="True if condition is satisfied")
277 depends = property(fget=lambda self: self._getdepends(),
278 doc="List of dependent conditions")
279 cancelled = property(fget=lambda self: self._getcancelled(),
280 doc="True if condition was cancelled")
289 - def __str__(self): return "Condition"
290
291 -class And(Condition):
292 """
293 Wait for condition 1 and condition 2
294
295 ready returns True if both conditions are satisfied.
296 cancelled returns True if either condition is cancelled
297 depends lists the dependencies of both conditions
298
299 ready raises Condition.Cancelled if either conditions is cancelled
300 """
302 self.left,self.right = left,right
306 return self.left.cancelled or self.right.cancelled
308 return self.left.depends | self.right.depends
309 - def __str__(self): return "(%s and %s)"%(self.left,self.right)
310
311 -class Or(Condition):
312 """
313 Wait for condition 1 or condition 2
314
315 ready returns True if either condition is satisfied.
316 cancelled returns True if both conditions are cancelled
317 depends lists the dependencies of both conditions
318
319 ready raises Condition.Cancelled if both conditions are cancelled
320 """
322 self.left,self.right = left,right
324
325
326
327
328 try:
329 if self.left.ready: return True
330 except Condition.Cancelled:
331
332
333 pass
334
335 return self.right.ready
337 return self.left.cancelled and self.right.cancelled
339 return self.left.depends | self.right.depends
340 - def __str__(self): return "(%s or %s)"%(self.left,self.right)
341
342 -class Not(Condition):
343 """
344 Wait until condition 1 is not satisfied.
345
346 ready returns True if condition is not satisfied
347 """
357
358 -class Atom(Condition):
359 """
360 The work to perform to satisfy a condition.
361
362 An atom is a full condition, with a ready state, a cancelled state
363 and a set of dependencies. The dependency is the atom itself.
364
365 Each atom has a start method which causes the work to be performed.
366 When the work is complete, the self.notify function should be called to
367 trigger any dependent work.
368
369 Generally the same notify method will be used for all atoms. The
370 default is to call park.util.condition.Depends.notify, which uses
371 the message_stream to notify the running queue that
372 function
373 """
374 __unique_id = itertools.count()
375 send = Depends.notify
377 """
378 Generate a new atom with a unique id.
379 """
380 Condition.__init__(self)
381 self.id = self.__unique_id.next()
383 """
384 The condition is dependent only on self.
385 """
386 return set([self])
390 """
391 Run the condition.
392
393 Do whatever processing is required to satisfy the condition then
394 call self.notify(). Unless the processing is trivial the work
395 should be done in a separate thread. Use the @threaded decorator
396 from park.util.threads to mark the start method as threaded so
397 that the caller doesn't have to know if the work is trivial.
398 """
399 raise NotImplementedError
400
402 """
403 Wait on a message in the message stream.
404 """
408
409 @threaded
411 """
412 Listen on the message queue for all messages associated with the key.
413 Filter each message until we find one that satisfies the condition.
414 Posts a Condition message saying that the condition can continue.
415 """
416
417 queue = Queue.Queue()
418 msgstream.listen(self.key, queue)
419 while True:
420 msg = queue.get()
421 queue.task_done()
422 fn_name = "OnMessage"+msg.__class__.__name__
423 fn = getattr(self, fn_name, self.OnMessage)
424
425
426 try:
427 if fn(msg): break
428 except:
429 self._cancelled = True
430 self.notify()
431 raise
432
433
434 self._ready = True
435 self.notify()
436
438 """
439 Message filter for unhandled messages.
440
441 By default they are ignored, but subclasses can override.
442 """
443 return False
444
446 """
447 Message filter for Aborted, Error and Completed.
448
449 If message stream ends before the condition is satisfied, then
450 the condition is cancelled.
451 """
452 self._cancelled = True
453 return True
454 OnAborted = OnLastMessage
455 OnError = OnLastMessage
456 OnCompleted = OnLastMessage
457 - def __str__(self): return 'IsMessage'
458
460 """
461 Wait for job to complete
462 """
465 - def __str__(self): return 'IsCompleted'
466
468 """
469 Wait for job result which matches expression.
470
471 Follows the message stream for the job waiting for an 'Improved' message.
472 For each message, evaluate an expression, signalling ready when that
473 expression is True. Expressions can use any functions
474 Expressions involving the attributes of the to be better than expression. Result is assumed
475 to be a dictionary of values and variables.
476 """
483 - def __str__(self): return 'IsCompleted'
484
486 """
487 Wait for download to complete.
488 """
492 @threaded
494 raise NotImplementedError
495
496
498
502
503
504
505
506
507
508
509
510 -def _check(expr, ready, depends):
511
512 if ready is Exception:
513 try: expr.ready
514 except Condition.Cancelled: pass
515 else: raise AssertionError(str(expr)+".ready does not raise Cancelled")
516 if expr.cancelled != True:
517 raise AssertionError(str(expr)+".cancelled expected True")
518 else:
519 try:
520 if expr.ready != ready:
521 raise AssertionError(str(expr)+".ready expected "+str(ready))
522 except Condition.Cancelled:
523 raise AssertionError(str(expr)+".ready raised Cancelled")
524 if expr.cancelled != False:
525 raise AssertionError(str(expr)+".cancelled expected False")
526 assert expr.depends == depends, str(expr)+".depends=[%s], not [%s]"%(
527 ",".join(str(s) for s in expr.depends),
528 ",".join(str(s) for s in depends))
529
532 true = Atom(); true._ready = True
533 false = Atom(); false._ready = False
534 cancelled = Atom(); cancelled._cancelled = True
535 _check((true|false)&true, True, set([true,false]))
536 _check((cancelled | true), True, set([true,cancelled]))
537 _check((cancelled | false), False, set([false,cancelled]))
538 _check((cancelled | cancelled), Exception, set([cancelled]))
539 _check((cancelled & true), Exception,set([true,cancelled]))
540 _check((cancelled & false), Exception,set([false,cancelled]))
541 _check((cancelled & cancelled), Exception, set([cancelled]))
542 _check(~true,False,set([true]))
543 _check(~false,True,set([false]))
544 _check(~cancelled,Exception,set([cancelled]))
545
546
547
548 if __name__ == "__main__": test()
549