Package park :: Package util :: Module threads

Source Code for Module park.util.threads

  1  # This program is public domain 
  2  """ 
  3  @threaded decorator for functions to be run in a thread. 
  4  """ 
  5   
  6  from functools import wraps 
  7  import itertools 
  8  import threading 
9 10 -class AfterThread(threading.Thread):
11 """ 12 Thread class with additional 'after' capability which runs a function 13 after the thread is complete. This allows us to separate the notification 14 from the computation. 15 16 Unlike Thread.join, the wait() method returns the value of the computation. 17 """ 18 name = property(threading.Thread.getName, 19 threading.Thread.setName, 20 doc="Thread name")
21 - def __init__(self, *args, **kwargs):
22 self.__result = None 23 self.__after = kwargs.pop('after',None) 24 threading.Thread.__init__(self, *args, **kwargs)
25
26 - def after(self, notify=None):
27 """ 28 Calls notify after the thread is complete. Notify should 29 take a single argument which is the result of the function. 30 """ 31 self.__after = notify 32 # Run immediately if thread is already complete 33 if self._Thread__started and self._Thread__stopped: 34 self.__after(self.__result)
35
36 - def run(self):
37 """ 38 Run the thread followed by the after function if any. 39 """ 40 if self._Thread__target: 41 self.__result = self._Thread__target(*self._Thread__args, 42 **self._Thread__kwargs) 43 if self.__after is not None: 44 self.__after(self.__result)
45
46 - def wait(self, timeout=None):
47 """ 48 Wait for the thread to complete. 49 50 Returns the result of the computation. 51 52 Example:: 53 54 result = thread.wait() 55 56 If timeout is used, then wait() may return before the result is 57 available. In this case, wait() will return None. This can be 58 used as follows:: 59 60 while True: 61 result = thread.wait(timeout=0) 62 if result is not None: break 63 ... do something else while waiting ... 64 65 Timeout should not be used with functions that may return None. 66 This is due to the race condition in which the thread completes 67 between the timeout triggering in wait() and the main thread 68 calling thread.isAlive(). 69 """ 70 self.join(timeout) 71 return self.__result
72
73 -def threaded(fn):
74 """ 75 @threaded decorator for functions to be run in a thread. 76 77 Returns the running thread. 78 79 The returned thread supports the following methods:: 80 81 wait(timeout=False) 82 Waits for the function to complete. 83 Returns the result of the function if the thread is joined, 84 or None if timeout. Use thread.isAlive() to test for timeout. 85 after(notify) 86 Calls notify after the thread is complete. Notify should 87 take a single argument which is the result of the function. 88 isAlive() 89 Returns True if thread is still running. 90 name 91 Thread name property. By default the name is 'fn-#' where fn 92 is the function name and # is the number of times the thread 93 has been invoked. 94 95 For example:: 96 97 @threaded 98 def compute(self,input): 99 ... 100 def onComputeButton(self,evt): 101 thread = self.compute(self.input.GetValue()) 102 thread.after(lambda result: wx.Post(self.win,wx.EVT_PAINT)) 103 104 A threaded function can also be invoked directly in the current thread:: 105 106 result = self.compute.main(self.input.GetValue()) 107 108 All threads must complete before the program can exit. For queue 109 processing threads which wait are alive continuously waiting for 110 new input, use the @daemon decorator instead. 111 """ 112 instance = itertools.count(1) 113 @wraps(fn) 114 def wrapper(*args, **kw): 115 name = "%s-%d"%(fn.func_name,instance.next()) 116 thread = AfterThread(target=fn,args=args,kwargs=kw,name=name) 117 thread.start() 118 return thread
119 wrapper.main = fn 120 return wrapper 121
122 -def daemon(fn):
123 """ 124 @daemon decorator for functions to be run in a thread. 125 126 Returns the running thread. 127 128 Unlike threaded functions, daemon functions are not expected to complete. 129 """ 130 instance_counter = itertools.count(1) 131 @wraps(fn) 132 def wrapper(*args, **kw): 133 name = "%s-%d"%(fn.func_name,instance_counter.next()) 134 thread = threading.Thread(target=fn,args=args,kwargs=kw,name=name) 135 thread.setDaemon(True) 136 thread.start() 137 return thread
138 wrapper.main = fn 139 return wrapper 140
141 -def demo(join=False):
142 import time,thread 143 @threaded 144 def sleeper(t): 145 print "thread %d sleeping for %g seconds"%(thread.get_ident(), t) 146 time.sleep(t) 147 print "waking %d"%thread.get_ident()
148 def after(result): 149 print "after %d"%thread.get_ident() 150 print "=== Start threading test ===" 151 t1 = sleeper(5) 152 t2 = sleeper(3) 153 print "sleeping in main until t2 is complete" 154 time.sleep(4) 155 print "registering after fn after t2 is complete" 156 t1.after(after) 157 t2.after(after) 158 print "waiting in main until t1 is complete" 159 if join: 160 t1.join() 161 t2.join() 162 print "joined t1 and t2" 163
164 -def test():
165 import time,thread,atexit 166 class Box: pass 167 @threaded 168 def sleeper(t,box): 169 box.threadid = thread.get_ident() 170 time.sleep(t) 171 return box
172 def after(box): 173 box.afterid = thread.get_ident() 174 start = time.time() 175 b1,b2 = Box(),Box() 176 t1,t2 = sleeper(1,b1),sleeper(0.5,b2) 177 time.sleep(0.75) 178 assert b1.threadid != b2.threadid,"not called in different threads" 179 t1.after(after) 180 t2.after(after) 181 assert t1.name == "sleeper-1" 182 assert t2.name == "sleeper-2" 183 # Test wait for thread completion 184 t1.join(),t2.join() 185 assert time.time()-start < 1.1, "not running in parallel" 186 # Make sure after() was called 187 assert b1.afterid == b1.threadid # t1.after called in t1 188 assert b2.afterid == thread.get_ident() # t2.after called in main() 189 190 # Run function in the main thread 191 b3 = Box() 192 sleeper.main(0.2,b3) 193 assert b3.threadid == thread.get_ident() 194 195 #TODO: move test into a separate file 196 ## This test doesn't work because atexit.register needs to happen before 197 ## threading is imported. 198 ## Repeat the test, making sure all threads complete before main exits 199 #b3,b4 = Box(),Box() 200 #def checkafter(*args): 201 # for b in args: 202 # assert hasattr(b,'afterid') 203 #t1,t2 = sleeper(1,b3),sleeper(0.5,b4) 204 #time.sleep(0.75) 205 #t1.after(after,[b3]) 206 #t2.after(after,[b4]) 207 #atexit.register(checkafter,[b3,b4]) 208 209 if __name__ == "__main__": 210 #demo(join=True) 211 #demo(join=False) 212 test() 213