Package park :: Package core :: Module serviceproxy

Source Code for Module park.core.serviceproxy

  1  # This program is public domain 
  2  """ 
  3  Client-side view of the web service. 
  4  """ 
5 -class TimeoutError(Exception): pass
6
7 -def proxy(cls, **kw):
8 service = cls.__module__.__name__+'.'+cls.__name__ 9 version = cls.version 10 request = Request(service=service, version=version, **kw)
11 12
13 -def queue_rpc(server):
14 """ 15 Return a proxy for remote queuing operations. 16 17 Using this, proxy.calc(x) gets set to the server to run 18 server.calc(x). 19 """ 20 return RpcProxy(server.invoke)
21 -class ServiceProxy(RpcProxy):
22 """ 23 Proxy for remote service operations. 24 25 Using this, proxy.calc(x) gets sent through the server to run 26 service.calc(x) for the service which is running jobid. 27 """
28 - def __init__(self, server, id):
29 self.server,self.id = server,id
30 - def _transport(method, args, kw):
31 return server.forward(id, method, *args, **kw)
32
33 -class Job(object):
34 """ 35 Client side view of a job. 36 37 Inherit from here when creating new service interfaces. 38 """ 39 40 result = None 41 """ 42 The output from the job. This field is job specific. 43 """ 44 45 status = None 46 """A string indicating the position of the job in the run cycle. 47 State will have one of the following values:: 48 49 pending 50 51 Waiting for dependencies to be satisfied, such as file transfer, 52 or the completion of other jobs. The attribute preconditions 53 contains the list of dependencies. 54 55 queued 56 57 Job is ready to run, but waiting for processing time. 58 59 running 60 61 Job is being processed. Running jobs have header containing 62 job specific information that the client can display when the 63 user wishes to see progress from the work. 64 65 completed 66 67 Job has ended. This may be because all the work was performed, 68 because the user aborted, or because the job had too many errors. 69 The output is available in result. 70 71 suspended 72 73 Job is paused awaiting interaction from the user. 74 75 76 cpu_time 77 wall_time 78 """ 79 80 server = None 81 """The job queue which is managing the service""" 82 service = None 83 """The service proxy. Use job.service.m1(p1,p2,...) to invoke 84 method m1 with parameters p1, p2, etc. 85 """ 86 request = None 87 """Request associated with this job""" 88 name = None 89 """Name of the job"""
90 - def submit(self, server=None, request=None, tags=TAGS, 91 when=None):
92 """ 93 server - handle for the server 94 request - content of the service request 95 tags - strings to identify the job; default is your user name 96 when - preconditions for starting the job 97 """ 98 if self.server is not None: 99 raise RuntimeError("The job has already been submitted to a queue") 100 101 if server is None: server = park.local_server() 102 103 # Submit the job to the server and get back the job id. 104 # TODO: use hash of request as jobid? 105 self.id = server.submit(request=request, 106 tags=tags, 107 when=when) 108 self.server = server 109 self.request = request 110 self.service = service_rpc(server,self.id) 111 self.iscompleted = condition.IsCompletedProxy(self.server, self.id) 112 """Condition which is true when the job is completed""" 113 # Give the job a default name 114 if self.name == None: 115 self.name = "%s-%d"%(self.server, self.id)
116
117 - def abort(self):
118 """ 119 Tell the job to stop running. Any output from the job will still be 120 on the server for later pickup. 121 """ 122 self.server.abort(self.id, wait)
123
124 - def cleanup(self):
125 """ 126 Remove intermediate state about the job, such as the logging 127 information and temporary files from the computation. 128 129 The job is moved from the active list to the archived list. 130 """ 131 self.server.cleanup(self.id)
132
133 - def delete(self):
134 """ 135 Remove the job from the server and all associated data. 136 """ 137 self.server.delete(self.id)
138
139 - def status(self):
140 """ 141 Ask for the current job status. Because the job is running 142 asynchronously, the results may be out of date by the time they 143 are received. 144 """ 145 return self.server.status(self.id)
146
147 - def isimproved(self, expression):
148 """ 149 Condition for queuing job dependencies. 150 151 When expression evaluates to True the condition is satisfied. 152 153 The expression can contain the usual math and linear algebra 154 functions from numpy operating on the attributes of the 155 IsImproved message for the service. 156 """ 157 return condition.IsImprovedProxy(self.server, self.id, expression)
158
159 - def wait(self, interval=0.1, timeout=0):
160 """ 161 Wait for the job to complete. 162 163 interval is time interval between checks for completion. 164 165 This is used in scripts to impose a synchronous interface 166 to the fitting service. 167 """ 168 t0 = time.time() 169 while not self.iscompleted.ready: 170 time.sleep(interval) 171 if timeout and time.time()-t0 > timeout: 172 raise TimeoutError("job not complete") 173 return self.result
174
175 - def listen(self, monitor):
176 """ 177 Attach an additional monitor to the job. 178 """ 179 self.server.listen(self.id, self.monitor)
180
181 - def set(self, **kw):
182 """ 183 Set attributes on the running service. 184 """ 185 self.service.send(message.Set(*kw))
186
187 - def loglevel(self, level):
188 """ 189 Specify the logging level for the job. The levels are:: 190 NOTSET, DEBUG, INFO, WARN, ERROR, CRITICAL 191 These constants are defined in the logging package. 192 The default is ERROR. 193 """ 194 self.server.loglevel(self.id, level)
195