diff --git a/utils/pom.xml b/utils/pom.xml index 206eb1896a6..9e2358680f3 100755 --- a/utils/pom.xml +++ b/utils/pom.xml @@ -208,7 +208,6 @@ com/cloud/utils/testcase/*TestCase* com/cloud/utils/db/*Test* - com/cloud/utils/testcase/NioTest.java diff --git a/utils/src/test/java/com/cloud/utils/testcase/NioTest.java b/utils/src/test/java/com/cloud/utils/testcase/NioTest.java index d8510cfcac2..7516cfc42e8 100644 --- a/utils/src/test/java/com/cloud/utils/testcase/NioTest.java +++ b/utils/src/test/java/com/cloud/utils/testcase/NioTest.java @@ -19,14 +19,7 @@ package com.cloud.utils.testcase; -import java.nio.channels.ClosedChannelException; -import java.util.Random; - -import junit.framework.TestCase; - -import org.apache.log4j.Logger; -import org.junit.Assert; - +import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.exception.NioConnectionException; import com.cloud.utils.nio.HandlerFactory; import com.cloud.utils.nio.Link; @@ -34,131 +27,190 @@ import com.cloud.utils.nio.NioClient; import com.cloud.utils.nio.NioServer; import com.cloud.utils.nio.Task; import com.cloud.utils.nio.Task.Type; +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; -/** - * - * - * - * - */ +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; -public class NioTest extends TestCase { +public class NioTest { - private static final Logger s_logger = Logger.getLogger(NioTest.class); + private static final Logger LOGGER = Logger.getLogger(NioTest.class); - private NioServer _server; - private NioClient _client; + final private int totalTestCount = 10; + private int completedTestCount = 0; - private Link _clientLink; + private NioServer server; + private List clients = new ArrayList<>(); + private List maliciousClients = new ArrayList<>(); - private int _testCount; - private int _completedCount; + private ExecutorService clientExecutor = Executors.newFixedThreadPool(totalTestCount, new NamedThreadFactory("NioClientHandler"));; + private ExecutorService maliciousExecutor = Executors.newFixedThreadPool(5*totalTestCount, new NamedThreadFactory("MaliciousNioClientHandler"));; + + private Random randomGenerator = new Random(); + private byte[] testBytes; private boolean isTestsDone() { boolean result; synchronized (this) { - result = _testCount == _completedCount; + result = totalTestCount == completedTestCount; } return result; } - private void getOneMoreTest() { - synchronized (this) { - _testCount++; - } - } - private void oneMoreTestDone() { synchronized (this) { - _completedCount++; + completedTestCount++; } } - @Override + @Before public void setUp() { - s_logger.info("Test"); + LOGGER.info("Setting up Benchmark Test"); - _testCount = 0; - _completedCount = 0; + completedTestCount = 0; + testBytes = new byte[1000000]; + randomGenerator.nextBytes(testBytes); - _server = new NioServer("NioTestServer", 7777, 5, new NioTestServer()); + // Server configured with one worker + server = new NioServer("NioTestServer", 7777, 1, new NioTestServer()); try { - _server.start(); + server.start(); } catch (final NioConnectionException e) { - fail(e.getMessage()); + Assert.fail(e.getMessage()); } - _client = new NioClient("NioTestServer", "127.0.0.1", 7777, 5, new NioTestClient()); - try { - _client.start(); - } catch (final NioConnectionException e) { - fail(e.getMessage()); - } - - while (_clientLink == null) { - try { - s_logger.debug("Link is not up! Waiting ..."); - Thread.sleep(1000); - } catch (final InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + // 5 malicious clients per valid client + for (int i = 0; i < totalTestCount; i++) { + for (int j = 0; j < 5; j++) { + final NioClient maliciousClient = new NioMaliciousClient("NioMaliciousTestClient-" + i, "127.0.0.1", 7777, 1, new NioMaliciousTestClient()); + maliciousClients.add(maliciousClient); + maliciousExecutor.submit(new ThreadedNioClient(maliciousClient)); } + final NioClient client = new NioClient("NioTestClient-" + i, "127.0.0.1", 7777, 1, new NioTestClient()); + clients.add(client); + clientExecutor.submit(new ThreadedNioClient(client)); } } - @Override + @After public void tearDown() { - while (!isTestsDone()) { - try { - s_logger.debug(_completedCount + "/" + _testCount + " tests done. Waiting for completion"); - Thread.sleep(1000); - } catch (final InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } stopClient(); stopServer(); } protected void stopClient() { - _client.stop(); - s_logger.info("Client stopped."); + for (NioClient client : clients) { + client.stop(); + } + for (NioClient maliciousClient : maliciousClients) { + maliciousClient.stop(); + } + LOGGER.info("Clients stopped."); } protected void stopServer() { - _server.stop(); - s_logger.info("Server stopped."); + server.stop(); + LOGGER.info("Server stopped."); } - protected void setClientLink(final Link link) { - _clientLink = link; - } - - Random randomGenerator = new Random(); - - byte[] _testBytes; - + @Test public void testConnection() { - _testBytes = new byte[1000000]; - randomGenerator.nextBytes(_testBytes); - try { - getOneMoreTest(); - _clientLink.send(_testBytes); - s_logger.info("Client: Data sent"); - getOneMoreTest(); - _clientLink.send(_testBytes); - s_logger.info("Client: Data sent"); - } catch (final ClosedChannelException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + final long currentTime = System.currentTimeMillis(); + while (!isTestsDone()) { + if (System.currentTimeMillis() - currentTime > 600000) { + Assert.fail("Failed to complete test within 600s"); + } + try { + LOGGER.debug(completedTestCount + "/" + totalTestCount + " tests done. Waiting for completion"); + Thread.sleep(1000); + } catch (final InterruptedException e) { + Assert.fail(e.getMessage()); + } } + LOGGER.debug(completedTestCount + "/" + totalTestCount + " tests done."); } protected void doServerProcess(final byte[] data) { oneMoreTestDone(); - Assert.assertArrayEquals(_testBytes, data); - s_logger.info("Verify done."); + Assert.assertArrayEquals(testBytes, data); + LOGGER.info("Verify data received by server done."); + } + + public byte[] getTestBytes() { + return testBytes; + } + + public class ThreadedNioClient implements Runnable { + final private NioClient client; + ThreadedNioClient(final NioClient client) { + this.client = client; + } + + @Override + public void run() { + try { + client.start(); + } catch (NioConnectionException e) { + Assert.fail(e.getMessage()); + } + } + } + + public class NioMaliciousClient extends NioClient { + + public NioMaliciousClient(String name, String host, int port, int workers, HandlerFactory factory) { + super(name, host, port, workers, factory); + } + + @Override + protected void init() throws IOException { + _selector = Selector.open(); + try { + _clientConnection = SocketChannel.open(); + LOGGER.info("Connecting to " + _host + ":" + _port); + final InetSocketAddress peerAddr = new InetSocketAddress(_host, _port); + _clientConnection.connect(peerAddr); + // Hang in there don't do anything + Thread.sleep(3600000); + } catch (final IOException e) { + _selector.close(); + throw e; + } catch (InterruptedException e) { + LOGGER.debug(e.getMessage()); + } + } + } + + public class NioMaliciousTestClient implements HandlerFactory { + + @Override + public Task create(final Type type, final Link link, final byte[] data) { + return new NioMaliciousTestClientHandler(type, link, data); + } + + public class NioMaliciousTestClientHandler extends Task { + + public NioMaliciousTestClientHandler(final Type type, final Link link, final byte[] data) { + super(type, link, data); + } + + @Override + public void doTask(final Task task) { + LOGGER.info("Malicious Client: Received task " + task.getType().toString()); + } + } } public class NioTestClient implements HandlerFactory { @@ -177,18 +229,23 @@ public class NioTest extends TestCase { @Override public void doTask(final Task task) { if (task.getType() == Task.Type.CONNECT) { - s_logger.info("Client: Received CONNECT task"); - setClientLink(task.getLink()); + LOGGER.info("Client: Received CONNECT task"); + try { + LOGGER.info("Sending data to server"); + task.getLink().send(getTestBytes()); + } catch (ClosedChannelException e) { + LOGGER.error(e.getMessage()); + e.printStackTrace(); + } } else if (task.getType() == Task.Type.DATA) { - s_logger.info("Client: Received DATA task"); + LOGGER.info("Client: Received DATA task"); } else if (task.getType() == Task.Type.DISCONNECT) { - s_logger.info("Client: Received DISCONNECT task"); + LOGGER.info("Client: Received DISCONNECT task"); stopClient(); } else if (task.getType() == Task.Type.OTHER) { - s_logger.info("Client: Received OTHER task"); + LOGGER.info("Client: Received OTHER task"); } } - } } @@ -208,15 +265,15 @@ public class NioTest extends TestCase { @Override public void doTask(final Task task) { if (task.getType() == Task.Type.CONNECT) { - s_logger.info("Server: Received CONNECT task"); + LOGGER.info("Server: Received CONNECT task"); } else if (task.getType() == Task.Type.DATA) { - s_logger.info("Server: Received DATA task"); + LOGGER.info("Server: Received DATA task"); doServerProcess(task.getData()); } else if (task.getType() == Task.Type.DISCONNECT) { - s_logger.info("Server: Received DISCONNECT task"); + LOGGER.info("Server: Received DISCONNECT task"); stopServer(); } else if (task.getType() == Task.Type.OTHER) { - s_logger.info("Server: Received OTHER task"); + LOGGER.info("Server: Received OTHER task"); } }