Support Multi part upload for S3 using TransferManager.

This commit is contained in:
Min Chen 2013-06-05 09:40:33 -07:00
parent e92cd6d632
commit 4e404953ad
2 changed files with 262 additions and 291 deletions

View File

@ -16,20 +16,12 @@
// under the License. // under the License.
package com.cloud.storage.template; package com.cloud.storage.template;
import static com.cloud.utils.StringUtils.join; import static com.cloud.utils.StringUtils.join;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.Date; import java.util.Date;
import org.apache.cloudstack.storage.command.DownloadCommand.ResourceType; import org.apache.cloudstack.storage.command.DownloadCommand.ResourceType;
@ -51,12 +43,15 @@ import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException; import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.ProgressEvent; import com.amazonaws.services.s3.model.ProgressEvent;
import com.amazonaws.services.s3.model.ProgressListener; import com.amazonaws.services.s3.model.ProgressListener;
import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.StorageClass; import com.amazonaws.services.s3.model.StorageClass;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.Upload;
import com.cloud.agent.api.storage.Proxy; import com.cloud.agent.api.storage.Proxy;
import com.cloud.agent.api.to.S3TO; import com.cloud.agent.api.to.S3TO;
import com.cloud.utils.Pair; import com.cloud.utils.Pair;
@ -68,367 +63,343 @@ import com.cloud.utils.UriUtils;
* *
*/ */
public class S3TemplateDownloader implements TemplateDownloader { public class S3TemplateDownloader implements TemplateDownloader {
public static final Logger s_logger = Logger.getLogger(S3TemplateDownloader.class.getName()); public static final Logger s_logger = Logger.getLogger(S3TemplateDownloader.class.getName());
private static final MultiThreadedHttpConnectionManager s_httpClientManager = new MultiThreadedHttpConnectionManager(); private static final MultiThreadedHttpConnectionManager s_httpClientManager = new MultiThreadedHttpConnectionManager();
private String downloadUrl; private String downloadUrl;
private String installPath; private String installPath;
private String s3Key; private String s3Key;
private String fileName; private String fileName;
public TemplateDownloader.Status status= TemplateDownloader.Status.NOT_STARTED; public TemplateDownloader.Status status = TemplateDownloader.Status.NOT_STARTED;
public String errorString = " "; public String errorString = " ";
private long remoteSize = 0; private long remoteSize = 0;
public long downloadTime = 0; public long downloadTime = 0;
public long totalBytes; public long totalBytes;
private final HttpClient client; private final HttpClient client;
private GetMethod request; private GetMethod request;
private boolean resume = false; private boolean resume = false;
private DownloadCompleteCallback completionCallback; private DownloadCompleteCallback completionCallback;
S3TO s3; private S3TO s3;
boolean inited = true; private boolean inited = true;
private long MAX_TEMPLATE_SIZE_IN_BYTES; private long maxTemplateSizeInByte;
private ResourceType resourceType = ResourceType.TEMPLATE; private ResourceType resourceType = ResourceType.TEMPLATE;
private final HttpMethodRetryHandler myretryhandler; private final HttpMethodRetryHandler myretryhandler;
public S3TemplateDownloader(S3TO storageLayer, String downloadUrl, String installPath,
DownloadCompleteCallback callback, long maxTemplateSizeInBytes, String user, String password, Proxy proxy,
ResourceType resourceType) {
this.s3 = storageLayer;
this.downloadUrl = downloadUrl;
this.installPath = installPath;
this.status = TemplateDownloader.Status.NOT_STARTED;
this.resourceType = resourceType;
this.maxTemplateSizeInByte = maxTemplateSizeInBytes;
this.totalBytes = 0;
this.client = new HttpClient(s_httpClientManager);
public S3TemplateDownloader (S3TO storageLayer, String downloadUrl, String installPath, DownloadCompleteCallback callback, long maxTemplateSizeInBytes, String user, String password, Proxy proxy, ResourceType resourceType) { myretryhandler = new HttpMethodRetryHandler() {
this.s3 = storageLayer; @Override
this.downloadUrl = downloadUrl; public boolean retryMethod(final HttpMethod method, final IOException exception, int executionCount) {
this.installPath = installPath; if (executionCount >= 2) {
this.status = TemplateDownloader.Status.NOT_STARTED; // Do not retry if over max retry count
this.resourceType = resourceType; return false;
this.MAX_TEMPLATE_SIZE_IN_BYTES = maxTemplateSizeInBytes; }
if (exception instanceof NoHttpResponseException) {
// Retry if the server dropped connection on us
return true;
}
if (!method.isRequestSent()) {
// Retry if the request has not been sent fully or
// if it's OK to retry methods that have been sent
return true;
}
// otherwise do not retry
return false;
}
};
this.totalBytes = 0; try {
this.client = new HttpClient(s_httpClientManager); this.request = new GetMethod(downloadUrl);
this.request.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, myretryhandler);
this.completionCallback = callback;
myretryhandler = new HttpMethodRetryHandler() { Pair<String, Integer> hostAndPort = UriUtils.validateUrl(downloadUrl);
@Override
public boolean retryMethod(
final HttpMethod method,
final IOException exception,
int executionCount) {
if (executionCount >= 2) {
// Do not retry if over max retry count
return false;
}
if (exception instanceof NoHttpResponseException) {
// Retry if the server dropped connection on us
return true;
}
if (!method.isRequestSent()) {
// Retry if the request has not been sent fully or
// if it's OK to retry methods that have been sent
return true;
}
// otherwise do not retry
return false;
}
};
try {
this.request = new GetMethod(downloadUrl);
this.request.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, myretryhandler);
this.completionCallback = callback;
Pair<String, Integer> hostAndPort = UriUtils.validateUrl(downloadUrl);
this.fileName = StringUtils.substringAfterLast(downloadUrl, "/"); this.fileName = StringUtils.substringAfterLast(downloadUrl, "/");
if (proxy != null) { if (proxy != null) {
client.getHostConfiguration().setProxy(proxy.getHost(), proxy.getPort()); client.getHostConfiguration().setProxy(proxy.getHost(), proxy.getPort());
if (proxy.getUserName() != null) { if (proxy.getUserName() != null) {
Credentials proxyCreds = new UsernamePasswordCredentials(proxy.getUserName(), proxy.getPassword()); Credentials proxyCreds = new UsernamePasswordCredentials(proxy.getUserName(), proxy.getPassword());
client.getState().setProxyCredentials(AuthScope.ANY, proxyCreds); client.getState().setProxyCredentials(AuthScope.ANY, proxyCreds);
} }
} }
if ((user != null) && (password != null)) { if ((user != null) && (password != null)) {
client.getParams().setAuthenticationPreemptive(true); client.getParams().setAuthenticationPreemptive(true);
Credentials defaultcreds = new UsernamePasswordCredentials(user, password); Credentials defaultcreds = new UsernamePasswordCredentials(user, password);
client.getState().setCredentials(new AuthScope(hostAndPort.first(), hostAndPort.second(), AuthScope.ANY_REALM), defaultcreds); client.getState().setCredentials(
s_logger.info("Added username=" + user + ", password=" + password + "for host " + hostAndPort.first() + ":" + hostAndPort.second()); new AuthScope(hostAndPort.first(), hostAndPort.second(), AuthScope.ANY_REALM), defaultcreds);
} else { s_logger.info("Added username=" + user + ", password=" + password + "for host " + hostAndPort.first()
s_logger.info("No credentials configured for host=" + hostAndPort.first() + ":" + hostAndPort.second()); + ":" + hostAndPort.second());
} } else {
} catch (IllegalArgumentException iae) { s_logger.info("No credentials configured for host=" + hostAndPort.first() + ":" + hostAndPort.second());
errorString = iae.getMessage(); }
status = TemplateDownloader.Status.UNRECOVERABLE_ERROR; } catch (IllegalArgumentException iae) {
inited = false; errorString = iae.getMessage();
} catch (Exception ex){ status = TemplateDownloader.Status.UNRECOVERABLE_ERROR;
errorString = "Unable to start download -- check url? "; inited = false;
status = TemplateDownloader.Status.UNRECOVERABLE_ERROR; } catch (Exception ex) {
s_logger.warn("Exception in constructor -- " + ex.toString()); errorString = "Unable to start download -- check url? ";
} catch (Throwable th) { status = TemplateDownloader.Status.UNRECOVERABLE_ERROR;
s_logger.warn("throwable caught ", th); s_logger.warn("Exception in constructor -- " + ex.toString());
} } catch (Throwable th) {
} s_logger.warn("throwable caught ", th);
}
}
@Override
public long download(boolean resume, DownloadCompleteCallback callback) {
switch (status) {
case ABORTED:
case UNRECOVERABLE_ERROR:
case DOWNLOAD_FINISHED:
return 0;
default:
@Override }
public long download(boolean resume, DownloadCompleteCallback callback) {
switch (status) {
case ABORTED:
case UNRECOVERABLE_ERROR:
case DOWNLOAD_FINISHED:
return 0;
default:
} try {
// execute get method
int responseCode = HttpStatus.SC_OK;
int bytes=0; if ((responseCode = client.executeMethod(request)) != HttpStatus.SC_OK) {
try { status = TemplateDownloader.Status.UNRECOVERABLE_ERROR;
// execute get method errorString = " HTTP Server returned " + responseCode + " (expected 200 OK) ";
int responseCode = HttpStatus.SC_OK; return 0; // FIXME: retry?
if ((responseCode = client.executeMethod(request)) != HttpStatus.SC_OK) { }
status = TemplateDownloader.Status.UNRECOVERABLE_ERROR; // get the total size of file
errorString = " HTTP Server returned " + responseCode + " (expected 200 OK) ";
return 0; //FIXME: retry?
}
// get the total size of file
Header contentLengthHeader = request.getResponseHeader("Content-Length"); Header contentLengthHeader = request.getResponseHeader("Content-Length");
boolean chunked = false; boolean chunked = false;
long remoteSize2 = 0; long remoteSize2 = 0;
if (contentLengthHeader == null) { if (contentLengthHeader == null) {
Header chunkedHeader = request.getResponseHeader("Transfer-Encoding"); Header chunkedHeader = request.getResponseHeader("Transfer-Encoding");
if (chunkedHeader == null || !"chunked".equalsIgnoreCase(chunkedHeader.getValue())) { if (chunkedHeader == null || !"chunked".equalsIgnoreCase(chunkedHeader.getValue())) {
status = TemplateDownloader.Status.UNRECOVERABLE_ERROR; status = TemplateDownloader.Status.UNRECOVERABLE_ERROR;
errorString=" Failed to receive length of download "; errorString = " Failed to receive length of download ";
return 0; //FIXME: what status do we put here? Do we retry? return 0; // FIXME: what status do we put here? Do we retry?
} else if ("chunked".equalsIgnoreCase(chunkedHeader.getValue())){ } else if ("chunked".equalsIgnoreCase(chunkedHeader.getValue())) {
chunked = true; chunked = true;
} }
} else { } else {
remoteSize2 = Long.parseLong(contentLengthHeader.getValue()); remoteSize2 = Long.parseLong(contentLengthHeader.getValue());
} }
if (remoteSize == 0) { if (remoteSize == 0) {
remoteSize = remoteSize2; remoteSize = remoteSize2;
} }
if (remoteSize > MAX_TEMPLATE_SIZE_IN_BYTES) { if (remoteSize > maxTemplateSizeInByte) {
s_logger.info("Remote size is too large: " + remoteSize + " , max=" + MAX_TEMPLATE_SIZE_IN_BYTES); s_logger.info("Remote size is too large: " + remoteSize + " , max=" + maxTemplateSizeInByte);
status = Status.UNRECOVERABLE_ERROR; status = Status.UNRECOVERABLE_ERROR;
errorString = "Download file size is too large"; errorString = "Download file size is too large";
return 0; return 0;
} }
if (remoteSize == 0) { if (remoteSize == 0) {
remoteSize = MAX_TEMPLATE_SIZE_IN_BYTES; remoteSize = maxTemplateSizeInByte;
} }
InputStream in = !chunked?new BufferedInputStream(request.getResponseBodyAsStream()) InputStream in = !chunked ? new BufferedInputStream(request.getResponseBodyAsStream())
: new ChunkedInputStream(request.getResponseBodyAsStream()); : new ChunkedInputStream(request.getResponseBodyAsStream());
s_logger.info("Starting download from " + getDownloadUrl() + " to s3 bucket " + s3.getBucketName() + " remoteSize=" + remoteSize + " , max size=" + MAX_TEMPLATE_SIZE_IN_BYTES); s_logger.info("Starting download from " + getDownloadUrl() + " to s3 bucket " + s3.getBucketName()
+ " remoteSize=" + remoteSize + " , max size=" + maxTemplateSizeInByte);
Date start = new Date(); Date start = new Date();
// compute s3 key // compute s3 key
s3Key = join(asList(installPath, fileName), S3Utils.SEPARATOR); s3Key = join(asList(installPath, fileName), S3Utils.SEPARATOR);
// multi-part upload using S3 api to handle > 5G input stream
TransferManager tm = new TransferManager(S3Utils.acquireClient(s3));
// download using S3 API // download using S3 API
ObjectMetadata metadata = new ObjectMetadata(); ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(remoteSize); metadata.setContentLength(remoteSize);
PutObjectRequest putObjectRequest = new PutObjectRequest( PutObjectRequest putObjectRequest = new PutObjectRequest(s3.getBucketName(), s3Key, in, metadata)
s3.getBucketName(), s3Key, in, metadata)
.withStorageClass(StorageClass.ReducedRedundancy); .withStorageClass(StorageClass.ReducedRedundancy);
// register progress listenser // register progress listenser
putObjectRequest putObjectRequest.setProgressListener(new ProgressListener() {
.setProgressListener(new ProgressListener() { @Override
@Override public void progressChanged(ProgressEvent progressEvent) {
public void progressChanged( // s_logger.debug(progressEvent.getBytesTransfered()
ProgressEvent progressEvent) { // + " number of byte transferd "
// s_logger.debug(progressEvent.getBytesTransfered() // + new Date());
// + " number of byte transferd " totalBytes += progressEvent.getBytesTransfered();
// + new Date()); if (progressEvent.getEventCode() == ProgressEvent.COMPLETED_EVENT_CODE) {
totalBytes += progressEvent.getBytesTransfered(); s_logger.info("download completed");
if (progressEvent.getEventCode() == ProgressEvent.COMPLETED_EVENT_CODE) { status = TemplateDownloader.Status.DOWNLOAD_FINISHED;
s_logger.info("download completed"); } else if (progressEvent.getEventCode() == ProgressEvent.FAILED_EVENT_CODE) {
status = TemplateDownloader.Status.DOWNLOAD_FINISHED; status = TemplateDownloader.Status.UNRECOVERABLE_ERROR;
} else if (progressEvent.getEventCode() == ProgressEvent.FAILED_EVENT_CODE){ } else if (progressEvent.getEventCode() == ProgressEvent.CANCELED_EVENT_CODE) {
status = TemplateDownloader.Status.UNRECOVERABLE_ERROR; status = TemplateDownloader.Status.ABORTED;
} else if (progressEvent.getEventCode() == ProgressEvent.CANCELED_EVENT_CODE){ } else {
status = TemplateDownloader.Status.ABORTED; status = TemplateDownloader.Status.IN_PROGRESS;
} else{ }
status = TemplateDownloader.Status.IN_PROGRESS; }
}
} });
// TransferManager processes all transfers asynchronously,
// so this call will return immediately.
Upload upload = tm.upload(putObjectRequest);
upload.waitForCompletion();
});
S3Utils.putObject(s3, putObjectRequest);
while (status != TemplateDownloader.Status.DOWNLOAD_FINISHED &&
status != TemplateDownloader.Status.UNRECOVERABLE_ERROR &&
status != TemplateDownloader.Status.ABORTED ){
// wait for completion
}
// finished or aborted // finished or aborted
Date finish = new Date(); Date finish = new Date();
String downloaded = "(incomplete download)"; String downloaded = "(incomplete download)";
if (totalBytes >= remoteSize) { if (totalBytes >= remoteSize) {
status = TemplateDownloader.Status.DOWNLOAD_FINISHED; status = TemplateDownloader.Status.DOWNLOAD_FINISHED;
downloaded = "(download complete remote=" + remoteSize + "bytes)"; downloaded = "(download complete remote=" + remoteSize + "bytes)";
} else { } else {
errorString = "Downloaded " + totalBytes + " bytes " + downloaded; errorString = "Downloaded " + totalBytes + " bytes " + downloaded;
} }
downloadTime += finish.getTime() - start.getTime(); downloadTime += finish.getTime() - start.getTime();
return totalBytes; return totalBytes;
}catch (HttpException hte) { } catch (HttpException hte) {
status = TemplateDownloader.Status.UNRECOVERABLE_ERROR; status = TemplateDownloader.Status.UNRECOVERABLE_ERROR;
errorString = hte.getMessage(); errorString = hte.getMessage();
} catch (IOException ioe) { } catch (IOException ioe) {
status = TemplateDownloader.Status.UNRECOVERABLE_ERROR; //probably a file write error? // probably a file write error
errorString = ioe.getMessage(); status = TemplateDownloader.Status.UNRECOVERABLE_ERROR;
} catch (AmazonClientException ex) { errorString = ioe.getMessage();
status = TemplateDownloader.Status.UNRECOVERABLE_ERROR; // S3 api exception } catch (AmazonClientException ex) {
// S3 api exception
status = TemplateDownloader.Status.UNRECOVERABLE_ERROR;
errorString = ex.getMessage(); errorString = ex.getMessage();
} finally { } catch (InterruptedException e) {
// close input stream // S3 upload api exception
request.releaseConnection(); status = TemplateDownloader.Status.UNRECOVERABLE_ERROR;
errorString = e.getMessage();
} finally {
// close input stream
request.releaseConnection();
if (callback != null) { if (callback != null) {
callback.downloadComplete(status); callback.downloadComplete(status);
} }
} }
return 0; return 0;
} }
public String getDownloadUrl() { public String getDownloadUrl() {
return downloadUrl; return downloadUrl;
} }
@Override
@Override
public TemplateDownloader.Status getStatus() { public TemplateDownloader.Status getStatus() {
return status; return status;
} }
@Override
@Override
public long getDownloadTime() { public long getDownloadTime() {
return downloadTime; return downloadTime;
} }
@Override
@Override
public long getDownloadedBytes() { public long getDownloadedBytes() {
return totalBytes; return totalBytes;
} }
@Override @Override
@SuppressWarnings("fallthrough") @SuppressWarnings("fallthrough")
public boolean stopDownload() { public boolean stopDownload() {
switch (getStatus()) { switch (getStatus()) {
case IN_PROGRESS: case IN_PROGRESS:
if (request != null) { if (request != null) {
request.abort(); request.abort();
} }
status = TemplateDownloader.Status.ABORTED; status = TemplateDownloader.Status.ABORTED;
return true; return true;
case UNKNOWN: case UNKNOWN:
case NOT_STARTED: case NOT_STARTED:
case RECOVERABLE_ERROR: case RECOVERABLE_ERROR:
case UNRECOVERABLE_ERROR: case UNRECOVERABLE_ERROR:
case ABORTED: case ABORTED:
status = TemplateDownloader.Status.ABORTED; status = TemplateDownloader.Status.ABORTED;
case DOWNLOAD_FINISHED: case DOWNLOAD_FINISHED:
try { try {
S3Utils.deleteObject(s3, s3.getBucketName(), s3Key); S3Utils.deleteObject(s3, s3.getBucketName(), s3Key);
} catch (Exception ex) { } catch (Exception ex) {
// ignore delete exception if it is not there // ignore delete exception if it is not there
} }
return true; return true;
default: default:
return true; return true;
} }
} }
@Override @Override
public int getDownloadPercent() { public int getDownloadPercent() {
if (remoteSize == 0) { if (remoteSize == 0) {
return 0; return 0;
} }
return (int)(100.0*totalBytes/remoteSize); return (int) (100.0 * totalBytes / remoteSize);
} }
@Override @Override
public void run() { public void run() {
try { try {
download(resume, completionCallback); download(resume, completionCallback);
} catch (Throwable t) { } catch (Throwable t) {
s_logger.warn("Caught exception during download "+ t.getMessage(), t); s_logger.warn("Caught exception during download " + t.getMessage(), t);
errorString = "Failed to install: " + t.getMessage(); errorString = "Failed to install: " + t.getMessage();
status = TemplateDownloader.Status.UNRECOVERABLE_ERROR; status = TemplateDownloader.Status.UNRECOVERABLE_ERROR;
} }
} }
@Override @Override
public void setStatus(TemplateDownloader.Status status) { public void setStatus(TemplateDownloader.Status status) {
this.status = status; this.status = status;
} }
public boolean isResume() {
return resume;
}
@Override
public String getDownloadError() {
return errorString;
}
public boolean isResume() { @Override
return resume; public String getDownloadLocalPath() {
} return this.s3Key;
}
@Override @Override
public String getDownloadError() {
return errorString;
}
@Override
public String getDownloadLocalPath() {
return this.s3Key;
}
@Override
public void setResume(boolean resume) { public void setResume(boolean resume) {
this.resume = resume; this.resume = resume;
} }
@Override
@Override
public long getMaxTemplateSizeInBytes() { public long getMaxTemplateSizeInBytes() {
return this.MAX_TEMPLATE_SIZE_IN_BYTES; return this.maxTemplateSizeInByte;
} }
public static void main(String[] args) { @Override
String url ="http://dev.mysql.com/get/Downloads/MySQL-5.0/mysql-noinstall-5.0.77-win32.zip/from/http://mirror.services.wisc.edu/mysql/"; public void setDownloadError(String error) {
try { errorString = error;
URI uri = new java.net.URI(url); }
} catch (URISyntaxException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
TemplateDownloader td = new S3TemplateDownloader(null, url,"/tmp/mysql", null, TemplateDownloader.DEFAULT_MAX_TEMPLATE_SIZE_IN_BYTES, null, null, null, null);
long bytes = td.download(true, null);
if (bytes > 0) {
System.out.println("Downloaded (" + bytes + " bytes)" + " in " + td.getDownloadTime()/1000 + " secs");
} else {
System.out.println("Failed download");
}
} @Override
public boolean isInited() {
return inited;
}
@Override public ResourceType getResourceType() {
public void setDownloadError(String error) { return resourceType;
errorString = error; }
}
@Override
public boolean isInited() {
return inited;
}
public ResourceType getResourceType() {
return resourceType;
}
} }

View File

@ -73,7 +73,7 @@ public final class S3Utils {
super(); super();
} }
private static AmazonS3 acquireClient(final ClientOptions clientOptions) { public static AmazonS3 acquireClient(final ClientOptions clientOptions) {
final AWSCredentials credentials = new BasicAWSCredentials( final AWSCredentials credentials = new BasicAWSCredentials(
clientOptions.getAccessKey(), clientOptions.getSecretKey()); clientOptions.getAccessKey(), clientOptions.getSecretKey());