* Job Queue:

- Replace some locks with concurrent
      - Change job ID to a long so it won't wrap
      - Remove some unused stats
      - Java 5 and debug cleanup
This commit is contained in:
zzz
2010-03-09 17:32:29 +00:00
parent 78a965dc90
commit 05f2a62cbb
4 changed files with 135 additions and 119 deletions

View File

@ -19,7 +19,7 @@ public interface Job {
*/ */
public String getName(); public String getName();
/** unique id */ /** unique id */
public int getJobId(); public long getJobId();
/** /**
* Timing criteria for the task * Timing criteria for the task
*/ */

View File

@ -15,8 +15,8 @@ import net.i2p.util.Log;
public abstract class JobImpl implements Job { public abstract class JobImpl implements Job {
private RouterContext _context; private RouterContext _context;
private JobTiming _timing; private JobTiming _timing;
private static int _idSrc = 0; private static long _idSrc = 0;
private int _id; private long _id;
private Exception _addedBy; private Exception _addedBy;
private long _madeReadyOn; private long _madeReadyOn;
@ -28,7 +28,7 @@ public abstract class JobImpl implements Job {
_madeReadyOn = 0; _madeReadyOn = 0;
} }
public int getJobId() { return _id; } public long getJobId() { return _id; }
public JobTiming getTiming() { return _timing; } public JobTiming getTiming() { return _timing; }
public final RouterContext getContext() { return _context; } public final RouterContext getContext() { return _context; }

View File

@ -12,10 +12,14 @@ import java.io.IOException;
import java.io.Writer; import java.io.Writer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.data.DataHelper; import net.i2p.data.DataHelper;
import net.i2p.router.networkdb.HandleDatabaseLookupMessageJob; import net.i2p.router.networkdb.HandleDatabaseLookupMessageJob;
@ -33,15 +37,15 @@ public class JobQueue {
private RouterContext _context; private RouterContext _context;
/** Integer (runnerId) to JobQueueRunner for created runners */ /** Integer (runnerId) to JobQueueRunner for created runners */
private final HashMap _queueRunners; private final Map<Integer, JobQueueRunner> _queueRunners;
/** a counter to identify a job runner */ /** a counter to identify a job runner */
private volatile static int _runnerId = 0; private volatile static int _runnerId = 0;
/** list of jobs that are ready to run ASAP */ /** list of jobs that are ready to run ASAP */
private ArrayList _readyJobs; private BlockingQueue<Job> _readyJobs;
/** list of jobs that are scheduled for running in the future */ /** list of jobs that are scheduled for running in the future */
private ArrayList _timedJobs; private List<Job> _timedJobs;
/** job name to JobStat for that job */ /** job name to JobStat for that job */
private final SortedMap _jobStats; private final Map<String, JobStats> _jobStats;
/** how many job queue runners can go concurrently */ /** how many job queue runners can go concurrently */
private int _maxRunners = 1; private int _maxRunners = 1;
private QueuePumper _pumper; private QueuePumper _pumper;
@ -52,9 +56,12 @@ public class JobQueue {
private final Object _jobLock; private final Object _jobLock;
/** how many when we go parallel */
private static final int RUNNERS = 4;
/** default max # job queue runners operating */ /** default max # job queue runners operating */
private final static int DEFAULT_MAX_RUNNERS = 1; private final static int DEFAULT_MAX_RUNNERS = 1;
/** router.config parameter to override the max runners */ /** router.config parameter to override the max runners @deprecated unimplemented */
private final static String PROP_MAX_RUNNERS = "router.maxJobRunners"; private final static String PROP_MAX_RUNNERS = "router.maxJobRunners";
/** how frequently should we check and update the max runners */ /** how frequently should we check and update the max runners */
@ -63,31 +70,37 @@ public class JobQueue {
/** if a job is this lagged, spit out a warning, but keep going */ /** if a job is this lagged, spit out a warning, but keep going */
private long _lagWarning = DEFAULT_LAG_WARNING; private long _lagWarning = DEFAULT_LAG_WARNING;
private final static long DEFAULT_LAG_WARNING = 5*1000; private final static long DEFAULT_LAG_WARNING = 5*1000;
/** @deprecated unimplemented */
private final static String PROP_LAG_WARNING = "router.jobLagWarning"; private final static String PROP_LAG_WARNING = "router.jobLagWarning";
/** if a job is this lagged, the router is hosed, so shut it down */ /** if a job is this lagged, the router is hosed, so spit out a warning (dont shut it down) */
private long _lagFatal = DEFAULT_LAG_FATAL; private long _lagFatal = DEFAULT_LAG_FATAL;
private final static long DEFAULT_LAG_FATAL = 30*1000; private final static long DEFAULT_LAG_FATAL = 30*1000;
/** @deprecated unimplemented */
private final static String PROP_LAG_FATAL = "router.jobLagFatal"; private final static String PROP_LAG_FATAL = "router.jobLagFatal";
/** if a job takes this long to run, spit out a warning, but keep going */ /** if a job takes this long to run, spit out a warning, but keep going */
private long _runWarning = DEFAULT_RUN_WARNING; private long _runWarning = DEFAULT_RUN_WARNING;
private final static long DEFAULT_RUN_WARNING = 5*1000; private final static long DEFAULT_RUN_WARNING = 5*1000;
/** @deprecated unimplemented */
private final static String PROP_RUN_WARNING = "router.jobRunWarning"; private final static String PROP_RUN_WARNING = "router.jobRunWarning";
/** if a job takes this long to run, the router is hosed, so shut it down */ /** if a job takes this long to run, the router is hosed, so spit out a warning (dont shut it down) */
private long _runFatal = DEFAULT_RUN_FATAL; private long _runFatal = DEFAULT_RUN_FATAL;
private final static long DEFAULT_RUN_FATAL = 30*1000; private final static long DEFAULT_RUN_FATAL = 30*1000;
/** @deprecated unimplemented */
private final static String PROP_RUN_FATAL = "router.jobRunFatal"; private final static String PROP_RUN_FATAL = "router.jobRunFatal";
/** don't enforce fatal limits until the router has been up for this long */ /** don't enforce fatal limits until the router has been up for this long */
private long _warmupTime = DEFAULT_WARMUP_TIME; private long _warmupTime = DEFAULT_WARMUP_TIME;
private final static long DEFAULT_WARMUP_TIME = 10*60*1000; private final static long DEFAULT_WARMUP_TIME = 10*60*1000;
private final static String PROP_WARMUM_TIME = "router.jobWarmupTime"; /** @deprecated unimplemented */
private final static String PROP_WARMUP_TIME = "router.jobWarmupTime";
/** max ready and waiting jobs before we start dropping 'em */ /** max ready and waiting jobs before we start dropping 'em */
private int _maxWaitingJobs = DEFAULT_MAX_WAITING_JOBS; private int _maxWaitingJobs = DEFAULT_MAX_WAITING_JOBS;
private final static int DEFAULT_MAX_WAITING_JOBS = 100; private final static int DEFAULT_MAX_WAITING_JOBS = 100;
/** @deprecated unimplemented */
private final static String PROP_MAX_WAITING_JOBS = "router.maxWaitingJobs"; private final static String PROP_MAX_WAITING_JOBS = "router.maxWaitingJobs";
/** /**
@ -109,16 +122,14 @@ public class JobQueue {
new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
_alive = true; _alive = true;
_readyJobs = new ArrayList(16); _readyJobs = new LinkedBlockingQueue();
_timedJobs = new ArrayList(64); _timedJobs = new ArrayList(64);
_jobLock = new Object(); _jobLock = new Object();
_queueRunners = new HashMap(); _queueRunners = new ConcurrentHashMap(RUNNERS);
_jobStats = Collections.synchronizedSortedMap(new TreeMap()); _jobStats = new ConcurrentHashMap();
_allowParallelOperation = false; _allowParallelOperation = false;
_pumper = new QueuePumper(); _pumper = new QueuePumper();
I2PThread pumperThread = new I2PThread(_pumper); I2PThread pumperThread = new I2PThread(_pumper, "Job Queue Pumper", true);
pumperThread.setDaemon(true);
pumperThread.setName("QueuePumper");
//pumperThread.setPriority(I2PThread.NORM_PRIORITY+1); //pumperThread.setPriority(I2PThread.NORM_PRIORITY+1);
pumperThread.start(); pumperThread.start();
} }
@ -128,7 +139,7 @@ public class JobQueue {
* *
*/ */
public void addJob(Job job) { public void addJob(Job job) {
if (job == null) return; if (job == null || !_alive) return;
if (job instanceof JobImpl) if (job instanceof JobImpl)
((JobImpl)job).addedToQueue(); ((JobImpl)job).addedToQueue();
@ -136,6 +147,7 @@ public class JobQueue {
long numReady = 0; long numReady = 0;
boolean alreadyExists = false; boolean alreadyExists = false;
boolean dropped = false; boolean dropped = false;
// getNext() is now outside the jobLock, is that ok?
synchronized (_jobLock) { synchronized (_jobLock) {
if (_readyJobs.contains(job)) if (_readyJobs.contains(job))
alreadyExists = true; alreadyExists = true;
@ -155,7 +167,7 @@ public class JobQueue {
job.getTiming().setStartAfter(_context.clock().now()); job.getTiming().setStartAfter(_context.clock().now());
if (job instanceof JobImpl) if (job instanceof JobImpl)
((JobImpl)job).madeReady(); ((JobImpl)job).madeReady();
_readyJobs.add(job); _readyJobs.offer(job);
} else { } else {
_timedJobs.add(job); _timedJobs.add(job);
} }
@ -167,12 +179,10 @@ public class JobQueue {
_context.statManager().addRateData("jobQueue.readyJobs", numReady, 0); _context.statManager().addRateData("jobQueue.readyJobs", numReady, 0);
if (dropped) { if (dropped) {
_context.statManager().addRateData("jobQueue.droppedJobs", 1, 1); _context.statManager().addRateData("jobQueue.droppedJobs", 1, 1);
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.ERROR))
_log.warn("Dropping job due to overload! # ready jobs: " _log.error("Dropping job due to overload! # ready jobs: "
+ numReady + ": job = " + job); + numReady + ": job = " + job);
} }
return;
} }
public void removeJob(Job job) { public void removeJob(Job job) {
@ -189,17 +199,15 @@ public class JobQueue {
} }
public int getReadyCount() { public int getReadyCount() {
synchronized (_jobLock) {
return _readyJobs.size(); return _readyJobs.size();
}
} }
public long getMaxLag() { public long getMaxLag() {
synchronized (_jobLock) { Job j = _readyJobs.peek();
if (_readyJobs.size() <= 0) return 0; if (j == null) return 0;
// first job is the one that has been waiting the longest // first job is the one that has been waiting the longest
long startAfter = ((Job)_readyJobs.get(0)).getTiming().getStartAfter(); long startAfter = j.getTiming().getStartAfter();
return _context.clock().now() - startAfter; return _context.clock().now() - startAfter;
}
} }
/** /**
@ -228,9 +236,10 @@ public class JobQueue {
public void allowParallelOperation() { public void allowParallelOperation() {
_allowParallelOperation = true; _allowParallelOperation = true;
runQueue(4); runQueue(RUNNERS);
} }
/** @deprecated do you really want to do this? */
public void restart() { public void restart() {
synchronized (_jobLock) { synchronized (_jobLock) {
_timedJobs.clear(); _timedJobs.clear();
@ -241,14 +250,21 @@ public class JobQueue {
void shutdown() { void shutdown() {
_alive = false; _alive = false;
synchronized (_jobLock) { _timedJobs.clear();
_jobLock.notifyAll(); _readyJobs.clear();
} // The JobQueueRunners are NOT daemons,
// so they must be stopped.
Job poison = new PoisonJob();
for (int i = 0; i < _queueRunners.size(); i++)
_readyJobs.offer(poison);
/********
if (_log.shouldLog(Log.WARN)) { if (_log.shouldLog(Log.WARN)) {
StringBuilder buf = new StringBuilder(1024); StringBuilder buf = new StringBuilder(1024);
buf.append("current jobs: \n"); buf.append("current jobs: \n");
for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) { for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) {
JobQueueRunner runner = (JobQueueRunner)iter.next(); JobQueueRunner runner = iter.next();
Job j = runner.getCurrentJob(); Job j = runner.getCurrentJob();
buf.append("Runner ").append(runner.getRunnerId()).append(": "); buf.append("Runner ").append(runner.getRunnerId()).append(": ");
@ -279,7 +295,9 @@ public class JobQueue {
buf.append(_timedJobs.get(i).toString()).append("\n\t"); buf.append(_timedJobs.get(i).toString()).append("\n\t");
_log.log(Log.WARN, buf.toString()); _log.log(Log.WARN, buf.toString());
} }
********/
} }
boolean isAlive() { return _alive; } boolean isAlive() { return _alive; }
/** /**
@ -287,9 +305,8 @@ public class JobQueue {
*/ */
public long getLastJobBegin() { public long getLastJobBegin() {
long when = -1; long when = -1;
// not synchronized, so might b0rk if the runners are changed for (JobQueueRunner runner : _queueRunners.values()) {
for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) { long cur = runner.getLastBegin();
long cur = ((JobQueueRunner)iter.next()).getLastBegin();
if (cur > when) if (cur > when)
cur = when; cur = when;
} }
@ -300,9 +317,8 @@ public class JobQueue {
*/ */
public long getLastJobEnd() { public long getLastJobEnd() {
long when = -1; long when = -1;
// not synchronized, so might b0rk if the runners are changed for (JobQueueRunner runner : _queueRunners.values()) {
for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) { long cur = runner.getLastEnd();
long cur = ((JobQueueRunner)iter.next()).getLastEnd();
if (cur > when) if (cur > when)
cur = when; cur = when;
} }
@ -315,9 +331,7 @@ public class JobQueue {
public Job getLastJob() { public Job getLastJob() {
Job j = null; Job j = null;
long when = -1; long when = -1;
// not synchronized, so might b0rk if the runners are changed for (JobQueueRunner cur : _queueRunners.values()) {
for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) {
JobQueueRunner cur = (JobQueueRunner)iter.next();
if (cur.getLastBegin() > when) { if (cur.getLastBegin() > when) {
j = cur.getCurrentJob(); j = cur.getCurrentJob();
when = cur.getLastBegin(); when = cur.getLastBegin();
@ -333,13 +347,10 @@ public class JobQueue {
Job getNext() { Job getNext() {
while (_alive) { while (_alive) {
try { try {
synchronized (_jobLock) { Job j = _readyJobs.take();
if (_readyJobs.size() > 0) { if (j.getJobId() == POISON_ID)
return (Job)_readyJobs.remove(0); break;
} else { return j;
_jobLock.wait();
}
}
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {}
} }
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
@ -355,8 +366,7 @@ public class JobQueue {
* the current job. * the current job.
* *
*/ */
public void runQueue(int numThreads) { public synchronized void runQueue(int numThreads) {
synchronized (_queueRunners) {
// we're still starting up [serially] and we've got at least one runner, // we're still starting up [serially] and we've got at least one runner,
// so dont do anything // so dont do anything
if ( (_queueRunners.size() > 0) && (!_allowParallelOperation) ) return; if ( (_queueRunners.size() > 0) && (!_allowParallelOperation) ) return;
@ -377,8 +387,7 @@ public class JobQueue {
t.start(); t.start();
} }
} else if (_queueRunners.size() == numThreads) { } else if (_queueRunners.size() == numThreads) {
for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) { for (JobQueueRunner runner : _queueRunners.values()) {
JobQueueRunner runner = (JobQueueRunner)iter.next();
runner.startRunning(); runner.startRunning();
} }
} else { // numThreads < # runners, so shrink } else { // numThreads < # runners, so shrink
@ -387,7 +396,6 @@ public class JobQueue {
// runner.stopRunning(); // runner.stopRunning();
//} //}
} }
}
} }
void removeRunner(int id) { _queueRunners.remove(Integer.valueOf(id)); } void removeRunner(int id) { _queueRunners.remove(Integer.valueOf(id)); }
@ -407,11 +415,11 @@ public class JobQueue {
while (_alive) { while (_alive) {
long now = _context.clock().now(); long now = _context.clock().now();
long timeToWait = -1; long timeToWait = -1;
ArrayList toAdd = null; List<Job> toAdd = null;
try { try {
synchronized (_jobLock) { synchronized (_jobLock) {
for (int i = 0; i < _timedJobs.size(); i++) { for (int i = 0; i < _timedJobs.size(); i++) {
Job j = (Job)_timedJobs.get(i); Job j = _timedJobs.get(i);
// find jobs due to start before now // find jobs due to start before now
long timeLeft = j.getTiming().getStartAfter() - now; long timeLeft = j.getTiming().getStartAfter() - now;
if (timeLeft <= 0) { if (timeLeft <= 0) {
@ -437,7 +445,7 @@ public class JobQueue {
// extra alloc. (no, i'm not just being insane - i'm updating this based // extra alloc. (no, i'm not just being insane - i'm updating this based
// on some profiling data ;) // on some profiling data ;)
for (int i = 0; i < toAdd.size(); i++) for (int i = 0; i < toAdd.size(); i++)
_readyJobs.add(toAdd.get(i)); _readyJobs.offer(toAdd.get(i));
_jobLock.notifyAll(); _jobLock.notifyAll();
} else { } else {
if (timeToWait < 0) if (timeToWait < 0)
@ -476,17 +484,15 @@ public class JobQueue {
private void updateJobTimings(long delta) { private void updateJobTimings(long delta) {
synchronized (_jobLock) { synchronized (_jobLock) {
for (int i = 0; i < _timedJobs.size(); i++) { for (int i = 0; i < _timedJobs.size(); i++) {
Job j = (Job)_timedJobs.get(i); Job j = _timedJobs.get(i);
j.getTiming().offsetChanged(delta); j.getTiming().offsetChanged(delta);
} }
for (int i = 0; i < _readyJobs.size(); i++) { for (Job j : _readyJobs) {
Job j = (Job)_readyJobs.get(i);
j.getTiming().offsetChanged(delta); j.getTiming().offsetChanged(delta);
} }
} }
synchronized (_runnerLock) { synchronized (_runnerLock) {
for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) { for (JobQueueRunner runner : _queueRunners.values()) {
JobQueueRunner runner = (JobQueueRunner)iter.next();
Job job = runner.getCurrentJob(); Job job = runner.getCurrentJob();
if (job != null) if (job != null)
job.getTiming().offsetChanged(delta); job.getTiming().offsetChanged(delta);
@ -509,14 +515,14 @@ public class JobQueue {
if (lag < 0) lag = 0; if (lag < 0) lag = 0;
if (duration < 0) duration = 0; if (duration < 0) duration = 0;
JobStats stats = null; JobStats stats = _jobStats.get(key);
if (!_jobStats.containsKey(key)) { if (stats == null) {
_jobStats.put(key, new JobStats(key)); stats = new JobStats(key);
_jobStats.put(key, stats);
// yes, if two runners finish the same job at the same time, this could // yes, if two runners finish the same job at the same time, this could
// create an extra object. but, who cares, its pushed out of the map // create an extra object. but, who cares, its pushed out of the map
// immediately anyway. // immediately anyway.
} }
stats = (JobStats)_jobStats.get(key);
stats.jobRan(duration, lag); stats.jobRan(duration, lag);
String dieMsg = null; String dieMsg = null;
@ -555,26 +561,39 @@ public class JobQueue {
} }
/** job ID counter changed from int to long so it won't wrap negative */
private static final int POISON_ID = -99999;
private static class PoisonJob implements Job {
public String getName() { return null; }
public long getJobId() { return POISON_ID; }
public JobTiming getTiming() { return null; }
public void runJob() {}
public Exception getAddedBy() { return null; }
public void dropped() {}
}
//// ////
// the remainder are utility methods for dumping status info // the remainder are utility methods for dumping status info
//// ////
public void renderStatusHTML(Writer out) throws IOException { public void renderStatusHTML(Writer out) throws IOException {
ArrayList readyJobs = null; List<Job> readyJobs = null;
ArrayList timedJobs = null; List<Job> timedJobs = null;
ArrayList activeJobs = new ArrayList(1); List<Job> activeJobs = new ArrayList(RUNNERS);
ArrayList justFinishedJobs = new ArrayList(4); List<Job> justFinishedJobs = new ArrayList(RUNNERS);
//out.write("<!-- jobQueue rendering -->\n"); //out.write("<!-- jobQueue rendering -->\n");
out.flush(); out.flush();
int states[] = null; //int states[] = null;
int numRunners = 0; int numRunners = 0;
synchronized (_queueRunners) {
states = new int[_queueRunners.size()]; {
//states = new int[_queueRunners.size()];
int i = 0; int i = 0;
for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); i++) { for (Iterator<JobQueueRunner> iter = _queueRunners.values().iterator(); iter.hasNext(); i++) {
JobQueueRunner runner = (JobQueueRunner)iter.next(); JobQueueRunner runner = iter.next();
states[i] = runner.getState(); //states[i] = runner.getState();
Job job = runner.getCurrentJob(); Job job = runner.getCurrentJob();
if (job != null) { if (job != null) {
activeJobs.add(job); activeJobs.add(job);
@ -621,21 +640,21 @@ public class JobQueue {
buf.append("<hr><b>Active jobs: ").append(activeJobs.size()).append("</b><ol>\n"); buf.append("<hr><b>Active jobs: ").append(activeJobs.size()).append("</b><ol>\n");
for (int i = 0; i < activeJobs.size(); i++) { for (int i = 0; i < activeJobs.size(); i++) {
Job j = (Job)activeJobs.get(i); Job j = activeJobs.get(i);
buf.append("<li>[started ").append(DataHelper.formatDuration(now-j.getTiming().getStartAfter())).append(" ago]: "); buf.append("<li>[started ").append(DataHelper.formatDuration(now-j.getTiming().getStartAfter())).append(" ago]: ");
buf.append(j.toString()).append("</li>\n"); buf.append(j.toString()).append("</li>\n");
} }
buf.append("</ol>\n"); buf.append("</ol>\n");
buf.append("<hr><b>Just finished jobs: ").append(justFinishedJobs.size()).append("</b><ol>\n"); buf.append("<hr><b>Just finished jobs: ").append(justFinishedJobs.size()).append("</b><ol>\n");
for (int i = 0; i < justFinishedJobs.size(); i++) { for (int i = 0; i < justFinishedJobs.size(); i++) {
Job j = (Job)justFinishedJobs.get(i); Job j = justFinishedJobs.get(i);
buf.append("<li>[finished ").append(DataHelper.formatDuration(now-j.getTiming().getActualEnd())).append(" ago]: "); buf.append("<li>[finished ").append(DataHelper.formatDuration(now-j.getTiming().getActualEnd())).append(" ago]: ");
buf.append(j.toString()).append("</li>\n"); buf.append(j.toString()).append("</li>\n");
} }
buf.append("</ol>\n"); buf.append("</ol>\n");
buf.append("<hr><b>Ready/waiting jobs: ").append(readyJobs.size()).append("</b><ol>\n"); buf.append("<hr><b>Ready/waiting jobs: ").append(readyJobs.size()).append("</b><ol>\n");
for (int i = 0; i < readyJobs.size(); i++) { for (int i = 0; i < readyJobs.size(); i++) {
Job j = (Job)readyJobs.get(i); Job j = readyJobs.get(i);
buf.append("<li>[waiting "); buf.append("<li>[waiting ");
buf.append(DataHelper.formatDuration(now-j.getTiming().getStartAfter())); buf.append(DataHelper.formatDuration(now-j.getTiming().getStartAfter()));
buf.append("]: "); buf.append("]: ");
@ -645,13 +664,13 @@ public class JobQueue {
out.flush(); out.flush();
buf.append("<hr><b>Scheduled jobs: ").append(timedJobs.size()).append("</b><ol>\n"); buf.append("<hr><b>Scheduled jobs: ").append(timedJobs.size()).append("</b><ol>\n");
TreeMap ordered = new TreeMap(); TreeMap<Long, Job> ordered = new TreeMap();
for (int i = 0; i < timedJobs.size(); i++) { for (int i = 0; i < timedJobs.size(); i++) {
Job j = (Job)timedJobs.get(i); Job j = timedJobs.get(i);
ordered.put(new Long(j.getTiming().getStartAfter()), j); ordered.put(new Long(j.getTiming().getStartAfter()), j);
} }
for (Iterator iter = ordered.values().iterator(); iter.hasNext(); ) { for (Iterator<Job> iter = ordered.values().iterator(); iter.hasNext(); ) {
Job j = (Job)iter.next(); Job j = iter.next();
long time = j.getTiming().getStartAfter() - now; long time = j.getTiming().getStartAfter() - now;
buf.append("<li>").append(j.getName()).append(" in "); buf.append("<li>").append(j.getName()).append(" in ");
buf.append(DataHelper.formatDuration(time)).append("</li>\n"); buf.append(DataHelper.formatDuration(time)).append("</li>\n");
@ -685,13 +704,10 @@ public class JobQueue {
long maxPendingTime = -1; long maxPendingTime = -1;
long minPendingTime = -1; long minPendingTime = -1;
TreeMap tstats = null; TreeMap<String, JobStats> tstats = new TreeMap(_jobStats);
synchronized (_jobStats) {
tstats = new TreeMap(_jobStats);
}
for (Iterator iter = tstats.values().iterator(); iter.hasNext(); ) { for (Iterator<JobStats> iter = tstats.values().iterator(); iter.hasNext(); ) {
JobStats stats = (JobStats)iter.next(); JobStats stats = iter.next();
buf.append("<tr>"); buf.append("<tr>");
buf.append("<td><b>").append(stats.getName()).append("</b></td>"); buf.append("<td><b>").append(stats.getName()).append("</b></td>");
buf.append("<td align=\"right\">").append(stats.getRuns()).append("</td>"); buf.append("<td align=\"right\">").append(stats.getRuns()).append("</td>");

View File

@ -23,12 +23,12 @@ class JobQueueRunner implements Runnable {
_currentJob = null; _currentJob = null;
_lastJob = null; _lastJob = null;
_log = _context.logManager().getLog(JobQueueRunner.class); _log = _context.logManager().getLog(JobQueueRunner.class);
_context.statManager().createRateStat("jobQueue.jobRun", "How long jobs take", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("jobQueue.jobRun", "How long jobs take", "JobQueue", new long[] { 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("jobQueue.jobRunSlow", "How long jobs that take over a second take", "JobQueue", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("jobQueue.jobRunSlow", "How long jobs that take over a second take", "JobQueue", new long[] { 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("jobQueue.jobLag", "How long jobs have to wait before running", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("jobQueue.jobLag", "How long jobs have to wait before running", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("jobQueue.jobWait", "How long does a job sit on the job queue?", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("jobQueue.jobWait", "How long does a job sit on the job queue?", "JobQueue", new long[] { 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("jobQueue.jobRunnerInactive", "How long are runners inactive?", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); //_context.statManager().createRateStat("jobQueue.jobRunnerInactive", "How long are runners inactive?", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
_state = 1; //_state = 1;
} }
final int getState() { return _state; } final int getState() { return _state; }
@ -41,16 +41,16 @@ class JobQueueRunner implements Runnable {
public long getLastBegin() { return _lastBegin; } public long getLastBegin() { return _lastBegin; }
public long getLastEnd() { return _lastEnd; } public long getLastEnd() { return _lastEnd; }
public void run() { public void run() {
_state = 2; //_state = 2;
long lastActive = _context.clock().now(); long lastActive = _context.clock().now();
long jobNum = 0; long jobNum = 0;
while ( (_keepRunning) && (_context.jobQueue().isAlive()) ) { while ( (_keepRunning) && (_context.jobQueue().isAlive()) ) {
_state = 3; //_state = 3;
try { try {
Job job = _context.jobQueue().getNext(); Job job = _context.jobQueue().getNext();
_state = 4; //_state = 4;
if (job == null) { if (job == null) {
_state = 5; //_state = 5;
if (_context.router().isAlive()) if (_context.router().isAlive())
if (_log.shouldLog(Log.ERROR)) if (_log.shouldLog(Log.ERROR))
_log.error("getNext returned null - dead?"); _log.error("getNext returned null - dead?");
@ -60,14 +60,14 @@ class JobQueueRunner implements Runnable {
long enqueuedTime = 0; long enqueuedTime = 0;
if (job instanceof JobImpl) { if (job instanceof JobImpl) {
_state = 6; //_state = 6;
long when = ((JobImpl)job).getMadeReadyOn(); long when = ((JobImpl)job).getMadeReadyOn();
if (when <= 0) { if (when <= 0) {
_state = 7; //_state = 7;
_log.error("Job was not made ready?! " + job, _log.error("Job was not made ready?! " + job,
new Exception("Not made ready?!")); new Exception("Not made ready?!"));
} else { } else {
_state = 8; //_state = 8;
enqueuedTime = now - when; enqueuedTime = now - when;
} }
} }
@ -75,27 +75,27 @@ class JobQueueRunner implements Runnable {
long betweenJobs = now - lastActive; long betweenJobs = now - lastActive;
_currentJob = job; _currentJob = job;
_lastJob = null; _lastJob = null;
_state = 9; //_state = 9;
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Runner " + _id + " running job " + job.getJobId() + ": " + job.getName()); _log.debug("Runner " + _id + " running job " + job.getJobId() + ": " + job.getName());
long origStartAfter = job.getTiming().getStartAfter(); long origStartAfter = job.getTiming().getStartAfter();
long doStart = _context.clock().now(); long doStart = _context.clock().now();
_state = 10; //_state = 10;
job.getTiming().start(); job.getTiming().start();
runCurrentJob(); runCurrentJob();
job.getTiming().end(); job.getTiming().end();
_state = 11; //_state = 11;
long duration = job.getTiming().getActualEnd() - job.getTiming().getActualStart(); long duration = job.getTiming().getActualEnd() - job.getTiming().getActualStart();
long beforeUpdate = _context.clock().now(); long beforeUpdate = _context.clock().now();
_state = 12; //_state = 12;
_context.jobQueue().updateStats(job, doStart, origStartAfter, duration); _context.jobQueue().updateStats(job, doStart, origStartAfter, duration);
_state = 13; //_state = 13;
long diff = _context.clock().now() - beforeUpdate; long diff = _context.clock().now() - beforeUpdate;
long lag = doStart - origStartAfter; long lag = doStart - origStartAfter;
if (lag < 0) lag = 0; if (lag < 0) lag = 0;
_context.statManager().addRateData("jobQueue.jobRunnerInactive", betweenJobs, betweenJobs); //_context.statManager().addRateData("jobQueue.jobRunnerInactive", betweenJobs, betweenJobs);
_context.statManager().addRateData("jobQueue.jobRun", duration, duration); _context.statManager().addRateData("jobQueue.jobRun", duration, duration);
_context.statManager().addRateData("jobQueue.jobLag", lag, 0); _context.statManager().addRateData("jobQueue.jobLag", lag, 0);
_context.statManager().addRateData("jobQueue.jobWait", enqueuedTime, enqueuedTime); _context.statManager().addRateData("jobQueue.jobWait", enqueuedTime, enqueuedTime);
@ -107,7 +107,7 @@ class JobQueueRunner implements Runnable {
+ ") on job " + _currentJob); + ") on job " + _currentJob);
} }
_state = 14; //_state = 14;
if (diff > 100) { if (diff > 100) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
@ -121,7 +121,7 @@ class JobQueueRunner implements Runnable {
_currentJob = null; _currentJob = null;
_lastEnd = lastActive; _lastEnd = lastActive;
jobNum++; jobNum++;
_state = 15; //_state = 15;
//if ( (jobNum % 10) == 0) //if ( (jobNum % 10) == 0)
// System.gc(); // System.gc();
@ -130,22 +130,22 @@ class JobQueueRunner implements Runnable {
_log.log(Log.CRIT, "WTF, error running?", t); _log.log(Log.CRIT, "WTF, error running?", t);
} }
} }
_state = 16; //_state = 16;
if (_context.router().isAlive()) if (_context.router().isAlive())
if (_log.shouldLog(Log.CRIT)) if (_log.shouldLog(Log.CRIT))
_log.log(Log.CRIT, "Queue runner " + _id + " exiting"); _log.log(Log.CRIT, "Queue runner " + _id + " exiting");
_context.jobQueue().removeRunner(_id); _context.jobQueue().removeRunner(_id);
_state = 17; //_state = 17;
} }
private void runCurrentJob() { private void runCurrentJob() {
try { try {
_state = 18; //_state = 18;
_lastBegin = _context.clock().now(); _lastBegin = _context.clock().now();
_currentJob.runJob(); _currentJob.runJob();
_state = 19; //_state = 19;
} catch (OutOfMemoryError oom) { } catch (OutOfMemoryError oom) {
_state = 20; //_state = 20;
try { try {
if (_log.shouldLog(Log.CRIT)) if (_log.shouldLog(Log.CRIT))
_log.log(Log.CRIT, "Router ran out of memory, shutting down", oom); _log.log(Log.CRIT, "Router ran out of memory, shutting down", oom);
@ -157,7 +157,7 @@ class JobQueueRunner implements Runnable {
try { Thread.sleep(1000); } catch (InterruptedException ie) {} try { Thread.sleep(1000); } catch (InterruptedException ie) {}
System.exit(-1); System.exit(-1);
} catch (Throwable t) { } catch (Throwable t) {
_state = 21; //_state = 21;
if (_log.shouldLog(Log.CRIT)) if (_log.shouldLog(Log.CRIT))
_log.log(Log.CRIT, "Error processing job [" + _currentJob.getName() _log.log(Log.CRIT, "Error processing job [" + _currentJob.getName()
+ "] on thread " + _id + ": " + t.getMessage(), t); + "] on thread " + _id + ": " + t.getMessage(), t);