Package park :: Package core :: Module xmlrpc_service

Source Code for Module park.core.xmlrpc_service

  1  # This program is public domain 
  2  from __future__ import with_statement 
  3  import logging 
  4  import xmlrpclib 
  5  from park.util.ThreadingXMLRPC import ThreadingDocXMLRPCServer as BaseServer 
  6   
  7  from park.util.codec import CPickle as codec 
  8  #import park.fitservice 
  9   
10 -class Xmlrpc(BaseServer):
11 """ 12 Communicate with the Park fitting service. 13 14 There are two types of communication with the service. 15 One is synchronous: queue job, enquire status, etc. 16 The other is asynchronous: listen for activity on jobid 17 """
18 - def __init__(self, url="http://localhost:8000"):
19 """ 20 Connect to the park job service using the XMLRPC protocol. 21 """ 22 park.Service.__init__(self) 23 self.__service = xmlrpclib.ServerProxy(url) 24 """Connection to the server""" 25 self.__done = {} 26 """Set of jobs we are waiting for""" 27 self.__listeners = {} 28 """Set of jobs we are listening to""" 29 self.__lock = thread.allocate_lock() 30 """Excluse lock for our data structures""" 31 self._start_listening()
32
33 - def submit(self, request, monitor):
34 request_msg = codec.encode(request) 35 jobid = self.__service.queue(request_msg) 36 return jobid
37 38 # Message queue support
39 - def wait(self, jobid, interval=0.1):
40 """ 41 Wait for the job to be complete. 42 """ 43 ok = self.__service.listenfor(jobid,'complete') 44 if not ok: return 45 with self.__lock: self._done[jobid] = False 46 while True: 47 with self.__lock: 48 if self.__done[jobid]: 49 del self.__done[jobid] 50 break 51 time.sleep(interval) 52 return
53
54 - def listen(self, jobid, callback):
55 """ 56 Add callback for jobid to listeners. 57 """ 58 with self._lock: 59 callbacks = self.__listeners.get(jobid,[]) 60 if callback not in callbacks: 61 callbacks.append(callback) 62 self.__listeners[jobid] = callbacks
63
64 - def hangup(self, jobid, callback):
65 """ 66 Stop listening for callback 67 """ 68 with self.__lock: 69 try: 70 callbacks = self.__listeners[jobid] 71 callbacks.remove(callback) 72 if callbacks != []: 73 del self.__listeners[jobid] 74 except ValueError: 75 pass
76
77 - def _dispatch(self, message_str):
78 """ 79 Listening thread: process next server message 80 """ 81 # Making a copy to avoid 82 with self.__lock: 83 callbacks = copy(self.__listeners.get(message.jobid,[])) 84 for f in callbacks: f(message) 85 86 if message.type == 'complete': 87 with self.__lock: 88 if jobid in self.__done: 89 self.__done[jobid] = True 90 # If the job is done, stop listening for it 91 try: 92 del self.__listeners[jobid] 93 except ValueError: 94 pass
95
96 - def _listen(self):
97 """ 98 Listening thread: wait for next message from the server. 99 """ 100 while self.__listening: 101 message_str = self.__service.next_message() 102 message = codec.decode(message_str) 103 self._dispatch(message)
104
105 - def _start_listening(self):
106 """ 107 Start the listening thread if it is not already running. 108 """ 109 # We will have one listening thread for all messages 110 thread.start_new_thread(self._listen,())
111
112 -class JobServices:
113 """ 114 """ 115 codec = codec 116 #job_queue = JobQueue() 117 #msg_queue = Queue() 118
119 - def put(self, job):
120 """ 121 Add a job to the queue, returning the job id. 122 """ 123 job = self.codec.decode(job) 124 id = self.job_queue.put(job) 125 return id
126
127 - def listenfor(self, key, messagetype):
128 """ 129 Put messages for key on queue. The key is usually jobid. 130 """ 131 message_queue.listen(key,self._msgqueue)
132
133 - def next_message(self, timeout=0):
134 """ 135 Return the next queued message for the job. 136 137 If timeout is zero this routine waits until a message is available. 138 """ 139 message = self.codec.encode(self._msgqueue.get()) 140 return message
141
142 - def send_message(self, key, message):
143 """ 144 Post a message to the message queue. All clients listening 145 on message.jobid will receive this message. 146 """ 147 message = self._codec.decode(message) 148 message_queue.queue(key, message)
149
150 - def log_error(self, coded_object):
151 """ 152 Receive a log message from somewhere in the world. 153 154 This message will be forwarded to whomever is listening 155 for these types of messages, as well as logged on the 156 named queue. 157 """ 158 obj = self._codec.decode(coded_object) 159 record = logging.makeLogRecord(obj) 160 logger = logging.getLogger(record.name) 161 logger.handle(record)
162
163 - def set_header(self, key, message):
164 """ 165 Set the queue header for key. 166 """ 167 message =self._codec.decode(message) 168 message_queue[key] = message
169
170 - def get_header(self, key):
171 """ 172 Get the queue header for key. Returns none if there is no 173 queue for key. 174 """ 175 self._codec.encode(message_queue[key])
176
177 -def serve(host="localhost",port=8000):
178 server = Server((host,port)) 179 180 # Documentation available from http://localhost/8000 181 server.set_server_title('PARK server') 182 server.set_server_name('PARK Optimization Services') 183 server.set_server_documentation(""" 184 PARK provides a remote fitting service API over XML. 185 186 External users may submit jobs to the queue and later 187 receive results when they are complete. 188 """) 189 190 server.register_introspection_functions() 191 server.register_instance(JobService()) 192 server.serve_forever()
193