Package park :: Package core :: Module server

Source Code for Module park.core.server

  1  """ 
  2  Define the service architecture. 
  3   
  4  The PARK fitting service manages groups of jobs from a number of users 
  5  at the same time.  The jobs can be long running, with clients able 
  6  to connect and disconnect from multiple locations. 
  7   
  8  The goal is not a simple batch queueing system.  Traditional queuing 
  9  systems allocate a group of processors to a particular job for a 
 10  fixed period of time.  Instead we want dynamic resource allocation, 
 11  where individual work units are meted out according to the criteria 
 12  such as the relative priority of the jobs and the length of time they 
 13  are in the queue.  Long running low priority jobs such as those which 
 14  are performing an exhaustive search on the parameter space should get 
 15  some resources even when higher priority jobs are still running.  If 
 16  nothing else, individual users may contribute their desktop resources 
 17  to the pool at least for the purposes of completing their own jobs 
 18  quicker. 
 19   
 20  We still need to be able to tie into legacy systems with batch queues 
 21  such as torque.  This is supported by starting a private parallel 
 22  kernel system on the allocated nodes which runs for the duration 
 23  of the job. 
 24   
 25  The top level of the queuing system is the job manager (`park.job.JobManager`). 
 26  This service keeps a list jobs in a persistent store, and manages the 
 27  interaction with the remote client.  Individual jobs (`park.job.Job`) 
 28  have properties such as owner, priority and completion status. 
 29   
 30  In general jobs do not have a user id.  Instead they are tagged with 
 31  information such as the users name, the service, the time the job 
 32  was started, service specific tags, and arbitrary strings assigned 
 33  by the user such as job name.  Some servers may require user 
 34  authentication for example to track the user through the teragrid 
 35  requests, but applications should not rely on this. 
 36   
 37  Notification is handled through a message service.  Users can also 
 38  request notification of job completion by email (daily digest by 
 39  default) by adding email details to the job submission request. 
 40   
 41  All interaction between the client, the job manager and the individual 
 42  jobs is handled through a service manager (`park.service.ServiceManager`). 
 43  This can be running locally for single system fits or remotely. 
 44   
 45  Resource allocation is based on credits toward job priority. 
 46  Each unit of work for higher priority jobs counts more toward usage, 
 47  so an individual user can choose between getting a small amount of 
 48  work done quickly or a larger amount of work done when the system 
 49  has more slack.  The default priority is free. 
 50   
 51  Need a message stream for the job queue.  That way as jobs are created, 
 52  destroyed, or otherwise change status the application can reflect these 
 53  changes in the GUI.  The stream header will be the entire list of active jobs. 
 54  We will provide the client side data structure to keep a consistent view 
 55  of the list, but the GUI client will still need notification that changes 
 56  are occurring.  The same technique can be used to maintain an AJAX view 
 57  of the job table. 
 58  """ 
 59   
 60  from __future__ import with_statement 
 61  import os 
 62  import socket 
 63   
 64  from park.util.msgstream import message_stream 
 65   
66 -def usertag():
67 """ 68 Convenience tag to help users identify jobs. 69 70 First try the following sources in order:: 71 72 PARKTAG - the normal user tag 73 PARKMAIL - the user portion of the notification address 74 LOGNAME - the Unix user name, unless it is 'root' 75 USERNAME - the Windows user name unless it is 'Administrator' 76 77 If all else fails, use the host name which is connecting 78 79 """ 80 # Check if it is explicitly set 81 if 'PARKTAG' in os.environ: 82 return os.environ['PARKTAG'] 83 # Grab it from the notification address 84 if 'PARKMAIL' in os.environ: 85 return os.environ['PARKMAIL'].split('@')[0] 86 # Try identifying a user id 87 if 'LOGNAME' in os.environ: 88 if os.environ['LOGNAME'] != 'root': 89 return os.environ['LOGNAME'] 90 if 'USERNAME' in os.environ: 91 if os.environ['USERNAME'] != 'Administrator': 92 return os.environ['USERNAME'] 93 # Otherwise use the machine name 94 return socket.gethostname()
95 TAGS = [usertag()] 96 97 98
99 -class Server(object):
100 """ 101 Define the backend interface to the job manager and user database. 102 """ 103
104 - def connect(self, user, token):
105 """ 106 Connect to the service and establish user credentials. 107 """ 108 self.user = authenticate(user) 109 self.queue = start_queue()
110
111 - def submit(self, request=request, when=when):
113
114 - def set_tags(self, tags):
115 """ 116 Set the base tags associated with new jobs. 117 118 Jobs are searchable by tags. Some tags come 119 """ 120 self.tags = tags
121
122 - def set_email(self, email):
123 """ 124 Set the user email. 125 """ 126 self.email = email 127 self.user.email = email
128
129 - def stop(self, jobid):
130 """ 131 Stop a job. 132 133 Raises KeyError if the job is not running. 134 135 This is likely triggered by the client as requested by 136 the user, but may also be triggered by the job itself 137 if it notices that it is failing. 138 """ 139 self.queue.stop(jobid)
140
141 - def start(self, job, inputs=None):
142 """ 143 Add a job to the job queue, returning jobid. Inputs is the 144 set of inputs to run through the job, or None if the job is 145 to run once between prepare and cleanup. 146 """ 147 jobid = self.queue.start(job, inputs) 148 return jobid
149
150 - def jobs(self):
151 """ 152 """ 153 return queue.jobs()
154
155 - def __enter__(self):
156 """ 157 Set the default service for subsequent requests. 158 159 Example:: 160 161 with park.local_service: 162 M1 = Assembly((model,data)) 163 fit = Fit(M1) 164 err = Uncertainty(M1, fit.result, after=fit) 165 result = err.wait() 166 """ 167 service_stack.append(self) 168 return self
169
170 - def __leave__(self):
171 service_stack.pop()
172
173 -class ServerProxy(Server):
174 """ 175 Client side view of a service. 176 177 Inherit from here when creating new service types. 178 """
179 - def jobs(self, filter=None):
180 """ 181 Return a list of the jobs on the service which the user can access. 182 183 This list will include all jobs owned by the user on the remote 184 service that have not yet been archived or deleted. 185 186 Use a filter to select jobs of a particular type. 187 """ 188 jobs = self.server.jobs(filter) 189 return [self._lookup(jobs) for job in jobs]
190
191 - def submit(self, request=None, monitor=None, when=None):
192 """ 193 Add a new request to the job queue. Messages from the job are 194 sent to monitor. 195 196 Returns the job id for the request, which is unique to the queue. 197 """
198
199 - def fetch(self, id):
200 """ 201 Grab a copy of the named job from the server. 202 203 This should include enough information to reconstruct the job 204 request. 205 """
206
207 - def status(self, id):
208 """ 209 """
210
211 - def abort(self, id):
212 """ 213 Abort a job. 214 """
215
216 - def control(self, id, message):
217 """ 218 Send a control message to the job. 219 220 Message should be pickleable. The structure of the message is 221 specific to the type of job. Messages will be queued in the 222 job control queue on the service until the job is ready to process 223 them. If the job has already been terminated then the message will 224 be ignored. 225 226 Control is asynchronous. Rather than trying to set up, e.g., 227 job.population() which returns the current population from 228 a simulation, instead set up job.get_population() which sends 229 a control message to retrieve the population and set up an 230 on_population() method which is called when the population 231 is received on the job message stream. 232 """
233
234 - def wait(self, jobid=None, timeout=None):
235 """ 236 Wait for the job to complete. 237 238 This is used in scripts to impose a synchronous interface 239 to the fitting service. 240 """ 241 while self.queue[jobid].status != 'complete': 242 time.sleep(interval) 243 return self.job.handler.result
244
245 - def _lookup(self, job):
246 """ 247 Find the job given the job id, building a job proxy if it is 248 not there already. 249 """ 250 jobid = job['id'] 251 if jobid not in self._joblist: 252 proxy = build_proxy(job['proxy']) 253 self._joblist[jobid] = proxy 254 return self._joblist[jobid]
255 256 # Default service stack required to support 'with service' call. 257 local_service = Server() 258 service_stack = [local_service]
259 -def default_service():
260 """ 261 Return the current default service if no service is specified. 262 263 This is usually local_service, but it can be a remote service 264 if the function is called in the context of "with remote:" 265 """ 266 return service_stack[-1]
267