stackmaid is now taskmanager

This commit is contained in:
Alex Huang 2011-03-31 13:06:54 -07:00
parent 5401ee84ba
commit 075fba5899
25 changed files with 871 additions and 222 deletions

View File

@ -85,6 +85,7 @@ import com.cloud.api.commands.UpdateHostCmd;
import com.cloud.capacity.Capacity;
import com.cloud.capacity.CapacityVO;
import com.cloud.capacity.dao.CapacityDao;
import com.cloud.cluster.StackMaid;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.dc.ClusterDetailsDao;
import com.cloud.dc.ClusterDetailsVO;
@ -127,7 +128,6 @@ import com.cloud.host.dao.HostTagsDao;
import com.cloud.hypervisor.Hypervisor;
import com.cloud.hypervisor.Hypervisor.HypervisorType;
import com.cloud.hypervisor.kvm.resource.KvmDummyResourceBase;
import com.cloud.maid.StackMaid;
import com.cloud.maint.UpgradeManager;
import com.cloud.network.IPAddressVO;
import com.cloud.network.NetworkManager;

View File

@ -84,13 +84,13 @@ import com.cloud.api.response.ListResponse;
import com.cloud.async.AsyncJob;
import com.cloud.async.AsyncJobManager;
import com.cloud.async.AsyncJobVO;
import com.cloud.cluster.StackMaid;
import com.cloud.configuration.ConfigurationVO;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.domain.Domain;
import com.cloud.domain.DomainVO;
import com.cloud.event.EventUtils;
import com.cloud.exception.CloudAuthenticationException;
import com.cloud.maid.StackMaid;
import com.cloud.server.ManagementServer;
import com.cloud.user.Account;
import com.cloud.user.AccountService;

View File

@ -32,8 +32,8 @@ import javax.servlet.http.HttpSession;
import org.apache.log4j.Logger;
import com.cloud.cluster.StackMaid;
import com.cloud.exception.CloudAuthenticationException;
import com.cloud.maid.StackMaid;
import com.cloud.server.ManagementServer;
import com.cloud.user.Account;
import com.cloud.user.AccountService;

View File

@ -46,11 +46,11 @@ import com.cloud.async.dao.AsyncJobDao;
import com.cloud.cluster.ClusterManager;
import com.cloud.cluster.ClusterManagerListener;
import com.cloud.cluster.ManagementServerHostVO;
import com.cloud.cluster.StackMaid;
import com.cloud.configuration.Config;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.exception.InvalidParameterValueException;
import com.cloud.exception.PermissionDeniedException;
import com.cloud.maid.StackMaid;
import com.cloud.user.Account;
import com.cloud.user.UserContext;
import com.cloud.user.dao.AccountDao;

View File

@ -0,0 +1,45 @@
/**
* Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.cluster;
/**
* CleanupMaid is implemented by tasks that needs to perform cleanup.
*
* It can contain the actual information about the current state of the
* task. The state is serialized and stored. When cleanup is required
* CleanupMaid is instantiated from the stored data and cleanup() is called.
*
*/
public interface CleanupMaid {
/**
* cleanup according the state that was stored.
*
* @return 0 indicates cleanup was successful. Negative number
* indicates the cleanup was unsuccessful but don't retry. Positive number
* indicates the cleanup was unsuccessful and retry in this many seconds.
*/
int cleanup();
/**
* If cleanup is unsuccessful and not to be retried, the cleanup procedure
* returned here is recorded.
* @return
*/
String getCleanupProcedure();
}

View File

@ -51,7 +51,6 @@ import com.cloud.utils.events.SubscriptionMgr;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.exception.ExceptionUtil;
import com.cloud.utils.mgmt.JmxUtil;
import com.cloud.utils.net.MacAddress;
import com.cloud.utils.net.NetUtils;
import com.google.gson.Gson;
@ -90,7 +89,7 @@ public class ClusterManagerImpl implements ClusterManager {
// _msid is the unique persistent identifier that peer name is based upon
//
private Long _mshostId = null;
protected long _msid = MacAddress.getMacAddress().toLong();
protected long _msid = ManagementServerNode.getManagementServerId();
protected long _runId = System.currentTimeMillis();
private boolean _peerScanInited = false;

View File

@ -0,0 +1,37 @@
/**
* Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.cluster;
import com.cloud.utils.component.SystemIntegrityChecker;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.net.MacAddress;
public class ManagementServerNode implements SystemIntegrityChecker {
private static final long s_nodeId = MacAddress.getMacAddress().toLong();
@Override
public void check() {
if (s_nodeId <= 0) {
throw new CloudRuntimeException("Unable to get the management server node id");
}
}
public static long getManagementServerId() {
return s_nodeId;
}
}

View File

@ -1,12 +1,12 @@
package com.cloud.maid;
package com.cloud.cluster;
import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;
import com.cloud.maid.dao.StackMaidDao;
import com.cloud.maid.dao.StackMaidDaoImpl;
import com.cloud.cluster.dao.StackMaidDao;
import com.cloud.cluster.dao.StackMaidDaoImpl;
import com.cloud.serializer.SerializerHelper;
import com.cloud.utils.CleanupDelegate;
import com.cloud.utils.db.Transaction;
@ -99,7 +99,7 @@ public class StackMaid {
public void exitCleanup(long currentMsid) {
if(currentSeq > 0) {
StackMaidVO maid = null;
TaskVO maid = null;
while((maid = maidDao.popCleanupDelegate(currentMsid)) != null) {
doCleanup(maid);
}
@ -109,7 +109,7 @@ public class StackMaid {
context.clear();
}
public static boolean doCleanup(StackMaidVO maid) {
public static boolean doCleanup(TaskVO maid) {
if(maid.getDelegate() != null) {
try {
Class<?> clz = Class.forName(maid.getDelegate());

View File

@ -0,0 +1,42 @@
package com.cloud.cluster;
import com.cloud.utils.component.Manager;
/**
* TaskManager helps business logic deal with clustering failover.
* Say you're writing code that introduces an inconsistent state over
* a long period of time. What happens if the server died in the middle
* of your operation? Who will come back to cleanup this state? TaskManager
* will help with doing this. You can create a task and update the state
* with different content during your process. If the server dies, TaskManager
* will automatically cleanup the tasks if there is a clustered server
* running elsewhere. If there are no clustered servers, then TaskManager will
* cleanup when the dead server resumes.
*
*/
public interface TaskManager extends Manager {
/**
* Adds a task with the context as to what the task is and the class
* responsible for cleaning up.
*
* @param context context information to be stored.
* @param cleaner clazz responsible for cleanup if the process was interrupted.
* @return task id.
*/
long addTask(CleanupMaid context);
/**
* update the task with new context
* @param taskId
* @param updatedContext new updated context.
*/
void updateTask(long taskId, CleanupMaid updatedContext);
/**
* removes the task as it is completed.
*
* @param taskId
*/
void taskCompleted(long taskId);
}

View File

@ -0,0 +1,230 @@
package com.cloud.cluster;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.ejb.Local;
import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import com.cloud.cluster.dao.StackMaidDao;
import com.cloud.configuration.Config;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.serializer.SerializerHelper;
import com.cloud.utils.DateUtil;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.component.ComponentLocator;
import com.cloud.utils.component.Inject;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.GlobalLock;
@Local(value=TaskManager.class)
public class TaskManagerImpl implements TaskManager, ClusterManagerListener {
private static final Logger s_logger = Logger.getLogger(TaskManagerImpl.class);
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
private static final int GC_INTERVAL = 10000; // 10 seconds
private int _cleanupRetryInterval;
private String _name;
@Inject
private StackMaidDao _maidDao;
@Inject
private ClusterManager _clusterMgr;
long _msId;
private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("TaskMgr-Heartbeat"));
private final ScheduledExecutorService _cleanupScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Task-Cleanup"));
protected TaskManagerImpl() {
}
@Override
public boolean configure(String name, Map<String, Object> xmlParams) throws ConfigurationException {
_name = name;
if (s_logger.isInfoEnabled()) {
s_logger.info("Start configuring StackMaidManager : " + name);
}
StackMaid.init(ManagementServerNode.getManagementServerId());
_msId = ManagementServerNode.getManagementServerId();
_clusterMgr.registerListener(this);
ComponentLocator locator = ComponentLocator.getCurrentLocator();
ConfigurationDao configDao = locator.getDao(ConfigurationDao.class);
Map<String, String> params = configDao.getConfiguration(xmlParams);
_cleanupRetryInterval = NumbersUtil.parseInt(params.get(Config.TaskCleanupRetryInterval.key()), 600);
_maidDao.takeover(_msId, _msId);
return true;
}
private void cleanupLeftovers(List<TaskVO> l) {
for (TaskVO maid : l) {
if (StackMaid.doCleanup(maid)) {
_maidDao.expunge(maid.getId());
}
}
}
@DB
private Runnable getGCTask() {
return new Runnable() {
@Override
public void run() {
GlobalLock scanLock = GlobalLock.getInternLock("StackMaidManagerGC");
try {
if (scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
try {
reallyRun();
} finally {
scanLock.unlock();
}
}
} finally {
scanLock.releaseRef();
}
}
public void reallyRun() {
try {
Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - 7200000);
List<TaskVO> l = _maidDao.listLeftoversByCutTime(cutTime);
cleanupLeftovers(l);
} catch (Throwable e) {
s_logger.error("Unexpected exception when trying to execute queue item, ", e);
}
}
};
}
@Override
public boolean start() {
// try {
// List<TaskVO> l = _maidDao.listLeftoversByMsid(_clusterMgr.getManagementNodeId());
// cleanupLeftovers(l);
// } catch (Throwable e) {
// s_logger.error("Unexpected exception " + e.getMessage(), e);
// }
//
// _heartbeatScheduler.scheduleAtFixedRate(getGCTask(), GC_INTERVAL, GC_INTERVAL, TimeUnit.MILLISECONDS);
_cleanupScheduler.schedule(new CleanupTask(), _cleanupRetryInterval > 0 ? _cleanupRetryInterval : 600, TimeUnit.SECONDS);
return true;
}
@Override
public boolean stop() {
return true;
}
@Override
public String getName() {
return _name;
}
@Override
public void onManagementNodeJoined(List<ManagementServerHostVO> nodeList, long selfNodeId) {
// Nothing to do
}
@Override
public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
for (ManagementServerHostVO node : nodeList) {
if (_maidDao.takeover(node.getMsid(), selfNodeId)) {
s_logger.info("Taking over from " + node.getMsid());
_cleanupScheduler.execute(new CleanupTask());
}
}
}
@Override
public long addTask(CleanupMaid context) {
return _maidDao.pushCleanupDelegate(_msId, 0, context.getClass().getName(), context);
}
@Override
public void updateTask(long taskId, CleanupMaid updatedContext) {
TaskVO task = _maidDao.createForUpdate();
task.setDelegate(updatedContext.getClass().getName());
task.setContext(SerializerHelper.toSerializedStringOld(updatedContext));
_maidDao.update(taskId, task);
}
@Override
public void taskCompleted(long taskId) {
_maidDao.remove(taskId);
}
protected boolean cleanup(TaskVO task) {
s_logger.info("Cleaning up " + task);
CleanupMaid delegate = (CleanupMaid)SerializerHelper.fromSerializedString(task.getContext());
assert delegate.getClass().getName().equals(task.getDelegate()) : "Deserializer says " + delegate.getClass().getName() + " but it's suppose to be " + task.getDelegate();
int result = delegate.cleanup();
if (result <= 0) {
if (result == 0) {
s_logger.info("Successfully cleaned up " + task.getId());
} else {
s_logger.warn("Unsuccessful in cleaning up " + task + ". Procedure to cleanup manaully: " + delegate.getCleanupProcedure());
}
taskCompleted(task.getId());
return true;
} else {
s_logger.error("Unable to cleanup " + task.getId());
return false;
}
}
class CleanupTask implements Runnable {
public CleanupTask() {
}
@Override
public void run() {
try {
List<TaskVO> tasks = _maidDao.listCleanupTasks(_msId);
List<TaskVO> retries = new ArrayList<TaskVO>();
for (TaskVO task : tasks) {
try {
if (!cleanup(task)) {
retries.add(task);
}
} catch (Exception e) {
s_logger.warn("Unable to clean up " + task, e);
}
}
if (retries.size() > 0) {
if (_cleanupRetryInterval > 0) {
_cleanupScheduler.schedule(this, _cleanupRetryInterval, TimeUnit.SECONDS);
} else {
for (TaskVO task : retries) {
s_logger.warn("Cleanup procedure for " + task + ": " + ((CleanupMaid)SerializerHelper.fromSerializedString(task.getContext())).getCleanupProcedure());
}
}
}
} catch (Exception e) {
s_logger.error("Unable to cleanup all of the tasks for " + _msId, e);
}
}
}
}

View File

@ -1,4 +1,4 @@
package com.cloud.maid;
package com.cloud.cluster;
import java.util.Date;
@ -13,12 +13,12 @@ import com.cloud.utils.db.GenericDao;
@Entity
@Table(name="stack_maid")
public class StackMaidVO {
public class TaskVO {
@Id
@GeneratedValue(strategy=GenerationType.IDENTITY)
@Column(name="id")
private Long id = null;
private long id;
@Column(name="msid")
private long msid;
@ -38,17 +38,13 @@ public class StackMaidVO {
@Column(name=GenericDao.CREATED_COLUMN)
private Date created;
public StackMaidVO() {
public TaskVO() {
}
public Long getId() {
public long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public long getMsid() {
return msid;
}
@ -96,4 +92,9 @@ public class StackMaidVO {
public void setCreated(Date created) {
this.created = created;
}
@Override
public String toString() {
return new StringBuilder("Task[").append(id).append("-").append(context).append("-").append(delegate).append("]").toString();
}
}

View File

@ -0,0 +1,30 @@
package com.cloud.cluster.dao;
import java.util.Date;
import java.util.List;
import com.cloud.cluster.TaskVO;
import com.cloud.utils.db.GenericDao;
public interface StackMaidDao extends GenericDao<TaskVO, Long> {
public long pushCleanupDelegate(long msid, int seq, String delegateClzName, Object context);
public TaskVO popCleanupDelegate(long msid);
public void clearStack(long msid);
public List<TaskVO> listLeftoversByMsid(long msid);
public List<TaskVO> listLeftoversByCutTime(Date cutTime);
/**
* Take over the task items of another management server and clean them up.
* This method changes the management server id of all of the tasks to
* this management server and mark the thread id as 0. It then returns
* all of the tasks that needs to be reverted to be processed.
*
* @param takeOverMsid management server id to take over.
* @param selfId the management server id of this node.
* @return list of tasks to take over.
*/
boolean takeover(long takeOverMsid, long selfId);
List<TaskVO> listCleanupTasks(long selfId);
}

View File

@ -1,4 +1,4 @@
package com.cloud.maid.dao;
package com.cloud.cluster.dao;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@ -12,7 +12,7 @@ import javax.ejb.Local;
import org.apache.log4j.Logger;
import com.cloud.maid.StackMaidVO;
import com.cloud.cluster.TaskVO;
import com.cloud.serializer.SerializerHelper;
import com.cloud.utils.DateUtil;
import com.cloud.utils.db.DB;
@ -20,14 +20,16 @@ import com.cloud.utils.db.Filter;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.Transaction;
@Local(value = { StackMaidDao.class })
public class StackMaidDaoImpl extends GenericDaoBase<StackMaidVO, Long> implements StackMaidDao {
private static final Logger s_logger = Logger.getLogger(StackMaidDaoImpl.class.getName());
@Local(value = { StackMaidDao.class }) @DB(txn=false)
public class StackMaidDaoImpl extends GenericDaoBase<TaskVO, Long> implements StackMaidDao {
private static final Logger s_logger = Logger.getLogger(StackMaidDaoImpl.class);
private SearchBuilder<StackMaidVO> popSearch;
private SearchBuilder<StackMaidVO> clearSearch;
private SearchBuilder<TaskVO> popSearch;
private SearchBuilder<TaskVO> clearSearch;
private final SearchBuilder<TaskVO> AllFieldsSearch;
public StackMaidDaoImpl() {
popSearch = createSearchBuilder();
@ -36,64 +38,76 @@ public class StackMaidDaoImpl extends GenericDaoBase<StackMaidVO, Long> implemen
clearSearch = createSearchBuilder();
clearSearch.and("msid", clearSearch.entity().getMsid(), SearchCriteria.Op.EQ);
AllFieldsSearch = createSearchBuilder();
AllFieldsSearch.and("msid", AllFieldsSearch.entity().getMsid(), Op.EQ);
AllFieldsSearch.and("thread", AllFieldsSearch.entity().getThreadId(), Op.EQ);
AllFieldsSearch.done();
}
@Override
public boolean takeover(long takeOverMsid, long selfId) {
TaskVO task = createForUpdate();
task.setMsid(selfId);
task.setThreadId(0);
SearchCriteria<TaskVO> sc = AllFieldsSearch.create();
sc.setParameters("msid", takeOverMsid);
return update(task, sc) > 0;
}
@Override
public List<TaskVO> listCleanupTasks(long msId) {
SearchCriteria<TaskVO> sc = AllFieldsSearch.create();
sc.setParameters("msid", msId);
sc.setParameters("thread", 0);
return this.search(sc, null);
}
@DB
public void pushCleanupDelegate(long msid, int seq, String delegateClzName, Object context) {
@Override
public long pushCleanupDelegate(long msid, int seq, String delegateClzName, Object context) {
TaskVO delegateItem = new TaskVO();
delegateItem.setMsid(msid);
delegateItem.setThreadId(Thread.currentThread().getId());
delegateItem.setSeq(seq);
delegateItem.setDelegate(delegateClzName);
delegateItem.setContext(SerializerHelper.toSerializedStringOld(context));
delegateItem.setCreated(DateUtil.currentGMTTime());
super.persist(delegateItem);
return delegateItem.getId();
}
@Override
public TaskVO popCleanupDelegate(long msid) {
SearchCriteria<TaskVO> sc = popSearch.create();
sc.setParameters("msid", msid);
sc.setParameters("threadId", Thread.currentThread().getId());
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
try {
StackMaidVO delegateItem = new StackMaidVO();
delegateItem.setMsid(msid);
delegateItem.setThreadId(Thread.currentThread().getId());
delegateItem.setSeq(seq);
delegateItem.setDelegate(delegateClzName);
delegateItem.setContext(SerializerHelper.toSerializedStringOld(context));
delegateItem.setCreated(DateUtil.currentGMTTime());
super.persist(delegateItem);
} finally {
txn.close();
}
}
@DB
public StackMaidVO popCleanupDelegate(long msid) {
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
try {
SearchCriteria<StackMaidVO> sc = popSearch.create();
sc.setParameters("msid", msid);
sc.setParameters("threadId", Thread.currentThread().getId());
Filter filter = new Filter(StackMaidVO.class, "seq", false, 0L, (long)1);
List<StackMaidVO> l = listIncludingRemovedBy(sc, filter);
if(l != null && l.size() > 0) {
expunge(l.get(0).getId());
return l.get(0);
}
} finally {
txn.close();
}
Filter filter = new Filter(TaskVO.class, "seq", false, 0L, (long)1);
List<TaskVO> l = listIncludingRemovedBy(sc, filter);
if(l != null && l.size() > 0) {
expunge(l.get(0).getId());
return l.get(0);
}
return null;
}
@DB
@Override
public void clearStack(long msid) {
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
try {
SearchCriteria<StackMaidVO> sc = clearSearch.create();
sc.setParameters("msid", msid);
expunge(sc);
} finally {
txn.close();
}
SearchCriteria<TaskVO> sc = clearSearch.create();
sc.setParameters("msid", msid);
expunge(sc);
}
@Override
@DB
public List<StackMaidVO> listLeftoversByMsid(long msid) {
List<StackMaidVO> l = new ArrayList<StackMaidVO>();
public List<TaskVO> listLeftoversByMsid(long msid) {
List<TaskVO> l = new ArrayList<TaskVO>();
String sql = "select * from stack_maid where msid=? order by msid asc, thread_id asc, seq desc";
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
@ -116,10 +130,11 @@ public class StackMaidDaoImpl extends GenericDaoBase<StackMaidVO, Long> implemen
return l;
}
@Override
@DB
public List<StackMaidVO> listLeftoversByCutTime(Date cutTime) {
public List<TaskVO> listLeftoversByCutTime(Date cutTime) {
List<StackMaidVO> l = new ArrayList<StackMaidVO>();
List<TaskVO> l = new ArrayList<TaskVO>();
String sql = "select * from stack_maid where created < ? order by msid asc, thread_id asc, seq desc";
Transaction txn = Transaction.open(Transaction.CLOUD_DB);

View File

@ -224,7 +224,9 @@ public enum Config {
VmOpCleanupWait("Advanced", ManagementServer.class, Long.class, "vm.op.cleanup.wait", "3600", "Time (in seconds) to wait before cleanuping up any vm work items", "Seconds"),
VmOpCancelInterval("Advanced", ManagementServer.class, Long.class, "vm.op.cancel.interval", "3600", "Time (in seconds) to wait before cancelling a operation", "Seconds"),
DefaultPageSize("Advanced", ManagementServer.class, Long.class, "default.page.size", "500", "Default page size for API list* commands", null);
DefaultPageSize("Advanced", ManagementServer.class, Long.class, "default.page.size", "500", "Default page size for API list* commands", null),
TaskCleanupRetryInterval("Advanced", ManagementServer.class, Integer.class, "task.cleanup.retry.interval", "600", "Time (in seconds) to wait before retrying cleanup of tasks if the cleanup failed previously. 0 means to never retry.", "Seconds");
private final String _category;

View File

@ -38,7 +38,10 @@ import com.cloud.capacity.dao.CapacityDaoImpl;
import com.cloud.certificate.dao.CertificateDaoImpl;
import com.cloud.cluster.ClusterManagerImpl;
import com.cloud.cluster.DummyClusterManagerImpl;
import com.cloud.cluster.ManagementServerNode;
import com.cloud.cluster.TaskManagerImpl;
import com.cloud.cluster.dao.ManagementServerHostDaoImpl;
import com.cloud.cluster.dao.StackMaidDaoImpl;
import com.cloud.configuration.dao.ConfigurationDaoImpl;
import com.cloud.configuration.dao.ResourceCountDaoImpl;
import com.cloud.configuration.dao.ResourceLimitDaoImpl;
@ -63,8 +66,6 @@ import com.cloud.host.dao.DetailsDaoImpl;
import com.cloud.host.dao.HostDaoImpl;
import com.cloud.host.dao.HostTagsDaoImpl;
import com.cloud.hypervisor.HypervisorGuruManagerImpl;
import com.cloud.maid.StackMaidManagerImpl;
import com.cloud.maid.dao.StackMaidDaoImpl;
import com.cloud.maint.UpgradeManagerImpl;
import com.cloud.maint.dao.AgentUpgradeDaoImpl;
import com.cloud.network.NetworkManagerImpl;
@ -156,6 +157,7 @@ public class DefaultComponentLibrary extends ComponentLibraryBase implements Com
@Override
public List<SystemIntegrityChecker> getSystemIntegrityCheckers() {
ArrayList<SystemIntegrityChecker> checkers = new ArrayList<SystemIntegrityChecker>();
checkers.add(new ManagementServerNode());
checkers.add(new DatabaseUpgradeChecker());
return checkers;
}
@ -271,7 +273,7 @@ public class DefaultComponentLibrary extends ComponentLibraryBase implements Com
}
protected void populateManagers() {
addManager("StackMaidManager", StackMaidManagerImpl.class);
addManager("StackMaidManager", TaskManagerImpl.class);
addManager("agent manager", AgentManagerImpl.class);
addManager("account manager", AccountManagerImpl.class);
addManager("configuration manager", ConfigurationManagerImpl.class);

View File

@ -53,6 +53,7 @@ import com.cloud.api.commands.DestroyConsoleProxyCmd;
import com.cloud.certificate.CertificateVO;
import com.cloud.certificate.dao.CertificateDao;
import com.cloud.cluster.ClusterManager;
import com.cloud.cluster.StackMaid;
import com.cloud.configuration.Config;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.dc.DataCenter;
@ -80,7 +81,6 @@ import com.cloud.info.ConsoleProxyStatus;
import com.cloud.info.RunningHostCountInfo;
import com.cloud.info.RunningHostInfoAgregator;
import com.cloud.info.RunningHostInfoAgregator.ZoneHostInfo;
import com.cloud.maid.StackMaid;
import com.cloud.network.NetworkManager;
import com.cloud.network.NetworkVO;
import com.cloud.network.Networks.TrafficType;

View File

@ -37,6 +37,7 @@ import com.cloud.agent.AgentManager;
import com.cloud.alert.AlertManager;
import com.cloud.cluster.ClusterManagerListener;
import com.cloud.cluster.ManagementServerHostVO;
import com.cloud.cluster.StackMaid;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.dc.ClusterDetailsDao;
import com.cloud.dc.DataCenterVO;
@ -55,7 +56,6 @@ import com.cloud.host.Host;
import com.cloud.host.HostVO;
import com.cloud.host.Status;
import com.cloud.host.dao.HostDao;
import com.cloud.maid.StackMaid;
import com.cloud.server.ManagementServer;
import com.cloud.storage.StorageManager;
import com.cloud.storage.dao.GuestOSCategoryDao;

View File

@ -1,6 +0,0 @@
package com.cloud.maid;
import com.cloud.utils.component.Manager;
public interface StackMaidManager extends Manager {
}

View File

@ -1,108 +0,0 @@
package com.cloud.maid;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.ejb.Local;
import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import com.cloud.cluster.ClusterManager;
import com.cloud.maid.dao.StackMaidDao;
import com.cloud.utils.DateUtil;
import com.cloud.utils.component.Inject;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.GlobalLock;
@Local(value = { StackMaidManager.class })
public class StackMaidManagerImpl implements StackMaidManager {
private static final Logger s_logger = Logger.getLogger(StackMaidManagerImpl.class);
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
private static final int GC_INTERVAL = 10000; // 10 seconds
private String _name;
@Inject
private StackMaidDao _maidDao;
@Inject
private ClusterManager _clusterMgr;
private final ScheduledExecutorService _heartbeatScheduler =
Executors.newScheduledThreadPool(1, new NamedThreadFactory("StackMaidMgr-Heartbeat"));
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
_name = name;
if (s_logger.isInfoEnabled())
s_logger.info("Start configuring StackMaidManager : " + name);
StackMaid.init(_clusterMgr.getManagementNodeId());
return true;
}
private void cleanupLeftovers(List<StackMaidVO> l) {
for(StackMaidVO maid : l) {
if(StackMaid.doCleanup(maid))
_maidDao.expunge(maid.getId());
}
}
@DB
private Runnable getGCTask() {
return new Runnable() {
public void run() {
GlobalLock scanLock = GlobalLock.getInternLock("StackMaidManagerGC");
try {
if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
try {
reallyRun();
} finally {
scanLock.unlock();
}
}
} finally {
scanLock.releaseRef();
}
}
public void reallyRun() {
try {
Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - 7200000);
List<StackMaidVO> l = _maidDao.listLeftoversByCutTime(cutTime);
cleanupLeftovers(l);
} catch(Throwable e) {
s_logger.error("Unexpected exception when trying to execute queue item, ", e);
}
}
};
}
public boolean start() {
try {
List<StackMaidVO> l = _maidDao.listLeftoversByMsid(_clusterMgr.getManagementNodeId());
cleanupLeftovers(l);
} catch(Throwable e) {
s_logger.error("Unexpected exception " + e.getMessage(), e);
}
_heartbeatScheduler.scheduleAtFixedRate(getGCTask(), GC_INTERVAL, GC_INTERVAL, TimeUnit.MILLISECONDS);
return true;
}
public boolean stop() {
return true;
}
public String getName() {
return _name;
}
}

View File

@ -1,16 +0,0 @@
package com.cloud.maid.dao;
import java.util.Date;
import java.util.List;
import com.cloud.maid.StackMaidVO;
import com.cloud.utils.db.GenericDao;
public interface StackMaidDao extends GenericDao<StackMaidVO, Long> {
public void pushCleanupDelegate(long msid, int seq, String delegateClzName, Object context);
public StackMaidVO popCleanupDelegate(long msid);
public void clearStack(long msid);
public List<StackMaidVO> listLeftoversByMsid(long msid);
public List<StackMaidVO> listLeftoversByCutTime(Date cutTime);
}

View File

@ -6,7 +6,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import com.cloud.maid.StackMaid;
import com.cloud.cluster.StackMaid;
import com.cloud.utils.Pair;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.GlobalLock;

View File

@ -60,6 +60,7 @@ import com.cloud.agent.manager.Commands;
import com.cloud.alert.AlertManager;
import com.cloud.capacity.CapacityManager;
import com.cloud.cluster.ClusterManager;
import com.cloud.cluster.StackMaid;
import com.cloud.configuration.Config;
import com.cloud.configuration.ConfigurationManager;
import com.cloud.configuration.ResourceCount.ResourceType;
@ -95,7 +96,6 @@ import com.cloud.host.dao.HostDao;
import com.cloud.hypervisor.Hypervisor.HypervisorType;
import com.cloud.hypervisor.HypervisorGuru;
import com.cloud.hypervisor.HypervisorGuruManager;
import com.cloud.maid.StackMaid;
import com.cloud.network.NetworkManager;
import com.cloud.network.NetworkVO;
import com.cloud.org.Cluster;

View File

@ -26,10 +26,10 @@ import org.apache.log4j.Logger;
import junit.framework.Assert;
import com.cloud.async.AsyncJobVO;
import com.cloud.maid.StackMaid;
import com.cloud.maid.StackMaidVO;
import com.cloud.maid.dao.StackMaidDao;
import com.cloud.maid.dao.StackMaidDaoImpl;
import com.cloud.cluster.StackMaid;
import com.cloud.cluster.TaskVO;
import com.cloud.cluster.dao.StackMaidDao;
import com.cloud.cluster.dao.StackMaidDaoImpl;
import com.cloud.serializer.Param;
import com.cloud.utils.ActionDelegate;
import com.cloud.utils.Pair;
@ -216,7 +216,7 @@ public class TestAsync extends Log4jEnabledTestCase {
dao.pushCleanupDelegate(1L, 1, "delegate2", new Long(100));
dao.pushCleanupDelegate(1L, 2, "delegate3", null);
StackMaidVO item = dao.popCleanupDelegate(1L);
TaskVO item = dao.popCleanupDelegate(1L);
Assert.assertTrue(item.getDelegate().equals("delegate3"));
Assert.assertTrue(item.getContext() == null);
@ -283,8 +283,8 @@ public class TestAsync extends Log4jEnabledTestCase {
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
StackMaidDao dao = new StackMaidDaoImpl();
List<StackMaidVO> l = dao.listLeftoversByMsid(1L);
for(StackMaidVO maid : l) {
List<TaskVO> l = dao.listLeftoversByMsid(1L);
for(TaskVO maid : l) {
s_logger.info("" + maid.getThreadId() + " " + maid.getDelegate() + " " + maid.getContext());
}

View File

@ -0,0 +1,375 @@
/**
* Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.cluster;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.ejb.Local;
import javax.naming.ConfigurationException;
import junit.framework.TestCase;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import com.cloud.agent.Listener;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.cluster.dao.StackMaidDao;
import com.cloud.cluster.dao.StackMaidDaoImpl;
import com.cloud.configuration.Config;
import com.cloud.configuration.DefaultInterceptorLibrary;
import com.cloud.configuration.dao.ConfigurationDaoImpl;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.exception.OperationTimedoutException;
import com.cloud.host.Status.Event;
import com.cloud.serializer.SerializerHelper;
import com.cloud.utils.component.ComponentLocator;
import com.cloud.utils.component.MockComponentLocator;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.exception.CloudRuntimeException;
public class TaskManagerTest extends TestCase {
private final static Logger s_logger = Logger.getLogger(TaskManagerTest.class);
@Override
@Before
public void setUp() {
MockComponentLocator locator = new MockComponentLocator("management-server");
locator.addDao("StackMaidDao", StackMaidDaoImpl.class);
locator.addDao("ConfigurationDao", ConfigurationDaoImpl.class);
locator.addManager("ClusterManager", MockClusterManager.class);
locator.makeActive(new DefaultInterceptorLibrary());
MockMaid.map.clear();
s_logger.info("Cleaning up the database");
Connection conn = Transaction.getStandaloneConnection();
try {
conn.setAutoCommit(true);
PreparedStatement stmt = conn.prepareStatement("DELETE FROM stack_maid");
stmt.executeUpdate();
stmt.close();
conn.close();
} catch (SQLException e) {
throw new CloudRuntimeException("Unable to setup database", e);
}
}
@Override
@After
public void tearDown() throws Exception {
}
public void testCompleteCase() throws Exception {
ComponentLocator locator = ComponentLocator.getCurrentLocator();
TaskManagerImpl taskMgr = ComponentLocator.inject(TaskManagerImpl.class);
assertTrue(taskMgr.configure("TaskManager", new HashMap<String, Object>()));
assertTrue(taskMgr.start());
MockMaid delegate = new MockMaid();
delegate.setValue("first");
long taskId = taskMgr.addTask(delegate);
StackMaidDao maidDao = locator.getDao(StackMaidDao.class);
TaskVO task = maidDao.findById(taskId);
assertEquals(task.getDelegate(), MockMaid.class.getName());
MockMaid retrieved = (MockMaid)SerializerHelper.fromSerializedString(task.getContext());
assertEquals(retrieved.getValue(), delegate.getValue());
delegate.setValue("second");
taskMgr.updateTask(taskId, delegate);
task = maidDao.findById(taskId);
assertEquals(task.getDelegate(), MockMaid.class.getName());
retrieved = (MockMaid)SerializerHelper.fromSerializedString(task.getContext());
assertEquals(retrieved.getValue(), delegate.getValue());
taskMgr.taskCompleted(taskId);
assertNull(maidDao.findById(taskId));
}
public void testSimulatedReboot() throws Exception {
ComponentLocator locator = ComponentLocator.getCurrentLocator();
TaskManagerImpl taskMgr = ComponentLocator.inject(TaskManagerImpl.class);
assertTrue(taskMgr.configure("TaskManager", new HashMap<String, Object>()));
assertTrue(taskMgr.start());
MockMaid maid = new MockMaid();
maid.setValue("first");
long taskId = taskMgr.addTask(maid);
StackMaidDao maidDao = locator.getDao(StackMaidDao.class);
TaskVO task = maidDao.findById(taskId);
assertEquals(task.getDelegate(), MockMaid.class.getName());
MockMaid retrieved = (MockMaid)SerializerHelper.fromSerializedString(task.getContext());
assertEquals(retrieved.getValue(), maid.getValue());
taskMgr.stop();
assertNotNull(MockMaid.map.get(maid.getSeq()));
taskMgr = ComponentLocator.inject(TaskManagerImpl.class);
HashMap<String, Object> params = new HashMap<String, Object>();
params.put(Config.TaskCleanupRetryInterval.key(), "1");
taskMgr.configure("TaskManager", params);
taskMgr.start();
int i = 0;
while (MockMaid.map.get(maid.getSeq()) != null && i++ < 5) {
Thread.sleep(1000);
}
assertNull(MockMaid.map.get(maid.getSeq()));
}
public void testTakeover() throws Exception {
ComponentLocator locator = ComponentLocator.getCurrentLocator();
TaskManagerImpl taskMgr = ComponentLocator.inject(TaskManagerImpl.class);
assertTrue(taskMgr.configure("TaskManager", new HashMap<String, Object>()));
assertTrue(taskMgr.start());
MockMaid delegate = new MockMaid();
delegate.setValue("first");
long taskId = taskMgr.addTask(delegate);
StackMaidDao maidDao = locator.getDao(StackMaidDao.class);
TaskVO task = maidDao.findById(taskId);
assertEquals(task.getDelegate(), MockMaid.class.getName());
MockMaid retrieved = (MockMaid)SerializerHelper.fromSerializedString(task.getContext());
assertEquals(retrieved.getValue(), delegate.getValue());
Connection conn = Transaction.getStandaloneConnection();
try {
conn.setAutoCommit(true);
PreparedStatement stmt = conn.prepareStatement("update stack_maid set msid=? where msid=?");
stmt.setLong(1, 1234);
stmt.setLong(2, ManagementServerNode.getManagementServerId());
stmt.executeUpdate();
stmt.close();
} finally {
conn.close();
}
MockClusterManager clusterMgr = (MockClusterManager)locator.getManager(ClusterManager.class);
clusterMgr.triggerTakeover(1234);
int i = 0;
while (MockMaid.map.get(delegate.getSeq()) != null && i++ < 500) {
Thread.sleep(1000);
}
assertNull(MockMaid.map.get(delegate.getSeq()));
}
public static class MockMaid implements CleanupMaid {
private static int s_seq = 1;
public static Map<Integer, MockMaid> map = new ConcurrentHashMap<Integer, MockMaid>();
int seq;
boolean canBeCleanup;
String value;
protected MockMaid() {
canBeCleanup = true;
seq = s_seq++;
map.put(seq, this);
}
public int getSeq() {
return seq;
}
public String getValue() {
return value;
}
public void setCanBeCleanup(boolean canBeCleanup) {
this.canBeCleanup = canBeCleanup;
}
@Override
public int cleanup() {
s_logger.debug("Cleanup called for " + seq);
map.remove(seq);
return canBeCleanup ? 0 : -1;
}
public void setValue(String value) {
this.value = value;
}
@Override
public String getCleanupProcedure() {
return "No cleanup necessary";
}
}
@Local(value=ClusterManager.class)
public static class MockClusterManager implements ClusterManager {
String _name;
ClusterManagerListener _listener;
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
_name = name;
return true;
}
@Override
public boolean start() {
return true;
}
@Override
public boolean stop() {
return true;
}
@Override
public String getName() {
return _name;
}
@Override
public Answer[] execute(String strPeer, long agentId, Command[] cmds, boolean stopOnError) {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public long executeAsync(String strPeer, long agentId, Command[] cmds, boolean stopOnError, Listener listener) {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers) {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers) {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public Answer[] sendToAgent(Long hostId, Command[] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) throws AgentUnavailableException {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public int getHeartbeatThreshold() {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public long getManagementNodeId() {
return ManagementServerNode.getManagementServerId();
}
@Override
public boolean isManagementNodeAlive(long msid) {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public boolean pingManagementNode(long msid) {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public long getCurrentRunId() {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public String getSelfPeerName() {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public String getSelfNodeIP() {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public String getPeerName(long agentHostId) {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public void registerListener(ClusterManagerListener listener) {
_listener = listener;
}
@Override
public void unregisterListener(ClusterManagerListener listener) {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public ManagementServerHostVO getPeer(String peerName) {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public void broadcast(long agentId, Command[] cmds) {
throw new UnsupportedOperationException("Not implemented");
}
public void triggerTakeover(long msId) {
ManagementServerHostVO node = new ManagementServerHostVO();
node.setMsid(msId);
List<ManagementServerHostVO> lst = new ArrayList<ManagementServerHostVO>();
lst.add(node);
_listener.onManagementNodeLeft(lst, ManagementServerNode.getManagementServerId());
}
protected MockClusterManager() {
}
}
}

View File

@ -1,5 +1,6 @@
package com.cloud.utils;
public interface CleanupDelegate<T, M> {
boolean cleanup(T itemContext, M managerContext);
}