1
2 from __future__ import with_statement
3 import logging
4 import xmlrpclib
5 from park.util.ThreadingXMLRPC import ThreadingDocXMLRPCServer as BaseServer
6
7 from park.util.codec import CPickle as codec
8
9
11 """
12 Communicate with the Park fitting service.
13
14 There are two types of communication with the service.
15 One is synchronous: queue job, enquire status, etc.
16 The other is asynchronous: listen for activity on jobid
17 """
18 - def __init__(self, url="http://localhost:8000"):
19 """
20 Connect to the park job service using the XMLRPC protocol.
21 """
22 park.Service.__init__(self)
23 self.__service = xmlrpclib.ServerProxy(url)
24 """Connection to the server"""
25 self.__done = {}
26 """Set of jobs we are waiting for"""
27 self.__listeners = {}
28 """Set of jobs we are listening to"""
29 self.__lock = thread.allocate_lock()
30 """Excluse lock for our data structures"""
31 self._start_listening()
32
33 - def submit(self, request, monitor):
37
38
39 - def wait(self, jobid, interval=0.1):
40 """
41 Wait for the job to be complete.
42 """
43 ok = self.__service.listenfor(jobid,'complete')
44 if not ok: return
45 with self.__lock: self._done[jobid] = False
46 while True:
47 with self.__lock:
48 if self.__done[jobid]:
49 del self.__done[jobid]
50 break
51 time.sleep(interval)
52 return
53
54 - def listen(self, jobid, callback):
55 """
56 Add callback for jobid to listeners.
57 """
58 with self._lock:
59 callbacks = self.__listeners.get(jobid,[])
60 if callback not in callbacks:
61 callbacks.append(callback)
62 self.__listeners[jobid] = callbacks
63
64 - def hangup(self, jobid, callback):
65 """
66 Stop listening for callback
67 """
68 with self.__lock:
69 try:
70 callbacks = self.__listeners[jobid]
71 callbacks.remove(callback)
72 if callbacks != []:
73 del self.__listeners[jobid]
74 except ValueError:
75 pass
76
78 """
79 Listening thread: process next server message
80 """
81
82 with self.__lock:
83 callbacks = copy(self.__listeners.get(message.jobid,[]))
84 for f in callbacks: f(message)
85
86 if message.type == 'complete':
87 with self.__lock:
88 if jobid in self.__done:
89 self.__done[jobid] = True
90
91 try:
92 del self.__listeners[jobid]
93 except ValueError:
94 pass
95
97 """
98 Listening thread: wait for next message from the server.
99 """
100 while self.__listening:
101 message_str = self.__service.next_message()
102 message = codec.decode(message_str)
103 self._dispatch(message)
104
106 """
107 Start the listening thread if it is not already running.
108 """
109
110 thread.start_new_thread(self._listen,())
111
113 """
114 """
115 codec = codec
116
117
118
119 - def put(self, job):
120 """
121 Add a job to the queue, returning the job id.
122 """
123 job = self.codec.decode(job)
124 id = self.job_queue.put(job)
125 return id
126
128 """
129 Put messages for key on queue. The key is usually jobid.
130 """
131 message_queue.listen(key,self._msgqueue)
132
134 """
135 Return the next queued message for the job.
136
137 If timeout is zero this routine waits until a message is available.
138 """
139 message = self.codec.encode(self._msgqueue.get())
140 return message
141
143 """
144 Post a message to the message queue. All clients listening
145 on message.jobid will receive this message.
146 """
147 message = self._codec.decode(message)
148 message_queue.queue(key, message)
149
151 """
152 Receive a log message from somewhere in the world.
153
154 This message will be forwarded to whomever is listening
155 for these types of messages, as well as logged on the
156 named queue.
157 """
158 obj = self._codec.decode(coded_object)
159 record = logging.makeLogRecord(obj)
160 logger = logging.getLogger(record.name)
161 logger.handle(record)
162
169
171 """
172 Get the queue header for key. Returns none if there is no
173 queue for key.
174 """
175 self._codec.encode(message_queue[key])
176
177 -def serve(host="localhost",port=8000):
178 server = Server((host,port))
179
180
181 server.set_server_title('PARK server')
182 server.set_server_name('PARK Optimization Services')
183 server.set_server_documentation("""
184 PARK provides a remote fitting service API over XML.
185
186 External users may submit jobs to the queue and later
187 receive results when they are complete.
188 """)
189
190 server.register_introspection_functions()
191 server.register_instance(JobService())
192 server.serve_forever()
193