Merge branch '4.19' into 4.20

This commit is contained in:
Daan Hoogland 2025-07-16 10:28:17 +02:00
commit 0d65c8c454
3 changed files with 48 additions and 18 deletions

View File

@ -37,4 +37,6 @@ public interface UsageJobDao extends GenericDao<UsageJobVO, Long> {
UsageJobVO isOwner(String hostname, int pid); UsageJobVO isOwner(String hostname, int pid);
void updateJobSuccess(Long jobId, long startMillis, long endMillis, long execTime, boolean success); void updateJobSuccess(Long jobId, long startMillis, long endMillis, long execTime, boolean success);
void removeLastOpenJobsOwned(String hostname, int pid);
} }

View File

@ -22,6 +22,7 @@ import java.util.Date;
import java.util.List; import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.cloud.usage.UsageJobVO; import com.cloud.usage.UsageJobVO;
@ -114,7 +115,7 @@ public class UsageJobDaoImpl extends GenericDaoBase<UsageJobVO, Long> implements
public UsageJobVO isOwner(String hostname, int pid) { public UsageJobVO isOwner(String hostname, int pid) {
TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.USAGE_DB); TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.USAGE_DB);
try { try {
if ((hostname == null) || (pid <= 0)) { if (hostname == null || pid <= 0) {
return null; return null;
} }
@ -174,7 +175,7 @@ public class UsageJobDaoImpl extends GenericDaoBase<UsageJobVO, Long> implements
SearchCriteria<UsageJobVO> sc = createSearchCriteria(); SearchCriteria<UsageJobVO> sc = createSearchCriteria();
sc.addAnd("endMillis", SearchCriteria.Op.EQ, Long.valueOf(0)); sc.addAnd("endMillis", SearchCriteria.Op.EQ, Long.valueOf(0));
sc.addAnd("jobType", SearchCriteria.Op.EQ, Integer.valueOf(UsageJobVO.JOB_TYPE_SINGLE)); sc.addAnd("jobType", SearchCriteria.Op.EQ, Integer.valueOf(UsageJobVO.JOB_TYPE_SINGLE));
sc.addAnd("scheduled", SearchCriteria.Op.EQ, Integer.valueOf(0)); sc.addAnd("scheduled", SearchCriteria.Op.EQ, Integer.valueOf(UsageJobVO.JOB_NOT_SCHEDULED));
List<UsageJobVO> jobs = search(sc, filter); List<UsageJobVO> jobs = search(sc, filter);
if ((jobs == null) || jobs.isEmpty()) { if ((jobs == null) || jobs.isEmpty()) {
@ -194,4 +195,36 @@ public class UsageJobDaoImpl extends GenericDaoBase<UsageJobVO, Long> implements
} }
return jobs.get(0).getHeartbeat(); return jobs.get(0).getHeartbeat();
} }
private List<UsageJobVO> getLastOpenJobsOwned(String hostname, int pid) {
SearchCriteria<UsageJobVO> sc = createSearchCriteria();
sc.addAnd("endMillis", SearchCriteria.Op.EQ, Long.valueOf(0));
sc.addAnd("host", SearchCriteria.Op.EQ, hostname);
if (pid > 0) {
sc.addAnd("pid", SearchCriteria.Op.EQ, Integer.valueOf(pid));
}
return listBy(sc);
}
@Override
public void removeLastOpenJobsOwned(String hostname, int pid) {
if (hostname == null) {
return;
}
TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.USAGE_DB);
try {
List<UsageJobVO> jobs = getLastOpenJobsOwned(hostname, pid);
if (CollectionUtils.isNotEmpty(jobs)) {
logger.info("Found {} opens job, to remove", jobs.size());
for (UsageJobVO job : jobs) {
logger.debug("Removing job - id: {}, pid: {}, job type: {}, scheduled: {}, heartbeat: {}",
job.getId(), job.getPid(), job.getJobType(), job.getScheduled(), job.getHeartbeat());
remove(job.getId());
}
}
} finally {
txn.close();
}
}
} }

View File

@ -324,6 +324,9 @@ public class UsageManagerImpl extends ManagerBase implements UsageManager, Runna
logger.info("Starting Usage Manager"); logger.info("Starting Usage Manager");
} }
_usageJobDao.removeLastOpenJobsOwned(_hostname, 0);
Runtime.getRuntime().addShutdownHook(new AbandonJob());
// use the configured exec time and aggregation duration for scheduling the job // use the configured exec time and aggregation duration for scheduling the job
_scheduledFuture = _scheduledFuture =
_executor.scheduleAtFixedRate(this, _jobExecTime.getTimeInMillis() - System.currentTimeMillis(), _aggregationDuration * 60 * 1000, TimeUnit.MILLISECONDS); _executor.scheduleAtFixedRate(this, _jobExecTime.getTimeInMillis() - System.currentTimeMillis(), _aggregationDuration * 60 * 1000, TimeUnit.MILLISECONDS);
@ -336,7 +339,6 @@ public class UsageManagerImpl extends ManagerBase implements UsageManager, Runna
_sanity = _sanityExecutor.scheduleAtFixedRate(new SanityCheck(), 1, _sanityCheckInterval, TimeUnit.DAYS); _sanity = _sanityExecutor.scheduleAtFixedRate(new SanityCheck(), 1, _sanityCheckInterval, TimeUnit.DAYS);
} }
Runtime.getRuntime().addShutdownHook(new AbandonJob());
TransactionLegacy usageTxn = TransactionLegacy.open(TransactionLegacy.USAGE_DB); TransactionLegacy usageTxn = TransactionLegacy.open(TransactionLegacy.USAGE_DB);
try { try {
if (_heartbeatLock.lock(3)) { // 3 second timeout if (_heartbeatLock.lock(3)) { // 3 second timeout
@ -2262,19 +2264,17 @@ public class UsageManagerImpl extends ManagerBase implements UsageManager, Runna
// the aggregation range away from executing the next job // the aggregation range away from executing the next job
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
long timeToJob = _jobExecTime.getTimeInMillis() - now; long timeToJob = _jobExecTime.getTimeInMillis() - now;
long timeSinceJob = 0; long timeSinceLastSuccessJob = 0;
long aggregationDurationMillis = _aggregationDuration * 60L * 1000L; long aggregationDurationMillis = _aggregationDuration * 60L * 1000L;
long lastSuccess = _usageJobDao.getLastJobSuccessDateMillis(); long lastSuccess = _usageJobDao.getLastJobSuccessDateMillis();
if (lastSuccess > 0) { if (lastSuccess > 0) {
timeSinceJob = now - lastSuccess; timeSinceLastSuccessJob = now - lastSuccess;
} }
if ((timeSinceJob > 0) && (timeSinceJob > (aggregationDurationMillis - 100))) { if ((timeSinceLastSuccessJob > 0) && (timeSinceLastSuccessJob > (aggregationDurationMillis - 100))) {
if (timeToJob > (aggregationDurationMillis / 2)) { if (timeToJob > (aggregationDurationMillis / 2)) {
if (logger.isDebugEnabled()) { logger.debug("it's been {} ms since last usage job and {} ms until next job, scheduling an immediate job to catch up (aggregation duration is {} minutes)"
logger.debug("it's been " + timeSinceJob + " ms since last usage job and " + timeToJob + , timeSinceLastSuccessJob, timeToJob, _aggregationDuration);
" ms until next job, scheduling an immediate job to catch up (aggregation duration is " + _aggregationDuration + " minutes)");
}
scheduleParse(); scheduleParse();
} }
} }
@ -2359,17 +2359,12 @@ public class UsageManagerImpl extends ManagerBase implements UsageManager, Runna
} }
} }
} }
private class AbandonJob extends Thread { private class AbandonJob extends Thread {
@Override @Override
public void run() { public void run() {
logger.info("exitting Usage Manager"); logger.info("exiting Usage Manager");
deleteOpenjob(); _usageJobDao.removeLastOpenJobsOwned(_hostname, _pid);
}
private void deleteOpenjob() {
UsageJobVO job = _usageJobDao.isOwner(_hostname, _pid);
if (job != null) {
_usageJobDao.remove(job.getId());
}
} }
} }
} }