// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package com.cloud.network.security; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import com.cloud.utils.Profiler; import junit.framework.TestCase; public class SecurityGroupQueueTest extends TestCase { public final static SecurityGroupWorkQueue queue = new LocalSecurityGroupWorkQueue(); public static class Producer implements Runnable { int _maxVmId = 0; int _newWorkQueued=0; Set vmIds = new HashSet(); public Producer(int maxVmId) { this._maxVmId = maxVmId; for (long i=1; i <= _maxVmId; i++) { vmIds.add(i); } } public void run() { _newWorkQueued = queue.submitWorkForVms(vmIds); } public int getNewWork() { return _newWorkQueued; } public int getTotalWork() { return _maxVmId; } } public static class Consumer implements Runnable { private int _numJobsToDequeue = 0; private int _numJobsDequeued = 0; public Consumer(int numJobsToDequeu) { this._numJobsToDequeue = numJobsToDequeu; } public void run() { List result = new ArrayList(); try { result = queue.getWork(_numJobsToDequeue); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } this._numJobsDequeued = result.size(); } int getNumJobsToDequeue() { return _numJobsToDequeue; } int getNumJobsDequeued() { return _numJobsDequeued; } } public void testNumJobsEqToNumVms1() { queue.clear(); final int numProducers = 50; Thread [] pThreads = new Thread[numProducers]; Producer [] producers = new Producer[numProducers]; int numProduced = 0; for (int i=0; i < numProducers; i++) { producers[i] = new Producer(i+1); pThreads[i] = new Thread(producers[i]); numProduced += i+1; pThreads[i].start(); } for (int i=0; i < numProducers ; i++) { try { pThreads[i].join(); } catch (InterruptedException ie){ ie.printStackTrace(); } } System.out.println("Num Vms= " + numProducers + " Queue size = " + queue.size()); assertEquals(numProducers, queue.size()); } protected void testNumJobsEqToNumVms2(int numProducers, int maxVmId) { queue.clear(); Thread [] pThreads = new Thread[numProducers]; Producer [] producers = new Producer[numProducers]; int numProduced = 0; Profiler p = new Profiler(); p.start(); for (int i=0; i < numProducers; i++) { producers[i] = new Producer(maxVmId); pThreads[i] = new Thread(producers[i]); numProduced += i+1; pThreads[i].start(); } for (int i=0; i < numProducers ; i++) { try { pThreads[i].join(); } catch (InterruptedException ie){ ie.printStackTrace(); } } p.stop(); System.out.println("Num Vms= " + maxVmId + " Queue size = " + queue.size() + " time=" + p.getDuration() + " ms"); assertEquals(maxVmId, queue.size()); } public void testNumJobsEqToNumVms3() { testNumJobsEqToNumVms2(50,20000); testNumJobsEqToNumVms2(400,5000); testNumJobsEqToNumVms2(1,1); testNumJobsEqToNumVms2(1,1000000); testNumJobsEqToNumVms2(750,1); } protected void _testDequeueOneJob(final int numConsumers, final int numProducers, final int maxVmId) { queue.clear(); Thread [] pThreads = new Thread[numProducers]; Thread [] cThreads = new Thread[numConsumers]; Consumer [] consumers = new Consumer[numConsumers]; Producer [] producers = new Producer[numProducers]; int numProduced = 0; for (int i=0; i < numConsumers; i++) { consumers[i] = new Consumer(1); cThreads[i] = new Thread(consumers[i]); cThreads[i].start(); } for (int i=0; i < numProducers; i++) { producers[i] = new Producer(maxVmId); pThreads[i] = new Thread(producers[i]); numProduced += maxVmId; pThreads[i].start(); } for (int i=0; i < numConsumers ; i++) { try { cThreads[i].join(); } catch (InterruptedException ie){ ie.printStackTrace(); } } for (int i=0; i < numProducers ; i++) { try { pThreads[i].join(); } catch (InterruptedException ie){ ie.printStackTrace(); } } int totalDequeued = 0; for (int i=0; i < numConsumers; i++) { //System.out.println("Consumer " + i + " ask to dequeue " + consumers[i].getNumJobsToDequeue() + ", dequeued " + consumers[i].getNumJobsDequeued()); totalDequeued += consumers[i].getNumJobsDequeued(); } int totalQueued = 0; for (int i=0; i < numProducers; i++) { //System.out.println("Producer " + i + " ask to queue " + producers[i].getTotalWork() + ", queued " + producers[i].getNewWork()); totalQueued += producers[i].getNewWork(); } System.out.println("Total jobs dequeued = " + totalDequeued + ", num queued=" + totalQueued + " queue current size=" + queue.size()); assertEquals(totalDequeued, numConsumers); assertEquals(totalQueued - totalDequeued, queue.size()); } public void testDequeueOneJobAgain() { _testDequeueOneJob(10,10,1000); int queueSize = queue.size(); Thread cThread = new Thread(new Consumer(1)); cThread.start(); try { cThread.join(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } assertEquals(queue.size(), queueSize-1); } public void testDequeueOneJob() { _testDequeueOneJob(10,10,1000); _testDequeueOneJob(1,10,1000); _testDequeueOneJob(10,1,1000); _testDequeueOneJob(10,1,10); } }