CLOUDSTACK-7539: [S3] Parallel deployment makes reference count of a cache in nfs secondary staging store negative(-1)

This commit is contained in:
Hiroki Ohashi 2014-10-02 17:21:06 +09:00 committed by Daan Hoogland
parent bcc8182f2f
commit 780816ee9f

View File

@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -40,6 +41,7 @@ import org.apache.cloudstack.engine.subsystem.api.storage.DataObjectInStore;
import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine.State;
import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine.Event;
import org.apache.cloudstack.engine.subsystem.api.storage.Scope;
import org.apache.cloudstack.engine.subsystem.api.storage.StorageCacheManager;
@ -60,6 +62,8 @@ import com.cloud.utils.db.QueryBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.agent.api.to.DataObjectType;
public class StorageCacheManagerImpl implements StorageCacheManager, Manager {
private static final Logger s_logger = Logger.getLogger(StorageCacheManagerImpl.class);
@Inject
@ -78,6 +82,9 @@ public class StorageCacheManagerImpl implements StorageCacheManager, Manager {
int workers;
ScheduledExecutorService executors;
int cacheReplaceMentInterval;
private static final Object templateLock = new Object();
private static final Object volumeLock = new Object();
private static final Object snapshotLock = new Object();
@Override
public DataStore getCacheStorage(Scope scope) {
@ -216,15 +223,82 @@ public class StorageCacheManagerImpl implements StorageCacheManager, Manager {
@Override
public DataObject createCacheObject(DataObject data, DataStore store) {
DataObjectInStore obj = objectInStoreMgr.findObject(data, store);
if (obj != null && obj.getState() == ObjectInDataStoreStateMachine.State.Ready) {
s_logger.debug("there is already one in the cache store");
DataObject dataObj = objectInStoreMgr.get(data, store);
dataObj.incRefCount();
return dataObj;
}
DataObject objOnCacheStore;
final Object lock;
final DataObjectType type = data.getType();
final String typeName;
final DataStoreRole role = store.getRole();
final long storeId = store.getId();
final long dataId = data.getId();
DataObject objOnCacheStore = store.create(data);
/*
* Make sure any thread knows own lock type.
*/
if (type == DataObjectType.TEMPLATE) {
lock = templateLock;
typeName = "template";
} else if (type == DataObjectType.VOLUME) {
lock = volumeLock;
typeName = "volume";
} else if (type == DataObjectType.SNAPSHOT) {
lock = snapshotLock;
typeName = "snapshot";
} else {
String msg = "unsupported DataObject comes, then can't acquire correct lock object";
throw new CloudRuntimeException(msg);
}
s_logger.debug("check " + typeName + " cache entry(id: " + dataId + ") on store(id: " + storeId + ")");
synchronized (lock) {
DataObjectInStore obj = objectInStoreMgr.findObject(data, store);
if (obj != null) {
State st = obj.getState();
long miliSeconds = 10000;
long timeoutSeconds = 3600;
long timeoutMiliSeconds = timeoutSeconds * 1000;
Date now = new Date();
long expiredEpoch = now.getTime() + timeoutMiliSeconds;
Date expiredDate = new Date(expiredEpoch);
/*
* Waiting for completion of cache copy.
*/
while (st == ObjectInDataStoreStateMachine.State.Allocated ||
st == ObjectInDataStoreStateMachine.State.Creating ||
st == ObjectInDataStoreStateMachine.State.Copying) {
/*
* Threads must release lock within waiting for cache copy and
* must be waken up at completion.
*/
s_logger.debug("waiting cache copy completion type: " + typeName + ", id: " + obj.getObjectId() + ", lock: " + lock.hashCode());
try {
lock.wait(miliSeconds);
} catch (InterruptedException e) {}
s_logger.debug("waken up");
now = new Date();
if (now.after(expiredDate)) {
String msg = "Waiting time exceeds timeout limit(" + timeoutSeconds + " s)";
throw new CloudRuntimeException(msg);
}
obj = objectInStoreMgr.findObject(data, store);
st = obj.getState();
}
if (st == ObjectInDataStoreStateMachine.State.Ready) {
s_logger.debug("there is already one in the cache store");
DataObject dataObj = objectInStoreMgr.get(data, store);
dataObj.incRefCount();
return dataObj;
}
}
s_logger.debug("create " + typeName + " cache entry(id: " + dataId + ") on store(id: " + storeId + ")");
objOnCacheStore = store.create(data);
}
AsyncCallFuture<CopyCommandResult> future = new AsyncCallFuture<CopyCommandResult>();
CopyCommandResult result = null;
@ -251,6 +325,13 @@ public class StorageCacheManagerImpl implements StorageCacheManager, Manager {
if (result == null) {
objOnCacheStore.processEvent(Event.OperationFailed);
}
synchronized (lock) {
/*
* Wake up all threads waiting for cache copy.
*/
s_logger.debug("wake up all waiting threads(lock: " + lock.hashCode() + ")");
lock.notifyAll();
}
}
return null;
}