1 """
2 Define the service architecture.
3
4 The PARK fitting service manages groups of jobs from a number of users
5 at the same time. The jobs can be long running, with clients able
6 to connect and disconnect from multiple locations.
7
8 The goal is not a simple batch queueing system. Traditional queuing
9 systems allocate a group of processors to a particular job for a
10 fixed period of time. Instead we want dynamic resource allocation,
11 where individual work units are meted out according to the criteria
12 such as the relative priority of the jobs and the length of time they
13 are in the queue. Long running low priority jobs such as those which
14 are performing an exhaustive search on the parameter space should get
15 some resources even when higher priority jobs are still running. If
16 nothing else, individual users may contribute their desktop resources
17 to the pool at least for the purposes of completing their own jobs
18 quicker.
19
20 We still need to be able to tie into legacy systems with batch queues
21 such as torque. This is supported by starting a private parallel
22 kernel system on the allocated nodes which runs for the duration
23 of the job.
24
25 The top level of the queuing system is the job manager (`park.job.JobManager`).
26 This service keeps a list jobs in a persistent store, and manages the
27 interaction with the remote client. Individual jobs (`park.job.Job`)
28 have properties such as owner, priority and completion status.
29
30 In general jobs do not have a user id. Instead they are tagged with
31 information such as the users name, the service, the time the job
32 was started, service specific tags, and arbitrary strings assigned
33 by the user such as job name. Some servers may require user
34 authentication for example to track the user through the teragrid
35 requests, but applications should not rely on this.
36
37 Notification is handled through a message service. Users can also
38 request notification of job completion by email (daily digest by
39 default) by adding email details to the job submission request.
40
41 All interaction between the client, the job manager and the individual
42 jobs is handled through a service manager (`park.service.ServiceManager`).
43 This can be running locally for single system fits or remotely.
44
45 Resource allocation is based on credits toward job priority.
46 Each unit of work for higher priority jobs counts more toward usage,
47 so an individual user can choose between getting a small amount of
48 work done quickly or a larger amount of work done when the system
49 has more slack. The default priority is free.
50
51 Need a message stream for the job queue. That way as jobs are created,
52 destroyed, or otherwise change status the application can reflect these
53 changes in the GUI. The stream header will be the entire list of active jobs.
54 We will provide the client side data structure to keep a consistent view
55 of the list, but the GUI client will still need notification that changes
56 are occurring. The same technique can be used to maintain an AJAX view
57 of the job table.
58 """
59
60 from __future__ import with_statement
61 import os
62 import socket
63
64 from park.util.msgstream import message_stream
65
67 """
68 Convenience tag to help users identify jobs.
69
70 First try the following sources in order::
71
72 PARKTAG - the normal user tag
73 PARKMAIL - the user portion of the notification address
74 LOGNAME - the Unix user name, unless it is 'root'
75 USERNAME - the Windows user name unless it is 'Administrator'
76
77 If all else fails, use the host name which is connecting
78
79 """
80
81 if 'PARKTAG' in os.environ:
82 return os.environ['PARKTAG']
83
84 if 'PARKMAIL' in os.environ:
85 return os.environ['PARKMAIL'].split('@')[0]
86
87 if 'LOGNAME' in os.environ:
88 if os.environ['LOGNAME'] != 'root':
89 return os.environ['LOGNAME']
90 if 'USERNAME' in os.environ:
91 if os.environ['USERNAME'] != 'Administrator':
92 return os.environ['USERNAME']
93
94 return socket.gethostname()
95 TAGS = [usertag()]
96
97
98
100 """
101 Define the backend interface to the job manager and user database.
102 """
103
105 """
106 Connect to the service and establish user credentials.
107 """
108 self.user = authenticate(user)
109 self.queue = start_queue()
110
113
121
123 """
124 Set the user email.
125 """
126 self.email = email
127 self.user.email = email
128
129 - def stop(self, jobid):
130 """
131 Stop a job.
132
133 Raises KeyError if the job is not running.
134
135 This is likely triggered by the client as requested by
136 the user, but may also be triggered by the job itself
137 if it notices that it is failing.
138 """
139 self.queue.stop(jobid)
140
141 - def start(self, job, inputs=None):
142 """
143 Add a job to the job queue, returning jobid. Inputs is the
144 set of inputs to run through the job, or None if the job is
145 to run once between prepare and cleanup.
146 """
147 jobid = self.queue.start(job, inputs)
148 return jobid
149
154
156 """
157 Set the default service for subsequent requests.
158
159 Example::
160
161 with park.local_service:
162 M1 = Assembly((model,data))
163 fit = Fit(M1)
164 err = Uncertainty(M1, fit.result, after=fit)
165 result = err.wait()
166 """
167 service_stack.append(self)
168 return self
169
172
174 """
175 Client side view of a service.
176
177 Inherit from here when creating new service types.
178 """
179 - def jobs(self, filter=None):
180 """
181 Return a list of the jobs on the service which the user can access.
182
183 This list will include all jobs owned by the user on the remote
184 service that have not yet been archived or deleted.
185
186 Use a filter to select jobs of a particular type.
187 """
188 jobs = self.server.jobs(filter)
189 return [self._lookup(jobs) for job in jobs]
190
191 - def submit(self, request=None, monitor=None, when=None):
192 """
193 Add a new request to the job queue. Messages from the job are
194 sent to monitor.
195
196 Returns the job id for the request, which is unique to the queue.
197 """
198
200 """
201 Grab a copy of the named job from the server.
202
203 This should include enough information to reconstruct the job
204 request.
205 """
206
210
212 """
213 Abort a job.
214 """
215
217 """
218 Send a control message to the job.
219
220 Message should be pickleable. The structure of the message is
221 specific to the type of job. Messages will be queued in the
222 job control queue on the service until the job is ready to process
223 them. If the job has already been terminated then the message will
224 be ignored.
225
226 Control is asynchronous. Rather than trying to set up, e.g.,
227 job.population() which returns the current population from
228 a simulation, instead set up job.get_population() which sends
229 a control message to retrieve the population and set up an
230 on_population() method which is called when the population
231 is received on the job message stream.
232 """
233
234 - def wait(self, jobid=None, timeout=None):
235 """
236 Wait for the job to complete.
237
238 This is used in scripts to impose a synchronous interface
239 to the fitting service.
240 """
241 while self.queue[jobid].status != 'complete':
242 time.sleep(interval)
243 return self.job.handler.result
244
246 """
247 Find the job given the job id, building a job proxy if it is
248 not there already.
249 """
250 jobid = job['id']
251 if jobid not in self._joblist:
252 proxy = build_proxy(job['proxy'])
253 self._joblist[jobid] = proxy
254 return self._joblist[jobid]
255
256
257 local_service = Server()
258 service_stack = [local_service]
260 """
261 Return the current default service if no service is specified.
262
263 This is usually local_service, but it can be a remote service
264 if the function is called in the context of "with remote:"
265 """
266 return service_stack[-1]
267