Package park :: Package util :: Module rwlock

Source Code for Module park.util.rwlock

  1  from __future__ import with_statement 
  2  import thread 
  3  import time 
4 5 -def yield_thread():
6 #print "yielding" 7 time.sleep(0.000001)
8
9 # Read-Many, Write-One lock 10 # 11 # readers: number of active readers 12 # writers: number of active writers 13 # pending_readers: number of readers waiting for writes to complete 14 # pending_writers: number of writers waiting for reads and writes to complete 15 # _update_lock is for managing readers,writers,pending 16 # _wait_for_read is a lock for making sure only one read is processed at a time 17 # _read_write_lock is the real lock 18 -class _RLock:
19 """Read lock"""
20 - def __init__(self, owner):
21 self._wait_for_read = thread.allocate_lock() 22 self.owner = owner
23 - def __enter__(self):
24 # Keep track of the number of readers 25 with self.owner._update_lock: 26 self.owner.pending_readers += 1 27 # Allow one reader through at a time. The first reader will 28 # grab the read-write lock and leave. Subsequent readers will 29 # add to the number of active readers. 30 # 31 # If there are writes pending, the next reader should not start 32 # immediately, but should instead block on the read-write lock until 33 # the write is complete. The write won't even start until all 34 # active readers complete. When a reader completes, it decrements 35 # the number of active readers, releasing the read-write lock when 36 # no readers remain. 37 with self._wait_for_read: 38 # Try the simple case of adding to an existing lock. 39 with self.owner._update_lock: 40 if self.owner.readers > 0 and self.owner.pending_writers == 0: 41 self.owner.readers += 1 42 self.owner.pending_readers -= 1 43 return 44 45 # Block until all pending writers have had a chance at the 46 # read-write lock. 47 while True: 48 self.owner._read_write_lock.acquire() 49 with self.owner._update_lock: 50 # Note: don't need to check self.owner.writers since 51 # there can only be one writer at a time. 52 if self.owner.pending_writers == 0: 53 self.owner.readers += 1 54 self.owner.pending_readers -= 1 55 break 56 self.owner._read_write_lock.release() 57 yield_thread()
58
59 - def __exit__(self,*args):
60 with self.owner._update_lock: 61 if self.owner.readers <= 0: 62 raise RuntimeError("Not locked for reading") 63 self.owner.readers -= 1 64 if self.owner.readers == 0: 65 self.owner._read_write_lock.release()
66 acquire = __enter__ 67 release = __exit__
68
69 -class _WLock:
70 "Write lock"
71 - def __init__(self, owner):
72 self.owner = owner
73 - def __enter__(self):
74 with self.owner._update_lock: 75 self.owner.pending_writers += 1 76 self.owner._read_write_lock.acquire() 77 with self.owner._update_lock: 78 self.owner.pending_writers -= 1 79 self.owner.writers += 1
80 - def __exit__(self,*args):
81 with self.owner._update_lock: 82 if self.owner.writers < 1: 83 raise RuntimeError("Not locked for writing") 84 self.owner._read_write_lock.release() 85 self.owner.writers -= 1
86 acquire = __enter__ 87 release = __exit__
88
89 90 -class RWLock:
91
92 - def __init__(self):
93 self._update_lock = thread.allocate_lock() 94 self._read_write_lock = thread.allocate_lock() 95 96 self.pending_readers = 0 97 self.pending_writers = 0 98 self.readers = 0 99 """Number of threads that are actively reading""" 100 self.writers = 0 101 """Number of threads that are waiting to write""" 102 self.read = _RLock(self) 103 """Read lock""" 104 self.write = _WLock(self) 105 """Write lock"""
106
107 - def __str__(self):
108 return "lock r:%d w:%d pr:%d pw:%d"%(self.readers,self.writers, 109 self.pending_readers, 110 self.pending_writers)
111
112 -def demo(rate=0.1):
113 import time 114 import traceback 115 from park.util.threads import threaded 116 117 start = time.time() 118 @threaded 119 def read(lock,n): 120 try: 121 #print "read",n,lock 122 with lock.read: 123 print "read start",n,"at %.1f"%(time.time()-start),lock 124 time.sleep(n*rate) 125 print "read stop",n,"at %.1f"%(time.time()-start),lock 126 except: 127 traceback.print_exc()
128 @threaded 129 def write(lock,n): 130 try: 131 #print "write",n,lock 132 with lock.write: 133 print "write start",n,"at %.1f"%(time.time()-start),lock 134 time.sleep(n*rate) 135 print "write stop",n,"at %.1f"%(time.time()-start),lock 136 except: 137 traceback.print_exc() 138 139 lock = RWLock() 140 # Let the reads start 141 read(lock,3) 142 read(lock,2) 143 read(lock,5) 144 # Start a write 145 time.sleep(0.5*rate) 146 write(lock,2) 147 time.sleep(0.5*rate) 148 # Before the write is complete, start more reads and writes 149 read(lock,4) 150 t = read(lock,6) 151 write(lock,3) 152 t.join() 153
154 -def test():
155 # Redirect demo output to a string 156 import sys 157 class Capture: 158 str = "" 159 def write(self, s): self.str += s
160 buffer = Capture() 161 sys.stdout = buffer 162 demo() 163 sys.stdout = None 164 165 expect = """\ 166 read start 3 at 0.0 lock r:1 w:0 pr:0 pw:0 167 read start 2 at 0.0 lock r:2 w:0 pr:0 pw:0 168 read start 5 at 0.0 lock r:3 w:0 pr:0 pw:0 169 read stop 2 at 0.2 lock r:3 w:0 pr:2 pw:2 170 read stop 3 at 0.3 lock r:2 w:0 pr:2 pw:2 171 read stop 5 at 0.5 lock r:1 w:0 pr:2 pw:2 172 write start 2 at 0.5 lock r:0 w:1 pr:2 pw:1 173 write stop 2 at 0.7 lock r:0 w:1 pr:2 pw:1 174 write start 3 at 0.7 lock r:0 w:1 pr:2 pw:0 175 write stop 3 at 1.0 lock r:0 w:1 pr:2 pw:0 176 read start 4 at 1.0 lock r:1 w:0 pr:1 pw:0 177 read start 6 at 1.0 lock r:2 w:0 pr:0 pw:0 178 read stop 4 at 1.4 lock r:2 w:0 pr:0 pw:0 179 read stop 6 at 1.6 lock r:1 w:0 pr:0 pw:0 180 """ 181 for g,e in zip(buffer.str.strip().split('\n'), 182 expect.strip().split('\n')): 183 assert e == g, "expected:\n%s\nbut got:\n%s\n"%(e,g) 184 185 if __name__ == "__main__": 186 test() 187