diff --git a/server/src/com/cloud/agent/manager/AgentManagerImpl.java b/server/src/com/cloud/agent/manager/AgentManagerImpl.java
index 08a9bbe4677..337605ed02c 100755
--- a/server/src/com/cloud/agent/manager/AgentManagerImpl.java
+++ b/server/src/com/cloud/agent/manager/AgentManagerImpl.java
@@ -85,6 +85,7 @@ import com.cloud.api.commands.UpdateHostCmd;
import com.cloud.capacity.Capacity;
import com.cloud.capacity.CapacityVO;
import com.cloud.capacity.dao.CapacityDao;
+import com.cloud.cluster.StackMaid;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.dc.ClusterDetailsDao;
import com.cloud.dc.ClusterDetailsVO;
@@ -127,7 +128,6 @@ import com.cloud.host.dao.HostTagsDao;
import com.cloud.hypervisor.Hypervisor;
import com.cloud.hypervisor.Hypervisor.HypervisorType;
import com.cloud.hypervisor.kvm.resource.KvmDummyResourceBase;
-import com.cloud.maid.StackMaid;
import com.cloud.maint.UpgradeManager;
import com.cloud.network.IPAddressVO;
import com.cloud.network.NetworkManager;
diff --git a/server/src/com/cloud/api/ApiServer.java b/server/src/com/cloud/api/ApiServer.java
index dc8878cb55b..a085461dc9e 100755
--- a/server/src/com/cloud/api/ApiServer.java
+++ b/server/src/com/cloud/api/ApiServer.java
@@ -84,13 +84,13 @@ import com.cloud.api.response.ListResponse;
import com.cloud.async.AsyncJob;
import com.cloud.async.AsyncJobManager;
import com.cloud.async.AsyncJobVO;
+import com.cloud.cluster.StackMaid;
import com.cloud.configuration.ConfigurationVO;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.domain.Domain;
import com.cloud.domain.DomainVO;
import com.cloud.event.EventUtils;
import com.cloud.exception.CloudAuthenticationException;
-import com.cloud.maid.StackMaid;
import com.cloud.server.ManagementServer;
import com.cloud.user.Account;
import com.cloud.user.AccountService;
diff --git a/server/src/com/cloud/api/ApiServlet.java b/server/src/com/cloud/api/ApiServlet.java
index 1332dfe4a19..b853eb4c944 100755
--- a/server/src/com/cloud/api/ApiServlet.java
+++ b/server/src/com/cloud/api/ApiServlet.java
@@ -32,8 +32,8 @@ import javax.servlet.http.HttpSession;
import org.apache.log4j.Logger;
+import com.cloud.cluster.StackMaid;
import com.cloud.exception.CloudAuthenticationException;
-import com.cloud.maid.StackMaid;
import com.cloud.server.ManagementServer;
import com.cloud.user.Account;
import com.cloud.user.AccountService;
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index 9de420b87a9..ad11df4e9a0 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -46,11 +46,11 @@ import com.cloud.async.dao.AsyncJobDao;
import com.cloud.cluster.ClusterManager;
import com.cloud.cluster.ClusterManagerListener;
import com.cloud.cluster.ManagementServerHostVO;
+import com.cloud.cluster.StackMaid;
import com.cloud.configuration.Config;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.exception.InvalidParameterValueException;
import com.cloud.exception.PermissionDeniedException;
-import com.cloud.maid.StackMaid;
import com.cloud.user.Account;
import com.cloud.user.UserContext;
import com.cloud.user.dao.AccountDao;
diff --git a/server/src/com/cloud/cluster/CleanupMaid.java b/server/src/com/cloud/cluster/CleanupMaid.java
new file mode 100644
index 00000000000..6bcca4b2d21
--- /dev/null
+++ b/server/src/com/cloud/cluster/CleanupMaid.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
+ *
+ * This software is licensed under the GNU General Public License v3 or later.
+ *
+ * It is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or any later version.
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ */
+package com.cloud.cluster;
+
+/**
+ * CleanupMaid is implemented by tasks that needs to perform cleanup.
+ *
+ * It can contain the actual information about the current state of the
+ * task. The state is serialized and stored. When cleanup is required
+ * CleanupMaid is instantiated from the stored data and cleanup() is called.
+ *
+ */
+public interface CleanupMaid {
+ /**
+ * cleanup according the state that was stored.
+ *
+ * @return 0 indicates cleanup was successful. Negative number
+ * indicates the cleanup was unsuccessful but don't retry. Positive number
+ * indicates the cleanup was unsuccessful and retry in this many seconds.
+ */
+ int cleanup();
+
+
+ /**
+ * If cleanup is unsuccessful and not to be retried, the cleanup procedure
+ * returned here is recorded.
+ * @return
+ */
+ String getCleanupProcedure();
+}
diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/server/src/com/cloud/cluster/ClusterManagerImpl.java
index 3c475b318af..39108de8f97 100644
--- a/server/src/com/cloud/cluster/ClusterManagerImpl.java
+++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java
@@ -51,7 +51,6 @@ import com.cloud.utils.events.SubscriptionMgr;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.exception.ExceptionUtil;
import com.cloud.utils.mgmt.JmxUtil;
-import com.cloud.utils.net.MacAddress;
import com.cloud.utils.net.NetUtils;
import com.google.gson.Gson;
@@ -90,7 +89,7 @@ public class ClusterManagerImpl implements ClusterManager {
// _msid is the unique persistent identifier that peer name is based upon
//
private Long _mshostId = null;
- protected long _msid = MacAddress.getMacAddress().toLong();
+ protected long _msid = ManagementServerNode.getManagementServerId();
protected long _runId = System.currentTimeMillis();
private boolean _peerScanInited = false;
diff --git a/server/src/com/cloud/cluster/ManagementServerNode.java b/server/src/com/cloud/cluster/ManagementServerNode.java
new file mode 100644
index 00000000000..5f201ef4025
--- /dev/null
+++ b/server/src/com/cloud/cluster/ManagementServerNode.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
+ *
+ * This software is licensed under the GNU General Public License v3 or later.
+ *
+ * It is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or any later version.
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ */
+package com.cloud.cluster;
+
+import com.cloud.utils.component.SystemIntegrityChecker;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.utils.net.MacAddress;
+
+public class ManagementServerNode implements SystemIntegrityChecker {
+ private static final long s_nodeId = MacAddress.getMacAddress().toLong();
+
+ @Override
+ public void check() {
+ if (s_nodeId <= 0) {
+ throw new CloudRuntimeException("Unable to get the management server node id");
+ }
+ }
+
+ public static long getManagementServerId() {
+ return s_nodeId;
+ }
+}
diff --git a/server/src/com/cloud/maid/StackMaid.java b/server/src/com/cloud/cluster/StackMaid.java
similarity index 92%
rename from server/src/com/cloud/maid/StackMaid.java
rename to server/src/com/cloud/cluster/StackMaid.java
index feddbf28c27..3f97b3282c0 100644
--- a/server/src/com/cloud/maid/StackMaid.java
+++ b/server/src/com/cloud/cluster/StackMaid.java
@@ -1,12 +1,12 @@
-package com.cloud.maid;
+package com.cloud.cluster;
import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;
-import com.cloud.maid.dao.StackMaidDao;
-import com.cloud.maid.dao.StackMaidDaoImpl;
+import com.cloud.cluster.dao.StackMaidDao;
+import com.cloud.cluster.dao.StackMaidDaoImpl;
import com.cloud.serializer.SerializerHelper;
import com.cloud.utils.CleanupDelegate;
import com.cloud.utils.db.Transaction;
@@ -99,7 +99,7 @@ public class StackMaid {
public void exitCleanup(long currentMsid) {
if(currentSeq > 0) {
- StackMaidVO maid = null;
+ TaskVO maid = null;
while((maid = maidDao.popCleanupDelegate(currentMsid)) != null) {
doCleanup(maid);
}
@@ -109,7 +109,7 @@ public class StackMaid {
context.clear();
}
- public static boolean doCleanup(StackMaidVO maid) {
+ public static boolean doCleanup(TaskVO maid) {
if(maid.getDelegate() != null) {
try {
Class> clz = Class.forName(maid.getDelegate());
diff --git a/server/src/com/cloud/cluster/TaskManager.java b/server/src/com/cloud/cluster/TaskManager.java
new file mode 100644
index 00000000000..9f17fbfb4b4
--- /dev/null
+++ b/server/src/com/cloud/cluster/TaskManager.java
@@ -0,0 +1,42 @@
+package com.cloud.cluster;
+
+import com.cloud.utils.component.Manager;
+
+/**
+ * TaskManager helps business logic deal with clustering failover.
+ * Say you're writing code that introduces an inconsistent state over
+ * a long period of time. What happens if the server died in the middle
+ * of your operation? Who will come back to cleanup this state? TaskManager
+ * will help with doing this. You can create a task and update the state
+ * with different content during your process. If the server dies, TaskManager
+ * will automatically cleanup the tasks if there is a clustered server
+ * running elsewhere. If there are no clustered servers, then TaskManager will
+ * cleanup when the dead server resumes.
+ *
+ */
+public interface TaskManager extends Manager {
+ /**
+ * Adds a task with the context as to what the task is and the class
+ * responsible for cleaning up.
+ *
+ * @param context context information to be stored.
+ * @param cleaner clazz responsible for cleanup if the process was interrupted.
+ * @return task id.
+ */
+ long addTask(CleanupMaid context);
+
+ /**
+ * update the task with new context
+ * @param taskId
+ * @param updatedContext new updated context.
+ */
+ void updateTask(long taskId, CleanupMaid updatedContext);
+
+
+ /**
+ * removes the task as it is completed.
+ *
+ * @param taskId
+ */
+ void taskCompleted(long taskId);
+}
diff --git a/server/src/com/cloud/cluster/TaskManagerImpl.java b/server/src/com/cloud/cluster/TaskManagerImpl.java
new file mode 100644
index 00000000000..9241670b37b
--- /dev/null
+++ b/server/src/com/cloud/cluster/TaskManagerImpl.java
@@ -0,0 +1,230 @@
+package com.cloud.cluster;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.ejb.Local;
+import javax.naming.ConfigurationException;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.cluster.dao.StackMaidDao;
+import com.cloud.configuration.Config;
+import com.cloud.configuration.dao.ConfigurationDao;
+import com.cloud.serializer.SerializerHelper;
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.NumbersUtil;
+import com.cloud.utils.component.ComponentLocator;
+import com.cloud.utils.component.Inject;
+import com.cloud.utils.concurrency.NamedThreadFactory;
+import com.cloud.utils.db.DB;
+import com.cloud.utils.db.GlobalLock;
+
+@Local(value=TaskManager.class)
+public class TaskManagerImpl implements TaskManager, ClusterManagerListener {
+ private static final Logger s_logger = Logger.getLogger(TaskManagerImpl.class);
+
+ private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
+ private static final int GC_INTERVAL = 10000; // 10 seconds
+ private int _cleanupRetryInterval;
+
+ private String _name;
+
+ @Inject
+ private StackMaidDao _maidDao;
+
+ @Inject
+ private ClusterManager _clusterMgr;
+
+ long _msId;
+
+ private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("TaskMgr-Heartbeat"));
+
+ private final ScheduledExecutorService _cleanupScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Task-Cleanup"));
+
+ protected TaskManagerImpl() {
+ }
+
+ @Override
+ public boolean configure(String name, Map xmlParams) throws ConfigurationException {
+ _name = name;
+
+ if (s_logger.isInfoEnabled()) {
+ s_logger.info("Start configuring StackMaidManager : " + name);
+ }
+
+ StackMaid.init(ManagementServerNode.getManagementServerId());
+ _msId = ManagementServerNode.getManagementServerId();
+
+ _clusterMgr.registerListener(this);
+
+ ComponentLocator locator = ComponentLocator.getCurrentLocator();
+ ConfigurationDao configDao = locator.getDao(ConfigurationDao.class);
+
+ Map params = configDao.getConfiguration(xmlParams);
+
+ _cleanupRetryInterval = NumbersUtil.parseInt(params.get(Config.TaskCleanupRetryInterval.key()), 600);
+ _maidDao.takeover(_msId, _msId);
+ return true;
+ }
+
+ private void cleanupLeftovers(List l) {
+ for (TaskVO maid : l) {
+ if (StackMaid.doCleanup(maid)) {
+ _maidDao.expunge(maid.getId());
+ }
+ }
+ }
+
+ @DB
+ private Runnable getGCTask() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ GlobalLock scanLock = GlobalLock.getInternLock("StackMaidManagerGC");
+ try {
+ if (scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
+ try {
+ reallyRun();
+ } finally {
+ scanLock.unlock();
+ }
+ }
+ } finally {
+ scanLock.releaseRef();
+ }
+ }
+
+ public void reallyRun() {
+ try {
+ Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - 7200000);
+ List l = _maidDao.listLeftoversByCutTime(cutTime);
+ cleanupLeftovers(l);
+ } catch (Throwable e) {
+ s_logger.error("Unexpected exception when trying to execute queue item, ", e);
+ }
+ }
+ };
+ }
+
+ @Override
+ public boolean start() {
+// try {
+// List l = _maidDao.listLeftoversByMsid(_clusterMgr.getManagementNodeId());
+// cleanupLeftovers(l);
+// } catch (Throwable e) {
+// s_logger.error("Unexpected exception " + e.getMessage(), e);
+// }
+//
+// _heartbeatScheduler.scheduleAtFixedRate(getGCTask(), GC_INTERVAL, GC_INTERVAL, TimeUnit.MILLISECONDS);
+ _cleanupScheduler.schedule(new CleanupTask(), _cleanupRetryInterval > 0 ? _cleanupRetryInterval : 600, TimeUnit.SECONDS);
+ return true;
+ }
+
+ @Override
+ public boolean stop() {
+ return true;
+ }
+
+ @Override
+ public String getName() {
+ return _name;
+ }
+
+ @Override
+ public void onManagementNodeJoined(List nodeList, long selfNodeId) {
+ // Nothing to do
+ }
+
+ @Override
+ public void onManagementNodeLeft(List nodeList, long selfNodeId) {
+ for (ManagementServerHostVO node : nodeList) {
+ if (_maidDao.takeover(node.getMsid(), selfNodeId)) {
+ s_logger.info("Taking over from " + node.getMsid());
+ _cleanupScheduler.execute(new CleanupTask());
+ }
+ }
+ }
+
+ @Override
+ public long addTask(CleanupMaid context) {
+ return _maidDao.pushCleanupDelegate(_msId, 0, context.getClass().getName(), context);
+ }
+
+ @Override
+ public void updateTask(long taskId, CleanupMaid updatedContext) {
+ TaskVO task = _maidDao.createForUpdate();
+ task.setDelegate(updatedContext.getClass().getName());
+ task.setContext(SerializerHelper.toSerializedStringOld(updatedContext));
+ _maidDao.update(taskId, task);
+ }
+
+ @Override
+ public void taskCompleted(long taskId) {
+ _maidDao.remove(taskId);
+ }
+
+ protected boolean cleanup(TaskVO task) {
+ s_logger.info("Cleaning up " + task);
+ CleanupMaid delegate = (CleanupMaid)SerializerHelper.fromSerializedString(task.getContext());
+ assert delegate.getClass().getName().equals(task.getDelegate()) : "Deserializer says " + delegate.getClass().getName() + " but it's suppose to be " + task.getDelegate();
+
+ int result = delegate.cleanup();
+ if (result <= 0) {
+ if (result == 0) {
+ s_logger.info("Successfully cleaned up " + task.getId());
+ } else {
+ s_logger.warn("Unsuccessful in cleaning up " + task + ". Procedure to cleanup manaully: " + delegate.getCleanupProcedure());
+ }
+ taskCompleted(task.getId());
+ return true;
+ } else {
+ s_logger.error("Unable to cleanup " + task.getId());
+ return false;
+ }
+ }
+
+ class CleanupTask implements Runnable {
+
+ public CleanupTask() {
+ }
+
+ @Override
+ public void run() {
+ try {
+ List tasks = _maidDao.listCleanupTasks(_msId);
+
+ List retries = new ArrayList();
+
+ for (TaskVO task : tasks) {
+ try {
+ if (!cleanup(task)) {
+ retries.add(task);
+ }
+ } catch (Exception e) {
+ s_logger.warn("Unable to clean up " + task, e);
+
+ }
+ }
+
+ if (retries.size() > 0) {
+ if (_cleanupRetryInterval > 0) {
+ _cleanupScheduler.schedule(this, _cleanupRetryInterval, TimeUnit.SECONDS);
+ } else {
+ for (TaskVO task : retries) {
+ s_logger.warn("Cleanup procedure for " + task + ": " + ((CleanupMaid)SerializerHelper.fromSerializedString(task.getContext())).getCleanupProcedure());
+ }
+ }
+ }
+
+ } catch (Exception e) {
+ s_logger.error("Unable to cleanup all of the tasks for " + _msId, e);
+ }
+ }
+ }
+}
diff --git a/server/src/com/cloud/maid/StackMaidVO.java b/server/src/com/cloud/cluster/TaskVO.java
similarity index 79%
rename from server/src/com/cloud/maid/StackMaidVO.java
rename to server/src/com/cloud/cluster/TaskVO.java
index 5b8c145c8a1..1c870a2a6cc 100644
--- a/server/src/com/cloud/maid/StackMaidVO.java
+++ b/server/src/com/cloud/cluster/TaskVO.java
@@ -1,4 +1,4 @@
-package com.cloud.maid;
+package com.cloud.cluster;
import java.util.Date;
@@ -13,12 +13,12 @@ import com.cloud.utils.db.GenericDao;
@Entity
@Table(name="stack_maid")
-public class StackMaidVO {
+public class TaskVO {
@Id
@GeneratedValue(strategy=GenerationType.IDENTITY)
@Column(name="id")
- private Long id = null;
+ private long id;
@Column(name="msid")
private long msid;
@@ -38,17 +38,13 @@ public class StackMaidVO {
@Column(name=GenericDao.CREATED_COLUMN)
private Date created;
- public StackMaidVO() {
+ public TaskVO() {
}
- public Long getId() {
+ public long getId() {
return id;
}
- public void setId(Long id) {
- this.id = id;
- }
-
public long getMsid() {
return msid;
}
@@ -96,4 +92,9 @@ public class StackMaidVO {
public void setCreated(Date created) {
this.created = created;
}
+
+ @Override
+ public String toString() {
+ return new StringBuilder("Task[").append(id).append("-").append(context).append("-").append(delegate).append("]").toString();
+ }
}
diff --git a/server/src/com/cloud/cluster/dao/StackMaidDao.java b/server/src/com/cloud/cluster/dao/StackMaidDao.java
new file mode 100644
index 00000000000..e79d7e23d72
--- /dev/null
+++ b/server/src/com/cloud/cluster/dao/StackMaidDao.java
@@ -0,0 +1,30 @@
+package com.cloud.cluster.dao;
+
+import java.util.Date;
+import java.util.List;
+
+import com.cloud.cluster.TaskVO;
+import com.cloud.utils.db.GenericDao;
+
+public interface StackMaidDao extends GenericDao {
+ public long pushCleanupDelegate(long msid, int seq, String delegateClzName, Object context);
+ public TaskVO popCleanupDelegate(long msid);
+ public void clearStack(long msid);
+
+ public List listLeftoversByMsid(long msid);
+ public List listLeftoversByCutTime(Date cutTime);
+
+ /**
+ * Take over the task items of another management server and clean them up.
+ * This method changes the management server id of all of the tasks to
+ * this management server and mark the thread id as 0. It then returns
+ * all of the tasks that needs to be reverted to be processed.
+ *
+ * @param takeOverMsid management server id to take over.
+ * @param selfId the management server id of this node.
+ * @return list of tasks to take over.
+ */
+ boolean takeover(long takeOverMsid, long selfId);
+
+ List listCleanupTasks(long selfId);
+}
diff --git a/server/src/com/cloud/maid/dao/StackMaidDaoImpl.java b/server/src/com/cloud/cluster/dao/StackMaidDaoImpl.java
similarity index 50%
rename from server/src/com/cloud/maid/dao/StackMaidDaoImpl.java
rename to server/src/com/cloud/cluster/dao/StackMaidDaoImpl.java
index 0a4c3f2d73c..c430c1621a1 100644
--- a/server/src/com/cloud/maid/dao/StackMaidDaoImpl.java
+++ b/server/src/com/cloud/cluster/dao/StackMaidDaoImpl.java
@@ -1,4 +1,4 @@
-package com.cloud.maid.dao;
+package com.cloud.cluster.dao;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -12,7 +12,7 @@ import javax.ejb.Local;
import org.apache.log4j.Logger;
-import com.cloud.maid.StackMaidVO;
+import com.cloud.cluster.TaskVO;
import com.cloud.serializer.SerializerHelper;
import com.cloud.utils.DateUtil;
import com.cloud.utils.db.DB;
@@ -20,14 +20,16 @@ import com.cloud.utils.db.Filter;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.Transaction;
-@Local(value = { StackMaidDao.class })
-public class StackMaidDaoImpl extends GenericDaoBase implements StackMaidDao {
- private static final Logger s_logger = Logger.getLogger(StackMaidDaoImpl.class.getName());
+@Local(value = { StackMaidDao.class }) @DB(txn=false)
+public class StackMaidDaoImpl extends GenericDaoBase implements StackMaidDao {
+ private static final Logger s_logger = Logger.getLogger(StackMaidDaoImpl.class);
- private SearchBuilder popSearch;
- private SearchBuilder clearSearch;
+ private SearchBuilder popSearch;
+ private SearchBuilder clearSearch;
+ private final SearchBuilder AllFieldsSearch;
public StackMaidDaoImpl() {
popSearch = createSearchBuilder();
@@ -36,64 +38,76 @@ public class StackMaidDaoImpl extends GenericDaoBase implemen
clearSearch = createSearchBuilder();
clearSearch.and("msid", clearSearch.entity().getMsid(), SearchCriteria.Op.EQ);
+
+ AllFieldsSearch = createSearchBuilder();
+ AllFieldsSearch.and("msid", AllFieldsSearch.entity().getMsid(), Op.EQ);
+ AllFieldsSearch.and("thread", AllFieldsSearch.entity().getThreadId(), Op.EQ);
+ AllFieldsSearch.done();
+ }
+
+ @Override
+ public boolean takeover(long takeOverMsid, long selfId) {
+ TaskVO task = createForUpdate();
+ task.setMsid(selfId);
+ task.setThreadId(0);
+
+ SearchCriteria sc = AllFieldsSearch.create();
+ sc.setParameters("msid", takeOverMsid);
+ return update(task, sc) > 0;
+
+ }
+
+ @Override
+ public List listCleanupTasks(long msId) {
+ SearchCriteria sc = AllFieldsSearch.create();
+ sc.setParameters("msid", msId);
+ sc.setParameters("thread", 0);
+
+ return this.search(sc, null);
}
- @DB
- public void pushCleanupDelegate(long msid, int seq, String delegateClzName, Object context) {
+ @Override
+ public long pushCleanupDelegate(long msid, int seq, String delegateClzName, Object context) {
+ TaskVO delegateItem = new TaskVO();
+ delegateItem.setMsid(msid);
+ delegateItem.setThreadId(Thread.currentThread().getId());
+ delegateItem.setSeq(seq);
+ delegateItem.setDelegate(delegateClzName);
+ delegateItem.setContext(SerializerHelper.toSerializedStringOld(context));
+ delegateItem.setCreated(DateUtil.currentGMTTime());
+
+ super.persist(delegateItem);
+ return delegateItem.getId();
+ }
+
+ @Override
+ public TaskVO popCleanupDelegate(long msid) {
+ SearchCriteria sc = popSearch.create();
+ sc.setParameters("msid", msid);
+ sc.setParameters("threadId", Thread.currentThread().getId());
- Transaction txn = Transaction.open(Transaction.CLOUD_DB);
- try {
- StackMaidVO delegateItem = new StackMaidVO();
- delegateItem.setMsid(msid);
- delegateItem.setThreadId(Thread.currentThread().getId());
- delegateItem.setSeq(seq);
- delegateItem.setDelegate(delegateClzName);
- delegateItem.setContext(SerializerHelper.toSerializedStringOld(context));
- delegateItem.setCreated(DateUtil.currentGMTTime());
-
- super.persist(delegateItem);
- } finally {
- txn.close();
- }
- }
-
- @DB
- public StackMaidVO popCleanupDelegate(long msid) {
- Transaction txn = Transaction.open(Transaction.CLOUD_DB);
- try {
- SearchCriteria sc = popSearch.create();
- sc.setParameters("msid", msid);
- sc.setParameters("threadId", Thread.currentThread().getId());
-
- Filter filter = new Filter(StackMaidVO.class, "seq", false, 0L, (long)1);
- List l = listIncludingRemovedBy(sc, filter);
- if(l != null && l.size() > 0) {
- expunge(l.get(0).getId());
- return l.get(0);
- }
- } finally {
- txn.close();
- }
+ Filter filter = new Filter(TaskVO.class, "seq", false, 0L, (long)1);
+ List l = listIncludingRemovedBy(sc, filter);
+ if(l != null && l.size() > 0) {
+ expunge(l.get(0).getId());
+ return l.get(0);
+ }
return null;
}
-
- @DB
+
+ @Override
public void clearStack(long msid) {
- Transaction txn = Transaction.open(Transaction.CLOUD_DB);
- try {
- SearchCriteria sc = clearSearch.create();
- sc.setParameters("msid", msid);
-
- expunge(sc);
- } finally {
- txn.close();
- }
+ SearchCriteria sc = clearSearch.create();
+ sc.setParameters("msid", msid);
+
+ expunge(sc);
}
+ @Override
@DB
- public List listLeftoversByMsid(long msid) {
- List l = new ArrayList();
+ public List listLeftoversByMsid(long msid) {
+ List l = new ArrayList();
String sql = "select * from stack_maid where msid=? order by msid asc, thread_id asc, seq desc";
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
@@ -116,10 +130,11 @@ public class StackMaidDaoImpl extends GenericDaoBase implemen
return l;
}
+ @Override
@DB
- public List listLeftoversByCutTime(Date cutTime) {
+ public List listLeftoversByCutTime(Date cutTime) {
- List l = new ArrayList();
+ List l = new ArrayList();
String sql = "select * from stack_maid where created < ? order by msid asc, thread_id asc, seq desc";
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
diff --git a/server/src/com/cloud/configuration/Config.java b/server/src/com/cloud/configuration/Config.java
index 8462a9781eb..0eaaf000369 100755
--- a/server/src/com/cloud/configuration/Config.java
+++ b/server/src/com/cloud/configuration/Config.java
@@ -224,7 +224,9 @@ public enum Config {
VmOpCleanupWait("Advanced", ManagementServer.class, Long.class, "vm.op.cleanup.wait", "3600", "Time (in seconds) to wait before cleanuping up any vm work items", "Seconds"),
VmOpCancelInterval("Advanced", ManagementServer.class, Long.class, "vm.op.cancel.interval", "3600", "Time (in seconds) to wait before cancelling a operation", "Seconds"),
- DefaultPageSize("Advanced", ManagementServer.class, Long.class, "default.page.size", "500", "Default page size for API list* commands", null);
+ DefaultPageSize("Advanced", ManagementServer.class, Long.class, "default.page.size", "500", "Default page size for API list* commands", null),
+
+ TaskCleanupRetryInterval("Advanced", ManagementServer.class, Integer.class, "task.cleanup.retry.interval", "600", "Time (in seconds) to wait before retrying cleanup of tasks if the cleanup failed previously. 0 means to never retry.", "Seconds");
private final String _category;
diff --git a/server/src/com/cloud/configuration/DefaultComponentLibrary.java b/server/src/com/cloud/configuration/DefaultComponentLibrary.java
index b36bd1688dc..ac3dbfa8719 100644
--- a/server/src/com/cloud/configuration/DefaultComponentLibrary.java
+++ b/server/src/com/cloud/configuration/DefaultComponentLibrary.java
@@ -38,7 +38,10 @@ import com.cloud.capacity.dao.CapacityDaoImpl;
import com.cloud.certificate.dao.CertificateDaoImpl;
import com.cloud.cluster.ClusterManagerImpl;
import com.cloud.cluster.DummyClusterManagerImpl;
+import com.cloud.cluster.ManagementServerNode;
+import com.cloud.cluster.TaskManagerImpl;
import com.cloud.cluster.dao.ManagementServerHostDaoImpl;
+import com.cloud.cluster.dao.StackMaidDaoImpl;
import com.cloud.configuration.dao.ConfigurationDaoImpl;
import com.cloud.configuration.dao.ResourceCountDaoImpl;
import com.cloud.configuration.dao.ResourceLimitDaoImpl;
@@ -63,8 +66,6 @@ import com.cloud.host.dao.DetailsDaoImpl;
import com.cloud.host.dao.HostDaoImpl;
import com.cloud.host.dao.HostTagsDaoImpl;
import com.cloud.hypervisor.HypervisorGuruManagerImpl;
-import com.cloud.maid.StackMaidManagerImpl;
-import com.cloud.maid.dao.StackMaidDaoImpl;
import com.cloud.maint.UpgradeManagerImpl;
import com.cloud.maint.dao.AgentUpgradeDaoImpl;
import com.cloud.network.NetworkManagerImpl;
@@ -156,6 +157,7 @@ public class DefaultComponentLibrary extends ComponentLibraryBase implements Com
@Override
public List getSystemIntegrityCheckers() {
ArrayList checkers = new ArrayList();
+ checkers.add(new ManagementServerNode());
checkers.add(new DatabaseUpgradeChecker());
return checkers;
}
@@ -271,7 +273,7 @@ public class DefaultComponentLibrary extends ComponentLibraryBase implements Com
}
protected void populateManagers() {
- addManager("StackMaidManager", StackMaidManagerImpl.class);
+ addManager("StackMaidManager", TaskManagerImpl.class);
addManager("agent manager", AgentManagerImpl.class);
addManager("account manager", AccountManagerImpl.class);
addManager("configuration manager", ConfigurationManagerImpl.class);
diff --git a/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java b/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java
index cec7696dab3..49d3d5bf9d3 100644
--- a/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java
+++ b/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java
@@ -53,6 +53,7 @@ import com.cloud.api.commands.DestroyConsoleProxyCmd;
import com.cloud.certificate.CertificateVO;
import com.cloud.certificate.dao.CertificateDao;
import com.cloud.cluster.ClusterManager;
+import com.cloud.cluster.StackMaid;
import com.cloud.configuration.Config;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.dc.DataCenter;
@@ -80,7 +81,6 @@ import com.cloud.info.ConsoleProxyStatus;
import com.cloud.info.RunningHostCountInfo;
import com.cloud.info.RunningHostInfoAgregator;
import com.cloud.info.RunningHostInfoAgregator.ZoneHostInfo;
-import com.cloud.maid.StackMaid;
import com.cloud.network.NetworkManager;
import com.cloud.network.NetworkVO;
import com.cloud.network.Networks.TrafficType;
diff --git a/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java b/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java
index c456fcfc9c4..1f253f308bb 100644
--- a/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java
+++ b/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java
@@ -37,6 +37,7 @@ import com.cloud.agent.AgentManager;
import com.cloud.alert.AlertManager;
import com.cloud.cluster.ClusterManagerListener;
import com.cloud.cluster.ManagementServerHostVO;
+import com.cloud.cluster.StackMaid;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.dc.ClusterDetailsDao;
import com.cloud.dc.DataCenterVO;
@@ -55,7 +56,6 @@ import com.cloud.host.Host;
import com.cloud.host.HostVO;
import com.cloud.host.Status;
import com.cloud.host.dao.HostDao;
-import com.cloud.maid.StackMaid;
import com.cloud.server.ManagementServer;
import com.cloud.storage.StorageManager;
import com.cloud.storage.dao.GuestOSCategoryDao;
diff --git a/server/src/com/cloud/maid/StackMaidManager.java b/server/src/com/cloud/maid/StackMaidManager.java
deleted file mode 100644
index ca466a29935..00000000000
--- a/server/src/com/cloud/maid/StackMaidManager.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package com.cloud.maid;
-
-import com.cloud.utils.component.Manager;
-
-public interface StackMaidManager extends Manager {
-}
diff --git a/server/src/com/cloud/maid/StackMaidManagerImpl.java b/server/src/com/cloud/maid/StackMaidManagerImpl.java
deleted file mode 100644
index 8300f14352a..00000000000
--- a/server/src/com/cloud/maid/StackMaidManagerImpl.java
+++ /dev/null
@@ -1,108 +0,0 @@
-package com.cloud.maid;
-
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import javax.ejb.Local;
-import javax.naming.ConfigurationException;
-
-import org.apache.log4j.Logger;
-
-import com.cloud.cluster.ClusterManager;
-import com.cloud.maid.dao.StackMaidDao;
-import com.cloud.utils.DateUtil;
-import com.cloud.utils.component.Inject;
-import com.cloud.utils.concurrency.NamedThreadFactory;
-import com.cloud.utils.db.DB;
-import com.cloud.utils.db.GlobalLock;
-
-@Local(value = { StackMaidManager.class })
-public class StackMaidManagerImpl implements StackMaidManager {
- private static final Logger s_logger = Logger.getLogger(StackMaidManagerImpl.class);
-
- private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
- private static final int GC_INTERVAL = 10000; // 10 seconds
-
- private String _name;
-
- @Inject
- private StackMaidDao _maidDao;
-
- @Inject
- private ClusterManager _clusterMgr;
-
- private final ScheduledExecutorService _heartbeatScheduler =
- Executors.newScheduledThreadPool(1, new NamedThreadFactory("StackMaidMgr-Heartbeat"));
-
- @Override
- public boolean configure(String name, Map params) throws ConfigurationException {
- _name = name;
-
- if (s_logger.isInfoEnabled())
- s_logger.info("Start configuring StackMaidManager : " + name);
-
- StackMaid.init(_clusterMgr.getManagementNodeId());
- return true;
- }
-
- private void cleanupLeftovers(List l) {
- for(StackMaidVO maid : l) {
- if(StackMaid.doCleanup(maid))
- _maidDao.expunge(maid.getId());
- }
- }
-
- @DB
- private Runnable getGCTask() {
- return new Runnable() {
- public void run() {
- GlobalLock scanLock = GlobalLock.getInternLock("StackMaidManagerGC");
- try {
- if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
- try {
- reallyRun();
- } finally {
- scanLock.unlock();
- }
- }
- } finally {
- scanLock.releaseRef();
- }
- }
-
- public void reallyRun() {
- try {
- Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - 7200000);
- List l = _maidDao.listLeftoversByCutTime(cutTime);
- cleanupLeftovers(l);
- } catch(Throwable e) {
- s_logger.error("Unexpected exception when trying to execute queue item, ", e);
- }
- }
- };
- }
-
- public boolean start() {
- try {
- List l = _maidDao.listLeftoversByMsid(_clusterMgr.getManagementNodeId());
- cleanupLeftovers(l);
- } catch(Throwable e) {
- s_logger.error("Unexpected exception " + e.getMessage(), e);
- }
-
- _heartbeatScheduler.scheduleAtFixedRate(getGCTask(), GC_INTERVAL, GC_INTERVAL, TimeUnit.MILLISECONDS);
- return true;
- }
-
- public boolean stop() {
- return true;
- }
-
- public String getName() {
- return _name;
- }
-}
diff --git a/server/src/com/cloud/maid/dao/StackMaidDao.java b/server/src/com/cloud/maid/dao/StackMaidDao.java
deleted file mode 100644
index c03e0cca138..00000000000
--- a/server/src/com/cloud/maid/dao/StackMaidDao.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package com.cloud.maid.dao;
-
-import java.util.Date;
-import java.util.List;
-
-import com.cloud.maid.StackMaidVO;
-import com.cloud.utils.db.GenericDao;
-
-public interface StackMaidDao extends GenericDao {
- public void pushCleanupDelegate(long msid, int seq, String delegateClzName, Object context);
- public StackMaidVO popCleanupDelegate(long msid);
- public void clearStack(long msid);
-
- public List listLeftoversByMsid(long msid);
- public List listLeftoversByCutTime(Date cutTime);
-}
diff --git a/server/src/com/cloud/vm/SystemVmLoadScanner.java b/server/src/com/cloud/vm/SystemVmLoadScanner.java
index 26742bdb43c..1039a9a3a0a 100644
--- a/server/src/com/cloud/vm/SystemVmLoadScanner.java
+++ b/server/src/com/cloud/vm/SystemVmLoadScanner.java
@@ -6,7 +6,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
-import com.cloud.maid.StackMaid;
+import com.cloud.cluster.StackMaid;
import com.cloud.utils.Pair;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.GlobalLock;
diff --git a/server/src/com/cloud/vm/VirtualMachineManagerImpl.java b/server/src/com/cloud/vm/VirtualMachineManagerImpl.java
index e8e6045b483..2032eeb4a68 100755
--- a/server/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/server/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -60,6 +60,7 @@ import com.cloud.agent.manager.Commands;
import com.cloud.alert.AlertManager;
import com.cloud.capacity.CapacityManager;
import com.cloud.cluster.ClusterManager;
+import com.cloud.cluster.StackMaid;
import com.cloud.configuration.Config;
import com.cloud.configuration.ConfigurationManager;
import com.cloud.configuration.ResourceCount.ResourceType;
@@ -95,7 +96,6 @@ import com.cloud.host.dao.HostDao;
import com.cloud.hypervisor.Hypervisor.HypervisorType;
import com.cloud.hypervisor.HypervisorGuru;
import com.cloud.hypervisor.HypervisorGuruManager;
-import com.cloud.maid.StackMaid;
import com.cloud.network.NetworkManager;
import com.cloud.network.NetworkVO;
import com.cloud.org.Cluster;
diff --git a/server/test/com/cloud/async/TestAsync.java b/server/test/com/cloud/async/TestAsync.java
index d9a5e4fd72d..921b382f34c 100644
--- a/server/test/com/cloud/async/TestAsync.java
+++ b/server/test/com/cloud/async/TestAsync.java
@@ -26,10 +26,10 @@ import org.apache.log4j.Logger;
import junit.framework.Assert;
import com.cloud.async.AsyncJobVO;
-import com.cloud.maid.StackMaid;
-import com.cloud.maid.StackMaidVO;
-import com.cloud.maid.dao.StackMaidDao;
-import com.cloud.maid.dao.StackMaidDaoImpl;
+import com.cloud.cluster.StackMaid;
+import com.cloud.cluster.TaskVO;
+import com.cloud.cluster.dao.StackMaidDao;
+import com.cloud.cluster.dao.StackMaidDaoImpl;
import com.cloud.serializer.Param;
import com.cloud.utils.ActionDelegate;
import com.cloud.utils.Pair;
@@ -216,7 +216,7 @@ public class TestAsync extends Log4jEnabledTestCase {
dao.pushCleanupDelegate(1L, 1, "delegate2", new Long(100));
dao.pushCleanupDelegate(1L, 2, "delegate3", null);
- StackMaidVO item = dao.popCleanupDelegate(1L);
+ TaskVO item = dao.popCleanupDelegate(1L);
Assert.assertTrue(item.getDelegate().equals("delegate3"));
Assert.assertTrue(item.getContext() == null);
@@ -283,8 +283,8 @@ public class TestAsync extends Log4jEnabledTestCase {
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
StackMaidDao dao = new StackMaidDaoImpl();
- List l = dao.listLeftoversByMsid(1L);
- for(StackMaidVO maid : l) {
+ List l = dao.listLeftoversByMsid(1L);
+ for(TaskVO maid : l) {
s_logger.info("" + maid.getThreadId() + " " + maid.getDelegate() + " " + maid.getContext());
}
diff --git a/server/test/com/cloud/cluster/TaskManagerTest.java b/server/test/com/cloud/cluster/TaskManagerTest.java
new file mode 100644
index 00000000000..f21c013b022
--- /dev/null
+++ b/server/test/com/cloud/cluster/TaskManagerTest.java
@@ -0,0 +1,375 @@
+/**
+ * Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
+ *
+ * This software is licensed under the GNU General Public License v3 or later.
+ *
+ * It is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or any later version.
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ */
+package com.cloud.cluster;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.ejb.Local;
+import javax.naming.ConfigurationException;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+
+import com.cloud.agent.Listener;
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.Command;
+import com.cloud.cluster.dao.StackMaidDao;
+import com.cloud.cluster.dao.StackMaidDaoImpl;
+import com.cloud.configuration.Config;
+import com.cloud.configuration.DefaultInterceptorLibrary;
+import com.cloud.configuration.dao.ConfigurationDaoImpl;
+import com.cloud.exception.AgentUnavailableException;
+import com.cloud.exception.OperationTimedoutException;
+import com.cloud.host.Status.Event;
+import com.cloud.serializer.SerializerHelper;
+import com.cloud.utils.component.ComponentLocator;
+import com.cloud.utils.component.MockComponentLocator;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.exception.CloudRuntimeException;
+
+public class TaskManagerTest extends TestCase {
+ private final static Logger s_logger = Logger.getLogger(TaskManagerTest.class);
+
+ @Override
+ @Before
+ public void setUp() {
+ MockComponentLocator locator = new MockComponentLocator("management-server");
+ locator.addDao("StackMaidDao", StackMaidDaoImpl.class);
+ locator.addDao("ConfigurationDao", ConfigurationDaoImpl.class);
+ locator.addManager("ClusterManager", MockClusterManager.class);
+ locator.makeActive(new DefaultInterceptorLibrary());
+ MockMaid.map.clear();
+ s_logger.info("Cleaning up the database");
+ Connection conn = Transaction.getStandaloneConnection();
+ try {
+ conn.setAutoCommit(true);
+ PreparedStatement stmt = conn.prepareStatement("DELETE FROM stack_maid");
+ stmt.executeUpdate();
+ stmt.close();
+ conn.close();
+ } catch (SQLException e) {
+ throw new CloudRuntimeException("Unable to setup database", e);
+ }
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ public void testCompleteCase() throws Exception {
+ ComponentLocator locator = ComponentLocator.getCurrentLocator();
+
+ TaskManagerImpl taskMgr = ComponentLocator.inject(TaskManagerImpl.class);
+ assertTrue(taskMgr.configure("TaskManager", new HashMap()));
+ assertTrue(taskMgr.start());
+
+ MockMaid delegate = new MockMaid();
+ delegate.setValue("first");
+ long taskId = taskMgr.addTask(delegate);
+
+ StackMaidDao maidDao = locator.getDao(StackMaidDao.class);
+ TaskVO task = maidDao.findById(taskId);
+
+ assertEquals(task.getDelegate(), MockMaid.class.getName());
+ MockMaid retrieved = (MockMaid)SerializerHelper.fromSerializedString(task.getContext());
+ assertEquals(retrieved.getValue(), delegate.getValue());
+
+ delegate.setValue("second");
+ taskMgr.updateTask(taskId, delegate);
+
+ task = maidDao.findById(taskId);
+ assertEquals(task.getDelegate(), MockMaid.class.getName());
+ retrieved = (MockMaid)SerializerHelper.fromSerializedString(task.getContext());
+ assertEquals(retrieved.getValue(), delegate.getValue());
+
+ taskMgr.taskCompleted(taskId);
+ assertNull(maidDao.findById(taskId));
+ }
+
+ public void testSimulatedReboot() throws Exception {
+ ComponentLocator locator = ComponentLocator.getCurrentLocator();
+
+ TaskManagerImpl taskMgr = ComponentLocator.inject(TaskManagerImpl.class);
+ assertTrue(taskMgr.configure("TaskManager", new HashMap()));
+ assertTrue(taskMgr.start());
+
+ MockMaid maid = new MockMaid();
+ maid.setValue("first");
+ long taskId = taskMgr.addTask(maid);
+
+ StackMaidDao maidDao = locator.getDao(StackMaidDao.class);
+ TaskVO task = maidDao.findById(taskId);
+
+ assertEquals(task.getDelegate(), MockMaid.class.getName());
+ MockMaid retrieved = (MockMaid)SerializerHelper.fromSerializedString(task.getContext());
+ assertEquals(retrieved.getValue(), maid.getValue());
+
+ taskMgr.stop();
+
+ assertNotNull(MockMaid.map.get(maid.getSeq()));
+
+ taskMgr = ComponentLocator.inject(TaskManagerImpl.class);
+ HashMap params = new HashMap();
+ params.put(Config.TaskCleanupRetryInterval.key(), "1");
+ taskMgr.configure("TaskManager", params);
+ taskMgr.start();
+
+ int i = 0;
+ while (MockMaid.map.get(maid.getSeq()) != null && i++ < 5) {
+ Thread.sleep(1000);
+ }
+
+ assertNull(MockMaid.map.get(maid.getSeq()));
+ }
+
+ public void testTakeover() throws Exception {
+ ComponentLocator locator = ComponentLocator.getCurrentLocator();
+
+ TaskManagerImpl taskMgr = ComponentLocator.inject(TaskManagerImpl.class);
+ assertTrue(taskMgr.configure("TaskManager", new HashMap()));
+ assertTrue(taskMgr.start());
+
+ MockMaid delegate = new MockMaid();
+ delegate.setValue("first");
+ long taskId = taskMgr.addTask(delegate);
+
+ StackMaidDao maidDao = locator.getDao(StackMaidDao.class);
+ TaskVO task = maidDao.findById(taskId);
+
+ assertEquals(task.getDelegate(), MockMaid.class.getName());
+ MockMaid retrieved = (MockMaid)SerializerHelper.fromSerializedString(task.getContext());
+ assertEquals(retrieved.getValue(), delegate.getValue());
+
+ Connection conn = Transaction.getStandaloneConnection();
+ try {
+ conn.setAutoCommit(true);
+ PreparedStatement stmt = conn.prepareStatement("update stack_maid set msid=? where msid=?");
+ stmt.setLong(1, 1234);
+ stmt.setLong(2, ManagementServerNode.getManagementServerId());
+ stmt.executeUpdate();
+ stmt.close();
+ } finally {
+ conn.close();
+ }
+
+ MockClusterManager clusterMgr = (MockClusterManager)locator.getManager(ClusterManager.class);
+ clusterMgr.triggerTakeover(1234);
+
+ int i = 0;
+ while (MockMaid.map.get(delegate.getSeq()) != null && i++ < 500) {
+ Thread.sleep(1000);
+ }
+
+ assertNull(MockMaid.map.get(delegate.getSeq()));
+ }
+
+ public static class MockMaid implements CleanupMaid {
+ private static int s_seq = 1;
+ public static Map map = new ConcurrentHashMap();
+
+ int seq;
+ boolean canBeCleanup;
+ String value;
+
+ protected MockMaid() {
+ canBeCleanup = true;
+ seq = s_seq++;
+ map.put(seq, this);
+ }
+
+ public int getSeq() {
+ return seq;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setCanBeCleanup(boolean canBeCleanup) {
+ this.canBeCleanup = canBeCleanup;
+ }
+
+ @Override
+ public int cleanup() {
+ s_logger.debug("Cleanup called for " + seq);
+ map.remove(seq);
+ return canBeCleanup ? 0 : -1;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String getCleanupProcedure() {
+ return "No cleanup necessary";
+ }
+ }
+
+ @Local(value=ClusterManager.class)
+ public static class MockClusterManager implements ClusterManager {
+ String _name;
+ ClusterManagerListener _listener;
+
+ @Override
+ public boolean configure(String name, Map params) throws ConfigurationException {
+ _name = name;
+ return true;
+ }
+
+ @Override
+ public boolean start() {
+ return true;
+ }
+
+ @Override
+ public boolean stop() {
+ return true;
+ }
+
+ @Override
+ public String getName() {
+ return _name;
+ }
+
+ @Override
+ public Answer[] execute(String strPeer, long agentId, Command[] cmds, boolean stopOnError) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public long executeAsync(String strPeer, long agentId, Command[] cmds, boolean stopOnError, Listener listener) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public Answer[] sendToAgent(Long hostId, Command[] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) throws AgentUnavailableException {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public int getHeartbeatThreshold() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public long getManagementNodeId() {
+ return ManagementServerNode.getManagementServerId();
+ }
+
+ @Override
+ public boolean isManagementNodeAlive(long msid) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public boolean pingManagementNode(long msid) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public long getCurrentRunId() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public String getSelfPeerName() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public String getSelfNodeIP() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public String getPeerName(long agentHostId) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public void registerListener(ClusterManagerListener listener) {
+ _listener = listener;
+ }
+
+ @Override
+ public void unregisterListener(ClusterManagerListener listener) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public ManagementServerHostVO getPeer(String peerName) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public void broadcast(long agentId, Command[] cmds) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ public void triggerTakeover(long msId) {
+ ManagementServerHostVO node = new ManagementServerHostVO();
+ node.setMsid(msId);
+
+ List lst = new ArrayList();
+ lst.add(node);
+
+ _listener.onManagementNodeLeft(lst, ManagementServerNode.getManagementServerId());
+ }
+
+ protected MockClusterManager() {
+ }
+ }
+
+}
diff --git a/utils/src/com/cloud/utils/CleanupDelegate.java b/utils/src/com/cloud/utils/CleanupDelegate.java
index 545dcddfb9f..4cf9109395b 100644
--- a/utils/src/com/cloud/utils/CleanupDelegate.java
+++ b/utils/src/com/cloud/utils/CleanupDelegate.java
@@ -1,5 +1,6 @@
package com.cloud.utils;
public interface CleanupDelegate {
+
boolean cleanup(T itemContext, M managerContext);
}