volume upload: added httpcomponents server and removed nio server

removing nio server as it is currently handling only https connections
and the parsing logic is also specific to agent communication.

current limitation of httpcomponents server is that the entire file is
read in memory. need figure out how to read it in chunks and send it
through a inputstreamreader to save on secondary storage.
This commit is contained in:
Rajani Karuturi 2015-01-16 17:10:57 +05:30
parent 1f1c96d2ee
commit ebaa4dd38e
6 changed files with 153 additions and 13 deletions

View File

@ -61,8 +61,8 @@
<cs.gson.version>1.7.2</cs.gson.version>
<cs.guava.version>14.0-rc1</cs.guava.version>
<cs.xapi.version>6.2.0-3.1</cs.xapi.version>
<cs.httpclient.version>3.1</cs.httpclient.version>
<cs.httpcore.version>4.2.1</cs.httpcore.version>
<cs.httpclient.version>4.3.6</cs.httpclient.version>
<cs.httpcore.version>4.4</cs.httpcore.version>
<cs.mysql.version>5.1.21</cs.mysql.version>
<cs.xstream.version>1.3.1</cs.xstream.version>
<cs.xmlrpc.version>3.1.3</cs.xmlrpc.version>
@ -323,7 +323,7 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${cs.httpcore.version}</version>
<version>${cs.httpclient.version}</version>
</dependency>
<dependency>
<groupId>com.thoughtworks.xstream</groupId>

View File

@ -50,6 +50,7 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>${cs.httpcore.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cloudstack</groupId>

View File

@ -53,6 +53,17 @@
<artifactId>cloud-server</artifactId>
<version>${project.version}</version>
</dependency>
<!-- dependencies for starting a post upload server on ssvm -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>${cs.httpcore.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore-nio</artifactId>
<version>${cs.httpcore.version}</version>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -32,8 +32,10 @@ import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.security.MessageDigest;
@ -48,19 +50,48 @@ import javax.naming.ConfigurationException;
import com.cloud.utils.nio.HandlerFactory;
import com.cloud.utils.nio.Link;
import com.cloud.utils.nio.NioServer;
import com.cloud.utils.nio.Task;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URLEncodedUtils;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.nio.DefaultHttpServerIODispatch;
import org.apache.http.impl.nio.DefaultNHttpServerConnection;
import org.apache.http.impl.nio.DefaultNHttpServerConnectionFactory;
import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.NHttpConnectionFactory;
import org.apache.http.nio.NHttpServerConnection;
import org.apache.http.nio.protocol.BasicAsyncRequestConsumer;
import org.apache.http.nio.protocol.BasicAsyncResponseProducer;
import org.apache.http.nio.protocol.HttpAsyncExchange;
import org.apache.http.nio.protocol.HttpAsyncRequestConsumer;
import org.apache.http.nio.protocol.HttpAsyncRequestHandler;
import org.apache.http.nio.protocol.HttpAsyncService;
import org.apache.http.nio.protocol.UriHttpAsyncRequestHandlerMapper;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.ListeningIOReactor;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.HttpProcessorBuilder;
import org.apache.http.protocol.ResponseConnControl;
import org.apache.http.protocol.ResponseContent;
import org.apache.http.protocol.ResponseDate;
import org.apache.http.protocol.ResponseServer;
import org.apache.http.util.EntityUtils;
import org.apache.log4j.Logger;
import com.amazonaws.services.s3.model.S3ObjectSummary;
@ -1309,9 +1340,61 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
private void startNioServerForPostUpload() {
//TODO: make port configurable.
NioServer server = new NioServer("PostUploadServer", 8210, 15, this);
s_logger.info("Listening on 8210 with 15 workers");
server.start();
int port = 8210;
// Create HTTP protocol processing chain
HttpProcessor httpproc = HttpProcessorBuilder.create()
.add(new ResponseDate())
.add(new ResponseServer("HTTP/1.1"))
.add(new ResponseContent())
.add(new ResponseConnControl()).build();
// Create request handler registry
UriHttpAsyncRequestHandlerMapper reqistry = new UriHttpAsyncRequestHandlerMapper();
// Register the default handler for all URIs
reqistry.register("/upload*", new PostUploadRequestHandler());
// Create server-side HTTP protocol handler
HttpAsyncService protocolHandler = new HttpAsyncService(httpproc, reqistry) {
@Override
public void connected(final NHttpServerConnection conn) {
s_logger.info(conn + ": connection open");
super.connected(conn);
}
@Override
public void closed(final NHttpServerConnection conn) {
s_logger.info(conn + ": connection closed");
super.closed(conn);
}
};
// Create HTTP connection factory
NHttpConnectionFactory<DefaultNHttpServerConnection> connFactory;
connFactory = new DefaultNHttpServerConnectionFactory(
ConnectionConfig.DEFAULT);
// Create server-side I/O event dispatch
IOEventDispatch ioEventDispatch = new DefaultHttpServerIODispatch(protocolHandler, connFactory);
// Set I/O reactor defaults
IOReactorConfig config = IOReactorConfig.custom()
.setIoThreadCount(15)
.setSoTimeout(3000)
.setConnectTimeout(3000)
.build();
try {
// Create server-side I/O reactor
ListeningIOReactor ioReactor = new DefaultListeningIOReactor(config);
// Listen of the given port
ioReactor.listen(new InetSocketAddress(port));
// Ready to go!
ioReactor.execute(ioEventDispatch);
} catch (InterruptedIOException ex) {
s_logger.info("Interrupted");
} catch (IOException e) {
s_logger.info("I/O error: " + e.getMessage());
}
s_logger.info("Shutdown");
}
private void savePostUploadPSK(String psk) {
@ -2529,4 +2612,54 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
// TODO Auto-generated method stub
}
}
//TODO: move this class to a separate file
private class PostUploadRequestHandler implements HttpAsyncRequestHandler<HttpRequest> {
public void handleInternal(HttpRequest httpRequest, HttpResponse httpResponse, HttpContext httpContext) throws HttpException, IOException {
s_logger.info(""); // empty line before each request
s_logger.info(httpRequest.getRequestLine());
s_logger.info("-------- HEADERS --------");
for (Header header : httpRequest.getAllHeaders()) {
s_logger.info(header.getName() + " : " + header.getValue());
}
s_logger.info("--------");
HttpEntity entity = null;
if (httpRequest instanceof HttpEntityEnclosingRequest) {
entity = ((HttpEntityEnclosingRequest) httpRequest).getEntity();
}
if (entity != null) {
s_logger.info(entity.getContentType());
s_logger.info(entity.getContentLength());
s_logger.info(entity.isChunked());
}
// For some reason, just putting the incoming entity into
// the response will not work. We have to buffer the message.
byte[] data;
if (entity == null) {
data = new byte[0];
} else {
data = EntityUtils.toByteArray(entity);
}
s_logger.info(new String(data));
//TODO: post request is available in data. Need to parse and save file.
httpResponse.setEntity(new StringEntity("upload successful"));
}
@Override
public HttpAsyncRequestConsumer<HttpRequest> processRequest(HttpRequest request, HttpContext context) throws HttpException, IOException {
return new BasicAsyncRequestConsumer();
}
@Override
public void handle(HttpRequest request, HttpAsyncExchange httpExchange, HttpContext context) throws HttpException, IOException {
HttpResponse response = httpExchange.getResponse();
handleInternal(request, response, context);
httpExchange.submitResponse(new BasicAsyncResponseProducer(response));
}
}
}

View File

@ -171,7 +171,7 @@
RewriteEngine Off
RewriteCond %{HTTPS} =on
RewriteCond %{REQUEST_METHOD} =POST
RewriteRule ^/upload/(.*) https://127.0.0.1:8210/upload?uuid=$1 [P,L]
RewriteRule ^/upload/(.*) http://127.0.0.1:8210/upload?uuid=$1 [P,L]
</VirtualHost>
</IfModule>

View File

@ -64,11 +64,6 @@
<artifactId>selenium-java-client-driver</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>${cs.httpclient.version}</version>
</dependency>
</dependencies>
<build>
<defaultGoal>compile</defaultGoal>