mirror of
https://github.com/apache/cloudstack.git
synced 2025-10-26 08:42:29 +01:00
it does not work with python3 ``` 2025-04-18T10:43:58.5235913Z 2025-04-18 10:32:20,503 - CRITICAL - EXCEPTION: Failure:: ['Traceback (most recent call last):\n', ' File "/opt/hostedtoolcache/Python/3.10.17/x64/lib/python3.10/unittest/case.py", line 59, in testPartExecutor\n yield\n', ' File "/opt/hostedtoolcache/Python/3.10.17/x64/lib/python3.10/unittest/case.py", line 591, in run\n self._callTestMethod(testMethod)\n', ' File "/opt/hostedtoolcache/Python/3.10.17/x64/lib/python3.10/unittest/case.py", line 549, in _callTestMethod\n method()\n', ' File "/home/runner/.local/lib/python3.10/site-packages/nose/failure.py", line 35, in runTest\n raise self.exc_val.with_traceback(self.tb)\n', ' File "/home/runner/.local/lib/python3.10/site-packages/nose/loader.py", line 335, in loadTestsFromName\n module = self.importer.importFromPath(\n', ' File "/home/runner/.local/lib/python3.10/site-packages/nose/importer.py", line 162, in importFromPath\n return self.importFromDir(dir_path, fqname)\n', ' File "/home/runner/.local/lib/python3.10/site-packages/nose/importer.py", line 198, in importFromDir\n mod = load_module(part_fqname, fh, filename, desc)\n', ' File "/home/runner/.local/lib/python3.10/site-packages/nose/importer.py", line 128, in load_module\n spec.loader.exec_module(mod)\n', ' File "<frozen importlib._bootstrap_external>", line 883, in exec_module\n', ' File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed\n', ' File "/home/runner/work/cloudstack/cloudstack/test/integration/smoke/test_certauthority_root.py", line 27, in <module>\n from OpenSSL.crypto import FILETYPE_PEM, verify, X509\n', "ImportError: cannot import name 'verify' from 'OpenSSL.crypto' (unknown location)\n"] ```
227 lines
9.0 KiB
Python
227 lines
9.0 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from nose.plugins.attrib import attr
|
|
from marvin.cloudstackTestCase import cloudstackTestCase
|
|
from marvin.lib.utils import cleanup_resources
|
|
from marvin.lib.base import *
|
|
from marvin.lib.common import list_hosts
|
|
|
|
from cryptography import x509
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric import padding
|
|
|
|
|
|
class TestCARootProvider(cloudstackTestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
testClient = super(TestCARootProvider, cls).getClsTestClient()
|
|
cls.apiclient = testClient.getApiClient()
|
|
cls.services = testClient.getParsedTestDataConfig()
|
|
cls.hypervisor = cls.testClient.getHypervisorInfo()
|
|
cls.cleanup = []
|
|
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
cleanup_resources(cls.apiclient, cls.cleanup)
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
|
|
|
|
def verifySignature(self, caCert, cert):
|
|
print("Verifying Certificate")
|
|
caPublicKey = caCert.public_key()
|
|
try:
|
|
caPublicKey.verify(
|
|
cert.signature,
|
|
cert.tbs_certificate_bytes,
|
|
padding.PKCS1v15(),
|
|
cert.signature_hash_algorithm,
|
|
)
|
|
print("Certificate is valid!")
|
|
except Exception as e:
|
|
print(f"Certificate verification failed: {e}")
|
|
|
|
def setUp(self):
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.dbclient = self.testClient.getDbConnection()
|
|
self.cleanup = []
|
|
|
|
|
|
def tearDown(self):
|
|
try:
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
|
|
|
|
def getUpSystemVMHosts(self, hostId=None):
|
|
hosts = list_hosts(
|
|
self.apiclient,
|
|
type='SecondaryStorageVM',
|
|
state='Up',
|
|
resourcestate='Enabled',
|
|
id=hostId
|
|
)
|
|
return hosts
|
|
|
|
|
|
@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
|
|
def test_list_ca_providers(self):
|
|
"""
|
|
Tests default ca providers list
|
|
"""
|
|
cmd = listCAProviders.listCAProvidersCmd()
|
|
response = self.apiclient.listCAProviders(cmd)
|
|
self.assertEqual(len(response), 1)
|
|
self.assertEqual(response[0].name, 'root')
|
|
|
|
|
|
def getCaCertificate(self):
|
|
cmd = listCaCertificate.listCaCertificateCmd()
|
|
cmd.provider = 'root'
|
|
response = self.apiclient.listCaCertificate(cmd)
|
|
return response.cacertificates.certificate
|
|
|
|
|
|
@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
|
|
def test_list_ca_certificate(self):
|
|
"""
|
|
Tests the ca certificate
|
|
"""
|
|
certificate = self.getCaCertificate()
|
|
self.assertTrue(len(certificate) > 0)
|
|
|
|
cert = x509.load_pem_x509_certificate(certificate.encode(), default_backend())
|
|
self.assertEqual(cert.signature_hash_algorithm.name, 'sha256')
|
|
self.assertEqual(cert.issuer.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)[0].value, 'ca.cloudstack.apache.org')
|
|
|
|
|
|
@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
|
|
def test_issue_certificate_without_csr(self):
|
|
"""
|
|
Tests issuance of a certificate
|
|
"""
|
|
cmd = issueCertificate.issueCertificateCmd()
|
|
cmd.domain = 'apache.org,cloudstack.apache.org'
|
|
cmd.ipaddress = '10.1.1.1,10.2.2.2'
|
|
cmd.provider = 'root'
|
|
|
|
response = self.apiclient.issueCertificate(cmd)
|
|
self.assertTrue(len(response.privatekey) > 0)
|
|
self.assertTrue(len(response.cacertificates) > 0)
|
|
self.assertTrue(len(response.certificate) > 0)
|
|
|
|
cert = x509.load_pem_x509_certificate(response.certificate.encode(), default_backend())
|
|
|
|
# Validate basic certificate attributes
|
|
self.assertEqual(cert.signature_hash_algorithm.name, 'sha256')
|
|
self.assertEqual(cert.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)[0].value, 'apache.org')
|
|
|
|
# Validate alternative names
|
|
altNames = cert.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
|
|
for domain in cmd.domain.split(','):
|
|
self.assertTrue(domain in altNames.value.get_values_for_type(x509.DNSName))
|
|
for address in cmd.ipaddress.split(','):
|
|
self.assertTrue(address in [str(x) for x in altNames.value.get_values_for_type(x509.IPAddress)])
|
|
|
|
# Validate certificate against CA public key
|
|
caCert = x509.load_pem_x509_certificate(self.getCaCertificate().encode(), default_backend())
|
|
self.verifySignature(caCert, cert)
|
|
|
|
|
|
@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
|
|
def test_issue_certificate_with_csr(self):
|
|
"""
|
|
Tests issuance of a certificate
|
|
"""
|
|
cmd = issueCertificate.issueCertificateCmd()
|
|
cmd.csr = "-----BEGIN CERTIFICATE REQUEST-----\nMIIBHjCByQIBADBkMQswCQYDVQQGEwJJTjELMAkGA1UECAwCSFIxETAPBgNVBAcM\nCEd1cnVncmFtMQ8wDQYDVQQKDAZBcGFjaGUxEzARBgNVBAsMCkNsb3VkU3RhY2sx\nDzANBgNVBAMMBnYtMS1WTTBcMA0GCSqGSIb3DQEBAQUAA0sAMEgCQQD46KFWKYrJ\nF43Y1oqWUfrl4mj4Qm05Bgsi6nuigZv7ufiAKK0nO4iJKdRa2hFMUvBi2/bU3IyY\nNvg7cdJsn4K9AgMBAAGgADANBgkqhkiG9w0BAQUFAANBAIta9glu/ZSjA/ncyXix\nyDOyAKmXXxsRIsdrEuIzakUuJS7C8IG0FjUbDyIaiwWQa5x+Lt4oMqCmpNqRzaGP\nfOo=\n-----END CERTIFICATE REQUEST-----"
|
|
cmd.provider = 'root'
|
|
|
|
response = self.apiclient.issueCertificate(cmd)
|
|
self.assertTrue(response.privatekey is None)
|
|
self.assertTrue(len(response.cacertificates) > 0)
|
|
self.assertTrue(len(response.certificate) > 0)
|
|
cert = x509.load_pem_x509_certificate(response.certificate.encode(), default_backend())
|
|
|
|
# Validate basic certificate attributes
|
|
self.assertEqual(cert.signature_hash_algorithm.name, 'sha256')
|
|
self.assertEqual(cert.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)[0].value, 'v-1-VM')
|
|
|
|
# Validate certificate against CA public key
|
|
caCert = x509.load_pem_x509_certificate(self.getCaCertificate().encode(), default_backend())
|
|
self.verifySignature(caCert, cert)
|
|
|
|
|
|
@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
|
|
def test_revoke_certificate(self):
|
|
"""
|
|
Tests certificate revocation
|
|
"""
|
|
cmd = revokeCertificate.revokeCertificateCmd()
|
|
cmd.serial = 'abc123' # hex value
|
|
cmd.cn = 'example.com'
|
|
cmd.provider = 'root'
|
|
serials = self.dbclient.execute(f"select serial, cn from crl where serial='{cmd.serial}'")
|
|
if len(serials) > 0:
|
|
self.dbclient.execute(f"delete from crl where serial='{cmd.serial}'")
|
|
|
|
response = self.apiclient.revokeCertificate(cmd)
|
|
self.assertTrue(response.success)
|
|
|
|
crl = self.dbclient.execute("select serial, cn from crl where serial='%s'" % cmd.serial)[0]
|
|
self.assertEqual(crl[0], cmd.serial)
|
|
self.assertEqual(crl[1], cmd.cn)
|
|
|
|
|
|
@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
|
|
def test_provision_certificate(self):
|
|
"""
|
|
Tests certificate provisioning
|
|
"""
|
|
hosts = self.getUpSystemVMHosts()
|
|
if not hosts or len(hosts) < 1:
|
|
raise self.skipTest("No Up systemvm hosts found, skipping test")
|
|
|
|
host = hosts[0]
|
|
|
|
cmd = provisionCertificate.provisionCertificateCmd()
|
|
cmd.hostid = host.id
|
|
cmd.reconnect = True
|
|
cmd.provider = 'root'
|
|
|
|
response = self.apiclient.provisionCertificate(cmd)
|
|
self.assertTrue(response.success)
|
|
|
|
if self.hypervisor.lower() == 'simulator':
|
|
hosts = self.getUpSystemVMHosts(host.id)
|
|
self.assertTrue(hosts is None or len(hosts) == 0)
|
|
else:
|
|
def checkHostIsUp(hostId):
|
|
hosts = self.getUpSystemVMHosts(host.id)
|
|
return (hosts is not None), hosts
|
|
result, hosts = wait_until(1, 30, checkHostIsUp, host.id)
|
|
if result:
|
|
self.assertTrue(len(hosts) == 1)
|
|
else:
|
|
self.fail("Failed to have systemvm host in Up state after cert provisioning")
|