From 89d6e7ed66e92ded84e85d57a7b2681fd088c20b Mon Sep 17 00:00:00 2001 From: Edison Su Date: Mon, 28 Oct 2013 17:31:49 -0700 Subject: [PATCH] CLOUDSTACK-4817: fix s3 multipart uplaod Conflicts: plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/XenServerStorageProcessor.java --- .../resource/XenServerStorageProcessor.java | 11 +-- scripts/vm/hypervisor/xenserver/s3xen | 68 +++++++++++++++++-- 2 files changed, 71 insertions(+), 8 deletions(-) diff --git a/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/XenServerStorageProcessor.java b/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/XenServerStorageProcessor.java index 1496108b19b..5a19aee2468 100644 --- a/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/XenServerStorageProcessor.java +++ b/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/XenServerStorageProcessor.java @@ -18,6 +18,11 @@ */ package com.cloud.hypervisor.xen.resource; + +import static com.cloud.utils.ReflectUtil.flattenProperties; +import static com.google.common.collect.Lists.newArrayList; + + import java.io.File; import java.net.URI; import java.util.Arrays; @@ -82,9 +87,6 @@ import com.xensource.xenapi.VDI; import com.xensource.xenapi.VM; import com.xensource.xenapi.VMGuestMetrics; -import static com.cloud.utils.ReflectUtil.flattenProperties; -import static com.google.common.collect.Lists.newArrayList; - public class XenServerStorageProcessor implements StorageProcessor { private static final Logger s_logger = Logger.getLogger(XenServerStorageProcessor.class); protected CitrixResourceBase hypervisorResource; @@ -1091,11 +1093,12 @@ public class XenServerStorageProcessor implements StorageProcessor { S3Utils.ClientOptions.class)); // https workaround for Introspector bug that does not // recognize Boolean accessor methods ... + parameters.addAll(Arrays.asList("operation", "put", "filename", dir + "/" + filename, "iSCSIFlag", iSCSIFlag.toString(), "bucket", s3.getBucketName(), "key", key, "https", s3.isHttps() != null ? s3.isHttps().toString() - : "null")); + : "null", "maxSingleUploadSizeInBytes", String.valueOf(s3.getMaxSingleUploadSizeInBytes()))); final String result = hypervisorResource.callHostPluginAsync(connection, "s3xen", "s3", wait, parameters.toArray(new String[parameters.size()])); diff --git a/scripts/vm/hypervisor/xenserver/s3xen b/scripts/vm/hypervisor/xenserver/s3xen index 372a6daaddc..bf81bbd34a6 100644 --- a/scripts/vm/hypervisor/xenserver/s3xen +++ b/scripts/vm/hypervisor/xenserver/s3xen @@ -34,6 +34,7 @@ import base64 import hmac import traceback import urllib2 +from xml.dom.minidom import parseString import XenAPIPlugin sys.path.extend(["/opt/xensource/sm/"]) @@ -260,15 +261,73 @@ class S3Client(object): sha).digest())[:-1] return signature, request_date + + def getText(self, nodelist): + rc = [] + for node in nodelist: + if node.nodeType == node.TEXT_NODE: + rc.append(node.data) + return ''.join(rc) - def put(self, bucket, key, src_filename): + def multiUpload(self, bucket, key, src_fileName, chunkSize=5 * 1024 * 1024): + uploadId={} + def readInitalMultipart(response): + data = response.read() + xmlResult = parseString(data) + result = xmlResult.getElementsByTagName("InitiateMultipartUploadResult")[0] + upload = result.getElementsByTagName("UploadId")[0] + uploadId["0"] = upload.childNodes[0].data + + self.do_operation('POST', bucket, key + "?uploads", fn_read=readInitalMultipart) + + fileSize = os.path.getsize(src_fileName) + parts = fileSize / chunkSize + ((fileSize % chunkSize) and 1) + part = 1 + srcFile = open(src_fileName, 'rb') + etags = [] + while part <= parts: + offset = part - 1 + size = min(fileSize - offset * chunkSize, chunkSize) + headers = { + self.HEADER_CONTENT_LENGTH: size + } + def send_body(connection): + srcFile.seek(offset * chunkSize) + block = srcFile.read(size) + connection.send(block) + def read_multiPart(response): + etag = response.getheader('ETag') + etags.append((part, etag)) + self.do_operation("PUT", bucket, "%s?partNumber=%s&uploadId=%s"%(key, part, uploadId["0"]), headers, send_body, read_multiPart) + part = part + 1 + srcFile.close() + + data = [] + partXml = "%i%s" + for etag in etags: + data.append(partXml%etag) + msg = "%s"%("".join(data)) + size = len(msg) + headers = { + self.HEADER_CONTENT_LENGTH: size + } + def send_complete_multipart(connection): + connection.send(msg) + self.do_operation("POST", bucket, "%s?uploadId=%s"%(key, uploadId["0"]), headers, send_complete_multipart) + + def put(self, bucket, key, src_filename, maxSingleUpload): if not os.path.isfile(src_filename): raise Exception( "Attempt to put " + src_filename + " that does not exist.") + size = os.path.getsize(src_filename) + if size > maxSingleUpload or maxSingleUpload == 0: + return self.multiUpload(bucket, key, src_filename) + headers = { self.HEADER_CONTENT_MD5: compute_md5(src_filename), + self.HEADER_CONTENT_TYPE: 'application/octet-stream', self.HEADER_CONTENT_LENGTH: str(os.stat(src_filename).st_size), } @@ -323,6 +382,7 @@ def parseArguments(args): bucket = args['bucket'] key = args['key'] filename = args['filename'] + maxSingleUploadBytes = int(args["maxSingleUploadSizeInBytes"]) if is_blank(operation): raise ValueError('An operation must be specified.') @@ -336,18 +396,18 @@ def parseArguments(args): if is_blank(filename): raise ValueError('A filename must be specified.') - return client, operation, bucket, key, filename + return client, operation, bucket, key, filename, maxSingleUploadBytes @echo def s3(session, args): - client, operation, bucket, key, filename = parseArguments(args) + client, operation, bucket, key, filename, maxSingleUploadBytes = parseArguments(args) try: if operation == 'put': - client.put(bucket, key, filename) + client.put(bucket, key, filename, maxSingleUploadBytes) elif operation == 'get': client.get(bucket, key, filename) elif operation == 'delete':