mirror of
https://github.com/apache/cloudstack.git
synced 2025-11-02 20:02:29 +01:00
some reformatting and a new way to get vos into the system
This commit is contained in:
parent
7f470e8d2a
commit
7c6932ef80
@ -1,17 +1,23 @@
|
||||
// Copyright 2012 Citrix Systems, Inc. Licensed under the
|
||||
// Apache License, Version 2.0 (the "License"); you may not use this
|
||||
// file except in compliance with the License. Citrix Systems, Inc.
|
||||
// reserves all rights not expressly granted by the License.
|
||||
// You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Automatically generated by addcopyright.py at 04/03/2012
|
||||
package com.cloud.async;
|
||||
/**
|
||||
* Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
|
||||
*
|
||||
* This software is licensed under the GNU General Public License v3 or later.
|
||||
*
|
||||
* It is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or any later version.
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
package com.cloud.async;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.lang.reflect.Type;
|
||||
@ -68,182 +74,182 @@ import com.cloud.utils.mgmt.JmxUtil;
|
||||
import com.cloud.utils.net.MacAddress;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
|
||||
@Local(value={AsyncJobManager.class})
|
||||
public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListener {
|
||||
public static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class.getName());
|
||||
|
||||
@Local(value={AsyncJobManager.class})
|
||||
public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListener {
|
||||
public static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class.getName());
|
||||
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
|
||||
|
||||
private static final int MAX_ONETIME_SCHEDULE_SIZE = 50;
|
||||
|
||||
private static final int MAX_ONETIME_SCHEDULE_SIZE = 50;
|
||||
private static final int HEARTBEAT_INTERVAL = 2000;
|
||||
private static final int GC_INTERVAL = 10000; // 10 seconds
|
||||
|
||||
private String _name;
|
||||
|
||||
private AsyncJobExecutorContext _context;
|
||||
private SyncQueueManager _queueMgr;
|
||||
|
||||
private String _name;
|
||||
|
||||
private AsyncJobExecutorContext _context;
|
||||
private SyncQueueManager _queueMgr;
|
||||
private ClusterManager _clusterMgr;
|
||||
private AccountManager _accountMgr;
|
||||
private AccountManager _accountMgr;
|
||||
private AccountDao _accountDao;
|
||||
private AsyncJobDao _jobDao;
|
||||
private long _jobExpireSeconds = 86400; // 1 day
|
||||
private long _jobCancelThresholdSeconds = 3600; // 1 hour
|
||||
private ApiDispatcher _dispatcher;
|
||||
|
||||
private final ScheduledExecutorService _heartbeatScheduler =
|
||||
Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat"));
|
||||
private ExecutorService _executor;
|
||||
|
||||
@Override
|
||||
public AsyncJobExecutorContext getExecutorContext() {
|
||||
return _context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncJobVO getAsyncJob(long jobId) {
|
||||
return _jobDao.findById(jobId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) {
|
||||
return _jobDao.findInstancePendingAsyncJob(instanceType, instanceId);
|
||||
private ApiDispatcher _dispatcher;
|
||||
|
||||
private final ScheduledExecutorService _heartbeatScheduler =
|
||||
Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat"));
|
||||
private ExecutorService _executor;
|
||||
|
||||
@Override
|
||||
public AsyncJobExecutorContext getExecutorContext() {
|
||||
return _context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncJobVO getAsyncJob(long jobId) {
|
||||
return _jobDao.findById(jobId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) {
|
||||
return _jobDao.findInstancePendingAsyncJob(instanceType, instanceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AsyncJobVO> findInstancePendingAsyncJobs(AsyncJob.Type instanceType, Long accountId) {
|
||||
return _jobDao.findInstancePendingAsyncJobs(instanceType, accountId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long submitAsyncJob(AsyncJobVO job) {
|
||||
return submitAsyncJob(job, false);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long submitAsyncJob(AsyncJobVO job) {
|
||||
return submitAsyncJob(job, false);
|
||||
}
|
||||
|
||||
@Override @DB
|
||||
@Override @DB
|
||||
public long submitAsyncJob(AsyncJobVO job, boolean scheduleJobExecutionInContext) {
|
||||
Transaction txt = Transaction.currentTxn();
|
||||
try {
|
||||
txt.start();
|
||||
Transaction txt = Transaction.currentTxn();
|
||||
try {
|
||||
txt.start();
|
||||
job.setInitMsid(getMsid());
|
||||
_jobDao.persist(job);
|
||||
txt.commit();
|
||||
_jobDao.persist(job);
|
||||
txt.commit();
|
||||
|
||||
// no sync source originally
|
||||
job.setSyncSource(null);
|
||||
job.setSyncSource(null);
|
||||
scheduleExecution(job, scheduleJobExecutionInContext);
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("submit async job-" + job.getId() + ", details: " + job.toString());
|
||||
}
|
||||
return job.getId();
|
||||
return job.getId();
|
||||
} catch(Exception e) {
|
||||
txt.rollback();
|
||||
String errMsg = "Unable to schedule async job for command " + job.getCmd() + ", unexpected exception.";
|
||||
s_logger.warn(errMsg, e);
|
||||
throw new CloudRuntimeException(errMsg);
|
||||
}
|
||||
}
|
||||
|
||||
@Override @DB
|
||||
public void completeAsyncJob(long jobId, int jobStatus, int resultCode, Object resultObject) {
|
||||
}
|
||||
}
|
||||
|
||||
@Override @DB
|
||||
public void completeAsyncJob(long jobId, int jobStatus, int resultCode, Object resultObject) {
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Complete async job-" + jobId + ", jobStatus: " + jobStatus +
|
||||
s_logger.debug("Complete async job-" + jobId + ", jobStatus: " + jobStatus +
|
||||
", resultCode: " + resultCode + ", result: " + resultObject);
|
||||
}
|
||||
|
||||
Transaction txt = Transaction.currentTxn();
|
||||
try {
|
||||
txt.start();
|
||||
AsyncJobVO job = _jobDao.findById(jobId);
|
||||
if(job == null) {
|
||||
}
|
||||
|
||||
Transaction txt = Transaction.currentTxn();
|
||||
try {
|
||||
txt.start();
|
||||
AsyncJobVO job = _jobDao.findById(jobId);
|
||||
if(job == null) {
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("job-" + jobId + " no longer exists, we just log completion info here. " + jobStatus +
|
||||
s_logger.debug("job-" + jobId + " no longer exists, we just log completion info here. " + jobStatus +
|
||||
", resultCode: " + resultCode + ", result: " + resultObject);
|
||||
}
|
||||
|
||||
txt.rollback();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
txt.rollback();
|
||||
return;
|
||||
}
|
||||
|
||||
job.setCompleteMsid(getMsid());
|
||||
job.setStatus(jobStatus);
|
||||
job.setResultCode(resultCode);
|
||||
job.setCompleteMsid(getMsid());
|
||||
job.setStatus(jobStatus);
|
||||
job.setResultCode(resultCode);
|
||||
|
||||
// reset attached object
|
||||
job.setInstanceType(null);
|
||||
job.setInstanceId(null);
|
||||
// reset attached object
|
||||
job.setInstanceType(null);
|
||||
job.setInstanceId(null);
|
||||
|
||||
if (resultObject != null) {
|
||||
job.setResult(ApiSerializerHelper.toSerializedStringOld(resultObject));
|
||||
}
|
||||
}
|
||||
|
||||
job.setLastUpdated(DateUtil.currentGMTTime());
|
||||
_jobDao.update(jobId, job);
|
||||
txt.commit();
|
||||
} catch(Exception e) {
|
||||
s_logger.error("Unexpected exception while completing async job-" + jobId, e);
|
||||
txt.rollback();
|
||||
}
|
||||
}
|
||||
job.setLastUpdated(DateUtil.currentGMTTime());
|
||||
_jobDao.update(jobId, job);
|
||||
txt.commit();
|
||||
} catch(Exception e) {
|
||||
s_logger.error("Unexpected exception while completing async job-" + jobId, e);
|
||||
txt.rollback();
|
||||
}
|
||||
}
|
||||
|
||||
@Override @DB
|
||||
public void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject) {
|
||||
@Override @DB
|
||||
public void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject) {
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Update async-job progress, job-" + jobId + ", processStatus: " + processStatus +
|
||||
s_logger.debug("Update async-job progress, job-" + jobId + ", processStatus: " + processStatus +
|
||||
", result: " + resultObject);
|
||||
}
|
||||
|
||||
Transaction txt = Transaction.currentTxn();
|
||||
try {
|
||||
txt.start();
|
||||
AsyncJobVO job = _jobDao.findById(jobId);
|
||||
if(job == null) {
|
||||
}
|
||||
|
||||
Transaction txt = Transaction.currentTxn();
|
||||
try {
|
||||
txt.start();
|
||||
AsyncJobVO job = _jobDao.findById(jobId);
|
||||
if(job == null) {
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("job-" + jobId + " no longer exists, we just log progress info here. progress status: " + processStatus);
|
||||
}
|
||||
|
||||
txt.rollback();
|
||||
return;
|
||||
}
|
||||
|
||||
job.setProcessStatus(processStatus);
|
||||
}
|
||||
|
||||
txt.rollback();
|
||||
return;
|
||||
}
|
||||
|
||||
job.setProcessStatus(processStatus);
|
||||
if(resultObject != null) {
|
||||
job.setResult(ApiSerializerHelper.toSerializedStringOld(resultObject));
|
||||
}
|
||||
job.setLastUpdated(DateUtil.currentGMTTime());
|
||||
_jobDao.update(jobId, job);
|
||||
txt.commit();
|
||||
} catch(Exception e) {
|
||||
s_logger.error("Unexpected exception while updating async job-" + jobId + " status: ", e);
|
||||
txt.rollback();
|
||||
}
|
||||
}
|
||||
}
|
||||
job.setLastUpdated(DateUtil.currentGMTTime());
|
||||
_jobDao.update(jobId, job);
|
||||
txt.commit();
|
||||
} catch(Exception e) {
|
||||
s_logger.error("Unexpected exception while updating async job-" + jobId + " status: ", e);
|
||||
txt.rollback();
|
||||
}
|
||||
}
|
||||
|
||||
@Override @DB
|
||||
public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId) {
|
||||
@Override @DB
|
||||
public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId) {
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Update async-job attachment, job-" + jobId + ", instanceType: " + instanceType +
|
||||
s_logger.debug("Update async-job attachment, job-" + jobId + ", instanceType: " + instanceType +
|
||||
", instanceId: " + instanceId);
|
||||
}
|
||||
|
||||
Transaction txt = Transaction.currentTxn();
|
||||
try {
|
||||
txt.start();
|
||||
}
|
||||
|
||||
Transaction txt = Transaction.currentTxn();
|
||||
try {
|
||||
txt.start();
|
||||
|
||||
AsyncJobVO job = _jobDao.createForUpdate();
|
||||
//job.setInstanceType(instanceType);
|
||||
job.setInstanceId(instanceId);
|
||||
job.setLastUpdated(DateUtil.currentGMTTime());
|
||||
_jobDao.update(jobId, job);
|
||||
AsyncJobVO job = _jobDao.createForUpdate();
|
||||
//job.setInstanceType(instanceType);
|
||||
job.setInstanceId(instanceId);
|
||||
job.setLastUpdated(DateUtil.currentGMTTime());
|
||||
_jobDao.update(jobId, job);
|
||||
|
||||
txt.commit();
|
||||
} catch(Exception e) {
|
||||
s_logger.error("Unexpected exception while updating async job-" + jobId + " attachment: ", e);
|
||||
txt.rollback();
|
||||
}
|
||||
}
|
||||
txt.commit();
|
||||
} catch(Exception e) {
|
||||
s_logger.error("Unexpected exception while updating async job-" + jobId + " attachment: ", e);
|
||||
txt.rollback();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId) {
|
||||
@Override
|
||||
public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId) {
|
||||
// This method is re-entrant. If an API developer wants to synchronized on an object, e.g. the router,
|
||||
// when executing business logic, they will call this method (actually a method in BaseAsyncCmd that calls this).
|
||||
// This method will get called every time their business logic executes. The first time it exectues for a job
|
||||
@ -309,60 +315,60 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
|
||||
return _jobDao.findById(cmd.getId());
|
||||
}
|
||||
|
||||
@Override @DB
|
||||
public AsyncJobResult queryAsyncJobResult(long jobId) {
|
||||
@Override @DB
|
||||
public AsyncJobResult queryAsyncJobResult(long jobId) {
|
||||
if(s_logger.isTraceEnabled()) {
|
||||
s_logger.trace("Query async-job status, job-" + jobId);
|
||||
}
|
||||
|
||||
Transaction txt = Transaction.currentTxn();
|
||||
AsyncJobResult jobResult = new AsyncJobResult(jobId);
|
||||
|
||||
try {
|
||||
txt.start();
|
||||
AsyncJobVO job = _jobDao.findById(jobId);
|
||||
if(job != null) {
|
||||
jobResult.setCmdOriginator(job.getCmdOriginator());
|
||||
jobResult.setJobStatus(job.getStatus());
|
||||
jobResult.setProcessStatus(job.getProcessStatus());
|
||||
jobResult.setResult(job.getResult());
|
||||
}
|
||||
|
||||
Transaction txt = Transaction.currentTxn();
|
||||
AsyncJobResult jobResult = new AsyncJobResult(jobId);
|
||||
|
||||
try {
|
||||
txt.start();
|
||||
AsyncJobVO job = _jobDao.findById(jobId);
|
||||
if(job != null) {
|
||||
jobResult.setCmdOriginator(job.getCmdOriginator());
|
||||
jobResult.setJobStatus(job.getStatus());
|
||||
jobResult.setProcessStatus(job.getProcessStatus());
|
||||
jobResult.setResult(job.getResult());
|
||||
jobResult.setResultCode(job.getResultCode());
|
||||
jobResult.setUuid(job.getUuid());
|
||||
|
||||
if(job.getStatus() == AsyncJobResult.STATUS_SUCCEEDED ||
|
||||
job.getStatus() == AsyncJobResult.STATUS_FAILED) {
|
||||
|
||||
jobResult.setUuid(job.getUuid());
|
||||
|
||||
if(job.getStatus() == AsyncJobResult.STATUS_SUCCEEDED ||
|
||||
job.getStatus() == AsyncJobResult.STATUS_FAILED) {
|
||||
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Async job-" + jobId + " completed");
|
||||
}
|
||||
} else {
|
||||
job.setLastPolled(DateUtil.currentGMTTime());
|
||||
_jobDao.update(jobId, job);
|
||||
}
|
||||
} else {
|
||||
}
|
||||
} else {
|
||||
job.setLastPolled(DateUtil.currentGMTTime());
|
||||
_jobDao.update(jobId, job);
|
||||
}
|
||||
} else {
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Async job-" + jobId + " does not exist, invalid job id?");
|
||||
}
|
||||
|
||||
jobResult.setJobStatus(AsyncJobResult.STATUS_FAILED);
|
||||
jobResult.setResult("job-" + jobId + " does not exist");
|
||||
}
|
||||
txt.commit();
|
||||
} catch(Exception e) {
|
||||
s_logger.error("Unexpected exception while querying async job-" + jobId + " status: ", e);
|
||||
|
||||
jobResult.setJobStatus(AsyncJobResult.STATUS_FAILED);
|
||||
jobResult.setResult("Exception: " + e.toString());
|
||||
txt.rollback();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
jobResult.setJobStatus(AsyncJobResult.STATUS_FAILED);
|
||||
jobResult.setResult("job-" + jobId + " does not exist");
|
||||
}
|
||||
txt.commit();
|
||||
} catch(Exception e) {
|
||||
s_logger.error("Unexpected exception while querying async job-" + jobId + " status: ", e);
|
||||
|
||||
jobResult.setJobStatus(AsyncJobResult.STATUS_FAILED);
|
||||
jobResult.setResult("Exception: " + e.toString());
|
||||
txt.rollback();
|
||||
}
|
||||
|
||||
if(s_logger.isTraceEnabled()) {
|
||||
s_logger.trace("Job status: " + jobResult.toString());
|
||||
}
|
||||
|
||||
return jobResult;
|
||||
}
|
||||
|
||||
return jobResult;
|
||||
}
|
||||
|
||||
|
||||
private void scheduleExecution(final AsyncJobVO job) {
|
||||
scheduleExecution(job, false);
|
||||
}
|
||||
@ -500,7 +506,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
private void executeQueueItem(SyncQueueItemVO item, boolean fromPreviousSession) {
|
||||
AsyncJobVO job = _jobDao.findById(item.getContentId());
|
||||
if (job != null) {
|
||||
@ -530,61 +536,61 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void releaseSyncSource(AsyncJobExecutor executor) {
|
||||
if(executor.getSyncSource() != null) {
|
||||
@Override
|
||||
public void releaseSyncSource(AsyncJobExecutor executor) {
|
||||
if(executor.getSyncSource() != null) {
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Release sync source for job-" + executor.getJob().getId() + " sync source: "
|
||||
+ executor.getSyncSource().getContentType() + "-"
|
||||
s_logger.debug("Release sync source for job-" + executor.getJob().getId() + " sync source: "
|
||||
+ executor.getSyncSource().getContentType() + "-"
|
||||
+ executor.getSyncSource().getContentId());
|
||||
}
|
||||
|
||||
_queueMgr.purgeItem(executor.getSyncSource().getId());
|
||||
checkQueue(executor.getSyncSource().getQueueId());
|
||||
}
|
||||
}
|
||||
|
||||
private void checkQueue(long queueId) {
|
||||
while(true) {
|
||||
try {
|
||||
SyncQueueItemVO item = _queueMgr.dequeueFromOne(queueId, getMsid());
|
||||
if(item != null) {
|
||||
}
|
||||
|
||||
_queueMgr.purgeItem(executor.getSyncSource().getId());
|
||||
checkQueue(executor.getSyncSource().getQueueId());
|
||||
}
|
||||
}
|
||||
|
||||
private void checkQueue(long queueId) {
|
||||
while(true) {
|
||||
try {
|
||||
SyncQueueItemVO item = _queueMgr.dequeueFromOne(queueId, getMsid());
|
||||
if(item != null) {
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Executing sync queue item: " + item.toString());
|
||||
}
|
||||
|
||||
executeQueueItem(item, false);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} catch(Throwable e) {
|
||||
s_logger.error("Unexpected exception when kicking sync queue-" + queueId, e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Runnable getHeartbeatTask() {
|
||||
return new Runnable() {
|
||||
}
|
||||
|
||||
executeQueueItem(item, false);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} catch(Throwable e) {
|
||||
s_logger.error("Unexpected exception when kicking sync queue-" + queueId, e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Runnable getHeartbeatTask() {
|
||||
return new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
public void run() {
|
||||
try {
|
||||
List<SyncQueueItemVO> l = _queueMgr.dequeueFromAny(getMsid(), MAX_ONETIME_SCHEDULE_SIZE);
|
||||
if(l != null && l.size() > 0) {
|
||||
for(SyncQueueItemVO item: l) {
|
||||
List<SyncQueueItemVO> l = _queueMgr.dequeueFromAny(getMsid(), MAX_ONETIME_SCHEDULE_SIZE);
|
||||
if(l != null && l.size() > 0) {
|
||||
for(SyncQueueItemVO item: l) {
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Execute sync-queue item: " + item.toString());
|
||||
}
|
||||
executeQueueItem(item, false);
|
||||
}
|
||||
}
|
||||
} catch(Throwable e) {
|
||||
s_logger.error("Unexpected exception when trying to execute queue item, ", e);
|
||||
}
|
||||
}
|
||||
} catch(Throwable e) {
|
||||
s_logger.error("Unexpected exception when trying to execute queue item, ", e);
|
||||
} finally {
|
||||
StackMaid.current().exitCleanup();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@DB
|
||||
@ -643,14 +649,14 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private long getMsid() {
|
||||
|
||||
private long getMsid() {
|
||||
if(_clusterMgr != null) {
|
||||
return _clusterMgr.getManagementNodeId();
|
||||
}
|
||||
|
||||
return MacAddress.getMacAddress().toLong();
|
||||
}
|
||||
}
|
||||
|
||||
return MacAddress.getMacAddress().toLong();
|
||||
}
|
||||
|
||||
private void cleanupPendingJobs(List<SyncQueueItemVO> l) {
|
||||
if(l != null && l.size() > 0) {
|
||||
@ -670,13 +676,13 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
|
||||
_queueMgr.purgeItem(item.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
|
||||
_name = name;
|
||||
|
||||
@Override
|
||||
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
|
||||
_name = name;
|
||||
|
||||
ComponentLocator locator = ComponentLocator.getCurrentLocator();
|
||||
ComponentLocator locator = ComponentLocator.getCurrentLocator();
|
||||
|
||||
ConfigurationDao configDao = locator.getDao(ConfigurationDao.class);
|
||||
if (configDao == null) {
|
||||
@ -694,25 +700,25 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
|
||||
_accountDao = locator.getDao(AccountDao.class);
|
||||
if (_accountDao == null) {
|
||||
throw new ConfigurationException("Unable to get " + AccountDao.class.getName());
|
||||
}
|
||||
_jobDao = locator.getDao(AsyncJobDao.class);
|
||||
if (_jobDao == null) {
|
||||
throw new ConfigurationException("Unable to get "
|
||||
+ AsyncJobDao.class.getName());
|
||||
}
|
||||
|
||||
_context = locator.getManager(AsyncJobExecutorContext.class);
|
||||
if (_context == null) {
|
||||
throw new ConfigurationException("Unable to get "
|
||||
+ AsyncJobExecutorContext.class.getName());
|
||||
}
|
||||
|
||||
_queueMgr = locator.getManager(SyncQueueManager.class);
|
||||
if(_queueMgr == null) {
|
||||
throw new ConfigurationException("Unable to get "
|
||||
+ SyncQueueManager.class.getName());
|
||||
}
|
||||
|
||||
}
|
||||
_jobDao = locator.getDao(AsyncJobDao.class);
|
||||
if (_jobDao == null) {
|
||||
throw new ConfigurationException("Unable to get "
|
||||
+ AsyncJobDao.class.getName());
|
||||
}
|
||||
|
||||
_context = locator.getManager(AsyncJobExecutorContext.class);
|
||||
if (_context == null) {
|
||||
throw new ConfigurationException("Unable to get "
|
||||
+ AsyncJobExecutorContext.class.getName());
|
||||
}
|
||||
|
||||
_queueMgr = locator.getManager(SyncQueueManager.class);
|
||||
if(_queueMgr == null) {
|
||||
throw new ConfigurationException("Unable to get "
|
||||
+ SyncQueueManager.class.getName());
|
||||
}
|
||||
|
||||
_clusterMgr = locator.getManager(ClusterManager.class);
|
||||
|
||||
_accountMgr = locator.getManager(AccountManager.class);
|
||||
@ -735,7 +741,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
|
||||
throw new ConfigurationException("Unable to load db.properties to configure AsyncJobManagerImpl");
|
||||
}
|
||||
|
||||
return true;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -765,10 +771,10 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
|
||||
@Override
|
||||
public void onManagementNodeIsolated() {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@Override
|
||||
public boolean start() {
|
||||
try {
|
||||
try {
|
||||
List<SyncQueueItemVO> l = _queueMgr.getActiveQueueItems(getMsid(), false);
|
||||
cleanupPendingJobs(l);
|
||||
_queueMgr.resetQueueProcess(getMsid());
|
||||
@ -776,13 +782,13 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
|
||||
} catch(Throwable e) {
|
||||
s_logger.error("Unexpected exception " + e.getMessage(), e);
|
||||
}
|
||||
|
||||
_heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), HEARTBEAT_INTERVAL,
|
||||
|
||||
_heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), HEARTBEAT_INTERVAL,
|
||||
HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
|
||||
_heartbeatScheduler.scheduleAtFixedRate(getGCTask(), GC_INTERVAL,
|
||||
GC_INTERVAL, TimeUnit.MILLISECONDS);
|
||||
|
||||
return true;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private static ExceptionResponse getResetResultResponse(String errorMessage) {
|
||||
@ -794,17 +800,17 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
|
||||
|
||||
private static String getSerializedErrorMessage(String errorMessage) {
|
||||
return ApiSerializerHelper.toSerializedStringOld(getResetResultResponse(errorMessage));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean stop() {
|
||||
_heartbeatScheduler.shutdown();
|
||||
_executor.shutdown();
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return _name;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean stop() {
|
||||
_heartbeatScheduler.shutdown();
|
||||
_executor.shutdown();
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return _name;
|
||||
}
|
||||
}
|
||||
|
||||
@ -169,11 +169,11 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
|
||||
factory.setCallback(0, builder);
|
||||
return builder;
|
||||
}
|
||||
|
||||
|
||||
public final Map<String, Attribute> getAllAttributes() {
|
||||
return _allAttributes;
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected GenericDaoBase() {
|
||||
Type t = getClass().getGenericSuperclass();
|
||||
@ -182,8 +182,8 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
|
||||
} else if (((Class<?>)t).getGenericSuperclass() instanceof ParameterizedType) {
|
||||
_entityBeanType = (Class<T>)((ParameterizedType)((Class<?>)t).getGenericSuperclass()).getActualTypeArguments()[0];
|
||||
} else {
|
||||
_entityBeanType = (Class<T>)((ParameterizedType)
|
||||
( (Class<?>)((Class<?>)t).getGenericSuperclass()).getGenericSuperclass()).getActualTypeArguments()[0];
|
||||
_entityBeanType = (Class<T>)((ParameterizedType)
|
||||
( (Class<?>)((Class<?>)t).getGenericSuperclass()).getGenericSuperclass()).getActualTypeArguments()[0];
|
||||
}
|
||||
|
||||
s_daoMaps.put(_entityBeanType, this);
|
||||
@ -298,7 +298,7 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
|
||||
|
||||
return s_seqFetcher.getRandomNextSequence(clazz, tg);
|
||||
}
|
||||
|
||||
|
||||
@Override @DB(txn=false)
|
||||
public List<T> lockRows(final SearchCriteria<T> sc, final Filter filter, final boolean exclusive) {
|
||||
return search(sc, filter, exclusive, false);
|
||||
@ -338,8 +338,9 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
|
||||
return searchIncludingRemoved(sc, filter, lock, cache, false) ;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<T> searchIncludingRemoved(SearchCriteria<T> sc, final Filter filter, final Boolean lock,
|
||||
final boolean cache, final boolean enable_query_cache) {
|
||||
final boolean cache, final boolean enable_query_cache) {
|
||||
String clause = sc != null ? sc.getWhereClause() : null;
|
||||
if (clause != null && clause.length() == 0) {
|
||||
clause = null;
|
||||
@ -368,7 +369,7 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
|
||||
}
|
||||
|
||||
final String sql = str.toString();
|
||||
|
||||
|
||||
PreparedStatement pstmt = null;
|
||||
final List<T> result = new ArrayList<T>();
|
||||
try {
|
||||
@ -494,11 +495,11 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
|
||||
byte[] bytes = rs.getBytes(index);
|
||||
if(bytes != null) {
|
||||
try {
|
||||
if(field.getAnnotation(Column.class).encryptable()){
|
||||
field.set(entity, DBEncryptionUtil.decrypt(new String(bytes, "UTF-8")));
|
||||
} else {
|
||||
field.set(entity, new String(bytes, "UTF-8"));
|
||||
}
|
||||
if(field.getAnnotation(Column.class).encryptable()){
|
||||
field.set(entity, DBEncryptionUtil.decrypt(new String(bytes, "UTF-8")));
|
||||
} else {
|
||||
field.set(entity, new String(bytes, "UTF-8"));
|
||||
}
|
||||
} catch (IllegalArgumentException e) {
|
||||
assert(false);
|
||||
throw new CloudRuntimeException("IllegalArgumentException when converting UTF-8 data");
|
||||
@ -749,7 +750,7 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
|
||||
sc.addAnd(_idAttributes.get(_table)[0], SearchCriteria.Op.EQ, id);
|
||||
Transaction txn = Transaction.currentTxn();
|
||||
txn.start();
|
||||
|
||||
|
||||
try {
|
||||
if (ub.getCollectionChanges() != null) {
|
||||
insertElementCollection(entity, _idAttributes.get(_table)[0], id, ub.getCollectionChanges());
|
||||
@ -757,11 +758,11 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
|
||||
} catch (SQLException e) {
|
||||
throw new CloudRuntimeException("Unable to persist element collection", e);
|
||||
}
|
||||
|
||||
|
||||
int rowsUpdated = update(ub, sc, null);
|
||||
|
||||
|
||||
txn.commit();
|
||||
|
||||
|
||||
return rowsUpdated;
|
||||
}
|
||||
|
||||
@ -846,7 +847,7 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
|
||||
assert results.size() <= 1 : "Didn't the limiting worked?";
|
||||
return results.size() == 0 ? null : results.get(0);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
@DB(txn=false)
|
||||
public T findOneBy(final SearchCriteria<T> sc) {
|
||||
@ -855,7 +856,7 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
|
||||
}
|
||||
return findOneIncludingRemovedBy(sc);
|
||||
}
|
||||
|
||||
|
||||
@DB(txn=false)
|
||||
protected List<T> listBy(final SearchCriteria<T> sc, final Filter filter) {
|
||||
if (_removed != null) {
|
||||
@ -881,7 +882,7 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
|
||||
protected List<T> listIncludingRemovedBy(final SearchCriteria<T> sc, final Filter filter, final boolean enable_query_cache) {
|
||||
return searchIncludingRemoved(sc, filter, null, false, enable_query_cache);
|
||||
}
|
||||
|
||||
|
||||
@DB(txn=false)
|
||||
protected List<T> listIncludingRemovedBy(final SearchCriteria<T> sc, final Filter filter) {
|
||||
return searchIncludingRemoved(sc, filter, null, false);
|
||||
@ -1150,7 +1151,7 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
|
||||
|
||||
return sql;
|
||||
}
|
||||
|
||||
|
||||
@DB(txn=false)
|
||||
protected StringBuilder createPartialSelectSql(SearchCriteria<?> sc, final boolean whereClause) {
|
||||
StringBuilder sql = new StringBuilder(_partialSelectSql.first());
|
||||
@ -1209,7 +1210,7 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
|
||||
public List<T> search(final SearchCriteria<T> sc, final Filter filter, final boolean enable_query_cache) {
|
||||
return search(sc, filter, null, false, enable_query_cache);
|
||||
}
|
||||
|
||||
|
||||
@Override @DB(txn=false)
|
||||
public boolean update(ID id, T entity) {
|
||||
assert Enhancer.isEnhanced(entity.getClass()) : "Entity is not generated by this dao";
|
||||
@ -1284,7 +1285,7 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (_ecAttributes != null && _ecAttributes.size() > 0) {
|
||||
HashMap<Attribute, Object> ecAttributes = new HashMap<Attribute, Object>();
|
||||
for (Attribute attr : _ecAttributes) {
|
||||
@ -1293,7 +1294,7 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
|
||||
ecAttributes.put(attr, ec);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
insertElementCollection(entity, _idAttributes.get(_table)[0], id, ecAttributes);
|
||||
}
|
||||
txn.commit();
|
||||
@ -1318,7 +1319,7 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
|
||||
for (Map.Entry<Attribute, Object> entry : ecAttributes.entrySet()) {
|
||||
Attribute attr = entry.getKey();
|
||||
Object obj = entry.getValue();
|
||||
|
||||
|
||||
EcInfo ec = (EcInfo)attr.attache;
|
||||
Enumeration en = null;
|
||||
if (ec.rawClass == null) {
|
||||
@ -1329,7 +1330,7 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
|
||||
PreparedStatement pstmt = txn.prepareAutoCloseStatement(ec.clearSql);
|
||||
prepareAttribute(1, pstmt, idAttribute, id);
|
||||
pstmt.executeUpdate();
|
||||
|
||||
|
||||
while (en.hasMoreElements()) {
|
||||
pstmt = txn.prepareAutoCloseStatement(ec.insertSql);
|
||||
if (ec.targetClass == Date.class) {
|
||||
@ -1389,11 +1390,11 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
|
||||
// to support generic localization, utilize MySql UTF-8 support
|
||||
if (length < str.length()) {
|
||||
try {
|
||||
if (attr.is(Attribute.Flag.Encrypted)){
|
||||
pstmt.setBytes(j, DBEncryptionUtil.encrypt(str.substring(0, column.length())).getBytes("UTF-8"));
|
||||
} else {
|
||||
pstmt.setBytes(j, str.substring(0, column.length()).getBytes("UTF-8"));
|
||||
}
|
||||
if (attr.is(Attribute.Flag.Encrypted)){
|
||||
pstmt.setBytes(j, DBEncryptionUtil.encrypt(str.substring(0, column.length())).getBytes("UTF-8"));
|
||||
} else {
|
||||
pstmt.setBytes(j, str.substring(0, column.length()).getBytes("UTF-8"));
|
||||
}
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
// no-way it can't support UTF-8 encoding
|
||||
assert(false);
|
||||
@ -1401,11 +1402,11 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
if (attr.is(Attribute.Flag.Encrypted)){
|
||||
pstmt.setBytes(j, DBEncryptionUtil.encrypt(str).getBytes("UTF-8"));
|
||||
} else {
|
||||
pstmt.setBytes(j, str.getBytes("UTF-8"));
|
||||
}
|
||||
if (attr.is(Attribute.Flag.Encrypted)){
|
||||
pstmt.setBytes(j, DBEncryptionUtil.encrypt(str).getBytes("UTF-8"));
|
||||
} else {
|
||||
pstmt.setBytes(j, str.getBytes("UTF-8"));
|
||||
}
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
// no-way it can't support UTF-8 encoding
|
||||
assert(false);
|
||||
@ -1499,6 +1500,28 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
|
||||
return entity;
|
||||
}
|
||||
|
||||
@DB(txn=false)
|
||||
protected T toVO(ResultSet result, boolean cache) throws SQLException {
|
||||
T entity;
|
||||
try {
|
||||
entity = _entityBeanType.newInstance();
|
||||
} catch (InstantiationException e1) {
|
||||
throw new CloudRuntimeException("Unable to instantiate entity", e1);
|
||||
} catch (IllegalAccessException e1) {
|
||||
throw new CloudRuntimeException("Illegal Access", e1);
|
||||
}
|
||||
toEntityBean(result, entity);
|
||||
if (cache && _cache != null) {
|
||||
try {
|
||||
_cache.put(new Element(_idField.get(entity), entity));
|
||||
} catch (final Exception e) {
|
||||
s_logger.debug("Can't put it in the cache", e);
|
||||
}
|
||||
}
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
@DB(txn=false)
|
||||
protected void toEntityBean(final ResultSet result, final T entity) throws SQLException {
|
||||
ResultSetMetaData meta = result.getMetaData();
|
||||
@ -1550,7 +1573,7 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
|
||||
} else {
|
||||
assert (false) : "You'll need to add more classeses";
|
||||
}
|
||||
|
||||
|
||||
if (ec.rawClass == null) {
|
||||
Object[] array = (Object[])Array.newInstance(ec.targetClass);
|
||||
lst.toArray(array);
|
||||
@ -1722,22 +1745,22 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
|
||||
SearchBuilder<T> builder = createSearchBuilder();
|
||||
return builder.create();
|
||||
}
|
||||
|
||||
|
||||
@Override @DB(txn=false)
|
||||
public <K> SearchCriteria2 createSearchCriteria2(Class<K> resultType) {
|
||||
final T entity = (T)_searchEnhancer.create();
|
||||
final Factory factory = (Factory)entity;
|
||||
SearchCriteria2 sc = new SearchCriteria2(entity, resultType, _allAttributes, this);
|
||||
factory.setCallback(0, sc);
|
||||
return sc;
|
||||
final T entity = (T)_searchEnhancer.create();
|
||||
final Factory factory = (Factory)entity;
|
||||
SearchCriteria2 sc = new SearchCriteria2(entity, resultType, _allAttributes, this);
|
||||
factory.setCallback(0, sc);
|
||||
return sc;
|
||||
}
|
||||
|
||||
|
||||
@Override @DB(txn=false)
|
||||
public SearchCriteria2 createSearchCriteria2() {
|
||||
final T entity = (T)_searchEnhancer.create();
|
||||
final Factory factory = (Factory)entity;
|
||||
SearchCriteria2 sc = new SearchCriteria2(entity, (Class<T>)entity.getClass(), _allAttributes, this);
|
||||
factory.setCallback(0, sc);
|
||||
return sc;
|
||||
final T entity = (T)_searchEnhancer.create();
|
||||
final Factory factory = (Factory)entity;
|
||||
SearchCriteria2 sc = new SearchCriteria2(entity, entity.getClass(), _allAttributes, this);
|
||||
factory.setCallback(0, sc);
|
||||
return sc;
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user