From 0c1ae20e77152418e294f88532a7c6ca416e08a5 Mon Sep 17 00:00:00 2001 From: Edison Su Date: Fri, 12 Jul 2013 18:01:39 -0700 Subject: [PATCH] add inital swift support Conflicts: server/src/com/cloud/resource/ResourceManagerImpl.java server/test/com/cloud/resource/MockResourceManagerImpl.java --- api/src/com/cloud/agent/api/to/SwiftTO.java | 8 +- .../com/cloud/resource/ResourceService.java | 15 - .../api/command/admin/swift/AddSwiftCmd.java | 118 - .../command/admin/swift/ListSwiftsCmd.java | 70 - .../DownloadSystemTemplateCommand.java | 129 -- .../schema/src/com/cloud/storage/SwiftVO.java | 2 +- .../src/com/cloud/storage/dao/SwiftDao.java | 31 - .../com/cloud/storage/dao/SwiftDaoImpl.java | 58 - engine/storage/image/pom.xml | 1 + .../MockLocalNfsSecondaryStorageResource.java | 57 +- .../image/BaseImageStoreDriverImpl.java | 30 +- .../driver/SwiftImageStoreDriverImpl.java | 47 +- scripts/storage/secondary/swift | 2008 +++++++---------- .../cloud/capacity/CapacityManagerImpl.java | 3 - .../ConfigurationManagerImpl.java | 31 - .../cloud/resource/ResourceManagerImpl.java | 33 - .../cloud/server/ManagementServerImpl.java | 7 - .../storage/snapshot/SnapshotManagerImpl.java | 3 - .../com/cloud/storage/swift/SwiftManager.java | 57 - .../cloud/storage/swift/SwiftManagerImpl.java | 298 --- .../template/HypervisorTemplateAdapter.java | 3 +- .../resource/MockResourceManagerImpl.java | 52 +- .../ChildTestConfiguration.java | 14 - services/secondary-storage/pom.xml | 1 + .../LocalNfsSecondaryStorageResource.java | 62 +- .../resource/NfsSecondaryStorageResource.java | 11 +- 26 files changed, 893 insertions(+), 2256 deletions(-) delete mode 100644 api/src/org/apache/cloudstack/api/command/admin/swift/AddSwiftCmd.java delete mode 100644 api/src/org/apache/cloudstack/api/command/admin/swift/ListSwiftsCmd.java delete mode 100644 engine/api/src/org/apache/cloudstack/storage/command/DownloadSystemTemplateCommand.java delete mode 100644 engine/schema/src/com/cloud/storage/dao/SwiftDao.java delete mode 100644 engine/schema/src/com/cloud/storage/dao/SwiftDaoImpl.java delete mode 100644 server/src/com/cloud/storage/swift/SwiftManager.java delete mode 100644 server/src/com/cloud/storage/swift/SwiftManagerImpl.java diff --git a/api/src/com/cloud/agent/api/to/SwiftTO.java b/api/src/com/cloud/agent/api/to/SwiftTO.java index e1697f91041..5d74003216e 100644 --- a/api/src/com/cloud/agent/api/to/SwiftTO.java +++ b/api/src/com/cloud/agent/api/to/SwiftTO.java @@ -25,15 +25,18 @@ public class SwiftTO implements DataStoreTO { String userName; String key; + String container; public SwiftTO() { } - public SwiftTO(Long id, String url, String account, String userName, String key) { + public SwiftTO(Long id, String url, String account, String userName, String key, + String container) { this.id = id; this.url = url; this.account = account; this.userName = userName; this.key = key; + this.container = container; } public Long getId() { @@ -61,6 +64,9 @@ public class SwiftTO implements DataStoreTO { return DataStoreRole.Image; } + public String getContainer() { + return this.container; + } } diff --git a/api/src/com/cloud/resource/ResourceService.java b/api/src/com/cloud/resource/ResourceService.java index ce0df635bfe..25298cc6251 100755 --- a/api/src/com/cloud/resource/ResourceService.java +++ b/api/src/com/cloud/resource/ResourceService.java @@ -27,10 +27,6 @@ import org.apache.cloudstack.api.command.admin.host.PrepareForMaintenanceCmd; import org.apache.cloudstack.api.command.admin.host.ReconnectHostCmd; import org.apache.cloudstack.api.command.admin.host.UpdateHostCmd; import org.apache.cloudstack.api.command.admin.host.UpdateHostPasswordCmd; -import org.apache.cloudstack.api.command.admin.storage.AddS3Cmd; -import org.apache.cloudstack.api.command.admin.storage.ListS3sCmd; -import org.apache.cloudstack.api.command.admin.swift.AddSwiftCmd; -import org.apache.cloudstack.api.command.admin.swift.ListSwiftsCmd; import com.cloud.exception.DiscoveryException; import com.cloud.exception.InvalidParameterValueException; @@ -38,9 +34,6 @@ import com.cloud.exception.ResourceInUseException; import com.cloud.host.Host; import com.cloud.hypervisor.Hypervisor.HypervisorType; import com.cloud.org.Cluster; -import com.cloud.storage.S3; -import com.cloud.storage.Swift; -import com.cloud.utils.Pair; import com.cloud.utils.fsm.NoTransitionException; public interface ResourceService { @@ -97,16 +90,8 @@ public interface ResourceService { Cluster getCluster(Long clusterId); - Swift discoverSwift(AddSwiftCmd addSwiftCmd) throws DiscoveryException; - - S3 discoverS3(AddS3Cmd cmd) throws DiscoveryException; - List getSupportedHypervisorTypes(long zoneId, boolean forVirtualRouter, Long podId); - Pair, Integer> listSwifts(ListSwiftsCmd cmd); - - List listS3s(ListS3sCmd cmd); - boolean releaseHostReservation(Long hostId); } diff --git a/api/src/org/apache/cloudstack/api/command/admin/swift/AddSwiftCmd.java b/api/src/org/apache/cloudstack/api/command/admin/swift/AddSwiftCmd.java deleted file mode 100644 index 462b5297ce5..00000000000 --- a/api/src/org/apache/cloudstack/api/command/admin/swift/AddSwiftCmd.java +++ /dev/null @@ -1,118 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package org.apache.cloudstack.api.command.admin.swift; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.cloudstack.api.APICommand; -import org.apache.cloudstack.api.ApiConstants; -import org.apache.cloudstack.api.ApiErrorCode; -import org.apache.cloudstack.api.BaseCmd; -import org.apache.cloudstack.api.Parameter; -import org.apache.cloudstack.api.ServerApiException; -import org.apache.cloudstack.api.command.admin.storage.AddImageStoreCmd; -import org.apache.cloudstack.api.response.ImageStoreResponse; -import org.apache.log4j.Logger; - -import com.cloud.exception.DiscoveryException; -import com.cloud.storage.ImageStore; -import com.cloud.user.Account; - -@APICommand(name = "addSwift", description = "Adds Swift.", responseObject = ImageStoreResponse.class, since="3.0.0") -public class AddSwiftCmd extends BaseCmd { - public static final Logger s_logger = Logger.getLogger(AddSwiftCmd.class.getName()); - private static final String s_name = "addswiftresponse"; - - ///////////////////////////////////////////////////// - //////////////// API parameters ///////////////////// - ///////////////////////////////////////////////////// - - @Parameter(name = ApiConstants.URL, type = CommandType.STRING, required = true, description = "the URL for swift") - private String url; - - @Parameter(name = ApiConstants.ACCOUNT, type = CommandType.STRING, description = "the account for swift") - private String account; - - @Parameter(name = ApiConstants.USERNAME, type = CommandType.STRING, description = "the username for swift") - private String username; - - @Parameter(name = ApiConstants.KEY, type = CommandType.STRING, description = " key for the user for swift") - private String key; - - ///////////////////////////////////////////////////// - /////////////////// Accessors /////////////////////// - ///////////////////////////////////////////////////// - - public String getUrl() { - return url; - } - - ///////////////////////////////////////////////////// - /////////////// API Implementation/////////////////// - ///////////////////////////////////////////////////// - - public String getAccount() { - return account; - } - - public String getUsername() { - return username; - } - - public String getKey() { - return key; - } - - @Override - public String getCommandName() { - return s_name; - } - - @Override - public long getEntityOwnerId() { - return Account.ACCOUNT_ID_SYSTEM; - } - - @Override - public void execute(){ - AddImageStoreCmd cmd = new AddImageStoreCmd(); - cmd.setProviderName("Swift"); - cmd.setUrl(this.getUrl()); - Map details = new HashMap(); - details.put(ApiConstants.ACCOUNT, this.getAccount()); - details.put(ApiConstants.USERNAME, this.getUsername()); - details.put(ApiConstants.KEY, this.getKey()); - - - try{ - ImageStore result = _storageService.discoverImageStore(cmd); - ImageStoreResponse storeResponse = null; - if (result != null ) { - storeResponse = _responseGenerator.createImageStoreResponse(result); - storeResponse.setResponseName(getCommandName()); - storeResponse.setObjectName("secondarystorage"); - this.setResponseObject(storeResponse); - } else { - throw new ServerApiException(ApiErrorCode.INTERNAL_ERROR, "Failed to add secondary storage"); - } - } catch (DiscoveryException ex) { - s_logger.warn("Exception: ", ex); - throw new ServerApiException(ApiErrorCode.INTERNAL_ERROR, ex.getMessage()); - } - } -} diff --git a/api/src/org/apache/cloudstack/api/command/admin/swift/ListSwiftsCmd.java b/api/src/org/apache/cloudstack/api/command/admin/swift/ListSwiftsCmd.java deleted file mode 100644 index b0408f43792..00000000000 --- a/api/src/org/apache/cloudstack/api/command/admin/swift/ListSwiftsCmd.java +++ /dev/null @@ -1,70 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package org.apache.cloudstack.api.command.admin.swift; - -import org.apache.cloudstack.api.APICommand; -import org.apache.cloudstack.api.ApiConstants; -import org.apache.cloudstack.api.BaseListCmd; -import org.apache.cloudstack.api.Parameter; -import org.apache.cloudstack.api.command.admin.storage.ListImageStoresCmd; -import org.apache.cloudstack.api.response.ImageStoreResponse; -import org.apache.cloudstack.api.response.ListResponse; -import org.apache.log4j.Logger; - -import com.cloud.user.Account; - -@APICommand(name = "listSwifts", description = "List Swift.", responseObject = ImageStoreResponse.class, since="3.0.0") -public class ListSwiftsCmd extends BaseListCmd { - public static final Logger s_logger = Logger.getLogger(ListSwiftsCmd.class.getName()); - private static final String s_name = "listswiftsresponse"; - - ///////////////////////////////////////////////////// - //////////////// API parameters ///////////////////// - ///////////////////////////////////////////////////// - - @Parameter(name = ApiConstants.ID, type = CommandType.LONG, description = "the id of the swift") - private Long id; - - ///////////////////////////////////////////////////// - /////////////// API Implementation/////////////////// - ///////////////////////////////////////////////////// - - public Long getId() { - return id; - } - - - @Override - public String getCommandName() { - return s_name; - } - - @Override - public long getEntityOwnerId() { - return Account.ACCOUNT_ID_SYSTEM; - } - - @Override - public void execute(){ - - ListImageStoresCmd cmd = new ListImageStoresCmd(); - cmd.setProvider("Swift"); - ListResponse response = _queryService.searchForImageStores(cmd); - response.setResponseName(getCommandName()); - this.setResponseObject(response); - } -} diff --git a/engine/api/src/org/apache/cloudstack/storage/command/DownloadSystemTemplateCommand.java b/engine/api/src/org/apache/cloudstack/storage/command/DownloadSystemTemplateCommand.java deleted file mode 100644 index 9528ff788d8..00000000000 --- a/engine/api/src/org/apache/cloudstack/storage/command/DownloadSystemTemplateCommand.java +++ /dev/null @@ -1,129 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package org.apache.cloudstack.storage.command; - -import com.cloud.agent.api.Command; -import com.cloud.agent.api.storage.PasswordAuth; -import com.cloud.agent.api.storage.Proxy; -import com.cloud.agent.api.to.DataStoreTO; -import com.cloud.template.VirtualMachineTemplate; - -public class DownloadSystemTemplateCommand extends Command { - - private PasswordAuth auth; - private Proxy _proxy; - private DataStoreTO _store; - private Long resourceId; - private Long accountId; - private String url; - private Long maxDownloadSizeInBytes; - private String name; - - protected DownloadSystemTemplateCommand() { - super(); - } - - public DownloadSystemTemplateCommand(DataStoreTO store, String secUrl, VirtualMachineTemplate template, - Long maxDownloadSizeInBytes) { - super(); - this._store = store; - this.accountId = template.getAccountId(); - this.url = secUrl; - this.maxDownloadSizeInBytes = maxDownloadSizeInBytes; - this.resourceId = template.getId(); - this.name = template.getUniqueName(); - } - - public DownloadSystemTemplateCommand(DataStoreTO store, String secUrl, String url, VirtualMachineTemplate template, - String user, String passwd, Long maxDownloadSizeInBytes) { - super(); - this._store = store; - this.accountId = template.getAccountId(); - this.url = secUrl; - this.maxDownloadSizeInBytes = maxDownloadSizeInBytes; - this.resourceId = template.getId(); - auth = new PasswordAuth(user, passwd); - this.name = template.getUniqueName(); - } - - public PasswordAuth getAuth() { - return auth; - } - - public void setCreds(String userName, String passwd) { - auth = new PasswordAuth(userName, passwd); - } - - public Proxy getProxy() { - return _proxy; - } - - public void setProxy(Proxy proxy) { - _proxy = proxy; - } - - public Long getMaxDownloadSizeInBytes() { - return maxDownloadSizeInBytes; - } - - public DataStoreTO getDataStore() { - return _store; - } - - public void setDataStore(DataStoreTO _store) { - this._store = _store; - } - - public Long getResourceId() { - return resourceId; - } - - public void setResourceId(Long resourceId) { - this.resourceId = resourceId; - } - - public Long getAccountId() { - return accountId; - } - - public void setAccountId(Long accountId) { - this.accountId = accountId; - } - - public String getUrl() { - return url; - } - - public void setUrl(String url) { - this.url = url; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - @Override - public boolean executeInSequence() { - // TODO Auto-generated method stub - return false; - } - -} diff --git a/engine/schema/src/com/cloud/storage/SwiftVO.java b/engine/schema/src/com/cloud/storage/SwiftVO.java index 4136a224b3c..1389242d21c 100644 --- a/engine/schema/src/com/cloud/storage/SwiftVO.java +++ b/engine/schema/src/com/cloud/storage/SwiftVO.java @@ -99,7 +99,7 @@ public class SwiftVO implements Swift, InternalIdentity { @Override public SwiftTO toSwiftTO() { - return new SwiftTO(getId(), getUrl(), getAccount(), getUserName(), getKey()); + return null; } @Override diff --git a/engine/schema/src/com/cloud/storage/dao/SwiftDao.java b/engine/schema/src/com/cloud/storage/dao/SwiftDao.java deleted file mode 100644 index 72c5219a768..00000000000 --- a/engine/schema/src/com/cloud/storage/dao/SwiftDao.java +++ /dev/null @@ -1,31 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package com.cloud.storage.dao; - -import com.cloud.agent.api.to.SwiftTO; -import com.cloud.storage.SwiftVO; -import com.cloud.utils.db.GenericDao; - -/** - * - * - */ - -public interface SwiftDao extends GenericDao { - - SwiftTO getSwiftTO(Long swiftId); -} diff --git a/engine/schema/src/com/cloud/storage/dao/SwiftDaoImpl.java b/engine/schema/src/com/cloud/storage/dao/SwiftDaoImpl.java deleted file mode 100644 index 07146902959..00000000000 --- a/engine/schema/src/com/cloud/storage/dao/SwiftDaoImpl.java +++ /dev/null @@ -1,58 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package com.cloud.storage.dao; - -import java.util.Collections; -import java.util.List; - -import javax.ejb.Local; - -import org.apache.log4j.Logger; -import org.springframework.stereotype.Component; - -import com.cloud.agent.api.to.SwiftTO; -import com.cloud.storage.SwiftVO; -import com.cloud.utils.db.GenericDaoBase; - -/** - * - * - */ -@Component -@Local(value = { SwiftDao.class }) -public class SwiftDaoImpl extends GenericDaoBase implements SwiftDao { - public static final Logger s_logger = Logger.getLogger(SwiftDaoImpl.class.getName()); - - @Override - public SwiftTO getSwiftTO(Long swiftId) { - if (swiftId != null) { - SwiftVO swift = findById(swiftId); - if (swift != null) { - return swift.toSwiftTO(); - } - return null; - } - - List swiftVOs = listAll(); - if (swiftVOs == null || swiftVOs.size() < 1) { - return null; - } else { - Collections.shuffle(swiftVOs); - return swiftVOs.get(0).toSwiftTO(); - } - } -} \ No newline at end of file diff --git a/engine/storage/image/pom.xml b/engine/storage/image/pom.xml index c4cf14ca9b5..c4d2d1b2542 100644 --- a/engine/storage/image/pom.xml +++ b/engine/storage/image/pom.xml @@ -36,6 +36,7 @@ mockito-all 1.9.5 + javax.inject javax.inject diff --git a/engine/storage/integration-test/test/org/apache/cloudstack/storage/MockLocalNfsSecondaryStorageResource.java b/engine/storage/integration-test/test/org/apache/cloudstack/storage/MockLocalNfsSecondaryStorageResource.java index 4d932964a64..75239c633b8 100644 --- a/engine/storage/integration-test/test/org/apache/cloudstack/storage/MockLocalNfsSecondaryStorageResource.java +++ b/engine/storage/integration-test/test/org/apache/cloudstack/storage/MockLocalNfsSecondaryStorageResource.java @@ -27,7 +27,6 @@ import java.util.List; import javax.naming.ConfigurationException; -import org.apache.cloudstack.storage.command.DownloadSystemTemplateCommand; import org.apache.cloudstack.storage.resource.NfsSecondaryStorageResource; import org.apache.cloudstack.storage.template.DownloadManagerImpl; import org.springframework.stereotype.Component; @@ -75,61 +74,9 @@ public class MockLocalNfsSecondaryStorageResource extends NfsSecondaryStorageRes @Override public Answer executeRequest(Command cmd) { - if (cmd instanceof DownloadSystemTemplateCommand) { - return execute((DownloadSystemTemplateCommand) cmd); - } else { - // return Answer.createUnsupportedCommandAnswer(cmd); - return super.executeRequest(cmd); - } + // return Answer.createUnsupportedCommandAnswer(cmd); + return super.executeRequest(cmd); } - private Answer execute(DownloadSystemTemplateCommand cmd) { - DataStoreTO dstore = cmd.getDataStore(); - if (dstore instanceof S3TO) { - // TODO: how to handle download progress for S3 - S3TO s3 = (S3TO) cmd.getDataStore(); - String url = cmd.getUrl(); - String user = null; - String password = null; - if (cmd.getAuth() != null) { - user = cmd.getAuth().getUserName(); - password = new String(cmd.getAuth().getPassword()); - } - // get input stream from the given url - InputStream in = UriUtils.getInputStreamFromUrl(url, user, password); - URL urlObj; - try { - urlObj = new URL(url); - } catch (MalformedURLException e) { - throw new CloudRuntimeException("URL is incorrect: " + url); - } - final String bucket = s3.getBucketName(); - // convention is no / in the end for install path based on S3Utils - // implementation. - String path = determineS3TemplateDirectory(cmd.getAccountId(), cmd.getResourceId(), cmd.getName()); - // template key is - // TEMPLATE_ROOT_DIR/account_id/template_id/template_name - String key = join(asList(path, urlObj.getFile()), S3Utils.SEPARATOR); - S3Utils.putObject(s3, in, bucket, key); - List s3Obj = S3Utils.getDirectory(s3, bucket, path); - if (s3Obj == null || s3Obj.size() == 0) { - return new Answer(cmd, false, "Failed to download to S3 bucket: " + bucket + " with key: " + key); - } else { - return new DownloadAnswer(null, 100, null, Status.DOWNLOADED, path, path, s3Obj.get(0).getSize(), s3Obj - .get(0).getSize(), s3Obj.get(0).getETag()); - } - } else if (dstore instanceof NfsTO) { - return new Answer(cmd, false, "Nfs needs to be pre-installed with system vm templates"); - } else if (dstore instanceof SwiftTO) { - // TODO: need to move code from - // execute(uploadTemplateToSwiftFromSecondaryStorageCommand) here, - // but we need to handle - // source is url, most likely we need to modify our existing - // swiftUpload python script. - return new Answer(cmd, false, "Swift is not currently support DownloadCommand"); - } else { - return new Answer(cmd, false, "Unsupported image data store: " + dstore); - } - } } diff --git a/engine/storage/src/org/apache/cloudstack/storage/image/BaseImageStoreDriverImpl.java b/engine/storage/src/org/apache/cloudstack/storage/image/BaseImageStoreDriverImpl.java index 86462472743..f77226df2e0 100644 --- a/engine/storage/src/org/apache/cloudstack/storage/image/BaseImageStoreDriverImpl.java +++ b/engine/storage/src/org/apache/cloudstack/storage/image/BaseImageStoreDriverImpl.java @@ -20,8 +20,10 @@ package org.apache.cloudstack.storage.image; import com.cloud.agent.api.Answer; import com.cloud.agent.api.storage.DownloadAnswer; +import com.cloud.agent.api.storage.Proxy; import com.cloud.agent.api.to.DataObjectType; import com.cloud.agent.api.to.DataTO; +import com.cloud.configuration.dao.ConfigurationDao; import com.cloud.storage.VMTemplateStorageResourceAssoc; import com.cloud.storage.VMTemplateVO; import com.cloud.storage.VolumeVO; @@ -46,12 +48,14 @@ import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO; import org.apache.log4j.Logger; import javax.inject.Inject; +import java.net.URI; +import java.net.URISyntaxException; import java.util.Date; public abstract class BaseImageStoreDriverImpl implements ImageStoreDriver { private static final Logger s_logger = Logger.getLogger(BaseImageStoreDriverImpl.class); @Inject - VMTemplateDao _templateDao; + protected VMTemplateDao _templateDao; @Inject DownloadMonitor _downloadMonitor; @Inject @@ -62,6 +66,22 @@ public abstract class BaseImageStoreDriverImpl implements ImageStoreDriver { TemplateDataStoreDao _templateStoreDao; @Inject EndPointSelector _epSelector; + @Inject + ConfigurationDao configDao; + protected String _proxy = null; + + protected Proxy getHttpProxy() { + if (_proxy == null) { + return null; + } + try { + URI uri = new URI(_proxy); + Proxy prx = new Proxy(uri); + return prx; + } catch (URISyntaxException e) { + return null; + } + } @Override public DataTO getTO(DataObject data) { @@ -77,6 +97,14 @@ public abstract class BaseImageStoreDriverImpl implements ImageStoreDriver { } } + protected Long getMaxTemplateSizeInBytes() { + try { + return Long.parseLong(configDao.getValue("max.template.iso.size")) * 1024L * 1024L * 1024L; + } catch (NumberFormatException e) { + return null; + } + } + @Override public void createAsync(DataStore dataStore, DataObject data, AsyncCompletionCallback callback) { CreateContext context = new CreateContext(callback, data); diff --git a/plugins/storage/image/swift/src/org/apache/cloudstack/storage/datastore/driver/SwiftImageStoreDriverImpl.java b/plugins/storage/image/swift/src/org/apache/cloudstack/storage/datastore/driver/SwiftImageStoreDriverImpl.java index d6d6cd283f1..d9718507823 100644 --- a/plugins/storage/image/swift/src/org/apache/cloudstack/storage/datastore/driver/SwiftImageStoreDriverImpl.java +++ b/plugins/storage/image/swift/src/org/apache/cloudstack/storage/datastore/driver/SwiftImageStoreDriverImpl.java @@ -19,13 +19,26 @@ package org.apache.cloudstack.storage.datastore.driver; import java.util.Map; +import java.util.Timer; import javax.inject.Inject; +import com.cloud.agent.api.storage.DownloadAnswer; +import com.cloud.agent.api.to.DataObjectType; +import com.cloud.storage.download.DownloadListener; +import com.cloud.storage.template.TemplateConstants; +import com.cloud.storage.upload.UploadListener; +import com.cloud.template.VirtualMachineTemplate; +import com.cloud.utils.component.ComponentContext; import org.apache.cloudstack.api.ApiConstants; -import org.apache.cloudstack.engine.subsystem.api.storage.DataStore; +import org.apache.cloudstack.engine.subsystem.api.storage.*; +import org.apache.cloudstack.framework.async.AsyncCallbackDispatcher; +import org.apache.cloudstack.framework.async.AsyncCompletionCallback; +import org.apache.cloudstack.storage.command.DownloadCommand; +import org.apache.cloudstack.storage.command.DownloadProgressCommand; import org.apache.cloudstack.storage.datastore.db.ImageStoreDetailsDao; import org.apache.cloudstack.storage.image.BaseImageStoreDriverImpl; import org.apache.cloudstack.storage.image.store.ImageStoreImpl; +import org.apache.cloudstack.storage.to.TemplateObjectTO; import org.apache.log4j.Logger; import com.cloud.agent.api.to.DataStoreTO; @@ -38,13 +51,15 @@ public class SwiftImageStoreDriverImpl extends BaseImageStoreDriverImpl { @Inject ImageStoreDetailsDao _imageStoreDetailsDao; + @Inject + EndPointSelector _epSelector; @Override public DataStoreTO getStoreTO(DataStore store) { ImageStoreImpl imgStore = (ImageStoreImpl) store; Map details = _imageStoreDetailsDao.getDetails(imgStore.getId()); return new SwiftTO(imgStore.getId(), imgStore.getUri(), details.get(ApiConstants.ACCOUNT), - details.get(ApiConstants.USERNAME), details.get(ApiConstants.KEY)); + details.get(ApiConstants.USERNAME), details.get(ApiConstants.KEY), details.get(ApiConstants.S3_BUCKET_NAME)); } @Override @@ -52,4 +67,32 @@ public class SwiftImageStoreDriverImpl extends BaseImageStoreDriverImpl { throw new UnsupportedServiceException("Extract entity url is not yet supported for Swift image store provider"); } + @Override + public void createAsync(DataStore dataStore, DataObject data, AsyncCompletionCallback callback) { + Long maxTemplateSizeInBytes = getMaxTemplateSizeInBytes(); + VirtualMachineTemplate tmpl = _templateDao.findById(data.getId()); + DownloadCommand dcmd = new DownloadCommand((TemplateObjectTO)(data.getTO()), maxTemplateSizeInBytes); + dcmd.setProxy(getHttpProxy()); + + EndPoint ep = _epSelector.select(data); + if (ep == null) { + s_logger.warn("There is no secondary storage VM for downloading template to image store " + dataStore.getName()); + return; + } + + CreateContext context = new CreateContext(callback, data); + AsyncCallbackDispatcher caller = AsyncCallbackDispatcher + .create(this); + caller.setContext(context); + + if (data.getType() == DataObjectType.TEMPLATE) { + caller.setCallback(caller.getTarget().createTemplateAsyncCallback(null, null)); + } else if (data.getType() == DataObjectType.VOLUME) { + caller.setCallback(caller.getTarget().createVolumeAsyncCallback(null, null)); + } + ep.sendMessageAsync(dcmd, caller); + + + } + } diff --git a/scripts/storage/secondary/swift b/scripts/storage/secondary/swift index 4138db8b17e..8224b4d393b 100755 --- a/scripts/storage/secondary/swift +++ b/scripts/storage/secondary/swift @@ -1,5 +1,5 @@ #!/usr/bin/python -u -# Copyright (c) 2010-2011 OpenStack, LLC. +# Copyright (c) 2010-2012 OpenStack, LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,896 +13,51 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. +import signal +import socket +import logging from errno import EEXIST, ENOENT from hashlib import md5 -from optparse import OptionParser -from os import environ, listdir, makedirs, utime +from optparse import OptionParser, SUPPRESS_HELP +from os import environ, listdir, makedirs, utime, _exit as os_exit from os.path import basename, dirname, getmtime, getsize, isdir, join -from Queue import Empty, Queue +from Queue import Queue +from random import shuffle from sys import argv, exc_info, exit, stderr, stdout -from threading import enumerate as threading_enumerate, Thread -from time import sleep +from threading import Thread +from time import sleep, time, gmtime, strftime from traceback import format_exception - - -# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # -# Inclusion of swift.common.client for convenience of single file distribution - -import socket -from cStringIO import StringIO -from re import compile, DOTALL -from tokenize import generate_tokens, STRING, NAME, OP -from urllib import quote as _quote, unquote -from urlparse import urlparse, urlunparse +from urllib import quote, unquote try: - from eventlet.green.httplib import HTTPException, HTTPSConnection + import simplejson as json except ImportError: - from httplib import HTTPException, HTTPSConnection + import json -try: - from eventlet import sleep -except ImportError: - from time import sleep - -try: - from swift.common.bufferedhttp \ - import BufferedHTTPConnection as HTTPConnection -except ImportError: - try: - from eventlet.green.httplib import HTTPConnection - except ImportError: - from httplib import HTTPConnection +from swiftclient import Connection, ClientException, HTTPException, utils +from swiftclient.version import version_info -def quote(value, safe='/'): +def get_conn(options): """ - Patched version of urllib.quote that encodes utf8 strings before quoting + Return a connection building it from the options. """ - if isinstance(value, unicode): - value = value.encode('utf8') - return _quote(value, safe) - - -# look for a real json parser first -try: - # simplejson is popular and pretty good - from simplejson import loads as json_loads -except ImportError: - try: - # 2.6 will have a json module in the stdlib - from json import loads as json_loads - except ImportError: - # fall back on local parser otherwise - comments = compile(r'/\*.*\*/|//[^\r\n]*', DOTALL) - - def json_loads(string): - ''' - Fairly competent json parser exploiting the python tokenizer and - eval(). -- From python-cloudfiles - - _loads(serialized_json) -> object - ''' - try: - res = [] - consts = {'true': True, 'false': False, 'null': None} - string = '(' + comments.sub('', string) + ')' - for type, val, _junk, _junk, _junk in \ - generate_tokens(StringIO(string).readline): - if (type == OP and val not in '[]{}:,()-') or \ - (type == NAME and val not in consts): - raise AttributeError() - elif type == STRING: - res.append('u') - res.append(val.replace('\\/', '/')) - else: - res.append(val) - return eval(''.join(res), {}, consts) - except Exception: - raise AttributeError() - - -class ClientException(Exception): - - def __init__(self, msg, http_scheme='', http_host='', http_port='', - http_path='', http_query='', http_status=0, http_reason='', - http_device=''): - Exception.__init__(self, msg) - self.msg = msg - self.http_scheme = http_scheme - self.http_host = http_host - self.http_port = http_port - self.http_path = http_path - self.http_query = http_query - self.http_status = http_status - self.http_reason = http_reason - self.http_device = http_device - - def __str__(self): - a = self.msg - b = '' - if self.http_scheme: - b += '%s://' % self.http_scheme - if self.http_host: - b += self.http_host - if self.http_port: - b += ':%s' % self.http_port - if self.http_path: - b += self.http_path - if self.http_query: - b += '?%s' % self.http_query - if self.http_status: - if b: - b = '%s %s' % (b, self.http_status) - else: - b = str(self.http_status) - if self.http_reason: - if b: - b = '%s %s' % (b, self.http_reason) - else: - b = '- %s' % self.http_reason - if self.http_device: - if b: - b = '%s: device %s' % (b, self.http_device) - else: - b = 'device %s' % self.http_device - return b and '%s: %s' % (a, b) or a - - -def http_connection(url, proxy=None): - """ - Make an HTTPConnection or HTTPSConnection - - :param url: url to connect to - :param proxy: proxy to connect through, if any; None by default; str of the - format 'http://127.0.0.1:8888' to set one - :returns: tuple of (parsed url, connection object) - :raises ClientException: Unable to handle protocol scheme - """ - parsed = urlparse(url) - proxy_parsed = urlparse(proxy) if proxy else None - if parsed.scheme == 'http': - conn = HTTPConnection((proxy_parsed if proxy else parsed).netloc) - elif parsed.scheme == 'https': - conn = HTTPSConnection((proxy_parsed if proxy else parsed).netloc) - else: - raise ClientException('Cannot handle protocol scheme %s for url %s' % - (parsed.scheme, repr(url))) - if proxy: - conn._set_tunnel(parsed.hostname, parsed.port) - return parsed, conn - - -def get_auth(url, user, key, snet=False): - """ - Get authentication/authorization credentials. - - The snet parameter is used for Rackspace's ServiceNet internal network - implementation. In this function, it simply adds *snet-* to the beginning - of the host name for the returned storage URL. With Rackspace Cloud Files, - use of this network path causes no bandwidth charges but requires the - client to be running on Rackspace's ServiceNet network. - - :param url: authentication/authorization URL - :param user: user to authenticate as - :param key: key or password for authorization - :param snet: use SERVICENET internal network (see above), default is False - :returns: tuple of (storage URL, auth token) - :raises ClientException: HTTP GET request to auth URL failed - """ - parsed, conn = http_connection(url) - conn.request('GET', parsed.path, '', - {'X-Auth-User': user, 'X-Auth-Key': key}) - resp = conn.getresponse() - resp.read() - if resp.status < 200 or resp.status >= 300: - raise ClientException('Auth GET failed', http_scheme=parsed.scheme, - http_host=conn.host, http_port=conn.port, - http_path=parsed.path, http_status=resp.status, - http_reason=resp.reason) - url = resp.getheader('x-storage-url') - if snet: - parsed = list(urlparse(url)) - # Second item in the list is the netloc - parsed[1] = 'snet-' + parsed[1] - url = urlunparse(parsed) - return url, resp.getheader('x-storage-token', - resp.getheader('x-auth-token')) - - -def get_account(url, token, marker=None, limit=None, prefix=None, - http_conn=None, full_listing=False): - """ - Get a listing of containers for the account. - - :param url: storage URL - :param token: auth token - :param marker: marker query - :param limit: limit query - :param prefix: prefix query - :param http_conn: HTTP connection object (If None, it will create the - conn object) - :param full_listing: if True, return a full listing, else returns a max - of 10000 listings - :returns: a tuple of (response headers, a list of containers) The response - headers will be a dict and all header names will be lowercase. - :raises ClientException: HTTP GET request failed - """ - if not http_conn: - http_conn = http_connection(url) - if full_listing: - rv = get_account(url, token, marker, limit, prefix, http_conn) - listing = rv[1] - while listing: - marker = listing[-1]['name'] - listing = \ - get_account(url, token, marker, limit, prefix, http_conn)[1] - if listing: - rv[1].extend(listing) - return rv - parsed, conn = http_conn - qs = 'format=json' - if marker: - qs += '&marker=%s' % quote(marker) - if limit: - qs += '&limit=%d' % limit - if prefix: - qs += '&prefix=%s' % quote(prefix) - conn.request('GET', '%s?%s' % (parsed.path, qs), '', - {'X-Auth-Token': token}) - resp = conn.getresponse() - resp_headers = {} - for header, value in resp.getheaders(): - resp_headers[header.lower()] = value - if resp.status < 200 or resp.status >= 300: - resp.read() - raise ClientException('Account GET failed', http_scheme=parsed.scheme, - http_host=conn.host, http_port=conn.port, - http_path=parsed.path, http_query=qs, http_status=resp.status, - http_reason=resp.reason) - if resp.status == 204: - resp.read() - return resp_headers, [] - return resp_headers, json_loads(resp.read()) - - -def head_account(url, token, http_conn=None): - """ - Get account stats. - - :param url: storage URL - :param token: auth token - :param http_conn: HTTP connection object (If None, it will create the - conn object) - :returns: a dict containing the response's headers (all header names will - be lowercase) - :raises ClientException: HTTP HEAD request failed - """ - if http_conn: - parsed, conn = http_conn - else: - parsed, conn = http_connection(url) - conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token}) - resp = conn.getresponse() - resp.read() - if resp.status < 200 or resp.status >= 300: - raise ClientException('Account HEAD failed', http_scheme=parsed.scheme, - http_host=conn.host, http_port=conn.port, - http_path=parsed.path, http_status=resp.status, - http_reason=resp.reason) - resp_headers = {} - for header, value in resp.getheaders(): - resp_headers[header.lower()] = value - return resp_headers - - -def post_account(url, token, headers, http_conn=None): - """ - Update an account's metadata. - - :param url: storage URL - :param token: auth token - :param headers: additional headers to include in the request - :param http_conn: HTTP connection object (If None, it will create the - conn object) - :raises ClientException: HTTP POST request failed - """ - if http_conn: - parsed, conn = http_conn - else: - parsed, conn = http_connection(url) - headers['X-Auth-Token'] = token - conn.request('POST', parsed.path, '', headers) - resp = conn.getresponse() - resp.read() - if resp.status < 200 or resp.status >= 300: - raise ClientException('Account POST failed', - http_scheme=parsed.scheme, http_host=conn.host, - http_port=conn.port, http_path=path, http_status=resp.status, - http_reason=resp.reason) - - -def get_container(url, token, container, marker=None, limit=None, - prefix=None, delimiter=None, http_conn=None, - full_listing=False): - """ - Get a listing of objects for the container. - - :param url: storage URL - :param token: auth token - :param container: container name to get a listing for - :param marker: marker query - :param limit: limit query - :param prefix: prefix query - :param delimeter: string to delimit the queries on - :param http_conn: HTTP connection object (If None, it will create the - conn object) - :param full_listing: if True, return a full listing, else returns a max - of 10000 listings - :returns: a tuple of (response headers, a list of objects) The response - headers will be a dict and all header names will be lowercase. - :raises ClientException: HTTP GET request failed - """ - if not http_conn: - http_conn = http_connection(url) - if full_listing: - rv = get_container(url, token, container, marker, limit, prefix, - delimiter, http_conn) - listing = rv[1] - while listing: - if not delimiter: - marker = listing[-1]['name'] - else: - marker = listing[-1].get('name', listing[-1].get('subdir')) - listing = get_container(url, token, container, marker, limit, - prefix, delimiter, http_conn)[1] - if listing: - rv[1].extend(listing) - return rv - parsed, conn = http_conn - path = '%s/%s' % (parsed.path, quote(container)) - qs = 'format=json' - if marker: - qs += '&marker=%s' % quote(marker) - if limit: - qs += '&limit=%d' % limit - if prefix: - qs += '&prefix=%s' % quote(prefix) - if delimiter: - qs += '&delimiter=%s' % quote(delimiter) - conn.request('GET', '%s?%s' % (path, qs), '', {'X-Auth-Token': token}) - resp = conn.getresponse() - if resp.status < 200 or resp.status >= 300: - resp.read() - raise ClientException('Container GET failed', - http_scheme=parsed.scheme, http_host=conn.host, - http_port=conn.port, http_path=path, http_query=qs, - http_status=resp.status, http_reason=resp.reason) - resp_headers = {} - for header, value in resp.getheaders(): - resp_headers[header.lower()] = value - if resp.status == 204: - resp.read() - return resp_headers, [] - return resp_headers, json_loads(resp.read()) - - -def head_container(url, token, container, http_conn=None): - """ - Get container stats. - - :param url: storage URL - :param token: auth token - :param container: container name to get stats for - :param http_conn: HTTP connection object (If None, it will create the - conn object) - :returns: a dict containing the response's headers (all header names will - be lowercase) - :raises ClientException: HTTP HEAD request failed - """ - if http_conn: - parsed, conn = http_conn - else: - parsed, conn = http_connection(url) - path = '%s/%s' % (parsed.path, quote(container)) - conn.request('HEAD', path, '', {'X-Auth-Token': token}) - resp = conn.getresponse() - resp.read() - if resp.status < 200 or resp.status >= 300: - raise ClientException('Container HEAD failed', - http_scheme=parsed.scheme, http_host=conn.host, - http_port=conn.port, http_path=path, http_status=resp.status, - http_reason=resp.reason) - resp_headers = {} - for header, value in resp.getheaders(): - resp_headers[header.lower()] = value - return resp_headers - - -def put_container(url, token, container, headers=None, http_conn=None): - """ - Create a container - - :param url: storage URL - :param token: auth token - :param container: container name to create - :param headers: additional headers to include in the request - :param http_conn: HTTP connection object (If None, it will create the - conn object) - :raises ClientException: HTTP PUT request failed - """ - if http_conn: - parsed, conn = http_conn - else: - parsed, conn = http_connection(url) - path = '%s/%s' % (parsed.path, quote(container)) - if not headers: - headers = {} - headers['X-Auth-Token'] = token - conn.request('PUT', path, '', headers) - resp = conn.getresponse() - resp.read() - if resp.status < 200 or resp.status >= 300: - raise ClientException('Container PUT failed', - http_scheme=parsed.scheme, http_host=conn.host, - http_port=conn.port, http_path=path, http_status=resp.status, - http_reason=resp.reason) - - -def post_container(url, token, container, headers, http_conn=None): - """ - Update a container's metadata. - - :param url: storage URL - :param token: auth token - :param container: container name to update - :param headers: additional headers to include in the request - :param http_conn: HTTP connection object (If None, it will create the - conn object) - :raises ClientException: HTTP POST request failed - """ - if http_conn: - parsed, conn = http_conn - else: - parsed, conn = http_connection(url) - path = '%s/%s' % (parsed.path, quote(container)) - headers['X-Auth-Token'] = token - conn.request('POST', path, '', headers) - resp = conn.getresponse() - resp.read() - if resp.status < 200 or resp.status >= 300: - raise ClientException('Container POST failed', - http_scheme=parsed.scheme, http_host=conn.host, - http_port=conn.port, http_path=path, http_status=resp.status, - http_reason=resp.reason) - - -def delete_container(url, token, container, http_conn=None): - """ - Delete a container - - :param url: storage URL - :param token: auth token - :param container: container name to delete - :param http_conn: HTTP connection object (If None, it will create the - conn object) - :raises ClientException: HTTP DELETE request failed - """ - if http_conn: - parsed, conn = http_conn - else: - parsed, conn = http_connection(url) - path = '%s/%s' % (parsed.path, quote(container)) - conn.request('DELETE', path, '', {'X-Auth-Token': token}) - resp = conn.getresponse() - resp.read() - if resp.status < 200 or resp.status >= 300: - raise ClientException('Container DELETE failed', - http_scheme=parsed.scheme, http_host=conn.host, - http_port=conn.port, http_path=path, http_status=resp.status, - http_reason=resp.reason) - - -def get_object(url, token, container, name, http_conn=None, - resp_chunk_size=None): - """ - Get an object - - :param url: storage URL - :param token: auth token - :param container: container name that the object is in - :param name: object name to get - :param http_conn: HTTP connection object (If None, it will create the - conn object) - :param resp_chunk_size: if defined, chunk size of data to read. NOTE: If - you specify a resp_chunk_size you must fully read - the object's contents before making another - request. - :returns: a tuple of (response headers, the object's contents) The response - headers will be a dict and all header names will be lowercase. - :raises ClientException: HTTP GET request failed - """ - if http_conn: - parsed, conn = http_conn - else: - parsed, conn = http_connection(url) - path = '%s/%s/%s' % (parsed.path, quote(container), quote(name)) - conn.request('GET', path, '', {'X-Auth-Token': token}) - resp = conn.getresponse() - if resp.status < 200 or resp.status >= 300: - resp.read() - raise ClientException('Object GET failed', http_scheme=parsed.scheme, - http_host=conn.host, http_port=conn.port, http_path=path, - http_status=resp.status, http_reason=resp.reason) - if resp_chunk_size: - - def _object_body(): - buf = resp.read(resp_chunk_size) - while buf: - yield buf - buf = resp.read(resp_chunk_size) - object_body = _object_body() - else: - object_body = resp.read() - resp_headers = {} - for header, value in resp.getheaders(): - resp_headers[header.lower()] = value - return resp_headers, object_body - - -def head_object(url, token, container, name, http_conn=None): - """ - Get object info - - :param url: storage URL - :param token: auth token - :param container: container name that the object is in - :param name: object name to get info for - :param http_conn: HTTP connection object (If None, it will create the - conn object) - :returns: a dict containing the response's headers (all header names will - be lowercase) - :raises ClientException: HTTP HEAD request failed - """ - if http_conn: - parsed, conn = http_conn - else: - parsed, conn = http_connection(url) - path = '%s/%s/%s' % (parsed.path, quote(container), quote(name)) - conn.request('HEAD', path, '', {'X-Auth-Token': token}) - resp = conn.getresponse() - resp.read() - if resp.status < 200 or resp.status >= 300: - raise ClientException('Object HEAD failed', http_scheme=parsed.scheme, - http_host=conn.host, http_port=conn.port, http_path=path, - http_status=resp.status, http_reason=resp.reason) - resp_headers = {} - for header, value in resp.getheaders(): - resp_headers[header.lower()] = value - return resp_headers - - -def put_object(url, token=None, container=None, name=None, contents=None, - content_length=None, etag=None, chunk_size=65536, - content_type=None, headers=None, http_conn=None, proxy=None): - """ - Put an object - - :param url: storage URL - :param token: auth token; if None, no token will be sent - :param container: container name that the object is in; if None, the - container name is expected to be part of the url - :param name: object name to put; if None, the object name is expected to be - part of the url - :param contents: a string or a file like object to read object data from; - if None, a zero-byte put will be done - :param content_length: value to send as content-length header; also limits - the amount read from contents; if None, it will be - computed via the contents or chunked transfer - encoding will be used - :param etag: etag of contents; if None, no etag will be sent - :param chunk_size: chunk size of data to write; default 65536 - :param content_type: value to send as content-type header; if None, no - content-type will be set (remote end will likely try - to auto-detect it) - :param headers: additional headers to include in the request, if any - :param http_conn: HTTP connection object (If None, it will create the - conn object) - :param proxy: proxy to connect through, if any; None by default; str of the - format 'http://127.0.0.1:8888' to set one - :returns: etag from server response - :raises ClientException: HTTP PUT request failed - """ - if http_conn: - parsed, conn = http_conn - else: - parsed, conn = http_connection(url, proxy=proxy) - path = parsed.path - if container: - path = '%s/%s' % (path.rstrip('/'), quote(container)) - if name: - path = '%s/%s' % (path.rstrip('/'), quote(name)) - if headers: - headers = dict(headers) - else: - headers = {} - if token: - headers['X-Auth-Token'] = token - if etag: - headers['ETag'] = etag.strip('"') - if content_length is not None: - headers['Content-Length'] = str(content_length) - else: - for n, v in headers.iteritems(): - if n.lower() == 'content-length': - content_length = int(v) - if content_type is not None: - headers['Content-Type'] = content_type - if not contents: - headers['Content-Length'] = '0' - if hasattr(contents, 'read'): - conn.putrequest('PUT', path) - for header, value in headers.iteritems(): - conn.putheader(header, value) - if content_length is None: - conn.putheader('Transfer-Encoding', 'chunked') - conn.endheaders() - chunk = contents.read(chunk_size) - while chunk: - conn.send('%x\r\n%s\r\n' % (len(chunk), chunk)) - chunk = contents.read(chunk_size) - conn.send('0\r\n\r\n') - else: - conn.endheaders() - left = content_length - while left > 0: - size = chunk_size - if size > left: - size = left - chunk = contents.read(size) - conn.send(chunk) - left -= len(chunk) - else: - conn.request('PUT', path, contents, headers) - resp = conn.getresponse() - resp.read() - if resp.status < 200 or resp.status >= 300: - raise ClientException('Object PUT failed', http_scheme=parsed.scheme, - http_host=conn.host, http_port=conn.port, http_path=path, - http_status=resp.status, http_reason=resp.reason) - return resp.getheader('etag', '').strip('"') - - -def post_object(url, token, container, name, headers, http_conn=None): - """ - Update object metadata - - :param url: storage URL - :param token: auth token - :param container: container name that the object is in - :param name: name of the object to update - :param headers: additional headers to include in the request - :param http_conn: HTTP connection object (If None, it will create the - conn object) - :raises ClientException: HTTP POST request failed - """ - if http_conn: - parsed, conn = http_conn - else: - parsed, conn = http_connection(url) - path = '%s/%s/%s' % (parsed.path, quote(container), quote(name)) - headers['X-Auth-Token'] = token - conn.request('POST', path, '', headers) - resp = conn.getresponse() - resp.read() - if resp.status < 200 or resp.status >= 300: - raise ClientException('Object POST failed', http_scheme=parsed.scheme, - http_host=conn.host, http_port=conn.port, http_path=path, - http_status=resp.status, http_reason=resp.reason) - - -def delete_object(url, token=None, container=None, name=None, http_conn=None, - headers=None, proxy=None): - """ - Delete object - - :param url: storage URL - :param token: auth token; if None, no token will be sent - :param container: container name that the object is in; if None, the - container name is expected to be part of the url - :param name: object name to delete; if None, the object name is expected to - be part of the url - :param http_conn: HTTP connection object (If None, it will create the - conn object) - :param headers: additional headers to include in the request - :param proxy: proxy to connect through, if any; None by default; str of the - format 'http://127.0.0.1:8888' to set one - :raises ClientException: HTTP DELETE request failed - """ - if http_conn: - parsed, conn = http_conn - else: - parsed, conn = http_connection(url, proxy=proxy) - path = parsed.path - if container: - path = '%s/%s' % (path.rstrip('/'), quote(container)) - if name: - path = '%s/%s' % (path.rstrip('/'), quote(name)) - if headers: - headers = dict(headers) - else: - headers = {} - if token: - headers['X-Auth-Token'] = token - conn.request('DELETE', path, '', headers) - resp = conn.getresponse() - resp.read() - if resp.status < 200 or resp.status >= 300: - raise ClientException('Object DELETE failed', - http_scheme=parsed.scheme, http_host=conn.host, - http_port=conn.port, http_path=path, http_status=resp.status, - http_reason=resp.reason) - - -class Connection(object): - """Convenience class to make requests that will also retry the request""" - - def __init__(self, authurl, user, key, retries=5, preauthurl=None, - preauthtoken=None, snet=False, starting_backoff=1): - """ - :param authurl: authenitcation URL - :param user: user name to authenticate as - :param key: key/password to authenticate with - :param retries: Number of times to retry the request before failing - :param preauthurl: storage URL (if you have already authenticated) - :param preauthtoken: authentication token (if you have already - authenticated) - :param snet: use SERVICENET internal network default is False - """ - self.authurl = authurl - self.user = user - self.key = key - self.retries = retries - self.http_conn = None - self.url = preauthurl - self.token = preauthtoken - self.attempts = 0 - self.snet = snet - self.starting_backoff = starting_backoff - - def get_auth(self): - return get_auth(self.authurl, self.user, self.key, snet=self.snet) - - def http_connection(self): - return http_connection(self.url) - - def _retry(self, reset_func, func, *args, **kwargs): - self.attempts = 0 - backoff = self.starting_backoff - while self.attempts <= self.retries: - self.attempts += 1 - try: - if not self.url or not self.token: - self.url, self.token = self.get_auth() - self.http_conn = None - if not self.http_conn: - self.http_conn = self.http_connection() - kwargs['http_conn'] = self.http_conn - rv = func(self.url, self.token, *args, **kwargs) - return rv - except (socket.error, HTTPException): - if self.attempts > self.retries: - raise - self.http_conn = None - except ClientException, err: - if self.attempts > self.retries: - raise - if err.http_status == 401: - self.url = self.token = None - if self.attempts > 1: - raise - elif err.http_status == 408: - self.http_conn = None - elif 500 <= err.http_status <= 599: - pass - else: - raise - sleep(backoff) - backoff *= 2 - if reset_func: - reset_func(func, *args, **kwargs) - - def head_account(self): - """Wrapper for :func:`head_account`""" - return self._retry(None, head_account) - - def get_account(self, marker=None, limit=None, prefix=None, - full_listing=False): - """Wrapper for :func:`get_account`""" - # TODO(unknown): With full_listing=True this will restart the entire - # listing with each retry. Need to make a better version that just - # retries where it left off. - return self._retry(None, get_account, marker=marker, limit=limit, - prefix=prefix, full_listing=full_listing) - - def post_account(self, headers): - """Wrapper for :func:`post_account`""" - return self._retry(None, post_account, headers) - - def head_container(self, container): - """Wrapper for :func:`head_container`""" - return self._retry(None, head_container, container) - - def get_container(self, container, marker=None, limit=None, prefix=None, - delimiter=None, full_listing=False): - """Wrapper for :func:`get_container`""" - # TODO(unknown): With full_listing=True this will restart the entire - # listing with each retry. Need to make a better version that just - # retries where it left off. - return self._retry(None, get_container, container, marker=marker, - limit=limit, prefix=prefix, delimiter=delimiter, - full_listing=full_listing) - - def put_container(self, container, headers=None): - """Wrapper for :func:`put_container`""" - return self._retry(None, put_container, container, headers=headers) - - def post_container(self, container, headers): - """Wrapper for :func:`post_container`""" - return self._retry(None, post_container, container, headers) - - def delete_container(self, container): - """Wrapper for :func:`delete_container`""" - return self._retry(None, delete_container, container) - - def head_object(self, container, obj): - """Wrapper for :func:`head_object`""" - return self._retry(None, head_object, container, obj) - - def get_object(self, container, obj, resp_chunk_size=None): - """Wrapper for :func:`get_object`""" - return self._retry(None, get_object, container, obj, - resp_chunk_size=resp_chunk_size) - - def put_object(self, container, obj, contents, content_length=None, - etag=None, chunk_size=65536, content_type=None, - headers=None): - """Wrapper for :func:`put_object`""" - - def _default_reset(*args, **kwargs): - raise ClientException('put_object(%r, %r, ...) failure and no ' - 'ability to reset contents for reupload.' % (container, obj)) - - reset_func = _default_reset - tell = getattr(contents, 'tell', None) - seek = getattr(contents, 'seek', None) - if tell and seek: - orig_pos = tell() - reset_func = lambda *a, **k: seek(orig_pos) - elif not contents: - reset_func = lambda *a, **k: None - - return self._retry(reset_func, put_object, container, obj, contents, - content_length=content_length, etag=etag, chunk_size=chunk_size, - content_type=content_type, headers=headers) - - def post_object(self, container, obj, headers): - """Wrapper for :func:`post_object`""" - return self._retry(None, post_object, container, obj, headers) - - def delete_object(self, container, obj): - """Wrapper for :func:`delete_object`""" - return self._retry(None, delete_object, container, obj) - -# End inclusion of swift.common.client -# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + return Connection(options.auth, + options.user, + options.key, + auth_version=options.auth_version, + os_options=options.os_options, + snet=options.snet, + cacert=options.os_cacert, + insecure=options.insecure, + ssl_compression=options.ssl_compression) def mkdirs(path): try: makedirs(path) - except OSError, err: + except OSError as err: if err.errno != EEXIST: raise @@ -925,39 +80,64 @@ def put_errors_from_threads(threads, error_queue): return was_error +class StopWorkerThreadSignal(object): + pass + + class QueueFunctionThread(Thread): def __init__(self, queue, func, *args, **kwargs): - """ Calls func for each item in queue; func is called with a queued - item as the first arg followed by *args and **kwargs. Use the abort - attribute to have the thread empty the queue (without processing) - and exit. """ + """ + Calls func for each item in queue; func is called with a queued + item as the first arg followed by *args and **kwargs. Use the + PriorityQueue for sending quit signal when Ctrl-C is pressed. + """ Thread.__init__(self) - self.abort = False self.queue = queue self.func = func self.args = args self.kwargs = kwargs self.exc_infos = [] + self.results = [] + self.store_results = kwargs.pop('store_results', False) def run(self): - try: - while True: + while True: + try: + item = self.queue.get() + if isinstance(item, StopWorkerThreadSignal): + break + except: + # This catch is important and it may occur when ctrl-C is + # pressed, in this case simply quit the thread + break + else: try: - item = self.queue.get_nowait() - if not self.abort: - self.func(item, *self.args, **self.kwargs) - self.queue.task_done() - except Empty: - if self.abort: - break - sleep(0.01) - except Exception: - self.exc_infos.append(exc_info()) + self.func(item, *self.args, **self.kwargs) + except Exception: + self.exc_infos.append(exc_info()) + + +def shutdown_worker_threads(queue, thread_list): + """ + Takes a job queue and a list of associated QueueFunctionThread objects, + puts a StopWorkerThreadSignal object into the queue, and waits for the + queue to flush. + """ + for thread in [t for t in thread_list if t.isAlive()]: + queue.put(StopWorkerThreadSignal()) + + while any(map(QueueFunctionThread.is_alive, thread_list)): + sleep(0.05) + + +def immediate_exit(signum, frame): + stderr.write(" Aborted\n") + os_exit(2) st_delete_help = ''' -delete --all OR delete container [--leave-segments] [object] [object] ... +delete [options] --all OR delete container [options] [object] [object] ... Deletes everything in the account (with --all), or everything in a container, or a list of objects depending on the args given. Segments of manifest objects will be deleted as well, unless you specify the @@ -965,12 +145,21 @@ delete --all OR delete container [--leave-segments] [object] [object] ... def st_delete(parser, args, print_queue, error_queue): - parser.add_option('-a', '--all', action='store_true', dest='yes_all', + parser.add_option( + '-a', '--all', action='store_true', dest='yes_all', default=False, help='Indicates that you really want to delete ' 'everything in the account') - parser.add_option('', '--leave-segments', action='store_true', - dest='leave_segments', default=False, help='Indicates that you want ' - 'the segments of manifest objects left alone') + parser.add_option( + '', '--leave-segments', action='store_true', + dest='leave_segments', default=False, + help='Indicates that you want the segments of manifest' + 'objects left alone') + parser.add_option( + '', '--object-threads', type=int, + default=10, help='Number of threads to use for deleting objects') + parser.add_option('', '--container-threads', type=int, + default=10, help='Number of threads to use for ' + 'deleting containers') (options, args) = parse_args(parser, args) args = args[1:] if (not args and not options.yes_all) or (args and options.yes_all): @@ -992,32 +181,34 @@ def st_delete(parser, args, print_queue, error_queue): def _delete_object((container, obj), conn): try: old_manifest = None + query_string = None if not options.leave_segments: try: - old_manifest = conn.head_object(container, obj).get( - 'x-object-manifest') - except ClientException, err: + headers = conn.head_object(container, obj) + old_manifest = headers.get('x-object-manifest') + if utils.config_true_value( + headers.get('x-static-large-object')): + query_string = 'multipart-manifest=delete' + except ClientException as err: if err.http_status != 404: raise - conn.delete_object(container, obj) + conn.delete_object(container, obj, query_string=query_string) if old_manifest: segment_queue = Queue(10000) scontainer, sprefix = old_manifest.split('/', 1) + scontainer = unquote(scontainer) + sprefix = unquote(sprefix).rstrip('/') + '/' for delobj in conn.get_container(scontainer, prefix=sprefix)[1]: segment_queue.put((scontainer, delobj['name'])) if not segment_queue.empty(): - segment_threads = [QueueFunctionThread(segment_queue, + segment_threads = [QueueFunctionThread( + segment_queue, _delete_segment, create_connection()) for _junk in - xrange(10)] + xrange(options.object_threads)] for thread in segment_threads: thread.start() - while not segment_queue.empty(): - sleep(0.01) - for thread in segment_threads: - thread.abort = True - while thread.isAlive(): - thread.join(0.01) + shutdown_worker_threads(segment_queue, segment_threads) put_errors_from_threads(segment_threads, error_queue) if options.verbose: path = options.yes_all and join(container, obj) or obj @@ -1028,7 +219,7 @@ def st_delete(parser, args, print_queue, error_queue): (path, conn.attempts)) else: print_queue.put(path) - except ClientException, err: + except ClientException as err: if err.http_status != 404: raise error_queue.put('Object %s not found' % @@ -1048,97 +239,109 @@ def st_delete(parser, args, print_queue, error_queue): object_queue.put((container, obj)) marker = objects[-1] while not object_queue.empty(): - sleep(0.01) + sleep(0.05) attempts = 1 while True: try: conn.delete_container(container) break - except ClientException, err: + except ClientException as err: if err.http_status != 409: raise if attempts > 10: raise attempts += 1 sleep(1) - except ClientException, err: + except ClientException as err: if err.http_status != 404: raise error_queue.put('Container %s not found' % repr(container)) - url, token = get_auth(options.auth, options.user, options.key, - snet=options.snet) - create_connection = lambda: Connection(options.auth, options.user, - options.key, preauthurl=url, preauthtoken=token, snet=options.snet) - object_threads = [QueueFunctionThread(object_queue, _delete_object, - create_connection()) for _junk in xrange(10)] + create_connection = lambda: get_conn(options) + object_threads = \ + [QueueFunctionThread(object_queue, _delete_object, create_connection()) + for _junk in xrange(options.object_threads)] for thread in object_threads: thread.start() - container_threads = [QueueFunctionThread(container_queue, - _delete_container, create_connection()) for _junk in xrange(10)] + container_threads = \ + [QueueFunctionThread(container_queue, _delete_container, + create_connection()) + for _junk in xrange(options.container_threads)] for thread in container_threads: thread.start() - if not args: - conn = create_connection() - try: - marker = '' - while True: - containers = \ - [c['name'] for c in conn.get_account(marker=marker)[1]] - if not containers: - break - for container in containers: - container_queue.put(container) - marker = containers[-1] - while not container_queue.empty(): - sleep(0.01) - while not object_queue.empty(): - sleep(0.01) - except ClientException, err: - if err.http_status != 404: - raise - error_queue.put('Account not found') - elif len(args) == 1: - if '/' in args[0]: - print >> stderr, 'WARNING: / in container name; you might have ' \ - 'meant %r instead of %r.' % \ - (args[0].replace('/', ' ', 1), args[0]) - conn = create_connection() - _delete_container(args[0], conn) - else: - for obj in args[1:]: - object_queue.put((args[0], obj)) - while not container_queue.empty(): - sleep(0.01) - for thread in container_threads: - thread.abort = True - while thread.isAlive(): - thread.join(0.01) - put_errors_from_threads(container_threads, error_queue) - while not object_queue.empty(): - sleep(0.01) - for thread in object_threads: - thread.abort = True - while thread.isAlive(): - thread.join(0.01) - put_errors_from_threads(object_threads, error_queue) + + try: + if not args: + conn = create_connection() + try: + marker = '' + while True: + containers = [ + c['name'] for c in conn.get_account(marker=marker)[1]] + if not containers: + break + for container in containers: + container_queue.put(container) + marker = containers[-1] + except ClientException as err: + if err.http_status != 404: + raise + error_queue.put('Account not found') + elif len(args) == 1: + if '/' in args[0]: + print >> stderr, 'WARNING: / in container name; you might ' \ + 'have meant %r instead of %r.' % ( + args[0].replace('/', ' ', 1), args[0]) + conn = create_connection() + _delete_container(args[0], conn) + else: + for obj in args[1:]: + object_queue.put((args[0], obj)) + finally: + shutdown_worker_threads(container_queue, container_threads) + put_errors_from_threads(container_threads, error_queue) + + shutdown_worker_threads(object_queue, object_threads) + put_errors_from_threads(object_threads, error_queue) st_download_help = ''' -download --all OR download container [options] [object] [object] ... - Downloads everything in the account (with --all), or everything in a - container, or a list of objects depending on the args given. For a single - object download, you may use the -o [--output] option to - redirect the output to a specific file or if "-" then just redirect to - stdout.'''.strip('\n') +download --all [options] OR download container [options] [object] [object] ... + Downloads everything in the account (with --all), or everything in all + containers in the account matching a prefix (with --all and -p [--prefix]), + or everything in a container, or a subset of a container with -p + [--prefix], or a list of objects depending on the args given. -p or + --prefix is an option that will only download items beginning with that + prefix. For a single object download, you may use the -o [--output] + option to redirect the output to a specific file or if "-" then + just redirect to stdout.'''.strip('\n') -def st_download(options, args, print_queue, error_queue): - parser.add_option('-a', '--all', action='store_true', dest='yes_all', +def st_download(parser, args, print_queue, error_queue): + parser.add_option( + '-a', '--all', action='store_true', dest='yes_all', default=False, help='Indicates that you really want to download ' 'everything in the account') - parser.add_option('-o', '--output', dest='out_file', help='For a single ' + parser.add_option( + '-m', '--marker', dest='marker', + default='', help='Marker to use when starting a container or ' + 'account download') + parser.add_option( + '-p', '--prefix', dest='prefix', + help='Will only download items beginning with the prefix') + parser.add_option( + '-o', '--output', dest='out_file', help='For a single ' 'file download, stream the output to an alternate location ') + parser.add_option( + '', '--object-threads', type=int, + default=10, help='Number of threads to use for downloading objects') + parser.add_option( + '', '--container-threads', type=int, default=10, + help='Number of threads to use for listing containers') + parser.add_option( + '', '--no-download', action='store_true', + default=False, + help="Perform download(s), but don't actually write anything to disk") (options, args) = parse_args(parser, args) args = args[1:] if options.out_file == '-': @@ -1161,8 +364,10 @@ def st_download(options, args, print_queue, error_queue): else: raise Exception("Invalid queue_arg length of %s" % len(queue_arg)) try: + start_time = time() headers, body = \ conn.get_object(container, obj, resp_chunk_size=65536) + header_receipt = time() content_type = headers.get('content-type') if 'content-length' in headers: content_length = int(headers.get('content-length')) @@ -1173,12 +378,13 @@ def st_download(options, args, print_queue, error_queue): if path[:1] in ('/', '\\'): path = path[1:] md5sum = None - make_dir = out_file != "-" + make_dir = not options.no_download and out_file != "-" if content_type.split(';', 1)[0] == 'text/directory': if make_dir and not isdir(path): mkdirs(path) read_length = 0 - if 'x-object-manifest' not in headers: + if 'x-object-manifest' not in headers and \ + 'x-static-large-object' not in headers: md5sum = md5() for chunk in body: read_length += len(chunk) @@ -1188,37 +394,47 @@ def st_download(options, args, print_queue, error_queue): dirpath = dirname(path) if make_dir and dirpath and not isdir(dirpath): mkdirs(dirpath) - if out_file == "-": - fp = stdout - elif out_file: - fp = open(out_file, 'wb') - else: - fp = open(path, 'wb') + if not options.no_download: + if out_file == "-": + fp = stdout + elif out_file: + fp = open(out_file, 'wb') + else: + fp = open(path, 'wb') read_length = 0 - if 'x-object-manifest' not in headers: + if 'x-object-manifest' not in headers and \ + 'x-static-large-object' not in headers: md5sum = md5() for chunk in body: - fp.write(chunk) + if not options.no_download: + fp.write(chunk) read_length += len(chunk) if md5sum: md5sum.update(chunk) - fp.close() + if not options.no_download: + fp.close() if md5sum and md5sum.hexdigest() != etag: error_queue.put('%s: md5sum != etag, %s != %s' % (path, md5sum.hexdigest(), etag)) if content_length is not None and read_length != content_length: error_queue.put('%s: read_length != content_length, %d != %d' % (path, read_length, content_length)) - if 'x-object-meta-mtime' in headers and not options.out_file: + if 'x-object-meta-mtime' in headers and not options.out_file \ + and not options.no_download: + mtime = float(headers['x-object-meta-mtime']) utime(path, (mtime, mtime)) if options.verbose: + finish_time = time() + time_str = 'headers %.3fs, total %.3fs, %.3fs MB/s' % ( + header_receipt - start_time, finish_time - start_time, + float(read_length) / (finish_time - start_time) / 1000000) if conn.attempts > 1: - print_queue.put('%s [after %d attempts' % - (path, conn.attempts)) + print_queue.put('%s [%s after %d attempts]' % + (path, time_str, conn.attempts)) else: - print_queue.put(path) - except ClientException, err: + print_queue.put('%s [%s]' % (path, time_str)) + except ClientException as err: if err.http_status != 404: raise error_queue.put('Object %s not found' % @@ -1226,120 +442,212 @@ def st_download(options, args, print_queue, error_queue): container_queue = Queue(10000) - def _download_container(container, conn): + def _download_container(container, conn, prefix=None): try: - marker = '' + marker = options.marker while True: - objects = [o['name'] for o in - conn.get_container(container, marker=marker)[1]] + objects = [ + o['name'] for o in + conn.get_container(container, marker=marker, + prefix=prefix)[1]] if not objects: break + marker = objects[-1] + shuffle(objects) for obj in objects: object_queue.put((container, obj)) - marker = objects[-1] - except ClientException, err: + except ClientException as err: if err.http_status != 404: raise error_queue.put('Container %s not found' % repr(container)) - url, token = get_auth(options.auth, options.user, options.key, - snet=options.snet) - create_connection = lambda: Connection(options.auth, options.user, - options.key, preauthurl=url, preauthtoken=token, snet=options.snet) - object_threads = [QueueFunctionThread(object_queue, _download_object, - create_connection()) for _junk in xrange(10)] + create_connection = lambda: get_conn(options) + object_threads = [QueueFunctionThread( + object_queue, _download_object, + create_connection()) for _junk in xrange(options.object_threads)] for thread in object_threads: thread.start() - container_threads = [QueueFunctionThread(container_queue, - _download_container, create_connection()) for _junk in xrange(10)] + container_threads = [QueueFunctionThread( + container_queue, + _download_container, create_connection()) + for _junk in xrange(options.container_threads)] for thread in container_threads: thread.start() - if not args: - conn = create_connection() - try: - marker = '' - while True: - containers = [c['name'] - for c in conn.get_account(marker=marker)[1]] - if not containers: - break - for container in containers: - container_queue.put(container) - marker = containers[-1] - except ClientException, err: - if err.http_status != 404: - raise - error_queue.put('Account not found') - elif len(args) == 1: - if '/' in args[0]: - print >> stderr, 'WARNING: / in container name; you might have ' \ - 'meant %r instead of %r.' % \ - (args[0].replace('/', ' ', 1), args[0]) - _download_container(args[0], create_connection()) - else: - if len(args) == 2: - obj = args[1] - object_queue.put((args[0], obj, options.out_file)) + + # We musn't let the main thread die with an exception while non-daemonic + # threads exist or the process with hang and ignore Ctrl-C. So we catch + # anything and tidy up the threads in a finally block. + try: + if not args: + # --all case + conn = create_connection() + try: + marker = options.marker + while True: + containers = [ + c['name'] for c in conn.get_account( + marker=marker, prefix=options.prefix)[1]] + if not containers: + break + marker = containers[-1] + shuffle(containers) + for container in containers: + container_queue.put(container) + except ClientException as err: + if err.http_status != 404: + raise + error_queue.put('Account not found') + elif len(args) == 1: + if '/' in args[0]: + print >> stderr, ('WARNING: / in container name; you might ' + 'have meant %r instead of %r.' % ( + args[0].replace('/', ' ', 1), args[0])) + _download_container(args[0], create_connection(), + options.prefix) else: - for obj in args[1:]: - object_queue.put((args[0], obj)) - while not container_queue.empty(): - sleep(0.01) - for thread in container_threads: - thread.abort = True - while thread.isAlive(): - thread.join(0.01) - put_errors_from_threads(container_threads, error_queue) - while not object_queue.empty(): - sleep(0.01) - for thread in object_threads: - thread.abort = True - while thread.isAlive(): - thread.join(0.01) - put_errors_from_threads(object_threads, error_queue) + if len(args) == 2: + obj = args[1] + object_queue.put((args[0], obj, options.out_file)) + else: + for obj in args[1:]: + object_queue.put((args[0], obj)) + finally: + shutdown_worker_threads(container_queue, container_threads) + put_errors_from_threads(container_threads, error_queue) + + shutdown_worker_threads(object_queue, object_threads) + put_errors_from_threads(object_threads, error_queue) + + +def prt_bytes(bytes, human_flag): + """ + convert a number > 1024 to printable format, either in 4 char -h format as + with ls -lh or return as 12 char right justified string + """ + + if human_flag: + suffix = '' + mods = 'KMGTPEZY' + temp = float(bytes) + if temp > 0: + while (temp > 1023): + temp /= 1024.0 + suffix = mods[0] + mods = mods[1:] + if suffix != '': + if temp >= 10: + bytes = '%3d%s' % (temp, suffix) + else: + bytes = '%.1f%s' % (temp, suffix) + if suffix == '': # must be < 1024 + bytes = '%4s' % bytes + else: + bytes = '%12s' % bytes + + return(bytes) st_list_help = ''' list [options] [container] Lists the containers for the account or the objects for a container. -p or --prefix is an option that will only list items beginning with that prefix. + -l produces output formatted like 'ls -l' and --lh like 'ls -lh'. + -t used with -l or --lh, only report totals -d or --delimiter is option (for container listings only) that will roll up - items with the given delimiter (see Cloud Files general documentation for - what this means). + items with the given delimiter (see http://docs.openstack.org/ + api/openstack-object-storage/1.0/content/list-objects.html) '''.strip('\n') -def st_list(options, args, print_queue, error_queue): - parser.add_option('-p', '--prefix', dest='prefix', help='Will only list ' - 'items beginning with the prefix') - parser.add_option('-d', '--delimiter', dest='delimiter', help='Will roll ' - 'up items with the given delimiter (see Cloud Files general ' - 'documentation for what this means)') +def st_list(parser, args, print_queue, error_queue): + parser.add_option( + '-l', '--long', dest='long', help='Long listing ' + 'similar to ls -l command', action='store_true', default=False) + parser.add_option( + '--lh', dest='human', help='report sizes as human ' + "similar to ls -lh switch, but -h taken", action='store_true', + default=False) + parser.add_option( + '-t', dest='totals', help='used with -l or --ls, only report totals', + action='store_true', default=False) + parser.add_option( + '-p', '--prefix', dest='prefix', + help='Will only list items beginning with the prefix') + parser.add_option( + '-d', '--delimiter', dest='delimiter', + help='Will roll up items with the given delimiter' + ' (see OpenStack Swift API documentation for what this means)') (options, args) = parse_args(parser, args) args = args[1:] if options.delimiter and not args: exit('-d option only allowed for container listings') - if len(args) > 1: + if len(args) > 1 or len(args) == 1 and args[0].find('/') >= 0: error_queue.put('Usage: %s [options] %s' % (basename(argv[0]), st_list_help)) return - conn = Connection(options.auth, options.user, options.key, - snet=options.snet) + + conn = get_conn(options) try: marker = '' + total_count = total_bytes = 0 while True: if not args: items = \ conn.get_account(marker=marker, prefix=options.prefix)[1] else: - items = conn.get_container(args[0], marker=marker, + items = conn.get_container( + args[0], marker=marker, prefix=options.prefix, delimiter=options.delimiter)[1] if not items: break for item in items: - print_queue.put(item.get('name', item.get('subdir'))) - marker = items[-1].get('name', items[-1].get('subdir')) - except ClientException, err: + item_name = item.get('name') + + if not options.long and not options.human: + print_queue.put(item.get('name', item.get('subdir'))) + else: + item_bytes = item.get('bytes') + total_bytes += item_bytes + if len(args) == 0: # listing containers + bytes = prt_bytes(item_bytes, options.human) + count = item.get('count') + total_count += count + try: + meta = conn.head_container(item_name) + utc = gmtime(float(meta.get('x-timestamp'))) + datestamp = strftime('%Y-%m-%d %H:%M:%S', utc) + except ClientException: + datestamp = '????-??-?? ??:??:??' + if not options.totals: + print_queue.put("%5s %s %s %s" % + (count, bytes, datestamp, + item_name)) + else: # list container contents + subdir = item.get('subdir') + if subdir is None: + bytes = prt_bytes(item_bytes, options.human) + date, xtime = item.get('last_modified').split('T') + xtime = xtime.split('.')[0] + else: + bytes = prt_bytes(0, options.human) + date = xtime = '' + item_name = subdir + if not options.totals: + print_queue.put("%s %10s %8s %s" % + (bytes, date, xtime, item_name)) + + marker = items[-1].get('name', items[-1].get('subdir')) + + # report totals + if options.long or options.human: + if len(args) == 0: + print_queue.put("%5s %s" % (prt_bytes(total_count, True), + prt_bytes(total_bytes, + options.human))) + else: + print_queue.put("%s" % (prt_bytes(total_bytes, options.human))) + + except ClientException as err: if err.http_status != 404: raise if not args: @@ -1347,17 +655,20 @@ def st_list(options, args, print_queue, error_queue): else: error_queue.put('Container %s not found' % repr(args[0])) - st_stat_help = ''' stat [container] [object] Displays information for the account, container, or object depending on the - args given (if any).'''.strip('\n') + args given (if any). --lh will print number of objects and total sizes + like 'list --lh' noting number of objs a multiple of 1024'''.strip('\n') -def st_stat(options, args, print_queue, error_queue): +def st_stat(parser, args, print_queue, error_queue): + parser.add_option( + '--lh', dest='human', help="report totals like 'list --lh'", + action='store_true', default=False) (options, args) = parse_args(parser, args) args = args[1:] - conn = Connection(options.auth, options.user, options.key) + conn = get_conn(options) if not args: try: headers = conn.head_account() @@ -1367,17 +678,20 @@ StorageURL: %s Auth Token: %s '''.strip('\n') % (conn.url, conn.token)) container_count = int(headers.get('x-account-container-count', 0)) - object_count = int(headers.get('x-account-object-count', 0)) - bytes_used = int(headers.get('x-account-bytes-used', 0)) + object_count = prt_bytes(headers.get('x-account-object-count', 0), + options.human).lstrip() + bytes_used = prt_bytes(headers.get('x-account-bytes-used', 0), + options.human).lstrip() print_queue.put(''' Account: %s Containers: %d - Objects: %d - Bytes: %d'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], container_count, + Objects: %s + Bytes: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], container_count, object_count, bytes_used)) for key, value in headers.items(): if key.startswith('x-account-meta-'): - print_queue.put('%10s: %s' % ('Meta %s' % + print_queue.put( + '%10s: %s' % ('Meta %s' % key[len('x-account-meta-'):].title(), value)) for key, value in headers.items(): if not key.startswith('x-account-meta-') and key not in ( @@ -1385,7 +699,7 @@ Containers: %d 'x-account-object-count', 'x-account-bytes-used'): print_queue.put( '%10s: %s' % (key.title(), value)) - except ClientException, err: + except ClientException as err: if err.http_status != 404: raise error_queue.put('Account not found') @@ -1396,13 +710,16 @@ Containers: %d (args[0].replace('/', ' ', 1), args[0]) try: headers = conn.head_container(args[0]) - object_count = int(headers.get('x-container-object-count', 0)) - bytes_used = int(headers.get('x-container-bytes-used', 0)) + object_count = prt_bytes( + headers.get('x-container-object-count', 0), + options.human).lstrip() + bytes_used = prt_bytes(headers.get('x-container-bytes-used', 0), + options.human).lstrip() print_queue.put(''' Account: %s Container: %s - Objects: %d - Bytes: %d + Objects: %s + Bytes: %s Read ACL: %s Write ACL: %s Sync To: %s @@ -1414,7 +731,8 @@ Write ACL: %s headers.get('x-container-sync-key', ''))) for key, value in headers.items(): if key.startswith('x-container-meta-'): - print_queue.put('%9s: %s' % ('Meta %s' % + print_queue.put( + '%9s: %s' % ('Meta %s' % key[len('x-container-meta-'):].title(), value)) for key, value in headers.items(): if not key.startswith('x-container-meta-') and key not in ( @@ -1424,7 +742,7 @@ Write ACL: %s 'x-container-sync-key'): print_queue.put( '%9s: %s' % (key.title(), value)) - except ClientException, err: + except ClientException as err: if err.http_status != 404: raise error_queue.put('Container %s not found' % repr(args[0])) @@ -1439,7 +757,8 @@ Write ACL: %s args[1], headers.get('content-type'))) if 'content-length' in headers: print_queue.put('Content Length: %s' % - headers['content-length']) + prt_bytes(headers['content-length'], + options.human).lstrip()) if 'last-modified' in headers: print_queue.put(' Last Modified: %s' % headers['last-modified']) @@ -1450,7 +769,8 @@ Write ACL: %s headers['x-object-manifest']) for key, value in headers.items(): if key.startswith('x-object-meta-'): - print_queue.put('%14s: %s' % ('Meta %s' % + print_queue.put( + '%14s: %s' % ('Meta %s' % key[len('x-object-meta-'):].title(), value)) for key, value in headers.items(): if not key.startswith('x-object-meta-') and key not in ( @@ -1458,7 +778,7 @@ Write ACL: %s 'etag', 'date', 'x-object-manifest'): print_queue.put( '%14s: %s' % (key.title(), value)) - except ClientException, err: + except ClientException as err: if err.http_status != 404: raise error_queue.put('Object %s not found' % @@ -1479,35 +799,41 @@ post [options] [container] [object] post -m Color:Blue -m Size:Large'''.strip('\n') -def st_post(options, args, print_queue, error_queue): - parser.add_option('-r', '--read-acl', dest='read_acl', help='Sets the ' +def st_post(parser, args, print_queue, error_queue): + parser.add_option( + '-r', '--read-acl', dest='read_acl', help='Sets the ' 'Read ACL for containers. Quick summary of ACL syntax: .r:*, ' '.r:-.example.com, .r:www.example.com, account1, account2:user2') - parser.add_option('-w', '--write-acl', dest='write_acl', help='Sets the ' + parser.add_option( + '-w', '--write-acl', dest='write_acl', help='Sets the ' 'Write ACL for containers. Quick summary of ACL syntax: account1, ' 'account2:user2') - parser.add_option('-t', '--sync-to', dest='sync_to', help='Sets the ' + parser.add_option( + '-t', '--sync-to', dest='sync_to', help='Sets the ' 'Sync To for containers, for multi-cluster replication.') - parser.add_option('-k', '--sync-key', dest='sync_key', help='Sets the ' + parser.add_option( + '-k', '--sync-key', dest='sync_key', help='Sets the ' 'Sync Key for containers, for multi-cluster replication.') - parser.add_option('-m', '--meta', action='append', dest='meta', default=[], + parser.add_option( + '-m', '--meta', action='append', dest='meta', default=[], help='Sets a meta data item with the syntax name:value. This option ' 'may be repeated. Example: -m Color:Blue -m Size:Large') + parser.add_option( + '-H', '--header', action='append', dest='header', + default=[], help='Set request headers with the syntax header:value. ' + ' This option may be repeated. Example -H content-type:text/plain ' + '-H "Content-Length: 4000"') (options, args) = parse_args(parser, args) args = args[1:] if (options.read_acl or options.write_acl or options.sync_to or - options.sync_key) and not args: + options.sync_key) and not args: exit('-r, -w, -t, and -k options only allowed for containers') - conn = Connection(options.auth, options.user, options.key) + conn = get_conn(options) if not args: - headers = {} - for item in options.meta: - split_item = item.split(':') - headers['X-Account-Meta-' + split_item[0]] = \ - len(split_item) > 1 and split_item[1] + headers = split_headers(options.meta, 'X-Account-Meta-', error_queue) try: conn.post_account(headers=headers) - except ClientException, err: + except ClientException as err: if err.http_status != 404: raise error_queue.put('Account not found') @@ -1516,11 +842,7 @@ def st_post(options, args, print_queue, error_queue): print >> stderr, 'WARNING: / in container name; you might have ' \ 'meant %r instead of %r.' % \ (args[0].replace('/', ' ', 1), args[0]) - headers = {} - for item in options.meta: - split_item = item.split(':') - headers['X-Container-Meta-' + split_item[0]] = \ - len(split_item) > 1 and split_item[1] + headers = split_headers(options.meta, 'X-Container-Meta-', error_queue) if options.read_acl is not None: headers['X-Container-Read'] = options.read_acl if options.write_acl is not None: @@ -1531,19 +853,17 @@ def st_post(options, args, print_queue, error_queue): headers['X-Container-Sync-Key'] = options.sync_key try: conn.post_container(args[0], headers=headers) - except ClientException, err: + except ClientException as err: if err.http_status != 404: raise conn.put_container(args[0], headers=headers) elif len(args) == 2: - headers = {} - for item in options.meta: - split_item = item.split(':') - headers['X-Object-Meta-' + split_item[0]] = \ - len(split_item) > 1 and split_item[1] + headers = split_headers(options.meta, 'X-Object-Meta-', error_queue) + # add header options to the headers object for the request. + headers.update(split_headers(options.header, '', error_queue)) try: conn.post_object(args[0], args[1], headers=headers) - except ClientException, err: + except ClientException as err: if err.http_status != 404: raise error_queue.put('Object %s not found' % @@ -1558,24 +878,48 @@ upload [options] container file_or_directory [file_or_directory] [...] Uploads to the given container the files and directories specified by the remaining args. -c or --changed is an option that will only upload files that have changed since the last upload. -S or --segment-size - and --leave-segments are options as well (see --help for more). + will upload the files in segments no larger than size. -C or + --segment-container will specify the location of the segments + to . --leave-segments are options as well (see --help for more). '''.strip('\n') -def st_upload(options, args, print_queue, error_queue): - parser.add_option('-c', '--changed', action='store_true', dest='changed', +def st_upload(parser, args, print_queue, error_queue): + parser.add_option( + '-c', '--changed', action='store_true', dest='changed', default=False, help='Will only upload files that have changed since ' 'the last upload') - parser.add_option('-S', '--segment-size', dest='segment_size', help='Will ' + parser.add_option( + '-S', '--segment-size', dest='segment_size', help='Will ' 'upload files in segments no larger than and then create a ' '"manifest" file that will download all the segments as if it were ' - 'the original file. The segments will be uploaded to a ' + 'the original file.') + parser.add_option( + '-C', '--segment-container', dest='segment_container', + help='Will upload the segments into the specified container.' + 'If not specified, the segments will be uploaded to ' '_segments container so as to not pollute the main ' ' listings.') - parser.add_option('', '--leave-segments', action='store_true', + parser.add_option( + '', '--leave-segments', action='store_true', dest='leave_segments', default=False, help='Indicates that you want ' 'the older segments of manifest objects left alone (in the case of ' 'overwrites)') + parser.add_option( + '', '--object-threads', type=int, default=10, + help='Number of threads to use for uploading full objects') + parser.add_option( + '', '--segment-threads', type=int, default=10, + help='Number of threads to use for uploading object segments') + parser.add_option( + '-H', '--header', action='append', dest='header', + default=[], help='Set request headers with the syntax header:value. ' + ' This option may be repeated. Example -H content-type:text/plain ' + '-H "Content-Length: 4000"') + parser.add_option('', '--use-slo', action='store_true', default=False, + help='When used in conjuction with --segment-size will ' + 'create a Static Large Object instead of the default ' + 'Dynamic Large Object.') (options, args) = parse_args(parser, args) args = args[1:] if len(args) < 2: @@ -1590,14 +934,21 @@ def st_upload(options, args, print_queue, error_queue): else: fp = open(job['path'], 'rb') fp.seek(job['segment_start']) - conn.put_object(job.get('container', args[0] + '_segments'), - job['obj'], fp, content_length=job['segment_size']) + seg_container = args[0] + '_segments' + if options.segment_container: + seg_container = options.segment_container + etag = conn.put_object(job.get('container', seg_container), + job['obj'], fp, + content_length=job['segment_size']) + job['segment_location'] = '/%s/%s' % (seg_container, job['obj']) + job['segment_etag'] = etag if options.verbose and 'log_line' in job: if conn.attempts > 1: print_queue.put('%s [after %d attempts]' % (job['log_line'], conn.attempts)) else: print_queue.put(job['log_line']) + return job def _object_job(job, conn): path = job['path'] @@ -1609,7 +960,7 @@ def st_upload(options, args, print_queue, error_queue): obj = obj[2:] if obj.startswith('/'): obj = obj[1:] - put_headers = {'x-object-meta-mtime': str(getmtime(path))} + put_headers = {'x-object-meta-mtime': "%f" % getmtime(path)} if dir_marker: if options.changed: try: @@ -1623,7 +974,7 @@ def st_upload(options, args, print_queue, error_queue): et == 'd41d8cd98f00b204e9800998ecf8427e' and \ mt == put_headers['x-object-meta-mtime']: return - except ClientException, err: + except ClientException as err: if err.http_status != 404: raise conn.put_object(container, obj, '', content_length=0, @@ -1634,6 +985,8 @@ def st_upload(options, args, print_queue, error_queue): # manifest object and need to delete the old segments # ourselves. old_manifest = None + old_slo_manifest_paths = [] + new_slo_manifest_paths = set() if options.changed or not options.leave_segments: try: headers = conn.head_object(container, obj) @@ -1644,73 +997,134 @@ def st_upload(options, args, print_queue, error_queue): return if not options.leave_segments: old_manifest = headers.get('x-object-manifest') - except ClientException, err: + if utils.config_true_value( + headers.get('x-static-large-object')): + headers, manifest_data = conn.get_object( + container, obj, + query_string='multipart-manifest=get') + for old_seg in json.loads(manifest_data): + seg_path = old_seg['name'].lstrip('/') + if isinstance(seg_path, unicode): + seg_path = seg_path.encode('utf-8') + old_slo_manifest_paths.append(seg_path) + except ClientException as err: if err.http_status != 404: raise + # Merge the command line header options to the put_headers + put_headers.update(split_headers(options.header, '', + error_queue)) + # Don't do segment job if object is not big enough if options.segment_size and \ - getsize(path) < options.segment_size: + getsize(path) > int(options.segment_size): + seg_container = container + '_segments' + if options.segment_container: + seg_container = options.segment_container full_size = getsize(path) segment_queue = Queue(10000) - segment_threads = [QueueFunctionThread(segment_queue, - _segment_job, create_connection()) for _junk in - xrange(10)] + segment_threads = [ + QueueFunctionThread( + segment_queue, _segment_job, + create_connection(), store_results=True) + for _junk in xrange(options.segment_threads)] for thread in segment_threads: thread.start() - segment = 0 - segment_start = 0 - while segment_start < full_size: - segment_size = int(options.segment_size) - if segment_start + segment_size > full_size: - segment_size = full_size - segment_start - segment_queue.put({'path': path, - 'obj': '%s/%s/%s/%08d' % (obj, - put_headers['x-object-meta-mtime'], full_size, - segment), - 'segment_start': segment_start, - 'segment_size': segment_size, - 'log_line': '%s segment %s' % (obj, segment)}) - segment += 1 - segment_start += segment_size - while not segment_queue.empty(): - sleep(0.01) - for thread in segment_threads: - thread.abort = True - while thread.isAlive(): - thread.join(0.01) - if put_errors_from_threads(segment_threads, error_queue): - raise ClientException('Aborting manifest creation ' - 'because not all segments could be uploaded. %s/%s' - % (container, obj)) - new_object_manifest = '%s_segments/%s/%s/%s/' % ( - container, obj, put_headers['x-object-meta-mtime'], - full_size) - if old_manifest == new_object_manifest: - old_manifest = None - put_headers['x-object-manifest'] = new_object_manifest - conn.put_object(container, obj, '', content_length=0, - headers=put_headers) + try: + segment = 0 + segment_start = 0 + while segment_start < full_size: + segment_size = int(options.segment_size) + if segment_start + segment_size > full_size: + segment_size = full_size - segment_start + if options.use_slo: + segment_name = '%s/slo/%s/%s/%s/%08d' % ( + obj, put_headers['x-object-meta-mtime'], + full_size, options.segment_size, segment) + else: + segment_name = '%s/%s/%s/%s/%08d' % ( + obj, put_headers['x-object-meta-mtime'], + full_size, options.segment_size, segment) + segment_queue.put( + {'path': path, 'obj': segment_name, + 'segment_start': segment_start, + 'segment_size': segment_size, + 'segment_index': segment, + 'log_line': '%s segment %s' % (obj, segment)}) + segment += 1 + segment_start += segment_size + finally: + shutdown_worker_threads(segment_queue, segment_threads) + if put_errors_from_threads(segment_threads, + error_queue): + raise ClientException( + 'Aborting manifest creation ' + 'because not all segments could be uploaded. ' + '%s/%s' % (container, obj)) + if options.use_slo: + slo_segments = [] + for thread in segment_threads: + slo_segments += thread.results + slo_segments.sort(key=lambda d: d['segment_index']) + for seg in slo_segments: + seg_loc = seg['segment_location'].lstrip('/') + if isinstance(seg_loc, unicode): + seg_loc = seg_loc.encode('utf-8') + new_slo_manifest_paths.add(seg_loc) + + manifest_data = json.dumps([ + {'path': d['segment_location'], + 'etag': d['segment_etag'], + 'size_bytes': d['segment_size']} + for d in slo_segments]) + + put_headers['x-static-large-object'] = 'true' + conn.put_object(container, obj, manifest_data, + headers=put_headers, + query_string='multipart-manifest=put') + else: + new_object_manifest = '%s/%s/%s/%s/%s/' % ( + quote(seg_container), quote(obj), + put_headers['x-object-meta-mtime'], full_size, + options.segment_size) + if old_manifest and old_manifest.rstrip('/') == \ + new_object_manifest.rstrip('/'): + old_manifest = None + put_headers['x-object-manifest'] = new_object_manifest + conn.put_object(container, obj, '', content_length=0, + headers=put_headers) else: - conn.put_object(container, obj, open(path, 'rb'), + conn.put_object( + container, obj, open(path, 'rb'), content_length=getsize(path), headers=put_headers) - if old_manifest: + if old_manifest or old_slo_manifest_paths: segment_queue = Queue(10000) - scontainer, sprefix = old_manifest.split('/', 1) - for delobj in conn.get_container(scontainer, - prefix=sprefix)[1]: - segment_queue.put({'delete': True, - 'container': scontainer, 'obj': delobj['name']}) + if old_manifest: + scontainer, sprefix = old_manifest.split('/', 1) + scontainer = unquote(scontainer) + sprefix = unquote(sprefix).rstrip('/') + '/' + for delobj in conn.get_container(scontainer, + prefix=sprefix)[1]: + segment_queue.put( + {'delete': True, + 'container': scontainer, + 'obj': delobj['name']}) + if old_slo_manifest_paths: + for seg_to_delete in old_slo_manifest_paths: + if seg_to_delete in new_slo_manifest_paths: + continue + scont, sobj = \ + seg_to_delete.split('/', 1) + segment_queue.put( + {'delete': True, + 'container': scont, 'obj': sobj}) if not segment_queue.empty(): - segment_threads = [QueueFunctionThread(segment_queue, - _segment_job, create_connection()) for _junk in - xrange(10)] + segment_threads = [ + QueueFunctionThread( + segment_queue, + _segment_job, create_connection()) + for _junk in xrange(options.segment_threads)] for thread in segment_threads: thread.start() - while not segment_queue.empty(): - sleep(0.01) - for thread in segment_threads: - thread.abort = True - while thread.isAlive(): - thread.join(0.01) + shutdown_worker_threads(segment_queue, segment_threads) put_errors_from_threads(segment_threads, error_queue) if options.verbose: if conn.attempts > 1: @@ -1718,7 +1132,7 @@ def st_upload(options, args, print_queue, error_queue): '%s [after %d attempts]' % (obj, conn.attempts)) else: print_queue.put(obj) - except OSError, err: + except OSError as err: if err.errno != ENOENT: raise error_queue.put('Local file %s not found' % repr(path)) @@ -1735,12 +1149,10 @@ def st_upload(options, args, print_queue, error_queue): else: object_queue.put({'path': subpath}) - url, token = get_auth(options.auth, options.user, options.key, - snet=options.snet) - create_connection = lambda: Connection(options.auth, options.user, - options.key, preauthurl=url, preauthtoken=token, snet=options.snet) - object_threads = [QueueFunctionThread(object_queue, _object_job, - create_connection()) for _junk in xrange(10)] + create_connection = lambda: get_conn(options) + object_threads = [ + QueueFunctionThread(object_queue, _object_job, create_connection()) + for _junk in xrange(options.object_threads)] for thread in object_threads: thread.start() conn = create_connection() @@ -1751,42 +1163,110 @@ def st_upload(options, args, print_queue, error_queue): try: conn.put_container(args[0]) if options.segment_size is not None: - conn.put_container(args[0] + '_segments') - except Exception: - pass + seg_container = args[0] + '_segments' + if options.segment_container: + seg_container = options.segment_container + conn.put_container(seg_container) + except ClientException as err: + msg = ' '.join(str(x) for x in (err.http_status, err.http_reason)) + if err.http_response_content: + if msg: + msg += ': ' + msg += err.http_response_content[:60] + error_queue.put( + 'Error trying to create container %r: %s' % (args[0], msg)) + except Exception as err: + error_queue.put( + 'Error trying to create container %r: %s' % (args[0], err)) + try: for arg in args[1:]: if isdir(arg): _upload_dir(arg) else: object_queue.put({'path': arg}) - while not object_queue.empty(): - sleep(0.01) - for thread in object_threads: - thread.abort = True - while thread.isAlive(): - thread.join(0.01) - put_errors_from_threads(object_threads, error_queue) - except ClientException, err: + except ClientException as err: if err.http_status != 404: raise error_queue.put('Account not found') + finally: + shutdown_worker_threads(object_queue, object_threads) + put_errors_from_threads(object_threads, error_queue) + + +def split_headers(options, prefix='', error_queue=None): + """ + Splits 'Key: Value' strings and returns them as a dictionary. + + :param options: An array of 'Key: Value' strings + :param prefix: String to prepend to all of the keys in the dictionary. + :param error_queue: Queue for thread safe error reporting. + """ + headers = {} + for item in options: + split_item = item.split(':', 1) + if len(split_item) == 2: + headers[(prefix + split_item[0]).title()] = split_item[1] + else: + error_string = "Metadata parameter %s must contain a ':'.\n%s" \ + % (item, st_post_help) + if error_queue: + error_queue.put(error_string) + else: + exit(error_string) + return headers def parse_args(parser, args, enforce_requires=True): if not args: args = ['-h'] (options, args) = parser.parse_args(args) + + if (not (options.auth and options.user and options.key)): + # Use 2.0 auth if none of the old args are present + options.auth_version = '2.0' + + # Use new-style args if old ones not present + if not options.auth and options.os_auth_url: + options.auth = options.os_auth_url + if not options.user and options.os_username: + options.user = options.os_username + if not options.key and options.os_password: + options.key = options.os_password + + # Specific OpenStack options + options.os_options = { + 'tenant_id': options.os_tenant_id, + 'tenant_name': options.os_tenant_name, + 'service_type': options.os_service_type, + 'endpoint_type': options.os_endpoint_type, + 'auth_token': options.os_auth_token, + 'object_storage_url': options.os_storage_url, + 'region_name': options.os_region_name, + } + + if (options.os_options.get('object_storage_url') and + options.os_options.get('auth_token') and + options.auth_version == '2.0'): + return options, args + if enforce_requires and \ not (options.auth and options.user and options.key): exit(''' -Requires ST_AUTH, ST_USER, and ST_KEY environment variables be set or -overridden with -A, -U, or -K.'''.strip('\n')) +Auth version 1.0 requires ST_AUTH, ST_USER, and ST_KEY environment variables +to be set or overridden with -A, -U, or -K. + +Auth version 2.0 requires OS_AUTH_URL, OS_USERNAME, OS_PASSWORD, and +OS_TENANT_NAME OS_TENANT_ID to be set or overridden with --os-auth-url, +--os-username, --os-password, --os-tenant-name or os-tenant-id. Note: +adding "-V 2" is necessary for this.'''.strip('\n')) return options, args if __name__ == '__main__': - parser = OptionParser(version='%prog 1.0', usage=''' + version = version_info.version_string() + parser = OptionParser(version='%%prog %s' % version, + usage=''' Usage: %%prog [options] [args] Commands: @@ -1797,24 +1277,130 @@ Commands: %(st_download_help)s %(st_delete_help)s -Example: +Examples: %%prog -A https://auth.api.rackspacecloud.com/v1.0 -U user -K key stat + + %%prog --os-auth-url https://api.example.com/v2.0 --os-tenant-name tenant \\ + --os-usernameuser --os-password password list + + %%prog --os-auth-token 6ee5eb33efad4e45ab46806eac010566 \\ + --os-storage-url https://10.1.5.2:8080/v1/AUTH_ced809b6a4baea7aeab61a \\ + list + + %%prog list --lh '''.strip('\n') % globals()) parser.add_option('-s', '--snet', action='store_true', dest='snet', default=False, help='Use SERVICENET internal network') parser.add_option('-v', '--verbose', action='count', dest='verbose', default=1, help='Print more info') + parser.add_option('--debug', action='store_true', dest='debug', + default=False, help='Show the curl commands of all http ' + 'queries.') parser.add_option('-q', '--quiet', action='store_const', dest='verbose', const=0, default=1, help='Suppress status output') parser.add_option('-A', '--auth', dest='auth', default=environ.get('ST_AUTH'), help='URL for obtaining an auth token') + parser.add_option('-V', '--auth-version', + dest='auth_version', + default=environ.get('ST_AUTH_VERSION', '1.0'), + type=str, + help='Specify a version for authentication. ' + 'Defaults to 1.0.') parser.add_option('-U', '--user', dest='user', default=environ.get('ST_USER'), - help='User name for obtaining an auth token') + help='User name for obtaining an auth token.') parser.add_option('-K', '--key', dest='key', default=environ.get('ST_KEY'), - help='Key for obtaining an auth token') + help='Key for obtaining an auth token.') + parser.add_option('--os-username', + metavar='', + default=environ.get('OS_USERNAME'), + help='Openstack username. Defaults to env[OS_USERNAME].') + parser.add_option('--os_username', + help=SUPPRESS_HELP) + parser.add_option('--os-password', + metavar='', + default=environ.get('OS_PASSWORD'), + help='Openstack password. Defaults to env[OS_PASSWORD].') + parser.add_option('--os_password', + help=SUPPRESS_HELP) + parser.add_option('--os-tenant-id', + metavar='', + default=environ.get('OS_TENANT_ID'), + help='OpenStack tenant ID. ' + 'Defaults to env[OS_TENANT_ID]') + parser.add_option('--os_tenant_id', + help=SUPPRESS_HELP) + parser.add_option('--os-tenant-name', + metavar='', + default=environ.get('OS_TENANT_NAME'), + help='Openstack tenant name. ' + 'Defaults to env[OS_TENANT_NAME].') + parser.add_option('--os_tenant_name', + help=SUPPRESS_HELP) + parser.add_option('--os-auth-url', + metavar='', + default=environ.get('OS_AUTH_URL'), + help='Openstack auth URL. Defaults to env[OS_AUTH_URL].') + parser.add_option('--os_auth_url', + help=SUPPRESS_HELP) + parser.add_option('--os-auth-token', + metavar='', + default=environ.get('OS_AUTH_TOKEN'), + help='Openstack token. Defaults to env[OS_AUTH_TOKEN]. ' + 'Used with --os-storage-url to bypass the ' + 'usual username/password authentication.') + parser.add_option('--os_auth_token', + help=SUPPRESS_HELP) + parser.add_option('--os-storage-url', + metavar='', + default=environ.get('OS_STORAGE_URL'), + help='Openstack storage URL. ' + 'Defaults to env[OS_STORAGE_URL]. ' + 'Used with --os-auth-token to bypass the ' + 'usual username/password authentication.') + parser.add_option('--os_storage_url', + help=SUPPRESS_HELP) + parser.add_option('--os-region-name', + metavar='', + default=environ.get('OS_REGION_NAME'), + help='Openstack region name. ' + 'Defaults to env[OS_REGION_NAME]') + parser.add_option('--os_region_name', + help=SUPPRESS_HELP) + parser.add_option('--os-service-type', + metavar='', + default=environ.get('OS_SERVICE_TYPE'), + help='Openstack Service type. ' + 'Defaults to env[OS_SERVICE_TYPE]') + parser.add_option('--os_service_type', + help=SUPPRESS_HELP) + parser.add_option('--os-endpoint-type', + metavar='', + default=environ.get('OS_ENDPOINT_TYPE'), + help='Openstack Endpoint type. ' + 'Defaults to env[OS_ENDPOINT_TYPE]') + parser.add_option('--os-cacert', + metavar='', + default=environ.get('OS_CACERT'), + help='Specify a CA bundle file to use in verifying a ' + 'TLS (https) server certificate. ' + 'Defaults to env[OS_CACERT]') + default_val = utils.config_true_value(environ.get('SWIFTCLIENT_INSECURE')) + parser.add_option('--insecure', + action="store_true", dest="insecure", + default=default_val, + help='Allow swiftclient to access insecure keystone ' + 'server. The keystone\'s certificate will not ' + 'be verified. ' + 'Defaults to env[SWIFTCLIENT_INSECURE] ' + '(set to \'true\' to enable).') + parser.add_option('--no-ssl-compression', + action='store_false', dest='ssl_compression', + default=True, + help='Disable SSL compression when using https. ' + 'This may increase performance.') parser.disable_interspersed_args() (options, args) = parse_args(parser, argv[1:], enforce_requires=False) parser.enable_interspersed_args() @@ -1826,6 +1412,12 @@ Example: exit('no such command: %s' % args[0]) exit() + signal.signal(signal.SIGINT, immediate_exit) + + if options.debug: + logger = logging.getLogger("swiftclient") + logging.basicConfig(level=logging.DEBUG) + print_queue = Queue(10000) def _print(item): @@ -1836,9 +1428,12 @@ Example: print_thread = QueueFunctionThread(print_queue, _print) print_thread.start() + error_count = 0 error_queue = Queue(10000) def _error(item): + global error_count + error_count += 1 if isinstance(item, unicode): item = item.encode('utf8') print >> stderr, item @@ -1846,24 +1441,15 @@ Example: error_thread = QueueFunctionThread(error_queue, _error) error_thread.start() + parser.usage = globals()['st_%s_help' % args[0]] try: - parser.usage = globals()['st_%s_help' % args[0]] - try: - globals()['st_%s' % args[0]](parser, argv[1:], print_queue, - error_queue) - except (ClientException, HTTPException, socket.error), err: - error_queue.put(str(err)) - while not print_queue.empty(): - sleep(0.01) - print_thread.abort = True - while print_thread.isAlive(): - print_thread.join(0.01) - while not error_queue.empty(): - sleep(0.01) - error_thread.abort = True - while error_thread.isAlive(): - error_thread.join(0.01) - except (SystemExit, Exception): - for thread in threading_enumerate(): - thread.abort = True - raise + globals()['st_%s' % args[0]](parser, argv[1:], print_queue, + error_queue) + except (ClientException, HTTPException, socket.error) as err: + error_queue.put(str(err)) + finally: + shutdown_worker_threads(print_queue, [print_thread]) + shutdown_worker_threads(error_queue, [error_thread]) + + if error_count: + exit(1) diff --git a/server/src/com/cloud/capacity/CapacityManagerImpl.java b/server/src/com/cloud/capacity/CapacityManagerImpl.java index 101902c9b74..d54624d6efb 100755 --- a/server/src/com/cloud/capacity/CapacityManagerImpl.java +++ b/server/src/com/cloud/capacity/CapacityManagerImpl.java @@ -70,7 +70,6 @@ import com.cloud.storage.VMTemplateVO; import com.cloud.storage.VolumeVO; import com.cloud.storage.dao.VMTemplatePoolDao; import com.cloud.storage.dao.VolumeDao; -import com.cloud.storage.swift.SwiftManager; import com.cloud.uservm.UserVm; import com.cloud.utils.DateUtil; import com.cloud.utils.NumbersUtil; @@ -119,8 +118,6 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager, @Inject StorageManager _storageMgr; @Inject - SwiftManager _swiftMgr; - @Inject ConfigurationManager _configMgr; @Inject HypervisorCapabilitiesDao _hypervisorCapabilitiesDao; diff --git a/server/src/com/cloud/configuration/ConfigurationManagerImpl.java b/server/src/com/cloud/configuration/ConfigurationManagerImpl.java index 8d8b12680ba..c3209a9f466 100755 --- a/server/src/com/cloud/configuration/ConfigurationManagerImpl.java +++ b/server/src/com/cloud/configuration/ConfigurationManagerImpl.java @@ -172,12 +172,8 @@ import com.cloud.service.ServiceOfferingVO; import com.cloud.service.dao.ServiceOfferingDao; import com.cloud.service.dao.ServiceOfferingDetailsDao; import com.cloud.storage.DiskOfferingVO; -import com.cloud.storage.SwiftVO; import com.cloud.storage.dao.DiskOfferingDao; import com.cloud.storage.dao.S3Dao; -import com.cloud.storage.dao.SwiftDao; -import com.cloud.storage.s3.S3Manager; -import com.cloud.storage.swift.SwiftManager; import com.cloud.test.IPRangeConfig; import com.cloud.user.Account; import com.cloud.user.AccountDetailVO; @@ -227,8 +223,6 @@ public class ConfigurationManagerImpl extends ManagerBase implements Configurati @Inject DomainDao _domainDao; @Inject - SwiftDao _swiftDao; - @Inject S3Dao _s3Dao; @Inject ServiceOfferingDao _serviceOfferingDao; @@ -277,10 +271,6 @@ public class ConfigurationManagerImpl extends ManagerBase implements Configurati @Inject PhysicalNetworkDao _physicalNetworkDao; @Inject - SwiftManager _swiftMgr; - @Inject - S3Manager _s3Mgr; - @Inject PhysicalNetworkTrafficTypeDao _trafficTypeDao; @Inject NicDao _nicDao; @@ -720,24 +710,6 @@ public class ConfigurationManagerImpl extends ManagerBase implements Configurati s_logger.error("Configuration variable " + name + " is expecting true or false in stead of " + value); return "Please enter either 'true' or 'false'."; } - if (Config.SwiftEnable.key().equals(name)) { - List stores = this._dataStoreMgr.listImageStores(); - if (stores != null && stores.size() > 0) { - return " can not change " + Config.SwiftEnable.key() + " after you have added secondary storage"; - } - SwiftVO swift = _swiftDao.findById(1L); - if (swift != null) { - return " can not change " + Config.SwiftEnable.key() + " after you have added Swift"; - } - if (this._s3Mgr.isS3Enabled()) { - return String.format("Swift is not supported when S3 is enabled."); - } - } - if (Config.S3Enable.key().equals(name)) { - if (this._swiftMgr.isSwiftEnabled()) { - return String.format("S3-backed Secondary Storage is not supported when Swift is enabled."); - } - } return null; } @@ -1980,9 +1952,6 @@ public class ConfigurationManagerImpl extends ManagerBase implements Configurati // Create default system networks createDefaultSystemNetworks(zone.getId()); - - _swiftMgr.propagateSwiftTmplteOnZone(zone.getId()); - _s3Mgr.propagateTemplatesToZone(zone); txn.commit(); return zone; } catch (Exception ex) { diff --git a/server/src/com/cloud/resource/ResourceManagerImpl.java b/server/src/com/cloud/resource/ResourceManagerImpl.java index b009e63d8e7..df40066041a 100755 --- a/server/src/com/cloud/resource/ResourceManagerImpl.java +++ b/server/src/com/cloud/resource/ResourceManagerImpl.java @@ -43,10 +43,6 @@ import org.apache.cloudstack.api.command.admin.host.PrepareForMaintenanceCmd; import org.apache.cloudstack.api.command.admin.host.ReconnectHostCmd; import org.apache.cloudstack.api.command.admin.host.UpdateHostCmd; import org.apache.cloudstack.api.command.admin.host.UpdateHostPasswordCmd; -import org.apache.cloudstack.api.command.admin.storage.AddS3Cmd; -import org.apache.cloudstack.api.command.admin.storage.ListS3sCmd; -import org.apache.cloudstack.api.command.admin.swift.AddSwiftCmd; -import org.apache.cloudstack.api.command.admin.swift.ListSwiftsCmd; import org.apache.cloudstack.region.dao.RegionDao; import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao; import org.apache.cloudstack.storage.datastore.db.StoragePoolVO; @@ -123,22 +119,17 @@ import com.cloud.org.Grouping.AllocationState; import com.cloud.org.Managed; import com.cloud.service.ServiceOfferingVO; import com.cloud.storage.GuestOSCategoryVO; -import com.cloud.storage.S3; -import com.cloud.storage.S3VO; import com.cloud.storage.StorageManager; import com.cloud.storage.StoragePool; import com.cloud.storage.StoragePoolHostVO; import com.cloud.storage.StoragePoolStatus; import com.cloud.storage.StorageService; -import com.cloud.storage.Swift; -import com.cloud.storage.SwiftVO; import com.cloud.storage.VMTemplateVO; import com.cloud.storage.dao.GuestOSCategoryDao; import com.cloud.storage.dao.StoragePoolHostDao; import com.cloud.storage.dao.VMTemplateDao; import com.cloud.storage.s3.S3Manager; import com.cloud.storage.secondary.SecondaryStorageVmManager; -import com.cloud.storage.swift.SwiftManager; import com.cloud.template.VirtualMachineTemplate; import com.cloud.user.Account; import com.cloud.user.AccountManager; @@ -196,8 +187,6 @@ public class ResourceManagerImpl extends ManagerBase implements ResourceManager, @Inject protected HostDao _hostDao; @Inject - protected SwiftManager _swiftMgr; - @Inject protected S3Manager _s3Mgr; @Inject protected HostDetailsDao _hostDetailsDao; @@ -623,28 +612,6 @@ public class ResourceManagerImpl extends ManagerBase implements ResourceManager, return discoverHostsFull(dcId, null, null, null, url, null, null, "SecondaryStorage", null, null, false); } - @Override - public Swift discoverSwift(AddSwiftCmd cmd) throws DiscoveryException { - return _swiftMgr.addSwift(cmd); - } - - @Override - public Pair, Integer> listSwifts(ListSwiftsCmd cmd) { - Pair, Integer> swifts = _swiftMgr.listSwifts(cmd); - return new Pair, Integer>(swifts.first(), swifts.second()); - } - - @Override - public S3 discoverS3(final AddS3Cmd cmd) throws DiscoveryException { - return _s3Mgr.addS3(cmd); - } - - @Override - public List listS3s(final ListS3sCmd cmd) { - return _s3Mgr.listS3s(cmd); - } - - private List discoverHostsFull(Long dcId, Long podId, Long clusterId, String clusterName, String url, String username, String password, String hypervisorType, List hostTags, Map params, boolean deferAgentCreation) throws IllegalArgumentException, DiscoveryException, InvalidParameterValueException { diff --git a/server/src/com/cloud/server/ManagementServerImpl.java b/server/src/com/cloud/server/ManagementServerImpl.java index 58e4b44ac8b..cdffb16177f 100755 --- a/server/src/com/cloud/server/ManagementServerImpl.java +++ b/server/src/com/cloud/server/ManagementServerImpl.java @@ -154,8 +154,6 @@ import org.apache.cloudstack.api.command.admin.storage.ListStoragePoolsCmd; import org.apache.cloudstack.api.command.admin.storage.ListStorageProvidersCmd; import org.apache.cloudstack.api.command.admin.storage.PreparePrimaryStorageForMaintenanceCmd; import org.apache.cloudstack.api.command.admin.storage.UpdateStoragePoolCmd; -import org.apache.cloudstack.api.command.admin.swift.AddSwiftCmd; -import org.apache.cloudstack.api.command.admin.swift.ListSwiftsCmd; import org.apache.cloudstack.api.command.admin.systemvm.*; import org.apache.cloudstack.api.command.admin.template.PrepareTemplateCmd; import org.apache.cloudstack.api.command.admin.usage.AddTrafficMonitorCmd; @@ -503,7 +501,6 @@ import com.cloud.storage.dao.VolumeDao; import com.cloud.storage.s3.S3Manager; import com.cloud.storage.secondary.SecondaryStorageVmManager; import com.cloud.storage.snapshot.SnapshotManager; -import com.cloud.storage.swift.SwiftManager; import com.cloud.storage.upload.UploadMonitor; import com.cloud.tags.ResourceTagVO; import com.cloud.tags.dao.ResourceTagDao; @@ -601,8 +598,6 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe @Inject private SecondaryStorageVmManager _secStorageVmMgr; @Inject - private SwiftManager _swiftMgr; - @Inject private ServiceOfferingDao _offeringsDao; @Inject private DiskOfferingDao _diskOfferingDao; @@ -2531,8 +2526,6 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe cmdList.add(FindStoragePoolsForMigrationCmd.class); cmdList.add(PreparePrimaryStorageForMaintenanceCmd.class); cmdList.add(UpdateStoragePoolCmd.class); - cmdList.add(AddSwiftCmd.class); - cmdList.add(ListSwiftsCmd.class); cmdList.add(DestroySystemVmCmd.class); cmdList.add(ListSystemVMsCmd.class); cmdList.add(MigrateSystemVMCmd.class); diff --git a/server/src/com/cloud/storage/snapshot/SnapshotManagerImpl.java b/server/src/com/cloud/storage/snapshot/SnapshotManagerImpl.java index ebe06e707a2..a67316f5a1e 100755 --- a/server/src/com/cloud/storage/snapshot/SnapshotManagerImpl.java +++ b/server/src/com/cloud/storage/snapshot/SnapshotManagerImpl.java @@ -99,7 +99,6 @@ import com.cloud.storage.dao.VMTemplateDao; import com.cloud.storage.dao.VolumeDao; import com.cloud.storage.s3.S3Manager; import com.cloud.storage.secondary.SecondaryStorageVmManager; -import com.cloud.storage.swift.SwiftManager; import com.cloud.storage.template.TemplateConstants; import com.cloud.tags.ResourceTagVO; import com.cloud.tags.dao.ResourceTagDao; @@ -179,8 +178,6 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager, @Inject private ResourceLimitService _resourceLimitMgr; @Inject - private SwiftManager _swiftMgr; - @Inject private S3Manager _s3Mgr; @Inject private SecondaryStorageVmManager _ssvmMgr; diff --git a/server/src/com/cloud/storage/swift/SwiftManager.java b/server/src/com/cloud/storage/swift/SwiftManager.java deleted file mode 100644 index 2abdac7f807..00000000000 --- a/server/src/com/cloud/storage/swift/SwiftManager.java +++ /dev/null @@ -1,57 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package com.cloud.storage.swift; - -import java.util.List; - -import com.cloud.agent.api.to.SwiftTO; -import org.apache.cloudstack.api.command.admin.swift.AddSwiftCmd; -import org.apache.cloudstack.api.command.admin.swift.ListSwiftsCmd; -import org.apache.cloudstack.api.command.user.iso.DeleteIsoCmd; -import org.apache.cloudstack.api.command.user.template.DeleteTemplateCmd; -import com.cloud.exception.DiscoveryException; -import com.cloud.storage.Swift; -import com.cloud.storage.SwiftVO; -import com.cloud.storage.VMTemplateSwiftVO; -import com.cloud.utils.Pair; -import com.cloud.utils.component.Manager; -public interface SwiftManager extends Manager { - - SwiftTO getSwiftTO(Long swiftId); - - SwiftTO getSwiftTO(); - - Swift addSwift(AddSwiftCmd cmd) throws DiscoveryException; - - boolean isSwiftEnabled(); - - public boolean isTemplateInstalled(Long templateId); - - void deleteIso(DeleteIsoCmd cmd); - - void deleteTemplate(DeleteTemplateCmd cmd); - - void propagateTemplateOnAllZones(Long tmpltId); - - void propagateSwiftTmplteOnZone(Long zoneId); - - Long chooseZoneForTmpltExtract(Long tmpltId); - - Pair, Integer> listSwifts(ListSwiftsCmd cmd); - - VMTemplateSwiftVO findByTmpltId(Long tmpltId); -} diff --git a/server/src/com/cloud/storage/swift/SwiftManagerImpl.java b/server/src/com/cloud/storage/swift/SwiftManagerImpl.java deleted file mode 100644 index 3149f5aad17..00000000000 --- a/server/src/com/cloud/storage/swift/SwiftManagerImpl.java +++ /dev/null @@ -1,298 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package com.cloud.storage.swift; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.Map; - -import javax.ejb.Local; -import javax.inject.Inject; -import javax.naming.ConfigurationException; - -import org.apache.cloudstack.api.command.admin.swift.ListSwiftsCmd; -import org.apache.cloudstack.api.command.user.iso.DeleteIsoCmd; -import org.apache.cloudstack.api.command.user.template.DeleteTemplateCmd; -import org.apache.log4j.Logger; -import org.springframework.stereotype.Component; - -import com.cloud.agent.AgentManager; -import com.cloud.agent.api.Answer; -import com.cloud.agent.api.to.SwiftTO; -import org.apache.cloudstack.api.command.admin.swift.AddSwiftCmd; -import org.apache.cloudstack.storage.command.DeleteCommand; -import org.apache.cloudstack.storage.to.TemplateObjectTO; - -import com.cloud.configuration.Config; -import com.cloud.configuration.dao.ConfigurationDao; -import com.cloud.dc.DataCenterVO; -import com.cloud.dc.dao.DataCenterDao; -import com.cloud.exception.DiscoveryException; -import com.cloud.host.HostVO; -import com.cloud.host.dao.HostDao; -import com.cloud.storage.SwiftVO; -import com.cloud.storage.VMTemplateHostVO; -import com.cloud.storage.VMTemplateSwiftVO; -import com.cloud.storage.VMTemplateZoneVO; -import com.cloud.storage.dao.SwiftDao; -import com.cloud.storage.dao.VMTemplateHostDao; -import com.cloud.storage.dao.VMTemplateSwiftDao; -import com.cloud.storage.dao.VMTemplateZoneDao; -import com.cloud.utils.Pair; -import com.cloud.utils.component.ManagerBase; -import com.cloud.utils.db.Filter; -import com.cloud.utils.db.SearchCriteria; -import com.cloud.utils.db.SearchCriteria.Op; -import com.cloud.utils.db.SearchCriteria2; -import com.cloud.utils.db.SearchCriteriaService; -import com.cloud.utils.exception.CloudRuntimeException; - -@Component -@Local(value = { SwiftManager.class }) -public class SwiftManagerImpl extends ManagerBase implements SwiftManager { - private static final Logger s_logger = Logger.getLogger(SwiftManagerImpl.class); - - @Inject - private SwiftDao _swiftDao; - @Inject - VMTemplateSwiftDao _vmTmpltSwiftlDao; - @Inject - private ConfigurationDao _configDao; - @Inject - private AgentManager _agentMgr; - @Inject - private DataCenterDao _dcDao; - @Inject - private VMTemplateZoneDao _vmTmpltZoneDao; - @Inject - private VMTemplateHostDao _vmTmpltHostDao; - @Inject - private HostDao _hostDao; - - @Override - public SwiftTO getSwiftTO(Long swiftId) { - return _swiftDao.getSwiftTO(swiftId); - } - - @Override - public SwiftTO getSwiftTO() { - return _swiftDao.getSwiftTO(null); - } - - @Override - public boolean isSwiftEnabled() { - Boolean swiftEnable = Boolean.valueOf(_configDao.getValue(Config.SwiftEnable.key())); - if (swiftEnable) { - return true; - } - return false; - } - - @Override - public boolean isTemplateInstalled(Long templateId) { - - SearchCriteriaService sc = SearchCriteria2.create(VMTemplateSwiftVO.class); - sc.addAnd(sc.getEntity().getTemplateId(), Op.EQ, templateId); - return !sc.list().isEmpty(); - } - - @Override - public SwiftVO addSwift(AddSwiftCmd cmd) throws DiscoveryException { - if (!isSwiftEnabled()) { - throw new DiscoveryException("Swift is not enabled"); - } - SwiftVO swift = new SwiftVO(cmd.getUrl(), cmd.getAccount(), cmd.getUsername(), cmd.getKey()); - swift = _swiftDao.persist(swift); - return swift; - } - - @Override - public boolean start() { - if (s_logger.isInfoEnabled()) { - s_logger.info("Start Swift Manager"); - } - - return true; - } - - @Override - public void deleteIso(DeleteIsoCmd cmd) { - String msg; - SwiftTO swift = getSwiftTO(); - if (swift == null) { - msg = "There is no Swift in this setup"; - s_logger.warn(msg); - throw new CloudRuntimeException(msg); - } - VMTemplateSwiftVO tmpltSwiftRef = _vmTmpltSwiftlDao.findBySwiftTemplate(swift.getId(), cmd.getId()); - if ( tmpltSwiftRef == null ) { - msg = "Delete ISO failed due to cannot find ISO " + cmd.getId() + " in Swift "; - s_logger.warn(msg); - throw new CloudRuntimeException(msg); - } - TemplateObjectTO tmplTO = new TemplateObjectTO(); - tmplTO.setDataStore(swift); - tmplTO.setId(cmd.getId()); - Answer answer = _agentMgr.sendToSSVM(null, new DeleteCommand(tmplTO)); - if (answer == null || !answer.getResult()) { - msg = "Failed to delete " + tmpltSwiftRef + " due to " + ((answer == null) ? "answer is null" : answer.getDetails()); - s_logger.warn(msg); - throw new CloudRuntimeException(msg); - } else { - _vmTmpltSwiftlDao.remove(tmpltSwiftRef.getId()); - s_logger.debug("Deleted template " + cmd.getId() + " in Swift"); - } - } - - @Override - public void deleteTemplate(DeleteTemplateCmd cmd) { - String msg; - SwiftTO swift = getSwiftTO(); - if (swift == null) { - msg = "There is no Swift in this setup"; - s_logger.warn(msg); - throw new CloudRuntimeException(msg); - } - VMTemplateSwiftVO tmpltSwiftRef = _vmTmpltSwiftlDao.findBySwiftTemplate(swift.getId(), cmd.getId()); - if (tmpltSwiftRef == null) { - msg = "Delete Template failed due to cannot find Template" + cmd.getId() + " in Swift "; - s_logger.warn(msg); - throw new CloudRuntimeException(msg); - } - TemplateObjectTO tmplTO = new TemplateObjectTO(); - tmplTO.setId(cmd.getId()); - Answer answer = _agentMgr.sendToSSVM(null, new DeleteCommand(tmplTO)); - if (answer == null || !answer.getResult()) { - msg = "Failed to delete " + tmpltSwiftRef + " due to " + ((answer == null) ? "answer is null" : answer.getDetails()); - s_logger.warn(msg); - throw new CloudRuntimeException(msg); - } else { - _vmTmpltSwiftlDao.remove(tmpltSwiftRef.getId()); - s_logger.debug("Deleted template " + cmd.getId() + " in Swift"); - } - } - - @Override - public void propagateTemplateOnAllZones(Long tmpltId) { - String msg; - SwiftTO swift = getSwiftTO(); - if (swift == null) { - msg = "There is no Swift in this setup"; - s_logger.trace(msg); - return; - } - VMTemplateSwiftVO tmpltSwiftRef = _vmTmpltSwiftlDao.findOneByTemplateId(tmpltId); - if (tmpltSwiftRef != null) { - List dcs = _dcDao.listAll(); - for (DataCenterVO dc : dcs) { - VMTemplateZoneVO tmpltZoneVO = new VMTemplateZoneVO(dc.getId(), tmpltId, new Date()); - try { - _vmTmpltZoneDao.persist(tmpltZoneVO); - } catch (Exception e) { - } - } - } - } - - @Override - public void propagateSwiftTmplteOnZone(Long zoneId) { - String msg; - SwiftTO swift = getSwiftTO(); - if (swift == null) { - msg = "There is no Swift in this setup"; - s_logger.trace(msg); - return; - } - List tmpltIds = new ArrayList(); - List tmpltSwiftRefs = _vmTmpltSwiftlDao.listAll(); - if (tmpltSwiftRefs == null) { - return; - } - for (VMTemplateSwiftVO tmpltSwiftRef : tmpltSwiftRefs) { - Long tmpltId = tmpltSwiftRef.getTemplateId(); - if (!tmpltIds.contains(tmpltId)) { - tmpltIds.add(tmpltId); - VMTemplateZoneVO tmpltZoneVO = new VMTemplateZoneVO(zoneId, tmpltId, new Date()); - try { - _vmTmpltZoneDao.persist(tmpltZoneVO); - } catch (Exception e) { - } - } - } - } - - @Override - public Long chooseZoneForTmpltExtract(Long tmpltId) { - SwiftTO swift = getSwiftTO(); - if (swift == null) { - return null; - } - - List tmpltHosts = _vmTmpltHostDao.listByOnlyTemplateId(tmpltId); - if (tmpltHosts != null) { - Collections.shuffle(tmpltHosts); - for (VMTemplateHostVO tHost : tmpltHosts) { - HostVO host = _hostDao.findById(tHost.getHostId()); - if (host != null) { - return host.getDataCenterId(); - } - throw new CloudRuntimeException("can not find secondary storage host"); - } - } - List dcs = _dcDao.listAll(); - Collections.shuffle(dcs); - return dcs.get(0).getId(); - } - - @Override - public Pair, Integer> listSwifts(ListSwiftsCmd cmd) { - Filter searchFilter = new Filter(SwiftVO.class, "id", Boolean.TRUE, cmd.getStartIndex(), cmd.getPageSizeVal()); - SearchCriteria sc = _swiftDao.createSearchCriteria(); - if (cmd.getId() != null) { - sc.addAnd("id", SearchCriteria.Op.EQ, cmd.getId()); - } - return _swiftDao.searchAndCount(sc, searchFilter); - - } - - @Override - public VMTemplateSwiftVO findByTmpltId(Long tmpltId) { - return _vmTmpltSwiftlDao.findOneByTemplateId(tmpltId); - } - - @Override - public boolean stop() { - if (s_logger.isInfoEnabled()) { - s_logger.info("Stop Swift Manager"); - } - return true; - } - - @Override - public boolean configure(String name, Map params) throws ConfigurationException { - if (s_logger.isInfoEnabled()) { - s_logger.info("Start configuring Swift Manager : " + name); - } - - return true; - } - - protected SwiftManagerImpl() { - } -} diff --git a/server/src/com/cloud/template/HypervisorTemplateAdapter.java b/server/src/com/cloud/template/HypervisorTemplateAdapter.java index 06bc09445d3..65318389c36 100755 --- a/server/src/com/cloud/template/HypervisorTemplateAdapter.java +++ b/server/src/com/cloud/template/HypervisorTemplateAdapter.java @@ -160,8 +160,7 @@ public class HypervisorTemplateAdapter extends TemplateAdapterBase { AsyncCallbackDispatcher caller = AsyncCallbackDispatcher.create(this); caller.setCallback(caller.getTarget().createTemplateAsyncCallBack(null, null)); caller.setContext(context); - this.imageService - .createTemplateAsync(tmpl, imageStore, caller); + this.imageService.createTemplateAsync(tmpl, imageStore, caller); } _resourceLimitMgr.incrementResourceCount(profile.getAccountId(), ResourceType.template); diff --git a/server/test/com/cloud/resource/MockResourceManagerImpl.java b/server/test/com/cloud/resource/MockResourceManagerImpl.java index 18cff80aff5..2c66134f98d 100644 --- a/server/test/com/cloud/resource/MockResourceManagerImpl.java +++ b/server/test/com/cloud/resource/MockResourceManagerImpl.java @@ -26,18 +26,8 @@ import javax.naming.ConfigurationException; import org.apache.cloudstack.api.command.admin.cluster.AddClusterCmd; import org.apache.cloudstack.api.command.admin.cluster.DeleteClusterCmd; -import org.apache.cloudstack.api.command.admin.host.AddHostCmd; -import org.apache.cloudstack.api.command.admin.host.AddSecondaryStorageCmd; -import org.apache.cloudstack.api.command.admin.host.CancelMaintenanceCmd; -import org.apache.cloudstack.api.command.admin.host.PrepareForMaintenanceCmd; -import org.apache.cloudstack.api.command.admin.host.ReconnectHostCmd; -import org.apache.cloudstack.api.command.admin.host.UpdateHostCmd; -import org.apache.cloudstack.api.command.admin.host.UpdateHostPasswordCmd; -import org.apache.cloudstack.api.command.admin.storage.AddS3Cmd; -import org.apache.cloudstack.api.command.admin.storage.ListS3sCmd; -import org.apache.cloudstack.api.command.admin.swift.AddSwiftCmd; -import org.apache.cloudstack.api.command.admin.swift.ListSwiftsCmd; - +import org.apache.cloudstack.api.command.admin.host.*; +import org.apache.cloudstack.api.command.admin.storage.*; import com.cloud.agent.api.StartupCommand; import com.cloud.agent.api.StartupRoutingCommand; import com.cloud.dc.DataCenterVO; @@ -57,8 +47,7 @@ import com.cloud.hypervisor.Hypervisor.HypervisorType; import com.cloud.org.Cluster; import com.cloud.resource.ResourceState.Event; import com.cloud.service.ServiceOfferingVO; -import com.cloud.storage.S3; -import com.cloud.storage.Swift; +import com.cloud.storage.ImageStore; import com.cloud.template.VirtualMachineTemplate; import com.cloud.utils.Pair; import com.cloud.utils.component.ManagerBase; @@ -179,23 +168,6 @@ public class MockResourceManagerImpl extends ManagerBase implements ResourceMana return null; } - /* (non-Javadoc) - * @see com.cloud.resource.ResourceService#discoverSwift(com.cloud.api.commands.AddSwiftCmd) - */ - @Override - public Swift discoverSwift(AddSwiftCmd addSwiftCmd) throws DiscoveryException { - // TODO Auto-generated method stub - return null; - } - - /* (non-Javadoc) - * @see com.cloud.resource.ResourceService#discoverS3(com.cloud.api.commands.AddS3Cmd) - */ - @Override - public S3 discoverS3(AddS3Cmd cmd) throws DiscoveryException { - // TODO Auto-generated method stub - return null; - } /* (non-Javadoc) * @see com.cloud.resource.ResourceService#getSupportedHypervisorTypes(long, boolean, java.lang.Long) @@ -206,24 +178,6 @@ public class MockResourceManagerImpl extends ManagerBase implements ResourceMana return null; } - /* (non-Javadoc) - * @see com.cloud.resource.ResourceService#listSwifts(com.cloud.api.commands.ListSwiftsCmd) - */ - @Override - public Pair, Integer> listSwifts(ListSwiftsCmd cmd) { - // TODO Auto-generated method stub - return null; - } - - /* (non-Javadoc) - * @see com.cloud.resource.ResourceService#listS3s(com.cloud.api.commands.ListS3sCmd) - */ - @Override - public List listS3s(ListS3sCmd cmd) { - // TODO Auto-generated method stub - return null; - } - /* (non-Javadoc) * @see com.cloud.resource.ResourceManager#registerResourceEvent(java.lang.Integer, com.cloud.resource.ResourceListener) */ diff --git a/server/test/org/apache/cloudstack/networkoffering/ChildTestConfiguration.java b/server/test/org/apache/cloudstack/networkoffering/ChildTestConfiguration.java index 6398e202b94..27faa4dcf00 100644 --- a/server/test/org/apache/cloudstack/networkoffering/ChildTestConfiguration.java +++ b/server/test/org/apache/cloudstack/networkoffering/ChildTestConfiguration.java @@ -100,11 +100,8 @@ import com.cloud.storage.dao.DiskOfferingDaoImpl; import com.cloud.storage.dao.S3DaoImpl; import com.cloud.storage.dao.SnapshotDaoImpl; import com.cloud.storage.dao.StoragePoolDetailsDaoImpl; -import com.cloud.storage.dao.SwiftDaoImpl; import com.cloud.storage.dao.VolumeDaoImpl; -import com.cloud.storage.s3.S3Manager; import com.cloud.storage.secondary.SecondaryStorageVmManager; -import com.cloud.storage.swift.SwiftManager; import com.cloud.tags.dao.ResourceTagsDaoImpl; import com.cloud.user.AccountDetailsDao; import com.cloud.user.AccountManager; @@ -126,7 +123,6 @@ import org.apache.cloudstack.region.PortableIpRangeDaoImpl; VolumeDaoImpl.class, HostPodDaoImpl.class, DomainDaoImpl.class, - SwiftDaoImpl.class, ServiceOfferingDaoImpl.class, ServiceOfferingDetailsDaoImpl.class, VlanDaoImpl.class, @@ -224,16 +220,6 @@ public class ChildTestConfiguration { return Mockito.mock(SecondaryStorageVmManager.class); } - @Bean - public SwiftManager swiftMgr() { - return Mockito.mock(SwiftManager.class); - } - - @Bean - public S3Manager s3Mgr() { - return Mockito.mock(S3Manager.class); - } - @Bean public VpcManager vpcMgr() { return Mockito.mock(VpcManager.class); diff --git a/services/secondary-storage/pom.xml b/services/secondary-storage/pom.xml index eb6c0ee9b50..124fa5e085c 100644 --- a/services/secondary-storage/pom.xml +++ b/services/secondary-storage/pom.xml @@ -63,6 +63,7 @@ install src + test org.codehaus.mojo diff --git a/services/secondary-storage/src/org/apache/cloudstack/storage/resource/LocalNfsSecondaryStorageResource.java b/services/secondary-storage/src/org/apache/cloudstack/storage/resource/LocalNfsSecondaryStorageResource.java index 02ff8bc1261..1a6dcf8eee8 100644 --- a/services/secondary-storage/src/org/apache/cloudstack/storage/resource/LocalNfsSecondaryStorageResource.java +++ b/services/secondary-storage/src/org/apache/cloudstack/storage/resource/LocalNfsSecondaryStorageResource.java @@ -32,7 +32,6 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.Executors; -import org.apache.cloudstack.storage.command.DownloadSystemTemplateCommand; import org.apache.log4j.Logger; import org.springframework.stereotype.Component; @@ -77,66 +76,7 @@ public class LocalNfsSecondaryStorageResource extends NfsSecondaryStorageResourc @Override public Answer executeRequest(Command cmd) { - if (cmd instanceof DownloadSystemTemplateCommand) { - return execute((DownloadSystemTemplateCommand) cmd); - } else { - // return Answer.createUnsupportedCommandAnswer(cmd); - return super.executeRequest(cmd); - } - } - - private Answer execute(DownloadSystemTemplateCommand cmd) { - DataStoreTO dstore = cmd.getDataStore(); - if (dstore instanceof S3TO) { - // TODO: how to handle download progress for S3 - S3TO s3 = (S3TO) cmd.getDataStore(); - String url = cmd.getUrl(); - String user = null; - String password = null; - if (cmd.getAuth() != null) { - user = cmd.getAuth().getUserName(); - password = new String(cmd.getAuth().getPassword()); - } - // get input stream from the given url - InputStream in = UriUtils.getInputStreamFromUrl(url, user, password); - URI uri; - URL urlObj; - try { - uri = new URI(url); - urlObj = new URL(url); - } catch (URISyntaxException e) { - throw new CloudRuntimeException("URI is incorrect: " + url); - } catch (MalformedURLException e) { - throw new CloudRuntimeException("URL is incorrect: " + url); - } - - final String bucket = s3.getBucketName(); - // convention is no / in the end for install path based on S3Utils - // implementation. - String path = determineS3TemplateDirectory(cmd.getAccountId(), cmd.getResourceId(), cmd.getName()); - // template key is - // TEMPLATE_ROOT_DIR/account_id/template_id/template_name - String key = join(asList(path, urlObj.getFile()), S3Utils.SEPARATOR); - S3Utils.putObject(s3, in, bucket, key); - List s3Obj = S3Utils.getDirectory(s3, bucket, path); - if (s3Obj == null || s3Obj.size() == 0) { - return new Answer(cmd, false, "Failed to download to S3 bucket: " + bucket + " with key: " + key); - } else { - return new DownloadAnswer(null, 100, null, Status.DOWNLOADED, path, path, s3Obj.get(0).getSize(), s3Obj.get(0).getSize(), s3Obj - .get(0).getETag()); - } - } else if (dstore instanceof NfsTO) { - return new Answer(cmd, false, "Nfs needs to be pre-installed with system vm templates"); - } else if (dstore instanceof SwiftTO) { - // TODO: need to move code from - // execute(uploadTemplateToSwiftFromSecondaryStorageCommand) here, - // but we need to handle - // source is url, most likely we need to modify our existing - // swiftUpload python script. - return new Answer(cmd, false, "Swift is not currently support DownloadCommand"); - } else { - return new Answer(cmd, false, "Unsupported image data store: " + dstore); - } + return super.executeRequest(cmd); } @Override diff --git a/services/secondary-storage/src/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java b/services/secondary-storage/src/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java index 17ec0bbf838..cefc08a131a 100755 --- a/services/secondary-storage/src/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java +++ b/services/secondary-storage/src/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java @@ -519,6 +519,10 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S } } + protected Answer registerTemplateOnSwift(DownloadCommand cmd) { + + return null; + } private Answer execute(DownloadCommand cmd) { DataStoreTO dstore = cmd.getDataStore(); if (dstore instanceof NfsTO || dstore instanceof S3TO) { @@ -560,12 +564,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S * s3Obj.get(0).getSize(), s3Obj .get(0).getETag()); } } */ else if (dstore instanceof SwiftTO) { - // TODO: need to move code from - // execute(uploadTemplateToSwiftFromSecondaryStorageCommand) here, - // but we need to handle - // source is url, most likely we need to modify our existing - // swiftUpload python script. - return new Answer(cmd, false, "Swift is not currently support DownloadCommand"); + return registerTemplateOnSwift(cmd); } else { return new Answer(cmd, false, "Unsupported image data store: " + dstore); }