298 lines
8.5 KiB
Python

#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Version @VERSION@
#
# A plugin for executing script needed by cloud stack
from __future__ import with_statement
from copy import copy
from datetime import datetime
from httplib import *
from string import join
import os
import sys
import time
import hashlib
import base64
import hmac
import traceback
import urllib2
import XenAPIPlugin
sys.path.extend(["/opt/xensource/sm/"])
import util
NULL = 'null'
# Value conversion utility functions ...
def to_none(value):
return value if value is not None and value.strip() != NULL else None
def to_bool(value):
return True if to_none(value) in ['true', 'True', None] else False
def to_integer(value, default):
return int(value) if to_none(value) is not None else default
def optional_str_value(value, default):
return value if is_not_blank(value) else default
def is_not_blank(value):
return True if to_none(value) is not None and value.strip != '' else False
def get_optional_key(map, key, default=''):
return map[key] if key in map else default
def log(message):
util.SMlog('#### VMOPS %s ####' % message)
def echo(fn):
def wrapped(*v, **k):
name = fn.__name__
log("enter %s ####" % name)
res = fn(*v, **k)
log("exit %s with result %s" % name, res)
return res
return wrapped
def require_str_value(value, error_message):
if is_not_blank(value):
return value
raise ValueError(error_message)
def retry(max_attempts, fn):
attempts = 1
while attempts <= max_attempts:
log("Attempting execution {0}/{1} of {2}".
format(attempts, max_attempts, fn.__name__))
try:
return fn()
except:
if (attempts >= max_attempts):
raise
attempts = attempts + 1
def compute_md5(filename, buffer_size=8192):
hasher = hashlib.md5()
with open(filename, 'rb') as file:
data = file.read(buffer_size)
while data != "":
hasher.update(data)
data = file.read(buffer_size)
return base64.encodestring(hasher.digest())[:-1]
class S3Client(object):
DEFAULT_END_POINT = 's3.amazonaws.com'
DEFAULT_CONNECTION_TIMEOUT = 50000
DEFAULT_SOCKET_TIMEOUT = 50000
DEFAULT_MAX_ERROR_RETRY = 3
HEADER_CONTENT_MD5 = 'Content-MD5'
HEADER_CONTENT_TYPE = 'Content-Type'
HEADER_CONTENT_LENGTH = 'Content-Length'
def __init__(self, access_key, secret_key, end_point=None,
https_flag=None, connection_timeout=None, socket_timeout=None,
max_error_retry=None):
self.access_key = require_str_value(
access_key, 'An access key must be specified.')
self.secret_key = require_str_value(
secret_key, 'A secret key must be specified.')
self.end_point = optional_str_value(end_point, self.DEFAULT_END_POINT)
self.https_flag = to_bool(https_flag)
self.connection_timeout = to_integer(
connection_timeout, self.DEFAULT_CONNECTION_TIMEOUT)
self.socket_timeout = to_integer(
socket_timeout, self.DEFAULT_SOCKET_TIMEOUT)
self.max_error_retry = to_integer(
max_error_retry, self.DEFAULT_MAX_ERROR_RETRY)
def build_canocialized_resource(self, bucket, key):
return '/{bucket}/{key}'.format(bucket=bucket, key=key)
def noop_send_body():
pass
def noop_read(response):
return response.read()
def do_operation(
self, method, bucket, key, input_headers={},
fn_send_body=noop_send_body, fn_read=noop_read):
headers = copy(input_headers)
headers['Expect'] = '100-continue'
uri = self.build_canocialized_resource(bucket, key)
signature, request_date = self.sign_request(method, uri, headers)
headers['Authorization'] = "AWS {0}:{1}".format(
self.access_key, signature)
headers['Date'] = request_date
connection = HTTPSConnection(self.end_point) \
if self.https_flag else HTTPConnection(self.end_point)
connection.timeout = self.socket_timeout
def perform_request():
connection.request(method, uri, fn_send_body(), headers)
response = connection.getresponse()
log("Sent {0} request to {1} {2} with headers {3}. \
Got response status {4}: {5}".
format(method, self.end_point, uri, headers,
response.status, response.reason))
return fn_read(response)
try:
return retry(self.max_error_retry, perform_request)
finally:
connection.close()
'''
See http://bit.ly/MMC5de for more information regarding the creation of
AWS authorization tokens and header signing
'''
def sign_request(self, operation, canocialized_resource, headers):
request_date = datetime.utcnow(
).strftime('%a, %d %b %Y %H:%M:%S +0000')
content_hash = get_optional_key(headers, self.HEADER_CONTENT_MD5)
content_type = get_optional_key(headers, self.HEADER_CONTENT_TYPE)
string_to_sign = join(
[operation, content_hash, content_type, request_date,
canocialized_resource], '\n')
signature = base64.encodestring(
hmac.new(self.secret_key, string_to_sign.encode('utf8'),
hashlib.sha1).digest())[:-1]
return signature, request_date
def put(self, bucket, key, src_filename):
headers = {
self.HEADER_CONTENT_MD5: compute_md5(src_filename),
self.HEADER_CONTENT_TYPE: 'application/octet-stream',
self.HEADER_CONTENT_LENGTH: os.stat(src_filename).st_size,
}
def send_body():
return open(src_filename, 'rb')
self.do_operation('PUT', bucket, key, headers, send_body)
def get(self, bucket, key, target_filename):
def read(response):
with open(target_filename, 'wb') as file:
while True:
block = response.read(8192)
if not block:
break
file.write(block)
return self.do_operation('GET', bucket, key, fn_read=read)
def delete(self, bucket, key):
return self.do_operation('DELETE', bucket, key)
def parseArguments(args):
# The keys in the args map will correspond to the properties defined on
# the com.cloud.utils.S3Utils#ClientOptions interface
client = S3Client(
args['accessKey'], args['secretKey'], args['endPoint'],
args['isHttps'], args['connectionTimeout'], args['socketTimeout'])
operation = args['operation']
bucket = args['bucket']
key = args['key']
filename = args['filename']
if is_blank(operation):
raise ValueError('An operation must be specified.')
if is_blank(bucket):
raise ValueError('A bucket must be specified.')
if is_blank(key):
raise ValueError('A value must be specified.')
if is_blank(filename):
raise ValueError('A filename must be specified.')
return client, operation, bucket, key, filename
@echo
def s3(session, args):
client, operation, bucket, key, filename = parseArguments(args)
try:
if operation == 'put':
client.put(bucket, key, filename)
elif operation == 'get':
client.get(bucket, key, filename)
elif operation == 'delete':
client.delete(bucket, key, filename)
else:
raise RuntimeError(
"S3 plugin does not support operation {0}.".format(operation))
return 'true'
except:
log("Operation {0} on file {1} from/in bucket {2} key {3}.".format(
operation, filename, bucket, key))
log(traceback.format_exc())
return 'false'
if __name__ == "__main__":
XenAPIPlugin.dispatch({"s3": s3})