cloudstack/server/test/com/cloud/async/TestSyncQueueManager.java
Brian Spindler 469c4dd139 Removing ^M's from code.
Signed-off-by: Prasanna Santhanam <tsp@apache.org>
2013-04-04 00:53:20 +05:30

209 lines
6.8 KiB
Java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.async;
import java.util.List;
import javax.inject.Inject;
import junit.framework.TestCase;
import org.apache.log4j.Logger;
import org.junit.Assert;
public class TestSyncQueueManager extends TestCase {
public static final Logger s_logger = Logger.getLogger(TestSyncQueueManager.class.getName());
private volatile int count = 0;
private volatile long expectingCurrent = 1;
@Inject SyncQueueManager mgr;
public void leftOverItems() {
List<SyncQueueItemVO> l = mgr.getActiveQueueItems(1L, false);
if(l != null && l.size() > 0) {
for(SyncQueueItemVO item : l) {
s_logger.info("Left over item: " + item.toString());
mgr.purgeItem(item.getId());
}
}
}
public void dequeueFromOneQueue() {
final int totalRuns = 5000;
final SyncQueueVO queue = mgr.queue("vm_instance", 1L, "Async-job", 1, 1);
for(int i = 1; i < totalRuns; i++)
mgr.queue("vm_instance", 1L, "Async-job", i+1, 1);
count = 0;
expectingCurrent = 1;
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
while(count < totalRuns) {
SyncQueueItemVO item = mgr.dequeueFromOne(queue.getId(), 1L);
if(item != null) {
s_logger.info("Thread 1 process item: " + item.toString());
Assert.assertEquals(expectingCurrent, item.getContentId().longValue());
expectingCurrent++;
count++;
mgr.purgeItem(item.getId());
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
}
}
);
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
while(count < totalRuns) {
SyncQueueItemVO item = mgr.dequeueFromOne(queue.getId(), 1L);
if(item != null) {
s_logger.info("Thread 2 process item: " + item.toString());
Assert.assertEquals(expectingCurrent, item.getContentId().longValue());
expectingCurrent++;
count++;
mgr.purgeItem(item.getId());
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
}
}
);
thread1.start();
thread2.start();
try {
thread1.join();
} catch (InterruptedException e) {
}
try {
thread2.join();
} catch (InterruptedException e) {
}
Assert.assertEquals(totalRuns, count);
}
public void dequeueFromAnyQueue() {
// simulate 30 queues
final int queues = 30;
final int totalRuns = 100;
final int itemsPerRun = 20;
for(int q = 1; q <= queues; q++)
for(int i = 0; i < totalRuns; i++)
mgr.queue("vm_instance", q, "Async-job", i+1, 1);
count = 0;
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
while(count < totalRuns*queues) {
List<SyncQueueItemVO> l = mgr.dequeueFromAny(1L, itemsPerRun);
if(l != null && l.size() > 0) {
s_logger.info("Thread 1 get " + l.size() + " dequeued items");
for(SyncQueueItemVO item : l) {
s_logger.info("Thread 1 process item: " + item.toString());
count++;
mgr.purgeItem(item.getId());
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
}
}
);
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
while(count < totalRuns*queues) {
List<SyncQueueItemVO> l = mgr.dequeueFromAny(1L, itemsPerRun);
if(l != null && l.size() > 0) {
s_logger.info("Thread 2 get " + l.size() + " dequeued items");
for(SyncQueueItemVO item : l) {
s_logger.info("Thread 2 process item: " + item.toString());
count++;
mgr.purgeItem(item.getId());
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
}
}
);
thread1.start();
thread2.start();
try {
thread1.join();
} catch (InterruptedException e) {
}
try {
thread2.join();
} catch (InterruptedException e) {
}
Assert.assertEquals(queues*totalRuns, count);
}
public void testPopulateQueueData() {
final int queues = 30000;
final int totalRuns = 100;
for(int q = 1; q <= queues; q++)
for(int i = 0; i < totalRuns; i++)
mgr.queue("vm_instance", q, "Async-job", i+1, 1);
}
public void testSyncQueue() {
mgr.queue("vm_instance", 1, "Async-job", 1, 1);
mgr.queue("vm_instance", 1, "Async-job", 2, 1);
mgr.queue("vm_instance", 1, "Async-job", 3, 1);
mgr.dequeueFromAny(100L, 1);
List<SyncQueueItemVO> l = mgr.getBlockedQueueItems(100000, false);
for(SyncQueueItemVO item : l) {
System.out.println("Blocked item. " + item.getContentType() + "-" + item.getContentId());
mgr.purgeItem(item.getId());
}
}
}