Package park :: Package core :: Module service

Source Code for Module park.core.service

  1  # This program is public domain 
  2  """ 
  3  If an onSignal handler is found for message class Q then this is 
  4  called immediately.  Otherwise, the messages are placed into the 
  5  ready queue to be processed by the onReady handler, or the 
  6  onReadyMessage handler if no specific onReady handler is available. 
  7   
  8  The onSignal handler can delay processing until the computation reaches 
  9  a ready state by queuing the signal on self.queue.  When the ready queue 
 10  is processed, this will invoke the onReady handler associated with the 
 11  signal. 
 12  """ 
 13   
 14  __all__ = ['request','proxy','Request','Proxy','Service'] 
 15   
 16  import Queue 
 17  import thread 
 18  import time 
 19  import logging 
20 21 -def request(cls, **kw):
22 """ 23 Generate a service proxy for a request to a service. 24 25 Individual services may have specialized request blocks, but most of 26 them use a simple request format that does not require a special class. 27 28 Returns a generic request, with service name and version set from the 29 associated service class. The additional keyword arguments form the 30 text of the request. 31 """ 32 service = cls.__module__.__name__+'.'+cls.__name__ 33 version = cls.version 34 request = Request(service=service, version=version, **kw)
35
36 # ======================================================================= 37 -class Request(object):
38 """ 39 Park service request abstract base class. 40 41 Requests have two important attributes: the service name that 42 handles the request and the data that the request needs to process. 43 44 There are a number of complications to having client-proxies to 45 server-side services. In particular, the software on the client 46 and the server may be out of sync. The version number is required 47 to synchronize the available versions. 48 49 Requests are long lived and so need to be properly curated. The 50 service version number is important for understanding a request 51 that has been saved to a file and reviewed later. See 52 `park.util.serial` for a more complete discussion. 53 54 Ideally requests will be simple objects with a attributes formed 55 from primitive python types. This will make it easier for non-python 56 software to talk to python services. Attributes with leading 57 underscores are ignored when transferring a request to or from 58 a server, or saving and loading from a file. 59 """ 60 service = None 61 """Name of the service which handles the request""" 62 version = None 63 """Version number of the service which handles the request""" 64
65 - def __init__(self, **kw):
66 self.__dict__ = kw
67
68 - def __getstate__(self):
69 # Make sure service and version are defined 70 if self.service is None: 71 raise AttributeError('Request does not define the service') 72 if self.version is None: 73 raise AttributeError('Request does not define the version') 74 75 # Save all instance data in the request except those starting with _ 76 state = dict((k,v) 77 for k,v in self.__dict__.items() 78 if not k.startswith('_')) 79 80 # Force service and version to be in state even if they are only 81 # defined at the class level. 82 state['version'] = self.version 83 state['service'] = self.service
84
85 - def __setstate__(self, state):
86 # Restore the instance data. Note that this will include service 87 # and version even if they are identical to the class definition 88 # of service and version because there is no advantage to doing 89 # otherwise except for saving a trivial amount of memory. 90 self.__dict__ = state
91
92 - def queue(self):
93 """ 94 Queue the request. Any information that can be derived at the 95 time the job is queued must be filled in at this point. This 96 may serve to modify the request, for example by filling in the 97 remote filename for the fetching service. This supplies the 98 filename that will be used when the request is satisfied, and 99 which may be required for dependent jobs running on the service. 100 101 The modified request will be sent back to the client who 102 submitted the job. 103 104 Raises NotImplementedError if there is no work to be done at 105 queue time. 106 """ 107 raise NotImplementedError
108
109 110 # ======================================================================= 111 -class Service(object):
112 """ 113 Park service abstract base class. 114 115 The service class manages the lifecycle of a computation, including 116 `prepare` the request, `run` the computation and `cleanup` 117 after the work is done. Periodically during run, the service will 118 need to call the `ServiceHandler.ready` method. This checks for 119 messages from the `park.Client` 120 """ 121 service=None 122 """Name of the service""" 123 version=None 124 """Version number of the service""" 125 126 # Lifecycle management 127 @classmethod
128 - def check_version(cls, request):
129 """ 130 Perform version check on the request. 131 """ 132 if cls.version != request.version: 133 raise ValueError("%s version %s does not match request %s" 134 %(cls.service,cls.version,request.version))
135
136 - def prepare(self, request):
137 """ 138 Prepare to run the job. All the required resources specified in 139 preconditions should already be in place. This method is called 140 once the job is ready to run. The specified work is the request 141 to be performed. 142 """
143
144 - def run(self, handler):
145 """ 146 Run the job. When the job is complete result will be set to 147 the results of the run. 148 149 This must be implemented in the subclass. 150 151 The run job should look something like the following: 152 153 best = 0 154 for c in cycles: 155 handler.ready() 156 results = handler.map(self.task, c.tasklist) 157 best = 158 if result > best: 159 best = result 160 handler.improved(best) 161 return best 162 163 The handler.ready() call will raise a park.core.service.AbortService 164 exception if the controller sends an abort message. The run 165 function need not do anything in response, except possibly signal 166 handler.improved(value) 167 """ 168 raise NotImplementedError
169
170 - def cleanup(self):
171 """ 172 Cleanup after the job is complete or after it has failed. 173 174 The default is to do nothing. 175 """
176
177 - def progress(self):
178 """ 179 Returns the tuple (k,n,'units') where progress is k cycles of 180 work complete out of a possible n cycles of total work. 181 182 Return None if progress is unknown. 183 """ 184 return None
185
186 - def checkpoint(self):
187 """ 188 Return the state of the job so that it can be restarted from the 189 current point. The usual process is to call prepare(work) followed 190 by run(). On recovery, this is changed to restore(state) followed 191 by run(). 192 193 Note that checkpoint will only be called when the job is in a ready 194 state, as signalled by a call to ready() from within run(). Even 195 then, the checkpoint will only be called after a certain amount of 196 time has elapsed to reduce overhead. 197 198 Return None if the service is not in a checkpoint state at this time. 199 """
200
201 - def restore(self, state):
202 """ 203 Recover from system failure. On reboot of the computational service, 204 or on process migration the state of the last checkpoint will be 205 restored to the service. 206 """
207
208 - def __str__(self):
209 return self.__class__.__name__+"-"+str(id(self))
210