1
2 __all__ = ['PBS']
3
4
5 try:
6 from _torque import *
7 available = True
8 except Exception,exc:
9 available = False
10 torque_lib_error = str(exc)
11
12 -class PBS(object):
13 """
14 PBS batch queuing system.
15 """
16 @staticmethod
22
23 @staticmethod
25 """
26 Return the default server name.
27 """
28 return pbs_default()
29
31 """Exception in PBS control"""
32
33
34
52
61 __del__ = disconnect
62
63 - def submit(self, script=None, destination=None, **kw):
64 """
65 Submit a job to the queue.
66
67 Script is the command to run on the head node.
68 Destination is "[queue]" or None for the default queue.
69
70 Raises PBS.Error if there is a problem.
71 """
72 attrs = PBS._user_kws(kw)
73 job = pbs_submit(self.connect, attrs, script, destination, None)
74 if job is None: PBS._check_status(-1)
75 return job
76
77 - def alter(self, job=None, **kw):
78 """
79 Modify the parameters to the job.
80
81 Raises PBS.Error if there is a problem.
82 """
83 attrs = PBS._user_kws(kw)
84 status = pbs_alterjob(self.connect, job, attrs, None)
85 PBS._check_status(status)
86
87 - def run(self, job=None, location=None, sync=True):
88 """
89 Run the queued job.
90
91 Job is a name seq#.server.
92
93 Location is the name of a managed cluster, or None for default.
94
95 If sync is False, check the job and return immediately, otherwise
96 wait for the job to start.
97 """
98 if sync:
99 status = pbs_runjob(self.connect, job, location, None)
100 else:
101 status = pbs_asyrunjob(self.connect, job, location, None)
102 PBS._check_status(status)
103
110
111
112 @staticmethod
120
121 @staticmethod
123 """
124 Verify that the keywords are in the set of user keywords.
125 """
126 if any([k not in PBS.user_kw for k in kw.keys()]):
127 raise TypeError("expected keyword from PBS.user_kw")
128 return PBS._attr_list(kw)
129
130 @staticmethod
132 """
133 Convert an keyword set to an attribute list.
134 """
135 ptrs = [attrl(name=k, value=str(v), resource=None) for v in kw.items()]
136 if ptrs != []:
137 head = ptrs[0]
138 for p in ptrs[1:-1]:
139 head.next = p
140 p = head
141 ptrs[-1].next = None
142 else:
143 head = None
144 return head
145
146
147 user_kw = dict(
148 Execution_Time="job's execution time",
149 Account_Name="account string",
150 Checkpoint="checkpoint interval",
151 Error_Path="path name for the standard error of the job",
152 Group_List="list of group names under which the job may execute",
153 Hold_Types="hold types (must be 'u')",
154 Join_Path="True if standard error and standard output are merged",
155 Keep_Files="which output of the job is kept on the execution host",
156 Resource_List="value of a named resource",
157 Mail_Points="points at which the server will send mail about the job",
158 Mail_Users="list of users who would receive mail about the job",
159 Job_Name="job name",
160 Output_Path="path name for the standard output of the job",
161 Priority="priority of the job",
162 Rerunable="rerunable flag",
163 Shell_Path_List="path to the shell which will interprets the job script",
164 User_List="list of user names under which the job may execute",
165 Variable_List="list of environmental variables which are to be exported to the job",
166 depend="inter-job dependencies",
167 stagein="list of files to be staged-in before job execution",
168 stageout="list of files to be staged-out after job execution",
169 )
170
172 if not available:
173 print torque_lib_error
174 return
175
176 import time
177 srv = PBS()
178 job = srv.submit('echo.sh')
179 print "job",job
180 srv.run(job)
181 print "scheduled"
182 time.sleep(10)
183 print "disconnecting"
184 srv.disconnect()
185
186 if __name__ == "__main__": test()
187