Package reflectometry :: Package reduction :: Module registry

Source Code for Module reflectometry.reduction.registry

  1  # This program is public domain 
  2  """ 
  3  File extension registry. 
  4   
  5  This provides routines for opening files based on extension, 
  6  and registers the built-in file extensions. 
  7  """ 
  8   
  9  import os.path 
 10   
11 -class ExtensionRegistry(object):
12 """ 13 Associate a file loader with an extension. 14 15 Note that there may be multiple loaders for the same extension. 16 17 Example 18 ======= 19 20 registry = ExtensionRegistry() 21 22 # Add an association by setting an element 23 registry['.zip'] = unzip 24 25 # Multiple extensions for one loader 26 registry['.tgz'] = untar 27 registry['.tar.gz'] = untar 28 29 # Generic extensions to use after trying more specific extensions; 30 # these will be checked after the more specific extensions fail. 31 registry['.gz'] = gunzip 32 33 # Multiple loaders for one extension 34 registry['.cx'] = cx1 35 registry['.cx'] = cx2 36 registry['.cx'] = cx3 37 38 # Show registered extensions 39 print registry.extensions() 40 41 # Can also register a format name for explicit control from caller 42 registry['cx3'] = cx3 43 print registry.formats() 44 45 # Retrieve loaders for a file name 46 registry.lookup('hello.cx') -> [cx3,cx2,cx1] 47 48 # Run loader on a filename 49 registry.load('hello.cx') -> 50 try: 51 return cx3('hello.cx') 52 except: 53 try: 54 return cx2('hello.cx') 55 except: 56 return cx1('hello.cx') 57 58 # Load in a specific format ignoring extension 59 registry.load('hello.cx',format='cx3') -> 60 return cx3('hello.cx') 61 """
62 - def __init__(self):
63 self.loaders = {}
64 - def __setitem__(self, ext, loader):
65 if ext not in self.loaders: 66 self.loaders[ext] = [] 67 self.loaders[ext].insert(0,loader)
68 - def __getitem__(self, ext):
69 return self.loaders[ext]
70 - def __contains__(self, ext):
71 return ext in self.loaders
72 - def formats(self):
73 """ 74 Return a sorted list of the registered formats. 75 """ 76 names = [a for a in self.loaders.keys() if not a.startswith('.')] 77 names.sort() 78 return names
79 - def extensions(self):
80 """ 81 Return a sorted list of registered extensions. 82 """ 83 exts = [a for a in self.loaders.keys() if a.startswith('.')] 84 exts.sort() 85 return exts
86 - def lookup(self, path):
87 """ 88 Return the loader associated with the file type of path. 89 90 Raises ValueError if file type is not known. 91 """ 92 # Find matching extensions 93 extlist = [ext for ext in self.extensions() if path.endswith(ext)] 94 # Sort matching extensions by decreasing order of length 95 extlist.sort(lambda a,b: len(a)<len(b)) 96 # Combine loaders for matching extensions into one big list 97 loaders = [] 98 for L in [self.loaders[ext] for ext in extlist]: 99 loaders.extend(L) 100 # Remove duplicates if they exist 101 if len(loaders) != len(set(loaders)): 102 result = [] 103 for L in loaders: 104 if L not in result: result.append(L) 105 loaders = L 106 # Raise an error if there are no matching extensions 107 if len(loaders) == 0: 108 raise ValueError, "Unknown file type for "+path 109 # All done 110 return loaders
111 - def load(self, path, format=None):
112 """ 113 Call the loader for the file type of path. 114 115 Raises ValueError if no loader is available. 116 Raises KeyError if format is not available. 117 May raise a loader-defined exception if loader fails. 118 """ 119 if format is None: 120 loaders = self.lookup(path) 121 else: 122 loaders = self.loaders[format] 123 for fn in loaders: 124 try: 125 return fn(path) 126 except: 127 pass # give other loaders a chance to succeed 128 # If we get here it is because all loaders failed 129 raise # reraises last exception
130
131 -def test():
132 reg = ExtensionRegistry() 133 class CxError(Exception): pass 134 def cx(file): return 'cx' 135 def new_cx(file): return 'new_cx' 136 def fail_cx(file): raise CxError 137 def cat(file): return 'cat' 138 def gunzip(file): return 'gunzip' 139 reg['.cx'] = cx 140 reg['.cx1'] = cx 141 reg['.cx'] = new_cx 142 reg['.gz'] = gunzip 143 reg['.cx.gz'] = new_cx 144 reg['.cx1.gz'] = fail_cx 145 reg['.cx1'] = fail_cx 146 reg['.cx2'] = fail_cx 147 reg['new_cx'] = new_cx 148 149 # Two loaders associated with .cx 150 assert reg.lookup('hello.cx') == [new_cx,cx] 151 # Make sure the last loader applies first 152 assert reg.load('hello.cx') == 'new_cx' 153 # Make sure the next loader applies if the first fails 154 assert reg.load('hello.cx1') == 'cx' 155 # Make sure the format override works 156 assert reg.load('hello.cx1',format='.cx.gz') == 'new_cx' 157 # Make sure the format override works 158 assert reg.load('hello.cx1',format='new_cx') == 'new_cx' 159 # Make sure the case of all loaders failing is correct 160 try: reg.load('hello.cx2') 161 except CxError: pass # correct failure 162 else: raise AssertError,"Incorrect error on load failure" 163 # Make sure the case of no loaders fails correctly 164 try: reg.load('hello.missing') 165 except ValueError,msg: 166 assert str(msg)=="Unknown file type for hello.missing",'Message: <%s>'%(msg) 167 else: raise AssertError,"No error raised for missing extension" 168 assert reg.formats() == ['new_cx'] 169 assert reg.extensions() == ['.cx','.cx.gz','.cx1','.cx1.gz','.cx2','.gz'] 170 # make sure that it supports multiple '.' in filename 171 assert reg.load('hello.extra.cx1') == 'cx' 172 assert reg.load('hello.gz') == 'gunzip' 173 assert reg.load('hello.cx1.gz') == 'gunzip' # Since .cx1.gz fails
174 175 if __name__ == "__main__": test() 176