| Home | Trees | Indices | Help |
|
|---|
|
|
Workflow management
When a job is scheduled it can have associated conditions, such as the completion of other jobs, fetching of data from a server, or availability of certain resources.
Since it can be tedious to specify the service each time when setting up a sophisticated workflow on a remote service, we allow the following shorthand:
- with service:
- job1 job2 ...
This sets default_service to service for all commands that submit work to a service.
The following example generates a complicated model, e.g., by running some set of simulations to see what the expected starting point of a fit should be. The fit also needs some large file from the SNS that needs to be reduced. After all this is ready then the refinement can begin.
We also need to make sure that our locally defined model is available on the cluster.
The code for setting up this computation would look something like the following:
import park
from reflectometry.reduction import SnsReduce
import mymodel
# We are dealing with a couple of different cluster queues
# authentication happens when we connect to the service.
# Park maintains a local authentication server to manage
# authentication between sessions. Think ssh-agent.
sns = park.remote_service('https://sns.ornl.gov/service')
compufans = park.remote_service('compufans')
# Start reduction on the SNS cluster
reduction = SnsReduce(infile='/snsdata/RefL/bigfile.nxs',
outfile='~/shared/bigfile.refl',
service=sns)
# Rather than specifying the service for each request, set the
# default service for a group of requests.
with compufans:
# Transfer my model to the cluster; I can do this because I have
# local privileges. The setup.py for mymodel will have to build
# an egg and put it in a predictable place.
upload = park.install_service(mymodel)
# Transfer the reduced file from the SNS to the local cluster
# when the reduction is complete. Since this is a point-to-point
# remote transfer, we need to grab our authentication token from
# the sns and pass it to the cluster to do the fetching. We don't
# actually care about the destination filename. Let the fetch
# command choose it.
sns_auth = park.credentials(service=sns)
fetch = park.fetch_url(
#url = 'https://sns.ornl.gov/service/pkienzle/bigfile.refl',
url = reduction.url,
auth=sns_auth,
when=reduction.iscompleted)
# Prepare the model on the local cluster. This is a simulation which
# does a lot of work to determine what kinds of models are feasible
# before trying to do any refinement. Again, we don't actually care
# about the output filename, so let the service choose it.
model = mymodel.prefit_simulation(when=upload.iscompleted)
# Define the fit as a combination of the model and the data. Assume
# that model defined in model.filename sets the fitting parameters
# appropriately.
assembly = mymodel.define_fit(model=model.filename,
data=fetch.filename)
# Run the fit on the local cluster.
fit = park.Fit(assembly,when=model.iscompleted&fetch.iscompleted)
The effect of this workflow is a declarative logic much like make:
reduction:
SnsReduce
fetch: reduction
FetchURL
upload:
park.upload_package
model: upload
mymodel.sim
assembly:
mymodel.refine
fit: model, assembly
Fit
In fact the logic is more powerful because we have boolean operators (and=&, or=| and not=-), allowing us to e.g., run two simulations and wait for either of them to complete.
Note: fetch and model filenames need to be generated immediately when the job is created for this logic to work; we may need to rethink the connection between the output of one job and the input of another since we don't want the script to block on submission to the queue. A solution is to make the filename property on the fetch proxy do the blocking.
Note: if any of these pieces fail, then the dependent jobs will be cancelled. We need the various services to memoize so that when the problem is fixed and the script is rerun there is a slight delay in checking that the pieces are complete, but it does not redo the work of running simulations or transfering large files.
Note: making sure that the version number of local and remote models are the same will be part of job submission. The version number of the relevant packages should be part of the memoize key. Version numbers of dependencies needs to be incorporated in this infrastructure.
Model versions for standard models can be installed implicitly on the server. This can be done using easy_install from the danse package repository. The server will import them with the appropriate requires commands. For models which are not part of the danse repository, either because they are private to the user or because they are under active development, we need a way to transfer the model from the client machine to the server. Again, we could rely on implicit mechanisms, where the local machine is treated as a danse repository, or we can use an explicit park.install as we have done above.
A note on security: if the user is not authorized to upload modules, we will need a mechanism for querying the server for the available service versions and use one of those instead.
Note: a condition on one service being tested on another service may cause authentication complications. The condition may need to ship an authentication token to the fetching machine, or otherwise bounce the authentication off the agent on the client.
Conditions are complicated: they are set up by the client but processed on the server. In order for the condition to act properly on the client side we need a proxy which doubles as a condition definition for the service and which the client can use to test the status of a running job.
TODO: extended use case to start multiple jobs at the same time, and kill all the others once one of them is "good enough".
| Classes | |
|
IsFetched Wait for download to complete. |
|
| Home | Trees | Indices | Help |
|
|---|
| Generated by Epydoc 3.0.1 on Mon Mar 16 15:03:12 2009 | http://epydoc.sourceforge.net |