Committing review 13891.

This commit is contained in:
Alex Huang 2013-09-23 14:59:39 -07:00
parent df3ee9a92d
commit 3e813ce61d

View File

@ -20,6 +20,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;
import javax.inject.Inject; import javax.inject.Inject;
import javax.naming.ConfigurationException; import javax.naming.ConfigurationException;
@ -41,14 +42,14 @@ public class AsyncJobMonitor extends ManagerBase {
private final Map<Long, ActiveTaskRecord> _activeTasks = new HashMap<Long, ActiveTaskRecord>(); private final Map<Long, ActiveTaskRecord> _activeTasks = new HashMap<Long, ActiveTaskRecord>();
private final Timer _timer = new Timer(); private final Timer _timer = new Timer();
private volatile int _activePoolThreads = 0; private final AtomicInteger _activePoolThreads = new AtomicInteger();
private volatile int _activeInplaceThreads = 0; private final AtomicInteger _activeInplaceThreads = new AtomicInteger();
// configuration // configuration
private long _inactivityCheckIntervalMs = 60000; private long _inactivityCheckIntervalMs = 60000;
private long _inactivityWarningThresholdMs = 90000; private long _inactivityWarningThresholdMs = 90000;
public AsyncJobMonitor() { public AsyncJobMonitor() {
} }
@ -118,9 +119,9 @@ public class AsyncJobMonitor extends ManagerBase {
ActiveTaskRecord record = new ActiveTaskRecord(jobId, threadId, fromPoolThread); ActiveTaskRecord record = new ActiveTaskRecord(jobId, threadId, fromPoolThread);
_activeTasks.put(runNumber, record); _activeTasks.put(runNumber, record);
if(fromPoolThread) if(fromPoolThread)
_activePoolThreads++; _activePoolThreads.incrementAndGet();
else else
_activeInplaceThreads++; _activeInplaceThreads.incrementAndGet();
} }
} }
@ -132,23 +133,23 @@ public class AsyncJobMonitor extends ManagerBase {
s_logger.info("Remove job-" + record.getJobId() + " from job monitoring"); s_logger.info("Remove job-" + record.getJobId() + " from job monitoring");
if(record.isPoolThread()) if(record.isPoolThread())
_activePoolThreads--; _activePoolThreads.decrementAndGet();
else else
_activeInplaceThreads--; _activeInplaceThreads.decrementAndGet();
_activeTasks.remove(runNumber); _activeTasks.remove(runNumber);
} }
} }
} }
public int getActivePoolThreads() { public int getActivePoolThreads() {
return _activePoolThreads; return _activePoolThreads.get();
} }
public int getActiveInplaceThread() { public int getActiveInplaceThread() {
return _activeInplaceThreads; return _activeInplaceThreads.get();
} }
private static class ActiveTaskRecord { private static class ActiveTaskRecord {
long _jobId; long _jobId;
long _threadId; long _threadId;