Package park :: Package core :: Module jobqueue

Source Code for Module park.core.jobqueue

  1  from __future__ import with_statement 
  2  import threads 
  3  import time 
4 5 -class JobQueue(object):
6 """ 7 Manage a set of active jobs. 8 """
9 - def jobs(self):
10 return self.active
11
12 - def submit(self, request):
13 jobid = JobID.next()
14
15 16 -class JobManager(object):
17 """ 18 Keep track of a set of jobs. 19 The set of jobs known by the server. 20 Server-side view of the fitting service. 21 22 Note that this needs to move into a transaction-based persistent 23 store so that server reboot does not terminate long running jobs. 24 25 This will presumably map to a torque job queue 26 """
27 - def __init__(self):
28 self.lock = thread.allocate_lock() 29 self.jobs = [] 30 self.id = 1
31
32 - def __getitem__(self, jobid):
33 """ 34 Return the specified job. 35 36 Raises KeyError if the job cannot be found. 37 """ 38 with self.lock: 39 job = self._find_jobid(jobid) 40 return job
41
42 - def __delitem__(self, jobid):
43 """ 44 Remove the specified job id. 45 46 Raises KeyError if the job cannot be found. 47 """ 48 with self.lock: 49 job = self._find_jobid(jobid) 50 self.jobs.remove(job)
51
52 - def find_jobs(self, status):
53 """ 54 Return a list of all job (id,name) matching the given status. 55 """ 56 with self.lock: 57 return [(job.id,job.name) 58 for job in self.jobs 59 if job.status==status]
60
61 - def _find_jobid(self, jobid):
62 """ 63 Return a job given the jobid or None if the job is not found. 64 """ 65 # Internal function...assume self is already locked 66 for job in jobs: 67 if job.id == jobid: break 68 else: 69 raise KeyError("%s not in job queue"%jobid) 70 return job
71
72 - def _next_jobid(self):
73 """ 74 Generate a new job id. 75 """ 76 self.id += 1 77 return "J"+str(self.id)
78
79 - def stop(self, jobid):
80 """ 81 Stop a job. 82 83 Raises KeyError if the job is not running. 84 85 This is likely triggered by the client as requested by 86 the user, but may also be triggered by the job itself 87 if it notices that it is failing. 88 """ 89 with self.lock: 90 job = self._find_jobid(jobid) 91 job.stop()
92
93 - def start(self, job):
94 """ 95 Add a job to the job queue. 96 """ 97 self.job.id = self._next_jobid() 98 with self.lock: 99 self.jobs.append(job) 100 return job.id
101 102 _queue = None
103 -def start_queue():
104 global _queue 105 if _queue == None: 106 _queue = JobManager() 107 return _queue
108
109 110 -class JobID(object):
111 """ 112 Stateful function to return the next available job id. 113 """ 114 lock = threads.lock() 115 t0 = time.gmtime() 116 seq = -1 117 118 @staticmethod
119 - def next():
120 """ 121 Generate job sequence number. 122 123 Guaranteed to be increasing. Human parseable. Thread-safe. 124 125 Format is <date>T<time>:<seq>. The sequence is necessary to 126 allow for multiple jobs per second to be queued. 127 """ 128 with JobID.lock: 129 t = time.gmtime() 130 # Protect against quick restart overwriting sequence num 131 if t == JobID.t0 and JobID.seq == -1: 132 time.sleep(1) 133 t = time.gmtime() 134 if t != JobID.t0: 135 JobID.seq = 0 136 JobID.t0 = t 137 else: 138 JobID.seq += 1 139 stamp = time.strftime("%Y%m%dT%H%M%S", t) 140 id = stamp + ":" + str(seq) 141 return id
142 143 @staticmethod
144 - def age(id):
145 """ 146 Returns number of seconds since job was queued. 147 """ 148 (stamp,seq) = id.split(':') 149 t = time.strptime("%Y%m%dT%H%M%S", stamp) 150 return time.time()-t
151