migrate the queue pumper thread to scheduled activity (instead of waking up every 500ms, check the job timings to see when we should next wake up. we also wake up whenever a new timed job is added, or the clock skew changes)

pull out some of the old unused vars/flags
This commit is contained in:
jrandom
2004-07-07 16:16:36 +00:00
committed by zzz
parent 0948686989
commit fe3eac07f4

View File

@ -44,7 +44,7 @@ public class JobQueue {
/** job name to JobStat for that job */
private SortedMap _jobStats;
/** how many job queue runners can go concurrently */
private int _maxRunners;
private int _maxRunners = 1;
private QueuePumper _pumper;
/** will we allow the # job runners to grow beyond 1? */
private boolean _allowParallelOperation;
@ -170,6 +170,7 @@ public class JobQueue {
} else {
synchronized (_timedJobs) {
_timedJobs.add(job);
_timedJobs.notifyAll();
}
}
} else {
@ -300,48 +301,6 @@ public class JobQueue {
return null;
}
/**
* Move newly ready timed jobs to the ready queue. Returns the
* number of ready jobs after the check is completed
*
*/
private int checkJobTimings() {
boolean newJobsReady = false;
long now = _context.clock().now();
ArrayList toAdd = null;
synchronized (_timedJobs) {
for (int i = 0; i < _timedJobs.size(); i++) {
Job j = (Job)_timedJobs.get(i);
// find jobs due to start before now
if (j.getTiming().getStartAfter() <= now) {
if (j instanceof JobImpl)
((JobImpl)j).madeReady();
if (toAdd == null) toAdd = new ArrayList(4);
toAdd.add(j);
_timedJobs.remove(i);
i--; // so the index stays consistent
}
}
}
int ready = 0;
synchronized (_readyJobs) {
if (toAdd != null) {
// rather than addAll, which allocs a byte array rv before adding,
// we iterate, since toAdd is usually going to only be 1 or 2 entries
// and since readyJobs will often have the space, we can avoid the
// extra alloc. (no, i'm not just being insane - i'm updating this based
// on some profiling data ;)
for (int i = 0; i < toAdd.size(); i++)
_readyJobs.add(toAdd.get(i));
}
ready = _readyJobs.size();
}
return ready;
}
/**
* Start up the queue with the specified number of concurrent processors.
* If this method has already been called, it will adjust the number of
@ -384,7 +343,6 @@ public class JobQueue {
void removeRunner(int id) { _queueRunners.remove(new Integer(id)); }
/**
* Notify a sufficient number of waiting runners, and if necessary, increase
* the number of runners (up to maxRunners)
@ -406,30 +364,56 @@ public class JobQueue {
*
*/
private final class QueuePumper implements Runnable, Clock.ClockUpdateListener {
private long _lastLimitUpdated;
public QueuePumper() {
_lastLimitUpdated = 0;
_context.clock().addUpdateListener(this);
}
public void run() {
try {
while (_alive) {
// periodically update our max runners limit
long now = _context.clock().now();
if (now > _lastLimitUpdated + MAX_LIMIT_UPDATE_DELAY) {
if (_log.shouldLog(Log.INFO))
_log.info("Updating the limits");
updateMaxLimit();
updateTimingLimits();
_lastLimitUpdated = now;
long timeToWait = 0;
ArrayList toAdd = null;
synchronized (_timedJobs) {
for (int i = 0; i < _timedJobs.size(); i++) {
Job j = (Job)_timedJobs.get(i);
// find jobs due to start before now
long timeLeft = j.getTiming().getStartAfter() - now;
if (timeLeft <= 0) {
if (j instanceof JobImpl)
((JobImpl)j).madeReady();
if (toAdd == null) toAdd = new ArrayList(4);
toAdd.add(j);
_timedJobs.remove(i);
i--; // so the index stays consistent
} else {
if ( (timeToWait <= 0) || (timeLeft < timeToWait) )
timeToWait = timeLeft;
}
}
if (toAdd == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Waiting " + timeToWait + " before rechecking the timed queue");
try {
_timedJobs.wait(timeToWait);
} catch (InterruptedException ie) {}
}
}
// turn timed jobs into ready jobs
int numMadeReady = checkJobTimings();
awaken(numMadeReady);
try { Thread.sleep(500); } catch (InterruptedException ie) {}
if (toAdd != null) {
synchronized (_readyJobs) {
// rather than addAll, which allocs a byte array rv before adding,
// we iterate, since toAdd is usually going to only be 1 or 2 entries
// and since readyJobs will often have the space, we can avoid the
// extra alloc. (no, i'm not just being insane - i'm updating this based
// on some profiling data ;)
for (int i = 0; i < toAdd.size(); i++)
_readyJobs.add(toAdd.get(i));
}
awaken(toAdd.size());
}
}
} catch (Throwable t) {
_context.clock().removeUpdateListener(this);
@ -439,9 +423,10 @@ public class JobQueue {
}
public void offsetChanged(long delta) {
if (_lastLimitUpdated > 0)
_lastLimitUpdated += delta;
updateJobTimings(delta);
synchronized (_timedJobs) {
_timedJobs.notifyAll();
}
}
}
@ -530,130 +515,6 @@ public class JobQueue {
}
}
////
// update config params
////
/**
* Update the max number of job queue runners
*
*/
private void updateMaxLimit() {
if (_context.router() == null) {
_maxRunners = DEFAULT_MAX_RUNNERS;
return;
}
String str = _context.router().getConfigSetting(PROP_MAX_RUNNERS);
if (str != null) {
try {
_maxRunners = Integer.parseInt(str);
return;
} catch (NumberFormatException nfe) {
_log.error("Invalid maximum job runners [" + str + "]");
}
}
if (_log.shouldLog(Log.INFO))
_log.info("Defaulting the maximum job runners to " + DEFAULT_MAX_RUNNERS);
_maxRunners = DEFAULT_MAX_RUNNERS;
}
/**
* Update the job lag and run threshold for warnings and fatalities, as well
* as the warmup time before which fatalities will be ignored
*
*/
private void updateTimingLimits() {
if (_context.router() == null) {
_lagWarning = DEFAULT_LAG_WARNING;
_lagFatal = DEFAULT_LAG_FATAL;
_runWarning = DEFAULT_RUN_WARNING;
_runFatal = DEFAULT_RUN_FATAL;
_warmupTime = DEFAULT_WARMUP_TIME;
_maxWaitingJobs = DEFAULT_MAX_WAITING_JOBS;
return;
}
String str = _context.router().getConfigSetting(PROP_LAG_WARNING);
if (str != null) {
try {
_lagWarning = Integer.parseInt(str);
} catch (NumberFormatException nfe) {
_log.error("Invalid job lag warning [" + str + "]");
_lagWarning = DEFAULT_LAG_WARNING;
}
} else {
_lagWarning = DEFAULT_LAG_WARNING;
}
if (_log.shouldLog(Log.INFO))
_log.info("Setting the warning job lag time to " + _lagWarning + "ms");
str = _context.router().getConfigSetting(PROP_LAG_FATAL);
if (str != null) {
try {
_lagFatal = Integer.parseInt(str);
} catch (NumberFormatException nfe) {
_log.error("Invalid job lag fatal [" + str + "]");
_lagFatal = DEFAULT_LAG_FATAL;
}
} else {
_lagFatal = DEFAULT_LAG_FATAL;
}
if (_log.shouldLog(Log.INFO))
_log.info("Setting the fatal job lag time to " + _lagFatal + "ms");
str = _context.router().getConfigSetting(PROP_RUN_WARNING);
if (str != null) {
try {
_runWarning = Integer.parseInt(str);
} catch (NumberFormatException nfe) {
_log.error("Invalid job run warning [" + str + "]");
_runWarning = DEFAULT_RUN_WARNING;
}
} else {
_runWarning = DEFAULT_RUN_WARNING;
}
if (_log.shouldLog(Log.INFO))
_log.info("Setting the warning job run time to " + _runWarning + "ms");
str = _context.router().getConfigSetting(PROP_RUN_FATAL);
if (str != null) {
try {
_runFatal = Integer.parseInt(str);
} catch (NumberFormatException nfe) {
_log.error("Invalid job run fatal [" + str + "]");
_runFatal = DEFAULT_RUN_FATAL;
}
} else {
_runFatal = DEFAULT_RUN_FATAL;
}
if (_log.shouldLog(Log.INFO))
_log.info("Setting the fatal job run time to " + _runFatal + "ms");
str = _context.router().getConfigSetting(PROP_WARMUM_TIME);
if (str != null) {
try {
_warmupTime = Integer.parseInt(str);
} catch (NumberFormatException nfe) {
_log.error("Invalid warmup time [" + str + "]");
_warmupTime = DEFAULT_WARMUP_TIME;
}
} else {
_warmupTime = DEFAULT_WARMUP_TIME;
}
str = _context.router().getConfigSetting(PROP_MAX_WAITING_JOBS);
if (str != null) {
try {
_maxWaitingJobs = Integer.parseInt(str);
} catch (NumberFormatException nfe) {
_log.error("Invalid max waiting jobs [" + str + "]");
_maxWaitingJobs = DEFAULT_MAX_WAITING_JOBS;
}
} else {
_maxWaitingJobs = DEFAULT_MAX_WAITING_JOBS;
}
if (_log.shouldLog(Log.INFO))
_log.info("Setting the max waiting jobs to " + _maxWaitingJobs);
}
////
// the remainder are utility methods for dumping status info