1
2
3
4 """
5 Data storage services.
6
7 Warning: Not yet used
8
9 Data store to manage local data, both job input and job output.
10
11 This includes URL prefetch, results storage and purging.
12
13 Design Notes
14 ============
15
16
17 Desired features::
18
19 scalability - read and write a block at a time
20 efficiency - check if file already fetched
21 robustness - checksum to make sure fetched file is correct
22 security - allow access to secure urls
23 usability - avoid reentering passwords on accessible resources
24
25 What happens when file at the url changes? What happens when urllib
26 drops the connection before the transfer is complete?
27
28
29 File caching services
30 ---------------------
31
32 Want something light weight and robust. Filesystem already exists and
33 acts as a database. Just need a string key tied to a file name.
34
35 Warning that unix does not like large directories, and this could kill
36 performance, unless a multilevel structure is created. Currently we are
37 ignoring the problem and putting things in one big directory. Breaking
38 the hash into e.g., 4+4+12 would give a 3-level directory structure.
39
40 Ultimately the caller doesn't care, but if we change it, then we may
41 need to provide a migration path for the server.
42
43 The SNS datasets can involve many large files which are not generally
44 reused from job to job. It would be nice to have cache aging based on
45 last access time. This can probably be done after the fact by walking
46 the cache directory. It may not work on all operating systems if
47 access time is not properly supported.
48
49 Administrator needs to be able to get a snapshot of the cache sorted
50 by size of cached object, last access time, etc.
51
52 For performance, keys accessed in this session are stored in a dictionary.
53 Consider making this a weak reference so that long running servers do
54 not show memory leaks. Better yet, kill server periodically since it
55 should be robust enough to recover, and many problems of long running
56 services vanish, as well as regular testing of robustness algorithms.
57
58
59 Staging
60 -------
61
62 The use cases we need to cover::
63
64 1. cluster with no local storage on nodes
65 2. cluster with local storage on nodes but no internet connection
66 3. cluster with local storage on nodes and an internet connection
67 4. independent nodes on the same subnet as server
68
69 All these use cases are satisfied by staging the prefetch URL
70 on a local server where the nodes can access it when necessary.
71 The work mapper itself should strive to keep compuations on
72 nodes which already have the problem loaded to reduce network
73 bandwidth and file load times, but this is a separate issue.
74
75 There are two additional use cases we will not cover::
76
77 0. files too large for a single node
78 5. completely independent nodes
79
80 Case 0 requires that the individual work units be calculated in
81 parallel. This will certainly require staging of the files on
82 a local server.
83
84 Case 5 is a situation such BOINC where a number of independent
85 computers from across the internet combine to work on a problem.
86 In these situations we will not want demand large amounts of
87 network bandwidth and local storage, so are only suitable for
88 small files. This means there will be minimal penalty for staging
89 the file on a local server.
90
91 Signalling
92 ----------
93
94 File transfer must occur asynchronously. The compute nodes
95 should not block as files are being staged. This leads to the
96 question of the asynchronous behaviour should be implemented.
97 We look at several possibilities::
98
99 1. Local data server on separate node of the cluster
100 2. Local data server on main server running as separate process
101 3. Local data server on main server running as separate thread
102 4. Local data server on main server with async I/O
103
104 I'm not sure 4 is possible in Python. 3 should be the default to
105 reduce the number of processes and difficulty managing them on the
106 server. However, because we need to support 1 on some architecures,
107 we will get 2 for free. In any case, 1-4 can be supported by adding
108 a server command saying that a precondition is met, and the server
109 does not need a busy loop to check a status flag in addition to the
110 usual listening for client connections.
111
112 Credentials
113 -----------
114
115 When fetching data from services such as the SNS portal the user
116 should not have to repeatedly enter a password. Credentials should
117 be cached. The mechanism for this is unclear.
118 """
119
120 from __future__ import with_statement
121
122 import hashlib
123 import os
124 import thread
125 import traceback
126
128 """
129 Create a data cache in a particular directory on the filesystem.
130
131 Each cached item consists of two parts, a .key file containing the
132 item description, and a .dat file containing the item data.
133
134 The store method returns a complete path to the .dat file.
135
136 TODO: monitor access times so that we can identify old files for purging
137 """
139 self.location = location
140 """Directory name containing the cache"""
141
142 if not os.path.exists(location):
143 os.makedirs(location)
144 self._keys = {}
145 self._lock = thread.allocate_lock()
146
147 - def hash(self, key):
148 """
149 Returns cache file and key file names for the given key.
150 """
151 hash = hashlib.md5(key).hexdigest()
152 filename = os.path.join(location,hash+'.dat')
153 record = os.path.join(location,hash+'.key')
154 return filename, record
155
165
167 """
168 Find the data file corresponding to the particular key.
169
170 Raises RuntimeError if there is a key collision.
171 """
172 with self.lock:
173
174 if key in self._keys:
175 return self._keys[key]
176
177
178 filename, record = self.hash(key)
179 self._keys[key] = filename
180
181
182 if os.path.exists(record):
183
184 file = open(record,'r')
185 oldkey = file.read()
186 file.close()
187 if oldkey != key:
188 raise RuntimeError('Cache collision for '+key)
189 else:
190
191 file = open(record,'w')
192 file.write(key)
193 file.close()
194
195 return filename
196
197
198 import park.core.service as service
199
200 -def config(key,missing): return missing
201 URL_CACHE_PATH = config('park.datastore.urlcache','/tmp/urlcache')
202 """Standard location for the local repository"""
203 urlcache = FileCache(URL_CACHE_PATH)
204 """The local cache for URLs"""
205
211
213 service='park.services.fetch'
214 version='1.0'
219 try:
220 urllib.urlretrieve(self.url,self.localfile,self._reporthook)
221 except:
222 urlcache.remove(request.url)
223 raise
224 return None
226 if filesize>0:
227 self.total = int((filesize+blocksize-1)/blocksize)
228 else:
229 self.total = None
230 self.count = int(n*blocksize/1000)
231 self.units = 'kb'
232 self.ready()
234 return self.count,self.total,self.units
235