Package reflectometry :: Package reduction :: Module nxstest

Source Code for Module reflectometry.reduction.nxstest

  1  #!/usr/bin/env python 
  2   
  3  # This program is public domain 
  4   
  5  # Author: Paul Kienzle 
  6   
  7  """ 
  8  NeXus tests converted to python. 
  9   
 10  python nxstest.py [-v -x] [hdf4] [hdf5] [xml] 
 11      Run tests 
 12   
 13      -v for verbose; shows structure and leaves files in place 
 14      -x for testing external linkage 
 15      hdf4 hdf5 xml 
 16          for running specific backend tests.  You can specify multiple 
 17          backends in a single run. Default is hdf5 
 18   
 19  python nxstest.py -d file 
 20      Show structure of a single file 
 21  """ 
 22   
 23  import nxs,os,numpy,sys 
 24   
25 -def memfootprint():
26 import gc 27 objs = gc.get_objects() 28 classes = set( c.__class__ for c in gc.get_objects() if hasattr(c,'__class__') ) 29 # print "\n".join([c.__name__ for c in classes]) 30 print "#objects=",len(objs) 31 print "#classes=",len(classes)
32
33 -def leak_test1(n = 1000, mode='w5'):
34 # import gc 35 # gc.enable() 36 # gc.set_debug(gc.DEBUG_LEAK) 37 filename = "leak_test1.nxs" 38 try: os.unlink(filename) 39 except OSError: pass 40 file = nxs.open(filename,mode) 41 file.close() 42 print "File should exist now" 43 for i in range(n): 44 if i%100 == 0: 45 print "loop count %d"%i 46 memfootprint() 47 file.open() 48 file.close() 49 # gc.collect() 50 os.unlink(filename)
51
52 -def show_structure(filename):
53 file = nxs.open(filename) 54 file.show()
55 56
57 -def populate(filename,mode):
58 i1 = numpy.arange(4,dtype='uint8') 59 i2 = numpy.arange(4,dtype='int16')*1000 60 i4 = numpy.arange(4,dtype='int32')*1000000 61 i8 = numpy.arange(4,dtype='int64')*1000000000000 62 r4 = numpy.arange(20,dtype='float32').reshape((5,4)) 63 r8 = numpy.arange(20,dtype='float64').reshape((5,4)) 64 comp_array=numpy.ones((100,20),dtype='int32') 65 for i in range(100): comp_array[i,:] *= i 66 67 file = nxs.open(filename,mode) 68 file.setnumberformat('float32','%9.3g') 69 file.makegroup("entry","NXentry") 70 file.opengroup("entry","NXentry") 71 file.putattr("hugo","namenlos") 72 file.putattr("cucumber","passion") 73 #file.putattr("embedded_null","embedded\000null") 74 75 # Write character data 76 file.makedata("ch_data",'char',[10]) 77 file.opendata("ch_data") 78 file.putdata("NeXus data") 79 file.closedata() 80 81 # Write numeric data 82 for var in ['i1','i2','i4','i8','r4']: 83 name = var+'_data' 84 val = locals()[var] 85 file.makedata(name,val.dtype,val.shape) 86 file.opendata(name) 87 file.putdata(val) 88 file.closedata() 89 90 # Write r8_data 91 file.makedata('r8_data','float64',[5,4]) 92 file.opendata('r8_data') 93 file.putslab(r8[4,:],[4,0],[1,4]) 94 file.putslab(r8[0:4,:],[0,0],[4,4]) 95 file.putattr("ch_attribute","NeXus") 96 file.putattr("i4_attribute",42,dtype='int32') 97 file.putattr("r4_attribute",3.14159265,dtype='float32') 98 ## Oops... NAPI doesn't support array attributes 99 #file.putattr("i4_array",[3,2],dtype='int32') 100 #file.putattr("r4_array",[3.14159265,2.718281828],dtype='float32') 101 dataID = file.getdataID() 102 file.closedata() 103 104 # Create the NXdata group 105 file.makegroup("data","NXdata") 106 file.opengroup("data","NXdata") 107 108 # .. demonstrate linking 109 file.makelink(dataID) 110 111 # .. demonstrate compressed data 112 file.compmakedata("comp_data",'int32',[100,20],'lzw',[20,20]) 113 file.opendata('comp_data') 114 file.putdata(comp_array) 115 file.closedata() 116 file.flush() 117 118 # .. demonstrate extensible data 119 file.makedata('flush_data','int32',[nxs.UNLIMITED]) 120 for i in range(7): 121 file.opendata('flush_data') 122 file.putslab(i,[i],[1]) 123 file.flush() 124 # Oops... why isn't there a closedata()? 125 file.closegroup() 126 127 # Create NXsample group 128 file.makegroup('sample','NXsample') 129 file.opengroup('sample','NXsample') 130 file.makedata('ch_data','char',[12]) 131 file.opendata('ch_data') 132 file.putdata('NeXus sample') 133 file.closedata() 134 sampleID = file.getgroupID() 135 file.closegroup() 136 file.closegroup() 137 138 # Demonstrate named links 139 file.makegroup('link','NXentry') 140 file.opengroup('link','NXentry') 141 file.makelink(sampleID) 142 file.makenamedlink('renLinkGroup',sampleID) 143 file.makenamedlink('renLinkData',dataID) 144 file.closegroup() 145 146 file.close() 147 return filename
148 149 failures = 0
150 -def fail(msg):
151 global failures 152 print "FAIL:",msg 153 failures += 1
154
155 -def dicteq(a,b):
156 """ 157 Compare two dictionaries printing how they differ. 158 """ 159 for k,v in a.iteritems(): 160 if k not in b: 161 print k,"not in",b 162 return False 163 if v != b[k]: 164 print v,"not equal",b[k] 165 return False 166 for k,v in b.iteritems(): 167 if k not in a: 168 print k,"not in",a 169 return False 170 return True
171
172 -def check(filename):
173 global failures 174 failures = 0 175 file = nxs.open(filename,'rw') 176 if filename != file.inquirefile(): fail("Files don't match") 177 attrs = file.getattrinfo() 178 if attrs != 4: fail("Expected 4 root attributes but got %d", attrs) 179 for i in range(attrs): 180 name,dims,type = file.getnextattr() 181 if name not in ['file_time','HDF_version','HDF5_Version','XML_version', 182 'NeXus_version','file_name']: 183 fail("attribute %s unexpected"%(name)) 184 185 file.opengroup('entry','NXentry') 186 187 expect = dict(hugo='namenlos',cucumber='passion') 188 #expect['embedded_null'] = "embedded\000null" 189 get = dict((k,v) for k,v in file.attrs()) 190 same = dicteq(get,expect) 191 if not same: fail("/entry attributes are %s"%(get)) 192 193 # Check that the numbers are written correctly 194 for name,dtype,shape,scale in \ 195 [('i1','int8',(4),1), 196 ('i2','int16',(4),1000), 197 ('i4','int32',(4),1000000), 198 ('i8','int64',(4),1000000000000), 199 ('r4','float32',(5,4),1), 200 ('r8','float64',(5,4),1) 201 ]: 202 n = numpy.prod(shape) 203 expected = numpy.arange(n,dtype=dtype).reshape(shape)*scale 204 file.opendata(name+'_data') 205 get = file.getdata() 206 file.closedata() 207 if not (get == expected).all(): 208 fail("%s retrieved %s"%(dtype,get)) 209 210 # Check attribute types 211 file.opendata('r8_data') 212 get = file.getattr("ch_attribute",5,'char') 213 if not get == "NeXus": fail("ch_attribute retrieved %s"%(get)) 214 get = file.getattr("i4_attribute",1,'int32') 215 if not get == numpy.int32(42): fail("i4_attribute retrieved %s"%(get)) 216 get = file.getattr("r4_attribute",1,'float32') 217 if not get == numpy.float32(3.14159265): fail("r4_attribute retrieved %s"%(get)) 218 ## Oops... NAPI doesn't support array attributes 219 #expect = numpy.array([3,2],dtype='int32') 220 #get = file.getattr("i4_array",2,'int32') 221 #if not (get==expect).all(): fail('i4_array retrieved %s'%(get)) 222 #expect = numpy.array([3.14159265,2.718281828],dtype='float32') 223 #get = file.getattr("r4_array",2,dtype='float32') 224 #if not (get==expect).all(): fail("r4_array retrieved %s"%(get)) 225 file.closedata() 226 227 228 # Check reading from compressed datasets 229 comp_array=numpy.ones((100,20),dtype='int32') 230 for i in range(100): comp_array[i,:] *= i 231 expected = comp_array 232 file.opengroup('data','NXdata') 233 file.opendata('comp_data') 234 get = file.getdata() 235 file.closedata() #/entry/data/comp_data 236 file.closegroup() #/entry/data 237 if not (get == expected).all(): 238 fail("compressed data differs") 239 print get 240 file.closegroup() #/entry 241 242 # Check read slab (e.g., from extensible) 243 244 # Check links 245 file.opengroup('entry','NXentry') 246 file.opengroup('sample','NXsample') 247 sampleid = file.getgroupID() 248 file.closegroup() #/entry/sample 249 file.opengroup('data','NXdata') #/entry/data 250 file.opendata('r8_data') #/entry/data/r8_data 251 dataid = file.getdataID() 252 file.closedata() #/entry/data/r8_data 253 file.closegroup() #/entry/data 254 file.opendata('r8_data') 255 data2id = file.getdataID() 256 file.closedata() 257 file.closegroup() #/entry 258 if not (file.sameID(dataid,data2id)): 259 fail("/entry/data/r8_data not linked to /entry/r8_data") 260 261 # Check openpath and getslab 262 file.openpath('/entry/data/comp_data') 263 get = file.getslab([4,4],[5,3]) 264 expected = comp_array[4:(4+5),4:(4+3)] 265 if not (get == expected).all(): 266 fail("retrieved compressed slabs differ") 267 print get 268 file.openpath('/entry/data/comp_data') 269 get = file.getslab([4,4],[5,3]) 270 expected = comp_array[4:(4+5),4:(4+3)] 271 if not (get == expected).all(): 272 fail("after reopen: retrieved compressed slabs differ") 273 print get 274 file.openpath('../r8_data') 275 for k,v in file.attrs(): 276 if k == 'target' and v != '/entry/r8_data': 277 fail("relative openpath was not successful") 278 279 return failures == 0
280
281 -def populate_external(filename,mode):
282 ext = dict(w5='.h5',w4='.hdf',wx='.xml')[mode] 283 file = nxs.open(filename,mode) 284 file.makegroup('entry1','NXentry') 285 file.linkexternal('entry1','NXentry','nxfile://data/dmc01'+ext) 286 file.makegroup('entry2','NXentry') 287 file.linkexternal('entry2','NXentry','nxfile://data/dmc02'+ext) 288 file.makegroup('entry3','NXentry') 289 file.close()
290
291 -def check_external(filename,mode):
292 ext = dict(w5='.h5',w4='.hdf',wx='.xml')[mode] 293 file = nxs.open(filename,'rw') 294 295 file.openpath('/entry1/start_time') 296 time = file.getdata() 297 298 get = file.inquirefile() 299 expected = 'nxfile://data/dmc01'+ext 300 if expected != get: fail("first external file returned %s"%(get)) 301 302 file.openpath('/entry2/sample/sample_name') 303 sample = file.getdata() 304 305 get = file.inquirefile() 306 expected = 'nxfile://data/dmc02'+ext 307 if expected != get: fail("second external file returned %s"%(get)) 308 309 file.openpath('/') 310 remote = file.isexternalgroup('entry1','NXentry') 311 if remote is None: 312 fail("failed to identify /entry1 as external") 313 remote = file.isexternalgroup('entry3','NXentry') 314 if remote is not None: 315 fail('incorrectly identified /entry3 as external') 316 317 file.close()
318
319 -def test_external(mode,quiet=True):
320 ext = dict(w5='.h5',w4='.hdf',wx='.xml')[mode] 321 filename = 'nxext'+ext 322 populate_external(external,mode) 323 if not quiet: 324 show_structure(external) 325 failures = check_external(filename,mode) 326 return failures
327
328 -def test_mode(mode,quiet=True,external=False):
329 ext = dict(w5='.h5',w4='.hdf',wx='.xml')[mode] 330 filename = 'NXtest'+ext 331 populate(filename,mode=mode) 332 if not quiet and 'NX_LOAD_PATH' in os.environ: 333 show_structure('dmc01'+ext) 334 if not quiet: 335 show_structure(filename) 336 failures = check(filename) 337 if external: failures += test_external(mode,quiet) 338 if quiet: 339 os.remove(filename) 340 return failures
341
342 -def test():
343 tests = 0 344 quiet = ('-v' not in sys.argv) 345 external = ('-x' in sys.argv) 346 if 'hdf4' in sys.argv: 347 test_mode('w4',quiet,external) 348 tests += 1 349 if 'xml' in sys.argv: 350 test_mode('wx',quiet,external) 351 tests += 1 352 if 'hdf5' in sys.argv: 353 test_mode('w5',quiet,external) 354 tests += 1 355 if tests == 0: test_mode('w5',quiet,external)
356 357 if __name__ == "__main__": 358 if len(sys.argv) == 3 and sys.argv[1]=='-d': 359 show_structure(sys.argv[2]) 360 else: 361 test() 362 #leak_test1(n=10000) 363