diff --git a/plugins/hypervisors/vmware/src/com/cloud/hypervisor/vmware/resource/VmwareContextFactory.java b/plugins/hypervisors/vmware/src/com/cloud/hypervisor/vmware/resource/VmwareContextFactory.java index 11163e90d6f..3ed5939aac5 100644 --- a/plugins/hypervisors/vmware/src/com/cloud/hypervisor/vmware/resource/VmwareContextFactory.java +++ b/plugins/hypervisors/vmware/src/com/cloud/hypervisor/vmware/resource/VmwareContextFactory.java @@ -77,7 +77,6 @@ public class VmwareContextFactory { context.registerStockObject("noderuninfo", String.format("%d-%d", s_clusterMgr.getManagementNodeId(), s_clusterMgr.getCurrentRunId())); context.setPoolInfo(s_pool, VmwareContextPool.composePoolKey(vCenterAddress, vCenterUserName)); - s_pool.registerOutstandingContext(context); return context; } diff --git a/plugins/hypervisors/vmware/src/com/cloud/hypervisor/vmware/resource/VmwareResource.java b/plugins/hypervisors/vmware/src/com/cloud/hypervisor/vmware/resource/VmwareResource.java index 1ec859bff3d..e7155c1e68c 100644 --- a/plugins/hypervisors/vmware/src/com/cloud/hypervisor/vmware/resource/VmwareResource.java +++ b/plugins/hypervisors/vmware/src/com/cloud/hypervisor/vmware/resource/VmwareResource.java @@ -5481,7 +5481,7 @@ public class VmwareResource implements StoragePoolResource, ServerResource, Vmwa if (s_logger.isTraceEnabled()) { s_logger.trace("Recycling threadlocal context to pool"); } - context.getPool().returnContext(context); + context.getPool().registerContext(context); } } diff --git a/plugins/hypervisors/vmware/src/com/cloud/storage/resource/VmwareSecondaryStorageContextFactory.java b/plugins/hypervisors/vmware/src/com/cloud/storage/resource/VmwareSecondaryStorageContextFactory.java index ee0273ca1ba..6e19ba67bf5 100644 --- a/plugins/hypervisors/vmware/src/com/cloud/storage/resource/VmwareSecondaryStorageContextFactory.java +++ b/plugins/hypervisors/vmware/src/com/cloud/storage/resource/VmwareSecondaryStorageContextFactory.java @@ -49,7 +49,6 @@ public class VmwareSecondaryStorageContextFactory { assert (context != null); context.setPoolInfo(s_pool, VmwareContextPool.composePoolKey(vCenterAddress, vCenterUserName)); - s_pool.registerOutstandingContext(context); return context; } diff --git a/plugins/hypervisors/vmware/src/com/cloud/storage/resource/VmwareSecondaryStorageResourceHandler.java b/plugins/hypervisors/vmware/src/com/cloud/storage/resource/VmwareSecondaryStorageResourceHandler.java index 9a8c37a098a..d0d5964bfa9 100644 --- a/plugins/hypervisors/vmware/src/com/cloud/storage/resource/VmwareSecondaryStorageResourceHandler.java +++ b/plugins/hypervisors/vmware/src/com/cloud/storage/resource/VmwareSecondaryStorageResourceHandler.java @@ -236,7 +236,7 @@ public class VmwareSecondaryStorageResourceHandler implements SecondaryStorageRe VmwareContext context = currentContext.get(); currentContext.set(null); assert (context.getPool() != null); - context.getPool().returnContext(context); + context.getPool().registerContext(context); } } diff --git a/vmware-base/src/com/cloud/hypervisor/vmware/util/VmwareContext.java b/vmware-base/src/com/cloud/hypervisor/vmware/util/VmwareContext.java index daf29eae5fd..9b477aef42b 100644 --- a/vmware-base/src/com/cloud/hypervisor/vmware/util/VmwareContext.java +++ b/vmware-base/src/com/cloud/hypervisor/vmware/util/VmwareContext.java @@ -666,7 +666,7 @@ public class VmwareContext { s_logger.warn("Unexpected exception: ", e); } finally { if (_pool != null) { - _pool.unregisterOutstandingContext(this); + _pool.unregisterContext(this); } unregisterOutstandingContext(); } diff --git a/vmware-base/src/com/cloud/hypervisor/vmware/util/VmwareContextPool.java b/vmware-base/src/com/cloud/hypervisor/vmware/util/VmwareContextPool.java index c97c01f32c3..b2e7636e0ae 100644 --- a/vmware-base/src/com/cloud/hypervisor/vmware/util/VmwareContextPool.java +++ b/vmware-base/src/com/cloud/hypervisor/vmware/util/VmwareContextPool.java @@ -16,28 +16,30 @@ // under the License. package com.cloud.hypervisor.vmware.util; +import com.google.common.base.Strings; +import org.apache.cloudstack.managed.context.ManagedContextTimerTask; +import org.apache.log4j.Logger; +import org.joda.time.Duration; + import java.util.ArrayList; -import java.util.HashMap; +import java.util.Iterator; import java.util.List; -import java.util.Map; +import java.util.Queue; import java.util.Timer; import java.util.TimerTask; - -import org.apache.log4j.Logger; - -import org.apache.cloudstack.managed.context.ManagedContextTimerTask; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; public class VmwareContextPool { private static final Logger s_logger = Logger.getLogger(VmwareContextPool.class); - private static final long DEFAULT_CHECK_INTERVAL = 10000; + private static final Duration DEFAULT_CHECK_INTERVAL = Duration.millis(10000L); private static final int DEFAULT_IDLE_QUEUE_LENGTH = 128; - private List _outstandingRegistry = new ArrayList(); - - private Map> _pool; + private final ConcurrentMap> _pool; private int _maxIdleQueueLength = DEFAULT_IDLE_QUEUE_LENGTH; - private long _idleCheckIntervalMs = DEFAULT_CHECK_INTERVAL; + private Duration _idleCheckInterval = DEFAULT_CHECK_INTERVAL; private Timer _timer = new Timer(); @@ -45,76 +47,77 @@ public class VmwareContextPool { this(DEFAULT_IDLE_QUEUE_LENGTH, DEFAULT_CHECK_INTERVAL); } - public VmwareContextPool(int maxIdleQueueLength) { - this(maxIdleQueueLength, DEFAULT_CHECK_INTERVAL); - } - - public VmwareContextPool(int maxIdleQueueLength, long idleCheckIntervalMs) { - _pool = new HashMap>(); + public VmwareContextPool(int maxIdleQueueLength, Duration idleCheckInterval) { + _pool = new ConcurrentHashMap>(); _maxIdleQueueLength = maxIdleQueueLength; - _idleCheckIntervalMs = idleCheckIntervalMs; + _idleCheckInterval = idleCheckInterval; - _timer.scheduleAtFixedRate(getTimerTask(), _idleCheckIntervalMs, _idleCheckIntervalMs); + _timer.scheduleAtFixedRate(getTimerTask(), _idleCheckInterval.getMillis(), _idleCheckInterval.getMillis()); } - public void registerOutstandingContext(VmwareContext context) { - assert (context != null); - synchronized (this) { - _outstandingRegistry.add(context); + public VmwareContext getContext(final String vCenterAddress, final String vCenterUserName) { + final String poolKey = composePoolKey(vCenterAddress, vCenterUserName).intern(); + if (Strings.isNullOrEmpty(poolKey)) { + return null; } - } - - public void unregisterOutstandingContext(VmwareContext context) { - assert (context != null); - synchronized (this) { - _outstandingRegistry.remove(context); - } - } - - public VmwareContext getContext(String vCenterAddress, String vCenterUserName) { - String poolKey = composePoolKey(vCenterAddress, vCenterUserName); - synchronized (this) { - List l = _pool.get(poolKey); - if (l == null) - return null; - - if (l.size() > 0) { - VmwareContext context = l.remove(0); - context.setPoolInfo(this, poolKey); - - if (s_logger.isTraceEnabled()) - s_logger.trace("Return a VmwareContext from the idle pool: " + poolKey + ". current pool size: " + l.size() + ", outstanding count: " + - VmwareContext.getOutstandingContextCount()); + synchronized (poolKey) { + final Queue ctxList = _pool.get(poolKey); + if (ctxList != null && !ctxList.isEmpty()) { + final VmwareContext context = ctxList.remove(); + if (context != null) { + context.setPoolInfo(this, poolKey); + } + if (s_logger.isTraceEnabled()) { + s_logger.trace("Return a VmwareContext from the idle pool: " + poolKey + ". current pool size: " + ctxList.size() + ", outstanding count: " + + VmwareContext.getOutstandingContextCount()); + } return context; } - - // TODO, we need to control the maximum number of outstanding VmwareContext object in the future return null; } } - public void returnContext(VmwareContext context) { + public void registerContext(final VmwareContext context) { assert (context.getPool() == this); assert (context.getPoolKey() != null); - synchronized (this) { - List l = _pool.get(context.getPoolKey()); - if (l == null) { - l = new ArrayList(); - _pool.put(context.getPoolKey(), l); + + final String poolKey = context.getPoolKey().intern(); + synchronized (poolKey) { + Queue ctxQueue = _pool.get(poolKey); + + if (ctxQueue == null) { + ctxQueue = new ConcurrentLinkedQueue<>(); + _pool.put(poolKey, ctxQueue); } - if (l.size() < _maxIdleQueueLength) { - context.clearStockObjects(); - l.add(context); + if (ctxQueue.size() >= _maxIdleQueueLength) { + final VmwareContext oldestContext = ctxQueue.remove(); + if (oldestContext != null) { + try { + oldestContext.close(); + } catch (Throwable t) { + s_logger.error("Unexpected exception caught while trying to purge oldest VmwareContext", t); + } + } + } + context.clearStockObjects(); + ctxQueue.add(context); - if (s_logger.isTraceEnabled()) - s_logger.trace("Recycle VmwareContext into idle pool: " + context.getPoolKey() + ", current idle pool size: " + l.size() + ", outstanding count: " + - VmwareContext.getOutstandingContextCount()); - } else { - if (s_logger.isTraceEnabled()) - s_logger.trace("VmwareContextPool queue exceeds limits, queue size: " + l.size()); - context.close(); + if (s_logger.isTraceEnabled()) { + s_logger.trace("Recycle VmwareContext into idle pool: " + context.getPoolKey() + ", current idle pool size: " + ctxQueue.size() + ", outstanding count: " + + VmwareContext.getOutstandingContextCount()); + } + } + } + + public void unregisterContext(final VmwareContext context) { + assert (context != null); + final String poolKey = context.getPoolKey().intern(); + final Queue ctxList = _pool.get(poolKey); + synchronized (poolKey) { + if (!Strings.isNullOrEmpty(poolKey) && ctxList != null && ctxList.contains(context)) { + ctxList.remove(context); } } } @@ -124,8 +127,6 @@ public class VmwareContextPool { @Override protected void runInContext() { try { - // doIdleCheck(); - doKeepAlive(); } catch (Throwable e) { s_logger.error("Unexpected exception", e); @@ -134,35 +135,30 @@ public class VmwareContextPool { }; } - private void getKeepAliveCheckContexts(List l, int batchSize) { - synchronized (this) { - int size = Math.min(_outstandingRegistry.size(), batchSize); - while (size > 0) { - VmwareContext context = _outstandingRegistry.remove(0); - l.add(context); - - _outstandingRegistry.add(context); - size--; - } - } - } - private void doKeepAlive() { - List l = new ArrayList(); - int batchSize = (int)(_idleCheckIntervalMs / 1000); // calculate batch size at 1 request/sec rate - getKeepAliveCheckContexts(l, batchSize); - - for (VmwareContext context : l) { - try { - context.idleCheck(); - } catch (Throwable e) { - s_logger.warn("Exception caught during VmwareContext idle check, close and discard the context", e); - context.close(); + final List closableCtxList = new ArrayList<>(); + for (final Queue ctxQueue : _pool.values()) { + for (Iterator iterator = ctxQueue.iterator(); iterator.hasNext();) { + final VmwareContext context = iterator.next(); + if (context == null) { + iterator.remove(); + continue; + } + try { + context.idleCheck(); + } catch (Throwable e) { + s_logger.warn("Exception caught during VmwareContext idle check, close and discard the context", e); + closableCtxList.add(context); + iterator.remove(); + } } } + for (final VmwareContext context : closableCtxList) { + context.close(); + } } - public static String composePoolKey(String vCenterAddress, String vCenterUserName) { + public static String composePoolKey(final String vCenterAddress, final String vCenterUserName) { assert (vCenterUserName != null); assert (vCenterAddress != null); return vCenterUserName + "@" + vCenterAddress; diff --git a/vmware-base/test/com/cloud/hypervisor/vmware/util/VmwareContextPoolTest.java b/vmware-base/test/com/cloud/hypervisor/vmware/util/VmwareContextPoolTest.java new file mode 100644 index 00000000000..5b8d47d0bb4 --- /dev/null +++ b/vmware-base/test/com/cloud/hypervisor/vmware/util/VmwareContextPoolTest.java @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.hypervisor.vmware.util; + +import com.cloud.utils.concurrency.NamedThreadFactory; +import org.joda.time.Duration; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class VmwareContextPoolTest { + + private class PoolClient implements Runnable { + private final VmwareContextPool pool; + private volatile Boolean canRun = true; + private int counter = 0; + + public PoolClient(final VmwareContextPool pool) { + this.pool = pool; + } + + public int count() { + return counter; + } + + public void stop() { + canRun = false; + } + + @Override + public void run() { + final String poolKey = pool.composePoolKey(vmwareAddress, vmwareUsername); + while (canRun) { + pool.registerContext(createDummyContext(pool, poolKey)); + counter++; + } + } + } + + private VmwareContextPool vmwareContextPool; + private VmwareContext vmwareContext; + private String vmwareAddress = "address"; + private String vmwareUsername = "username"; + + private int contextLength = 10; + private Duration idleCheckInterval = Duration.millis(1000L); + + public VmwareContext createDummyContext(final VmwareContextPool pool, final String poolKey) { + VmwareClient vimClient = new VmwareClient("someAddress"); + VmwareContext context = new VmwareContext(vimClient, "someAddress"); + context.setPoolInfo(pool, poolKey); + return context; + } + + @Before + public void setUp() throws Exception { + final String poolKey = vmwareContextPool.composePoolKey(vmwareAddress, vmwareUsername); + vmwareContextPool = new VmwareContextPool(contextLength, idleCheckInterval); + vmwareContext = createDummyContext(vmwareContextPool, poolKey); + } + + @Test + public void testRegisterContext() throws Exception { + vmwareContextPool.registerContext(vmwareContext); + Assert.assertEquals(vmwareContextPool.getContext(vmwareAddress, vmwareUsername), vmwareContext); + } + + @Test + public void testUnregisterContext() throws Exception { + vmwareContextPool.unregisterContext(vmwareContext); + Assert.assertNull(vmwareContextPool.getContext(vmwareAddress, vmwareUsername)); + } + + @Test + public void testComposePoolKey() throws Exception { + Assert.assertEquals(vmwareContextPool.composePoolKey(vmwareAddress, vmwareUsername), vmwareUsername + "@" + vmwareAddress); + } + + @Test + public void testMultithreadedPoolClients() throws Exception { + vmwareContextPool = Mockito.spy(vmwareContextPool); + final ExecutorService executor = Executors.newFixedThreadPool(10, new NamedThreadFactory("VmwareContextPoolClients")); + final List clients = new ArrayList<>(); + for (int i = 0; i < 50; i++) { + final PoolClient client = new PoolClient(vmwareContextPool); + clients.add(client); + executor.submit(client); + } + Thread.sleep(1000); + executor.shutdown(); + int totalRegistrations = 0; + for (final PoolClient client : clients) { + client.stop(); + totalRegistrations += client.count(); + } + Mockito.verify(vmwareContextPool, Mockito.atLeast(totalRegistrations)).registerContext(Mockito.any(VmwareContext.class)); + Assert.assertEquals(vmwareContextPool.composePoolKey(vmwareAddress, vmwareUsername), + vmwareContextPool.getContext(vmwareAddress, vmwareUsername).getPoolKey()); + } +} \ No newline at end of file