Package park :: Package core :: Module datastore

Source Code for Module park.core.datastore

  1  # This program is public domain 
  2   
  3   
  4  """ 
  5  Data storage services. 
  6   
  7  Warning: Not yet used 
  8   
  9  Data store to manage local data, both job input and job output. 
 10   
 11  This includes URL prefetch, results storage and purging. 
 12   
 13  Design Notes 
 14  ============ 
 15   
 16   
 17  Desired features:: 
 18   
 19      scalability - read and write a block at a time 
 20      efficiency - check if file already fetched 
 21      robustness - checksum to make sure fetched file is correct 
 22      security - allow access to secure urls 
 23      usability - avoid reentering passwords on accessible resources 
 24   
 25  What happens when file at the url changes?  What happens when urllib 
 26  drops the connection before the transfer is complete? 
 27   
 28   
 29  File caching services 
 30  --------------------- 
 31   
 32  Want something light weight and robust.  Filesystem already exists and 
 33  acts as a database.  Just need a string key tied to a file name. 
 34   
 35  Warning that unix does not like large directories, and this could kill 
 36  performance, unless a multilevel structure is created.  Currently we are 
 37  ignoring the problem and putting things in one big directory.  Breaking 
 38  the hash into e.g., 4+4+12 would give a 3-level directory structure. 
 39   
 40  Ultimately the caller doesn't care, but if we change it, then we may 
 41  need to provide a migration path for the server. 
 42   
 43  The SNS datasets can involve many large files which are not generally 
 44  reused from job to job.  It would be nice to have cache aging based on 
 45  last access time.  This can probably be done after the fact by walking 
 46  the cache directory.  It may not work on all operating systems if 
 47  access time is not properly supported. 
 48   
 49  Administrator needs to be able to get a snapshot of the cache sorted 
 50  by size of cached object, last access time, etc. 
 51   
 52  For performance, keys accessed in this session are stored in a dictionary. 
 53  Consider making this a weak reference so that long running servers do 
 54  not show memory leaks.  Better yet, kill server periodically since it 
 55  should be robust enough to recover, and many problems of long running 
 56  services vanish, as well as regular testing of robustness algorithms. 
 57   
 58   
 59  Staging 
 60  ------- 
 61   
 62  The use cases we need to cover:: 
 63   
 64      1. cluster with no local storage on nodes 
 65      2. cluster with local storage on nodes but no internet connection 
 66      3. cluster with local storage on nodes and an internet connection 
 67      4. independent nodes on the same subnet as server 
 68   
 69  All these use cases are satisfied by staging the prefetch URL 
 70  on a local server where the nodes can access it when necessary. 
 71  The work mapper itself should strive to keep compuations on 
 72  nodes which already have the problem loaded to reduce network 
 73  bandwidth and file load times, but this is a separate issue. 
 74   
 75  There are two additional use cases we will not cover:: 
 76   
 77      0. files too large for a single node 
 78      5. completely independent nodes 
 79   
 80  Case 0 requires that the individual work units be calculated in 
 81  parallel.  This will certainly require staging of the files on 
 82  a local server. 
 83   
 84  Case 5 is a situation such BOINC where a number of independent 
 85  computers from across the internet combine to work on a problem. 
 86  In these situations we will not want demand large amounts of 
 87  network bandwidth and local storage, so are only suitable for 
 88  small files.  This means there will be minimal penalty for staging 
 89  the file on a local server. 
 90   
 91  Signalling 
 92  ---------- 
 93   
 94  File transfer must occur asynchronously.  The compute nodes 
 95  should not block as files are being staged.  This leads to the 
 96  question of the asynchronous behaviour should be implemented. 
 97  We look at several possibilities:: 
 98   
 99      1. Local data server on separate node of the cluster 
100      2. Local data server on main server running as separate process 
101      3. Local data server on main server running as separate thread 
102      4. Local data server on main server with async I/O 
103   
104  I'm not sure 4 is possible in Python.  3 should be the default to 
105  reduce the number of processes and difficulty managing them on the 
106  server.  However, because we need to support 1 on some architecures, 
107  we will get 2 for free.  In any case, 1-4 can be supported by adding 
108  a server command saying that a precondition is met, and the server 
109  does not need a busy loop to check a status flag in addition to the 
110  usual listening for client connections. 
111   
112  Credentials 
113  ----------- 
114   
115  When fetching data from services such as the SNS portal the user 
116  should not have to repeatedly enter a password.  Credentials should 
117  be cached.  The mechanism for this is unclear. 
118  """ 
119   
120  from __future__ import with_statement 
121   
122  import hashlib 
123  import os 
124  import thread 
125  import traceback 
126   
127 -class FileCache(object):
128 """ 129 Create a data cache in a particular directory on the filesystem. 130 131 Each cached item consists of two parts, a .key file containing the 132 item description, and a .dat file containing the item data. 133 134 The store method returns a complete path to the .dat file. 135 136 TODO: monitor access times so that we can identify old files for purging 137 """
138 - def __init__(self, location=None):
139 self.location = location 140 """Directory name containing the cache""" 141 142 if not os.path.exists(location): 143 os.makedirs(location) 144 self._keys = {} 145 self._lock = thread.allocate_lock()
146
147 - def hash(self, key):
148 """ 149 Returns cache file and key file names for the given key. 150 """ 151 hash = hashlib.md5(key).hexdigest() 152 filename = os.path.join(location,hash+'.dat') 153 record = os.path.join(location,hash+'.key') 154 return filename, record
155
156 - def remove(self, key):
157 """ 158 Remove file associated with key. 159 """ 160 with self._lock: 161 filename, record = self.hash(key) 162 if key in self._keys: del self._keys[key] 163 if os.path.exists(record): os.remove(record) 164 if os.path.exists(filename): os.remove(filename)
165
166 - def store(self, key):
167 """ 168 Find the data file corresponding to the particular key. 169 170 Raises RuntimeError if there is a key collision. 171 """ 172 with self.lock: 173 # Check if key is active 174 if key in self._keys: 175 return self._keys[key] 176 177 # Convert key into hash string 178 filename, record = self.hash(key) 179 self._keys[key] = filename 180 181 # Create a key file 182 if os.path.exists(record): 183 # Key exists, check for collision 184 file = open(record,'r') 185 oldkey = file.read() 186 file.close() 187 if oldkey != key: 188 raise RuntimeError('Cache collision for '+key) 189 else: 190 # New key, make a file for it 191 file = open(record,'w') 192 file.write(key) 193 file.close() 194 195 return filename
196 197 #TODO: break this into it's own file 198 import park.core.service as service 199
200 -def config(key,missing): return missing
201 URL_CACHE_PATH = config('park.datastore.urlcache','/tmp/urlcache') 202 """Standard location for the local repository""" 203 urlcache = FileCache(URL_CACHE_PATH) 204 """The local cache for URLs""" 205
206 -class FetchRequest(service.Request):
207 - def __init__(self, url=None):
208 self.url = url
209 - def queue(self):
210 self.localfile = urlcache.path(self.url)
211
212 -class FetchService(service.Service):
213 service='park.services.fetch' 214 version='1.0'
215 - def prepare(self, request):
216 self.url = request.url 217 self.localfile = request.localfile
218 - def run(self):
219 try: 220 urllib.urlretrieve(self.url,self.localfile,self._reporthook) 221 except: 222 urlcache.remove(request.url) 223 raise 224 return None
225 - def _reporthook(self, n, blocksize, filesize):
226 if filesize>0: 227 self.total = int((filesize+blocksize-1)/blocksize) 228 else: 229 self.total = None 230 self.count = int(n*blocksize/1000) 231 self.units = 'kb' 232 self.ready()
233 - def progress(self):
234 return self.count,self.total,self.units
235