mirror of
https://github.com/apache/cloudstack.git
synced 2025-10-26 08:42:29 +01:00
CLOUDSTACK-5358: Bring back concurrency control in sync-queue management
This commit is contained in:
parent
d5dc6aab61
commit
441be43b8c
@ -47,6 +47,7 @@ import org.apache.cloudstack.context.CallContext;
|
||||
import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
|
||||
import org.apache.cloudstack.engine.orchestration.service.VolumeOrchestrationService;
|
||||
import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
|
||||
import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStoreInfo;
|
||||
import org.apache.cloudstack.engine.subsystem.api.storage.StoragePoolAllocator;
|
||||
import org.apache.cloudstack.framework.config.ConfigDepot;
|
||||
import org.apache.cloudstack.framework.config.ConfigKey;
|
||||
@ -769,7 +770,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
Outcome<VirtualMachine> outcome = startVmThroughJobQueue(vmUuid, params, planToDeploy);
|
||||
|
||||
try {
|
||||
outcome.get();
|
||||
VirtualMachine vm = outcome.get();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("Operation is interrupted", e);
|
||||
} catch (java.util.concurrent.ExecutionException e) {
|
||||
@ -1317,7 +1318,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
Outcome<VirtualMachine> outcome = stopVmThroughJobQueue(vmUuid, cleanUpEvenIfUnableToStop);
|
||||
|
||||
try {
|
||||
outcome.get();
|
||||
VirtualMachine vm = outcome.get();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("Operation is interrupted", e);
|
||||
} catch (java.util.concurrent.ExecutionException e) {
|
||||
@ -1626,7 +1627,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
Outcome<VirtualMachine> outcome = migrateVmStorageThroughJobQueue(vmUuid, destPool);
|
||||
|
||||
try {
|
||||
outcome.get();
|
||||
VirtualMachine vm = outcome.get();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("Operation is interrupted", e);
|
||||
} catch (java.util.concurrent.ExecutionException e) {
|
||||
@ -1718,7 +1719,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
Outcome<VirtualMachine> outcome = migrateVmThroughJobQueue(vmUuid, srcHostId, dest);
|
||||
|
||||
try {
|
||||
outcome.get();
|
||||
VirtualMachine vm = outcome.get();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("Operation is interrupted", e);
|
||||
} catch (java.util.concurrent.ExecutionException e) {
|
||||
@ -2001,7 +2002,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
Outcome<VirtualMachine> outcome = migrateVmWithStorageThroughJobQueue(vmUuid, srcHostId, destHostId, volumeToPool);
|
||||
|
||||
try {
|
||||
outcome.get();
|
||||
VirtualMachine vm = outcome.get();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("Operation is interrupted", e);
|
||||
} catch (java.util.concurrent.ExecutionException e) {
|
||||
@ -2014,7 +2015,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
throw (ResourceUnavailableException)jobException;
|
||||
else if (jobException instanceof ConcurrentOperationException)
|
||||
throw (ConcurrentOperationException)jobException;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -2296,7 +2297,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
Outcome<VirtualMachine> outcome = rebootVmThroughJobQueue(vmUuid, params);
|
||||
|
||||
try {
|
||||
outcome.get();
|
||||
VirtualMachine vm = outcome.get();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("Operation is interrupted", e);
|
||||
} catch (java.util.concurrent.ExecutionException e) {
|
||||
@ -2994,10 +2995,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
return;
|
||||
}
|
||||
|
||||
if (s_logger.isDebugEnabled())
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("Received startup command from hypervisor host. host id: " + agent.getId());
|
||||
|
||||
if (VmJobEnabled.value()) {
|
||||
if(VmJobEnabled.value()) {
|
||||
_syncMgr.resetHostSyncState(agent.getId());
|
||||
}
|
||||
|
||||
@ -3589,7 +3590,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
Outcome<VirtualMachine> outcome = migrateVmForScaleThroughJobQueue(vmUuid, srcHostId, dest, oldSvcOfferingId);
|
||||
|
||||
try {
|
||||
outcome.get();
|
||||
VirtualMachine vm = outcome.get();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("Operation is interrupted", e);
|
||||
} catch (java.util.concurrent.ExecutionException e) {
|
||||
@ -3793,7 +3794,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
}
|
||||
|
||||
public boolean unplugNic(Network network, NicTO nic, VirtualMachineTO vm, ReservationContext context, DeployDestination dest) throws ConcurrentOperationException,
|
||||
ResourceUnavailableException {
|
||||
ResourceUnavailableException {
|
||||
|
||||
boolean result = true;
|
||||
VMInstanceVO router = _vmDao.findById(vm.getId());
|
||||
@ -3828,7 +3829,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
@Override
|
||||
public VMInstanceVO reConfigureVm(String vmUuid, ServiceOffering oldServiceOffering,
|
||||
boolean reconfiguringOnExistingHost)
|
||||
throws ResourceUnavailableException, InsufficientServerCapacityException, ConcurrentOperationException {
|
||||
throws ResourceUnavailableException, InsufficientServerCapacityException, ConcurrentOperationException {
|
||||
|
||||
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
|
||||
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
|
||||
@ -3974,8 +3975,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
handlePowerOffReportWithNoPendingJobsOnVM(vm);
|
||||
break;
|
||||
|
||||
// PowerUnknown shouldn't be reported, it is a derived
|
||||
// VM power state from host state (host un-reachable)
|
||||
// PowerUnknown shouldn't be reported, it is a derived
|
||||
// VM power state from host state (host un-reachable)
|
||||
case PowerUnknown:
|
||||
default:
|
||||
assert (false);
|
||||
@ -4009,7 +4010,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
// we need to alert admin or user about this risky state transition
|
||||
_alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
|
||||
VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName()
|
||||
+ ") state is sync-ed (Starting -> Running) from out-of-context transition. VM network environment may need to be reset");
|
||||
+ ") state is sync-ed (Starting -> Running) from out-of-context transition. VM network environment may need to be reset");
|
||||
break;
|
||||
|
||||
case Running:
|
||||
@ -4031,7 +4032,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
}
|
||||
_alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
|
||||
VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState()
|
||||
+ " -> Running) from out-of-context transition. VM network environment may need to be reset");
|
||||
+ " -> Running) from out-of-context transition. VM network environment may need to be reset");
|
||||
break;
|
||||
|
||||
case Destroyed:
|
||||
@ -4074,7 +4075,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
}
|
||||
_alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
|
||||
VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState()
|
||||
+ " -> Stopped) from out-of-context transition.");
|
||||
+ " -> Stopped) from out-of-context transition.");
|
||||
// TODO: we need to forcely release all resource allocation
|
||||
break;
|
||||
|
||||
@ -4101,7 +4102,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
// however, if VM is missing from the host report (it may happen in out of band changes
|
||||
// or from designed behave of XS/KVM), the VM may not get a chance to run the state-sync logic
|
||||
//
|
||||
// Therefor, we will scan thoses VMs on UP host based on last update timestamp, if the host is UP
|
||||
// Therefore, we will scan thoses VMs on UP host based on last update timestamp, if the host is UP
|
||||
// and a VM stalls for status update, we will consider them to be powered off
|
||||
// (which is relatively safe to do so)
|
||||
|
||||
@ -4134,7 +4135,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
// We now only alert administrator about this situation
|
||||
_alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
|
||||
VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") is stuck in " + vm.getState()
|
||||
+ " state and its host is unreachable for too long");
|
||||
+ " state and its host is unreachable for too long");
|
||||
}
|
||||
}
|
||||
|
||||
@ -4332,7 +4333,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
|
||||
}
|
||||
|
||||
return new Object[] {workJob, workJob.getId()};
|
||||
return new Object[] {workJob, new Long(workJob.getId())};
|
||||
}
|
||||
});
|
||||
|
||||
@ -4383,7 +4384,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
|
||||
}
|
||||
|
||||
return new Object[] {workJob, workJob.getId()};
|
||||
return new Object[] {workJob, new Long(workJob.getId())};
|
||||
}
|
||||
});
|
||||
|
||||
@ -4436,7 +4437,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
|
||||
}
|
||||
|
||||
return new Object[] {workJob, workJob.getId()};
|
||||
return new Object[] {workJob, new Long(workJob.getId())};
|
||||
}
|
||||
});
|
||||
|
||||
@ -4458,6 +4459,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
@Override
|
||||
public Object[] doInTransaction(TransactionStatus status) {
|
||||
|
||||
_vmDao.lockRow(vm.getId(), true);
|
||||
|
||||
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
|
||||
VirtualMachine.Type.Instance, vm.getId(),
|
||||
VmWorkMigrate.class.getName());
|
||||
@ -4485,7 +4488,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
|
||||
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
|
||||
}
|
||||
return new Object[] {workJob, workJob.getId()};
|
||||
return new Object[] {workJob, new Long(workJob.getId())};
|
||||
}
|
||||
});
|
||||
|
||||
@ -4540,7 +4543,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
|
||||
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
|
||||
}
|
||||
return new Object[] {workJob, workJob.getId()};
|
||||
return new Object[] {workJob, new Long(workJob.getId())};
|
||||
}
|
||||
});
|
||||
|
||||
@ -4564,6 +4567,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
@Override
|
||||
public Object[] doInTransaction(TransactionStatus status) {
|
||||
|
||||
_vmDao.lockRow(vm.getId(), true);
|
||||
|
||||
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
|
||||
VirtualMachine.Type.Instance, vm.getId(),
|
||||
VmWorkMigrateForScale.class.getName());
|
||||
@ -4593,7 +4598,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
|
||||
}
|
||||
|
||||
return new Object[] {workJob, workJob.getId()};
|
||||
return new Object[] {workJob, new Long(workJob.getId())};
|
||||
}
|
||||
});
|
||||
|
||||
@ -4616,6 +4621,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
@Override
|
||||
public Object[] doInTransaction(TransactionStatus status) {
|
||||
|
||||
_vmDao.lockRow(vm.getId(), true);
|
||||
|
||||
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
|
||||
VirtualMachine.Type.Instance, vm.getId(),
|
||||
VmWorkStorageMigration.class.getName());
|
||||
@ -4639,13 +4646,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
|
||||
// save work context info (there are some duplications)
|
||||
VmWorkStorageMigration workInfo = new VmWorkStorageMigration(user.getId(), account.getId(), vm.getId(),
|
||||
VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, destPool);
|
||||
VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, destPool.getId());
|
||||
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
|
||||
|
||||
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
|
||||
}
|
||||
|
||||
return new Object[] {workJob, workJob.getId()};
|
||||
return new Object[] {workJob, new Long(workJob.getId())};
|
||||
}
|
||||
});
|
||||
|
||||
@ -4666,6 +4673,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
@Override
|
||||
public Object[] doInTransaction(TransactionStatus status) {
|
||||
|
||||
_vmDao.lockRow(vm.getId(), true);
|
||||
|
||||
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
|
||||
VirtualMachine.Type.Instance, vm.getId(),
|
||||
VmWorkAddVmToNetwork.class.getName());
|
||||
@ -4694,7 +4703,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
|
||||
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
|
||||
}
|
||||
return new Object[] {workJob, workJob.getId()};
|
||||
return new Object[] {workJob, new Long(workJob.getId())};
|
||||
}
|
||||
});
|
||||
|
||||
@ -4715,6 +4724,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
@Override
|
||||
public Object[] doInTransaction(TransactionStatus status) {
|
||||
|
||||
_vmDao.lockRow(vm.getId(), true);
|
||||
|
||||
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
|
||||
VirtualMachine.Type.Instance, vm.getId(),
|
||||
VmWorkRemoveNicFromVm.class.getName());
|
||||
@ -4743,7 +4754,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
|
||||
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
|
||||
}
|
||||
return new Object[] {workJob, workJob.getId()};
|
||||
return new Object[] {workJob, new Long(workJob.getId())};
|
||||
}
|
||||
});
|
||||
|
||||
@ -4764,6 +4775,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
@Override
|
||||
public Object[] doInTransaction(TransactionStatus status) {
|
||||
|
||||
_vmDao.lockRow(vm.getId(), true);
|
||||
|
||||
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
|
||||
VirtualMachine.Type.Instance, vm.getId(),
|
||||
VmWorkRemoveVmFromNetwork.class.getName());
|
||||
@ -4792,7 +4805,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
|
||||
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
|
||||
}
|
||||
return new Object[] {workJob, workJob.getId()};
|
||||
return new Object[] {workJob, new Long(workJob.getId())};
|
||||
}
|
||||
});
|
||||
|
||||
@ -4815,6 +4828,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
@Override
|
||||
public Object[] doInTransaction(TransactionStatus status) {
|
||||
|
||||
_vmDao.lockRow(vm.getId(), true);
|
||||
|
||||
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
|
||||
VirtualMachine.Type.Instance, vm.getId(),
|
||||
VmWorkReconfigure.class.getName());
|
||||
@ -4843,7 +4858,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
|
||||
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
|
||||
}
|
||||
return new Object[] {workJob, workJob.getId()};
|
||||
return new Object[] {workJob, new Long(workJob.getId())};
|
||||
}
|
||||
});
|
||||
|
||||
@ -4980,7 +4995,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
|
||||
s_logger.info("Unable to find vm " + work.getVmId());
|
||||
}
|
||||
assert (vm != null);
|
||||
orchestrateStorageMigration(vm.getUuid(), work.getDestStoragePool());
|
||||
StoragePool pool = (PrimaryDataStoreInfo)dataStoreMgr.getPrimaryDataStore(work.getDestStoragePoolId());
|
||||
orchestrateStorageMigration(vm.getUuid(), pool);
|
||||
|
||||
return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
|
||||
}
|
||||
|
||||
|
||||
@ -16,20 +16,18 @@
|
||||
// under the License.
|
||||
package com.cloud.vm;
|
||||
|
||||
import com.cloud.storage.StoragePool;
|
||||
|
||||
public class VmWorkStorageMigration extends VmWork {
|
||||
private static final long serialVersionUID = -8677979691741157474L;
|
||||
|
||||
StoragePool destPool;
|
||||
Long destPoolId;
|
||||
|
||||
public VmWorkStorageMigration(long userId, long accountId, long vmId, String handlerName, StoragePool destPool) {
|
||||
public VmWorkStorageMigration(long userId, long accountId, long vmId, String handlerName, Long destPoolId) {
|
||||
super(userId, accountId, vmId, handlerName);
|
||||
|
||||
this.destPool = destPool;
|
||||
this.destPoolId = destPoolId;
|
||||
}
|
||||
|
||||
public StoragePool getDestStoragePool() {
|
||||
return destPool;
|
||||
public Long getDestStoragePoolId() {
|
||||
return destPoolId;
|
||||
}
|
||||
}
|
||||
|
||||
@ -24,6 +24,7 @@ import com.cloud.utils.db.GenericDao;
|
||||
|
||||
public interface SyncQueueItemDao extends GenericDao<SyncQueueItemVO, Long> {
|
||||
public SyncQueueItemVO getNextQueueItem(long queueId);
|
||||
public int getActiveQueueItemCount(long queueId);
|
||||
|
||||
public List<SyncQueueItemVO> getNextQueueItems(int maxItems);
|
||||
|
||||
|
||||
@ -36,6 +36,7 @@ import com.cloud.utils.db.GenericDaoBase;
|
||||
import com.cloud.utils.db.GenericSearchBuilder;
|
||||
import com.cloud.utils.db.SearchBuilder;
|
||||
import com.cloud.utils.db.SearchCriteria;
|
||||
import com.cloud.utils.db.SearchCriteria.Func;
|
||||
import com.cloud.utils.db.SearchCriteria.Op;
|
||||
import com.cloud.utils.db.TransactionLegacy;
|
||||
|
||||
@ -43,6 +44,7 @@ import com.cloud.utils.db.TransactionLegacy;
|
||||
public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long> implements SyncQueueItemDao {
|
||||
private static final Logger s_logger = Logger.getLogger(SyncQueueItemDaoImpl.class);
|
||||
final GenericSearchBuilder<SyncQueueItemVO, Long> queueIdSearch;
|
||||
final GenericSearchBuilder<SyncQueueItemVO, Integer> queueActiveItemSearch;
|
||||
|
||||
public SyncQueueItemDaoImpl() {
|
||||
super();
|
||||
@ -51,6 +53,12 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long>
|
||||
queueIdSearch.and("contentType", queueIdSearch.entity().getContentType(), Op.EQ);
|
||||
queueIdSearch.selectFields(queueIdSearch.entity().getId());
|
||||
queueIdSearch.done();
|
||||
|
||||
queueActiveItemSearch = createSearchBuilder(Integer.class);
|
||||
queueActiveItemSearch.and("queueId", queueActiveItemSearch.entity().getQueueId(), Op.EQ);
|
||||
queueActiveItemSearch.and("processNumber", queueActiveItemSearch.entity().getLastProcessNumber(), Op.NNULL);
|
||||
queueActiveItemSearch.select(null, Func.COUNT, queueActiveItemSearch.entity().getId());
|
||||
queueActiveItemSearch.done();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -66,19 +74,31 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long>
|
||||
|
||||
Filter filter = new Filter(SyncQueueItemVO.class, "created", true, 0L, 1L);
|
||||
List<SyncQueueItemVO> l = listBy(sc, filter);
|
||||
if (l != null && l.size() > 0)
|
||||
if(l != null && l.size() > 0)
|
||||
return l.get(0);
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getActiveQueueItemCount(long queueId) {
|
||||
SearchCriteria<Integer> sc = queueActiveItemSearch.create();
|
||||
sc.setParameters("queueId", queueId);
|
||||
|
||||
List<Integer> count = customSearch(sc, null);
|
||||
return count.get(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<SyncQueueItemVO> getNextQueueItems(int maxItems) {
|
||||
List<SyncQueueItemVO> l = new ArrayList<SyncQueueItemVO>();
|
||||
|
||||
String sql =
|
||||
"SELECT i.id, i.queue_id, i.content_type, i.content_id, i.created " + " FROM sync_queue AS q JOIN sync_queue_item AS i ON q.id = i.queue_id "
|
||||
+ " WHERE i.queue_proc_number IS NULL " + " GROUP BY q.id " + " ORDER BY i.id " + " LIMIT 0, ?";
|
||||
String sql = "SELECT i.id, i.queue_id, i.content_type, i.content_id, i.created " +
|
||||
" FROM sync_queue AS q JOIN sync_queue_item AS i ON q.id = i.queue_id " +
|
||||
" WHERE i.queue_proc_number IS NULL " +
|
||||
" GROUP BY q.id " +
|
||||
" ORDER BY i.id " +
|
||||
" LIMIT 0, ?";
|
||||
|
||||
TransactionLegacy txn = TransactionLegacy.currentTxn();
|
||||
PreparedStatement pstmt = null;
|
||||
@ -86,7 +106,7 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long>
|
||||
pstmt = txn.prepareAutoCloseStatement(sql);
|
||||
pstmt.setInt(1, maxItems);
|
||||
ResultSet rs = pstmt.executeQuery();
|
||||
while (rs.next()) {
|
||||
while(rs.next()) {
|
||||
SyncQueueItemVO item = new SyncQueueItemVO();
|
||||
item.setId(rs.getLong(1));
|
||||
item.setQueueId(rs.getLong(2));
|
||||
@ -106,7 +126,8 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long>
|
||||
@Override
|
||||
public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive) {
|
||||
SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
|
||||
sb.and("lastProcessMsid", sb.entity().getLastProcessMsid(), SearchCriteria.Op.EQ);
|
||||
sb.and("lastProcessMsid", sb.entity().getLastProcessMsid(),
|
||||
SearchCriteria.Op.EQ);
|
||||
sb.done();
|
||||
|
||||
SearchCriteria<SyncQueueItemVO> sc = sb.create();
|
||||
@ -134,7 +155,7 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long>
|
||||
SearchCriteria<SyncQueueItemVO> sc = sbItem.create();
|
||||
sc.setParameters("lastProcessTime2", new Date(cutTime.getTime() - thresholdMs));
|
||||
|
||||
if (exclusive)
|
||||
if(exclusive)
|
||||
return lockRows(sc, null, true);
|
||||
return listBy(sc, null);
|
||||
}
|
||||
|
||||
@ -24,7 +24,6 @@ import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
@ -365,23 +364,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
|
||||
}
|
||||
|
||||
SyncQueueVO queue = null;
|
||||
|
||||
// to deal with temporary DB exceptions like DB deadlock/Lock-wait time out cased rollbacks
|
||||
// we retry five times until we throw an exception
|
||||
Random random = new Random();
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
queue = _queueMgr.queue(syncObjType, syncObjId, SyncQueueItem.AsyncJobContentType, job.getId(), queueSizeLimit);
|
||||
if (queue != null) {
|
||||
break;
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(1000 + random.nextInt(5000));
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
|
||||
queue = _queueMgr.queue(syncObjType, syncObjId, SyncQueueItem.AsyncJobContentType, job.getId(), queueSizeLimit);
|
||||
if (queue == null)
|
||||
throw new CloudRuntimeException("Unable to insert queue item into database, DB is full?");
|
||||
}
|
||||
|
||||
@ -242,18 +242,15 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage
|
||||
}
|
||||
|
||||
private boolean queueReadyToProcess(SyncQueueVO queueVO) {
|
||||
return true;
|
||||
int nActiveItems = _syncQueueItemDao.getActiveQueueItemCount(queueVO.getId());
|
||||
if (nActiveItems < queueVO.getQueueSizeLimit())
|
||||
return true;
|
||||
|
||||
//
|
||||
// TODO
|
||||
//
|
||||
// Need to disable concurrency disable at queue level due to the need to support
|
||||
// job wake-up dispatching task
|
||||
//
|
||||
// Concurrency control is better done at higher level and leave the job scheduling/serializing simpler
|
||||
//
|
||||
|
||||
// return queueVO.getQueueSize() < queueVO.getQueueSizeLimit();
|
||||
if (s_logger.isDebugEnabled())
|
||||
s_logger.debug("Queue (queue id, sync type, sync id) - (" + queueVO.getId()
|
||||
+ "," + queueVO.getSyncObjType() + ", " + queueVO.getSyncObjId()
|
||||
+ ") is reaching concurrency limit " + queueVO.getQueueSizeLimit());
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user