CLOUDSTACK-4817: fix s3 multipart uplaod

Conflicts:

	plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/XenServerStorageProcessor.java
This commit is contained in:
Edison Su 2013-10-28 17:31:49 -07:00
parent 2cac1aaa0f
commit 89d6e7ed66
2 changed files with 71 additions and 8 deletions

View File

@ -18,6 +18,11 @@
*/ */
package com.cloud.hypervisor.xen.resource; package com.cloud.hypervisor.xen.resource;
import static com.cloud.utils.ReflectUtil.flattenProperties;
import static com.google.common.collect.Lists.newArrayList;
import java.io.File; import java.io.File;
import java.net.URI; import java.net.URI;
import java.util.Arrays; import java.util.Arrays;
@ -82,9 +87,6 @@ import com.xensource.xenapi.VDI;
import com.xensource.xenapi.VM; import com.xensource.xenapi.VM;
import com.xensource.xenapi.VMGuestMetrics; import com.xensource.xenapi.VMGuestMetrics;
import static com.cloud.utils.ReflectUtil.flattenProperties;
import static com.google.common.collect.Lists.newArrayList;
public class XenServerStorageProcessor implements StorageProcessor { public class XenServerStorageProcessor implements StorageProcessor {
private static final Logger s_logger = Logger.getLogger(XenServerStorageProcessor.class); private static final Logger s_logger = Logger.getLogger(XenServerStorageProcessor.class);
protected CitrixResourceBase hypervisorResource; protected CitrixResourceBase hypervisorResource;
@ -1091,11 +1093,12 @@ public class XenServerStorageProcessor implements StorageProcessor {
S3Utils.ClientOptions.class)); S3Utils.ClientOptions.class));
// https workaround for Introspector bug that does not // https workaround for Introspector bug that does not
// recognize Boolean accessor methods ... // recognize Boolean accessor methods ...
parameters.addAll(Arrays.asList("operation", "put", "filename", parameters.addAll(Arrays.asList("operation", "put", "filename",
dir + "/" + filename, "iSCSIFlag", dir + "/" + filename, "iSCSIFlag",
iSCSIFlag.toString(), "bucket", s3.getBucketName(), iSCSIFlag.toString(), "bucket", s3.getBucketName(),
"key", key, "https", s3.isHttps() != null ? s3.isHttps().toString() "key", key, "https", s3.isHttps() != null ? s3.isHttps().toString()
: "null")); : "null", "maxSingleUploadSizeInBytes", String.valueOf(s3.getMaxSingleUploadSizeInBytes())));
final String result = hypervisorResource.callHostPluginAsync(connection, "s3xen", final String result = hypervisorResource.callHostPluginAsync(connection, "s3xen",
"s3", wait, "s3", wait,
parameters.toArray(new String[parameters.size()])); parameters.toArray(new String[parameters.size()]));

View File

@ -34,6 +34,7 @@ import base64
import hmac import hmac
import traceback import traceback
import urllib2 import urllib2
from xml.dom.minidom import parseString
import XenAPIPlugin import XenAPIPlugin
sys.path.extend(["/opt/xensource/sm/"]) sys.path.extend(["/opt/xensource/sm/"])
@ -260,15 +261,73 @@ class S3Client(object):
sha).digest())[:-1] sha).digest())[:-1]
return signature, request_date return signature, request_date
def getText(self, nodelist):
rc = []
for node in nodelist:
if node.nodeType == node.TEXT_NODE:
rc.append(node.data)
return ''.join(rc)
def put(self, bucket, key, src_filename): def multiUpload(self, bucket, key, src_fileName, chunkSize=5 * 1024 * 1024):
uploadId={}
def readInitalMultipart(response):
data = response.read()
xmlResult = parseString(data)
result = xmlResult.getElementsByTagName("InitiateMultipartUploadResult")[0]
upload = result.getElementsByTagName("UploadId")[0]
uploadId["0"] = upload.childNodes[0].data
self.do_operation('POST', bucket, key + "?uploads", fn_read=readInitalMultipart)
fileSize = os.path.getsize(src_fileName)
parts = fileSize / chunkSize + ((fileSize % chunkSize) and 1)
part = 1
srcFile = open(src_fileName, 'rb')
etags = []
while part <= parts:
offset = part - 1
size = min(fileSize - offset * chunkSize, chunkSize)
headers = {
self.HEADER_CONTENT_LENGTH: size
}
def send_body(connection):
srcFile.seek(offset * chunkSize)
block = srcFile.read(size)
connection.send(block)
def read_multiPart(response):
etag = response.getheader('ETag')
etags.append((part, etag))
self.do_operation("PUT", bucket, "%s?partNumber=%s&uploadId=%s"%(key, part, uploadId["0"]), headers, send_body, read_multiPart)
part = part + 1
srcFile.close()
data = []
partXml = "<Part><PartNumber>%i</PartNumber><ETag>%s</ETag></Part>"
for etag in etags:
data.append(partXml%etag)
msg = "<CompleteMultipartUpload>%s</CompleteMultipartUpload>"%("".join(data))
size = len(msg)
headers = {
self.HEADER_CONTENT_LENGTH: size
}
def send_complete_multipart(connection):
connection.send(msg)
self.do_operation("POST", bucket, "%s?uploadId=%s"%(key, uploadId["0"]), headers, send_complete_multipart)
def put(self, bucket, key, src_filename, maxSingleUpload):
if not os.path.isfile(src_filename): if not os.path.isfile(src_filename):
raise Exception( raise Exception(
"Attempt to put " + src_filename + " that does not exist.") "Attempt to put " + src_filename + " that does not exist.")
size = os.path.getsize(src_filename)
if size > maxSingleUpload or maxSingleUpload == 0:
return self.multiUpload(bucket, key, src_filename)
headers = { headers = {
self.HEADER_CONTENT_MD5: compute_md5(src_filename), self.HEADER_CONTENT_MD5: compute_md5(src_filename),
self.HEADER_CONTENT_TYPE: 'application/octet-stream', self.HEADER_CONTENT_TYPE: 'application/octet-stream',
self.HEADER_CONTENT_LENGTH: str(os.stat(src_filename).st_size), self.HEADER_CONTENT_LENGTH: str(os.stat(src_filename).st_size),
} }
@ -323,6 +382,7 @@ def parseArguments(args):
bucket = args['bucket'] bucket = args['bucket']
key = args['key'] key = args['key']
filename = args['filename'] filename = args['filename']
maxSingleUploadBytes = int(args["maxSingleUploadSizeInBytes"])
if is_blank(operation): if is_blank(operation):
raise ValueError('An operation must be specified.') raise ValueError('An operation must be specified.')
@ -336,18 +396,18 @@ def parseArguments(args):
if is_blank(filename): if is_blank(filename):
raise ValueError('A filename must be specified.') raise ValueError('A filename must be specified.')
return client, operation, bucket, key, filename return client, operation, bucket, key, filename, maxSingleUploadBytes
@echo @echo
def s3(session, args): def s3(session, args):
client, operation, bucket, key, filename = parseArguments(args) client, operation, bucket, key, filename, maxSingleUploadBytes = parseArguments(args)
try: try:
if operation == 'put': if operation == 'put':
client.put(bucket, key, filename) client.put(bucket, key, filename, maxSingleUploadBytes)
elif operation == 'get': elif operation == 'get':
client.get(bucket, key, filename) client.get(bucket, key, filename)
elif operation == 'delete': elif operation == 'delete':