1
2 """
3 If an onSignal handler is found for message class Q then this is
4 called immediately. Otherwise, the messages are placed into the
5 ready queue to be processed by the onReady handler, or the
6 onReadyMessage handler if no specific onReady handler is available.
7
8 The onSignal handler can delay processing until the computation reaches
9 a ready state by queuing the signal on self.queue. When the ready queue
10 is processed, this will invoke the onReady handler associated with the
11 signal.
12 """
13
14 __all__ = ['request','proxy','Request','Proxy','Service']
15
16 import Queue
17 import thread
18 import time
19 import logging
22 """
23 Generate a service proxy for a request to a service.
24
25 Individual services may have specialized request blocks, but most of
26 them use a simple request format that does not require a special class.
27
28 Returns a generic request, with service name and version set from the
29 associated service class. The additional keyword arguments form the
30 text of the request.
31 """
32 service = cls.__module__.__name__+'.'+cls.__name__
33 version = cls.version
34 request = Request(service=service, version=version, **kw)
35
38 """
39 Park service request abstract base class.
40
41 Requests have two important attributes: the service name that
42 handles the request and the data that the request needs to process.
43
44 There are a number of complications to having client-proxies to
45 server-side services. In particular, the software on the client
46 and the server may be out of sync. The version number is required
47 to synchronize the available versions.
48
49 Requests are long lived and so need to be properly curated. The
50 service version number is important for understanding a request
51 that has been saved to a file and reviewed later. See
52 `park.util.serial` for a more complete discussion.
53
54 Ideally requests will be simple objects with a attributes formed
55 from primitive python types. This will make it easier for non-python
56 software to talk to python services. Attributes with leading
57 underscores are ignored when transferring a request to or from
58 a server, or saving and loading from a file.
59 """
60 service = None
61 """Name of the service which handles the request"""
62 version = None
63 """Version number of the service which handles the request"""
64
67
69
70 if self.service is None:
71 raise AttributeError('Request does not define the service')
72 if self.version is None:
73 raise AttributeError('Request does not define the version')
74
75
76 state = dict((k,v)
77 for k,v in self.__dict__.items()
78 if not k.startswith('_'))
79
80
81
82 state['version'] = self.version
83 state['service'] = self.service
84
86
87
88
89
90 self.__dict__ = state
91
93 """
94 Queue the request. Any information that can be derived at the
95 time the job is queued must be filled in at this point. This
96 may serve to modify the request, for example by filling in the
97 remote filename for the fetching service. This supplies the
98 filename that will be used when the request is satisfied, and
99 which may be required for dependent jobs running on the service.
100
101 The modified request will be sent back to the client who
102 submitted the job.
103
104 Raises NotImplementedError if there is no work to be done at
105 queue time.
106 """
107 raise NotImplementedError
108
112 """
113 Park service abstract base class.
114
115 The service class manages the lifecycle of a computation, including
116 `prepare` the request, `run` the computation and `cleanup`
117 after the work is done. Periodically during run, the service will
118 need to call the `ServiceHandler.ready` method. This checks for
119 messages from the `park.Client`
120 """
121 service=None
122 """Name of the service"""
123 version=None
124 """Version number of the service"""
125
126
127 @classmethod
135
137 """
138 Prepare to run the job. All the required resources specified in
139 preconditions should already be in place. This method is called
140 once the job is ready to run. The specified work is the request
141 to be performed.
142 """
143
144 - def run(self, handler):
145 """
146 Run the job. When the job is complete result will be set to
147 the results of the run.
148
149 This must be implemented in the subclass.
150
151 The run job should look something like the following:
152
153 best = 0
154 for c in cycles:
155 handler.ready()
156 results = handler.map(self.task, c.tasklist)
157 best =
158 if result > best:
159 best = result
160 handler.improved(best)
161 return best
162
163 The handler.ready() call will raise a park.core.service.AbortService
164 exception if the controller sends an abort message. The run
165 function need not do anything in response, except possibly signal
166 handler.improved(value)
167 """
168 raise NotImplementedError
169
171 """
172 Cleanup after the job is complete or after it has failed.
173
174 The default is to do nothing.
175 """
176
178 """
179 Returns the tuple (k,n,'units') where progress is k cycles of
180 work complete out of a possible n cycles of total work.
181
182 Return None if progress is unknown.
183 """
184 return None
185
187 """
188 Return the state of the job so that it can be restarted from the
189 current point. The usual process is to call prepare(work) followed
190 by run(). On recovery, this is changed to restore(state) followed
191 by run().
192
193 Note that checkpoint will only be called when the job is in a ready
194 state, as signalled by a call to ready() from within run(). Even
195 then, the checkpoint will only be called after a certain amount of
196 time has elapsed to reduce overhead.
197
198 Return None if the service is not in a checkpoint state at this time.
199 """
200
202 """
203 Recover from system failure. On reboot of the computational service,
204 or on process migration the state of the last checkpoint will be
205 restored to the service.
206 """
207
209 return self.__class__.__name__+"-"+str(id(self))
210