Package park :: Package util :: Module torque

Source Code for Module park.util.torque

  1   
  2  __all__ = ['PBS'] 
  3   
  4  # Delay error until trying to establish a connection. 
  5  try: 
  6      from _torque import * 
  7      available = True 
  8  except Exception,exc: 
  9      available = False 
 10      torque_lib_error = str(exc) 
11 12 -class PBS(object):
13 """ 14 PBS batch queuing system. 15 """ 16 @staticmethod
17 - def max_connections():
18 """ 19 Returns the maximum number of allowed connections. 20 """ 21 return pbs_query_max_connections()
22 23 @staticmethod
24 - def default():
25 """ 26 Return the default server name. 27 """ 28 return pbs_default()
29
30 - class Error(Exception):
31 """Exception in PBS control"""
32 # Note: can override the exception to use a standard queue 33 # exception by setting PBS.Error = Queue.Error outside. 34
35 - def __init__(self, server=None):
36 """ 37 Connect to the server host:port. 38 39 Host defaults to the environment variable PBS_DEFAULT. 40 If this is not defined, use PBS_DIR/default_destn. 41 42 Port defaults to 15001. 43 44 Raises PBS.Error if there is an error. 45 """ 46 self.connect = 0 47 if not available: 48 raise PBS.Error(torque_lib_error) 49 self.server = server 50 self.connect = pbs_connect(server) 51 PBS._check_status(self.connect)
52
53 - def disconnect(self):
54 """ 55 Disconnect from the server. 56 """ 57 if self.connect > 0: 58 status = pbs_disconnect(self.connect) 59 self.connect = -1 60 PBS._check_status(status)
61 __del__ = disconnect 62
63 - def submit(self, script=None, destination=None, **kw):
64 """ 65 Submit a job to the queue. 66 67 Script is the command to run on the head node. 68 Destination is "[queue]" or None for the default queue. 69 70 Raises PBS.Error if there is a problem. 71 """ 72 attrs = PBS._user_kws(kw) 73 job = pbs_submit(self.connect, attrs, script, destination, None) 74 if job is None: PBS._check_status(-1) # Fake an error code 75 return job
76
77 - def alter(self, job=None, **kw):
78 """ 79 Modify the parameters to the job. 80 81 Raises PBS.Error if there is a problem. 82 """ 83 attrs = PBS._user_kws(kw) 84 status = pbs_alterjob(self.connect, job, attrs, None) 85 PBS._check_status(status)
86
87 - def run(self, job=None, location=None, sync=True):
88 """ 89 Run the queued job. 90 91 Job is a name seq#.server. 92 93 Location is the name of a managed cluster, or None for default. 94 95 If sync is False, check the job and return immediately, otherwise 96 wait for the job to start. 97 """ 98 if sync: 99 status = pbs_runjob(self.connect, job, location, None) 100 else: 101 status = pbs_asyrunjob(self.connect, job, location, None) 102 PBS._check_status(status)
103
104 - def remove(self, job=None):
105 """ 106 Remove a job from the queue. 107 """ 108 status = pbs_deljob(self.connect, job, None) 109 PBS._check_status(status)
110 111 112 @staticmethod
113 - def _check_status(status):
114 """ 115 Check the status return and convert to an exception. 116 """ 117 if status < 0: 118 err = pbs_strerror(pbs_errno()) 119 raise PBS.Error(err)
120 121 @staticmethod
122 - def _user_kws(kw):
123 """ 124 Verify that the keywords are in the set of user keywords. 125 """ 126 if any([k not in PBS.user_kw for k in kw.keys()]): 127 raise TypeError("expected keyword from PBS.user_kw") 128 return PBS._attr_list(kw)
129 130 @staticmethod
131 - def _attr_list(kw):
132 """ 133 Convert an keyword set to an attribute list. 134 """ 135 ptrs = [attrl(name=k, value=str(v), resource=None) for v in kw.items()] 136 if ptrs != []: 137 head = ptrs[0] 138 for p in ptrs[1:-1]: 139 head.next = p 140 p = head 141 ptrs[-1].next = None 142 else: 143 head = None 144 return head
145 146 # Allowed keywords for submit, alterjob 147 user_kw = dict( 148 Execution_Time="job's execution time", 149 Account_Name="account string", 150 Checkpoint="checkpoint interval", 151 Error_Path="path name for the standard error of the job", 152 Group_List="list of group names under which the job may execute", 153 Hold_Types="hold types (must be 'u')", 154 Join_Path="True if standard error and standard output are merged", 155 Keep_Files="which output of the job is kept on the execution host", 156 Resource_List="value of a named resource", 157 Mail_Points="points at which the server will send mail about the job", 158 Mail_Users="list of users who would receive mail about the job", 159 Job_Name="job name", 160 Output_Path="path name for the standard output of the job", 161 Priority="priority of the job", 162 Rerunable="rerunable flag", 163 Shell_Path_List="path to the shell which will interprets the job script", 164 User_List="list of user names under which the job may execute", 165 Variable_List="list of environmental variables which are to be exported to the job", 166 depend="inter-job dependencies", 167 stagein="list of files to be staged-in before job execution", 168 stageout="list of files to be staged-out after job execution", 169 )
170
171 -def test():
172 if not available: 173 print torque_lib_error 174 return 175 176 import time 177 srv = PBS() 178 job = srv.submit('echo.sh') 179 print "job",job 180 srv.run(job) 181 print "scheduled" 182 time.sleep(10) 183 print "disconnecting" 184 srv.disconnect()
185 186 if __name__ == "__main__": test() 187