From 780816ee9f9461f1eaa6bc5133354f99b06cd208 Mon Sep 17 00:00:00 2001 From: Hiroki Ohashi Date: Thu, 2 Oct 2014 17:21:06 +0900 Subject: [PATCH] CLOUDSTACK-7539: [S3] Parallel deployment makes reference count of a cache in nfs secondary staging store negative(-1) --- .../manager/StorageCacheManagerImpl.java | 97 +++++++++++++++++-- 1 file changed, 89 insertions(+), 8 deletions(-) diff --git a/engine/storage/cache/src/org/apache/cloudstack/storage/cache/manager/StorageCacheManagerImpl.java b/engine/storage/cache/src/org/apache/cloudstack/storage/cache/manager/StorageCacheManagerImpl.java index 3a502d12ae0..6be35812b12 100644 --- a/engine/storage/cache/src/org/apache/cloudstack/storage/cache/manager/StorageCacheManagerImpl.java +++ b/engine/storage/cache/src/org/apache/cloudstack/storage/cache/manager/StorageCacheManagerImpl.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Date; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -40,6 +41,7 @@ import org.apache.cloudstack.engine.subsystem.api.storage.DataObjectInStore; import org.apache.cloudstack.engine.subsystem.api.storage.DataStore; import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager; import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine; +import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine.State; import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine.Event; import org.apache.cloudstack.engine.subsystem.api.storage.Scope; import org.apache.cloudstack.engine.subsystem.api.storage.StorageCacheManager; @@ -60,6 +62,8 @@ import com.cloud.utils.db.QueryBuilder; import com.cloud.utils.db.SearchCriteria; import com.cloud.utils.exception.CloudRuntimeException; +import com.cloud.agent.api.to.DataObjectType; + public class StorageCacheManagerImpl implements StorageCacheManager, Manager { private static final Logger s_logger = Logger.getLogger(StorageCacheManagerImpl.class); @Inject @@ -78,6 +82,9 @@ public class StorageCacheManagerImpl implements StorageCacheManager, Manager { int workers; ScheduledExecutorService executors; int cacheReplaceMentInterval; + private static final Object templateLock = new Object(); + private static final Object volumeLock = new Object(); + private static final Object snapshotLock = new Object(); @Override public DataStore getCacheStorage(Scope scope) { @@ -216,15 +223,82 @@ public class StorageCacheManagerImpl implements StorageCacheManager, Manager { @Override public DataObject createCacheObject(DataObject data, DataStore store) { - DataObjectInStore obj = objectInStoreMgr.findObject(data, store); - if (obj != null && obj.getState() == ObjectInDataStoreStateMachine.State.Ready) { - s_logger.debug("there is already one in the cache store"); - DataObject dataObj = objectInStoreMgr.get(data, store); - dataObj.incRefCount(); - return dataObj; - } + DataObject objOnCacheStore; + final Object lock; + final DataObjectType type = data.getType(); + final String typeName; + final DataStoreRole role = store.getRole(); + final long storeId = store.getId(); + final long dataId = data.getId(); - DataObject objOnCacheStore = store.create(data); + /* + * Make sure any thread knows own lock type. + */ + if (type == DataObjectType.TEMPLATE) { + lock = templateLock; + typeName = "template"; + } else if (type == DataObjectType.VOLUME) { + lock = volumeLock; + typeName = "volume"; + } else if (type == DataObjectType.SNAPSHOT) { + lock = snapshotLock; + typeName = "snapshot"; + } else { + String msg = "unsupported DataObject comes, then can't acquire correct lock object"; + throw new CloudRuntimeException(msg); + } + s_logger.debug("check " + typeName + " cache entry(id: " + dataId + ") on store(id: " + storeId + ")"); + + synchronized (lock) { + DataObjectInStore obj = objectInStoreMgr.findObject(data, store); + if (obj != null) { + State st = obj.getState(); + + long miliSeconds = 10000; + long timeoutSeconds = 3600; + long timeoutMiliSeconds = timeoutSeconds * 1000; + Date now = new Date(); + long expiredEpoch = now.getTime() + timeoutMiliSeconds; + Date expiredDate = new Date(expiredEpoch); + + /* + * Waiting for completion of cache copy. + */ + while (st == ObjectInDataStoreStateMachine.State.Allocated || + st == ObjectInDataStoreStateMachine.State.Creating || + st == ObjectInDataStoreStateMachine.State.Copying) { + + /* + * Threads must release lock within waiting for cache copy and + * must be waken up at completion. + */ + s_logger.debug("waiting cache copy completion type: " + typeName + ", id: " + obj.getObjectId() + ", lock: " + lock.hashCode()); + try { + lock.wait(miliSeconds); + } catch (InterruptedException e) {} + s_logger.debug("waken up"); + + now = new Date(); + if (now.after(expiredDate)) { + String msg = "Waiting time exceeds timeout limit(" + timeoutSeconds + " s)"; + throw new CloudRuntimeException(msg); + } + + obj = objectInStoreMgr.findObject(data, store); + st = obj.getState(); + } + + if (st == ObjectInDataStoreStateMachine.State.Ready) { + s_logger.debug("there is already one in the cache store"); + DataObject dataObj = objectInStoreMgr.get(data, store); + dataObj.incRefCount(); + return dataObj; + } + } + + s_logger.debug("create " + typeName + " cache entry(id: " + dataId + ") on store(id: " + storeId + ")"); + objOnCacheStore = store.create(data); + } AsyncCallFuture future = new AsyncCallFuture(); CopyCommandResult result = null; @@ -251,6 +325,13 @@ public class StorageCacheManagerImpl implements StorageCacheManager, Manager { if (result == null) { objOnCacheStore.processEvent(Event.OperationFailed); } + synchronized (lock) { + /* + * Wake up all threads waiting for cache copy. + */ + s_logger.debug("wake up all waiting threads(lock: " + lock.hashCode() + ")"); + lock.notifyAll(); + } } return null; }