1 from __future__ import with_statement
2 import threads
3 import time
6 """
7 Manage a set of active jobs.
8 """
11
14
17 """
18 Keep track of a set of jobs.
19 The set of jobs known by the server.
20 Server-side view of the fitting service.
21
22 Note that this needs to move into a transaction-based persistent
23 store so that server reboot does not terminate long running jobs.
24
25 This will presumably map to a torque job queue
26 """
28 self.lock = thread.allocate_lock()
29 self.jobs = []
30 self.id = 1
31
33 """
34 Return the specified job.
35
36 Raises KeyError if the job cannot be found.
37 """
38 with self.lock:
39 job = self._find_jobid(jobid)
40 return job
41
43 """
44 Remove the specified job id.
45
46 Raises KeyError if the job cannot be found.
47 """
48 with self.lock:
49 job = self._find_jobid(jobid)
50 self.jobs.remove(job)
51
53 """
54 Return a list of all job (id,name) matching the given status.
55 """
56 with self.lock:
57 return [(job.id,job.name)
58 for job in self.jobs
59 if job.status==status]
60
62 """
63 Return a job given the jobid or None if the job is not found.
64 """
65
66 for job in jobs:
67 if job.id == jobid: break
68 else:
69 raise KeyError("%s not in job queue"%jobid)
70 return job
71
73 """
74 Generate a new job id.
75 """
76 self.id += 1
77 return "J"+str(self.id)
78
79 - def stop(self, jobid):
80 """
81 Stop a job.
82
83 Raises KeyError if the job is not running.
84
85 This is likely triggered by the client as requested by
86 the user, but may also be triggered by the job itself
87 if it notices that it is failing.
88 """
89 with self.lock:
90 job = self._find_jobid(jobid)
91 job.stop()
92
94 """
95 Add a job to the job queue.
96 """
97 self.job.id = self._next_jobid()
98 with self.lock:
99 self.jobs.append(job)
100 return job.id
101
102 _queue = None
108
111 """
112 Stateful function to return the next available job id.
113 """
114 lock = threads.lock()
115 t0 = time.gmtime()
116 seq = -1
117
118 @staticmethod
120 """
121 Generate job sequence number.
122
123 Guaranteed to be increasing. Human parseable. Thread-safe.
124
125 Format is <date>T<time>:<seq>. The sequence is necessary to
126 allow for multiple jobs per second to be queued.
127 """
128 with JobID.lock:
129 t = time.gmtime()
130
131 if t == JobID.t0 and JobID.seq == -1:
132 time.sleep(1)
133 t = time.gmtime()
134 if t != JobID.t0:
135 JobID.seq = 0
136 JobID.t0 = t
137 else:
138 JobID.seq += 1
139 stamp = time.strftime("%Y%m%dT%H%M%S", t)
140 id = stamp + ":" + str(seq)
141 return id
142
143 @staticmethod
145 """
146 Returns number of seconds since job was queued.
147 """
148 (stamp,seq) = id.split(':')
149 t = time.strptime("%Y%m%dT%H%M%S", stamp)
150 return time.time()-t
151