cloudstack/test/integration/smoke/test_certauthority_root.py
Wei Zhou 55c8138a1a
test: fix test_certauthority_root.py (#10762)
it does not work with python3
```
2025-04-18T10:43:58.5235913Z 2025-04-18 10:32:20,503 - CRITICAL - EXCEPTION: Failure:: ['Traceback (most recent call last):\n', '  File "/opt/hostedtoolcache/Python/3.10.17/x64/lib/python3.10/unittest/case.py", line 59, in testPartExecutor\n    yield\n', '  File "/opt/hostedtoolcache/Python/3.10.17/x64/lib/python3.10/unittest/case.py", line 591, in run\n    self._callTestMethod(testMethod)\n', '  File "/opt/hostedtoolcache/Python/3.10.17/x64/lib/python3.10/unittest/case.py", line 549, in _callTestMethod\n    method()\n', '  File "/home/runner/.local/lib/python3.10/site-packages/nose/failure.py", line 35, in runTest\n    raise self.exc_val.with_traceback(self.tb)\n', '  File "/home/runner/.local/lib/python3.10/site-packages/nose/loader.py", line 335, in loadTestsFromName\n    module = self.importer.importFromPath(\n', '  File "/home/runner/.local/lib/python3.10/site-packages/nose/importer.py", line 162, in importFromPath\n    return self.importFromDir(dir_path, fqname)\n', '  File "/home/runner/.local/lib/python3.10/site-packages/nose/importer.py", line 198, in importFromDir\n    mod = load_module(part_fqname, fh, filename, desc)\n', '  File "/home/runner/.local/lib/python3.10/site-packages/nose/importer.py", line 128, in load_module\n    spec.loader.exec_module(mod)\n', '  File "<frozen importlib._bootstrap_external>", line 883, in exec_module\n', '  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed\n', '  File "/home/runner/work/cloudstack/cloudstack/test/integration/smoke/test_certauthority_root.py", line 27, in <module>\n    from OpenSSL.crypto import FILETYPE_PEM, verify, X509\n', "ImportError: cannot import name 'verify' from 'OpenSSL.crypto' (unknown location)\n"]
```
2025-04-24 14:13:20 +05:30

227 lines
9.0 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from nose.plugins.attrib import attr
from marvin.cloudstackTestCase import cloudstackTestCase
from marvin.lib.utils import cleanup_resources
from marvin.lib.base import *
from marvin.lib.common import list_hosts
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
class TestCARootProvider(cloudstackTestCase):
@classmethod
def setUpClass(cls):
testClient = super(TestCARootProvider, cls).getClsTestClient()
cls.apiclient = testClient.getApiClient()
cls.services = testClient.getParsedTestDataConfig()
cls.hypervisor = cls.testClient.getHypervisorInfo()
cls.cleanup = []
@classmethod
def tearDownClass(cls):
try:
cleanup_resources(cls.apiclient, cls.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
def verifySignature(self, caCert, cert):
print("Verifying Certificate")
caPublicKey = caCert.public_key()
try:
caPublicKey.verify(
cert.signature,
cert.tbs_certificate_bytes,
padding.PKCS1v15(),
cert.signature_hash_algorithm,
)
print("Certificate is valid!")
except Exception as e:
print(f"Certificate verification failed: {e}")
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
def tearDown(self):
try:
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
def getUpSystemVMHosts(self, hostId=None):
hosts = list_hosts(
self.apiclient,
type='SecondaryStorageVM',
state='Up',
resourcestate='Enabled',
id=hostId
)
return hosts
@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
def test_list_ca_providers(self):
"""
Tests default ca providers list
"""
cmd = listCAProviders.listCAProvidersCmd()
response = self.apiclient.listCAProviders(cmd)
self.assertEqual(len(response), 1)
self.assertEqual(response[0].name, 'root')
def getCaCertificate(self):
cmd = listCaCertificate.listCaCertificateCmd()
cmd.provider = 'root'
response = self.apiclient.listCaCertificate(cmd)
return response.cacertificates.certificate
@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
def test_list_ca_certificate(self):
"""
Tests the ca certificate
"""
certificate = self.getCaCertificate()
self.assertTrue(len(certificate) > 0)
cert = x509.load_pem_x509_certificate(certificate.encode(), default_backend())
self.assertEqual(cert.signature_hash_algorithm.name, 'sha256')
self.assertEqual(cert.issuer.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)[0].value, 'ca.cloudstack.apache.org')
@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
def test_issue_certificate_without_csr(self):
"""
Tests issuance of a certificate
"""
cmd = issueCertificate.issueCertificateCmd()
cmd.domain = 'apache.org,cloudstack.apache.org'
cmd.ipaddress = '10.1.1.1,10.2.2.2'
cmd.provider = 'root'
response = self.apiclient.issueCertificate(cmd)
self.assertTrue(len(response.privatekey) > 0)
self.assertTrue(len(response.cacertificates) > 0)
self.assertTrue(len(response.certificate) > 0)
cert = x509.load_pem_x509_certificate(response.certificate.encode(), default_backend())
# Validate basic certificate attributes
self.assertEqual(cert.signature_hash_algorithm.name, 'sha256')
self.assertEqual(cert.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)[0].value, 'apache.org')
# Validate alternative names
altNames = cert.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
for domain in cmd.domain.split(','):
self.assertTrue(domain in altNames.value.get_values_for_type(x509.DNSName))
for address in cmd.ipaddress.split(','):
self.assertTrue(address in [str(x) for x in altNames.value.get_values_for_type(x509.IPAddress)])
# Validate certificate against CA public key
caCert = x509.load_pem_x509_certificate(self.getCaCertificate().encode(), default_backend())
self.verifySignature(caCert, cert)
@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
def test_issue_certificate_with_csr(self):
"""
Tests issuance of a certificate
"""
cmd = issueCertificate.issueCertificateCmd()
cmd.csr = "-----BEGIN CERTIFICATE REQUEST-----\nMIIBHjCByQIBADBkMQswCQYDVQQGEwJJTjELMAkGA1UECAwCSFIxETAPBgNVBAcM\nCEd1cnVncmFtMQ8wDQYDVQQKDAZBcGFjaGUxEzARBgNVBAsMCkNsb3VkU3RhY2sx\nDzANBgNVBAMMBnYtMS1WTTBcMA0GCSqGSIb3DQEBAQUAA0sAMEgCQQD46KFWKYrJ\nF43Y1oqWUfrl4mj4Qm05Bgsi6nuigZv7ufiAKK0nO4iJKdRa2hFMUvBi2/bU3IyY\nNvg7cdJsn4K9AgMBAAGgADANBgkqhkiG9w0BAQUFAANBAIta9glu/ZSjA/ncyXix\nyDOyAKmXXxsRIsdrEuIzakUuJS7C8IG0FjUbDyIaiwWQa5x+Lt4oMqCmpNqRzaGP\nfOo=\n-----END CERTIFICATE REQUEST-----"
cmd.provider = 'root'
response = self.apiclient.issueCertificate(cmd)
self.assertTrue(response.privatekey is None)
self.assertTrue(len(response.cacertificates) > 0)
self.assertTrue(len(response.certificate) > 0)
cert = x509.load_pem_x509_certificate(response.certificate.encode(), default_backend())
# Validate basic certificate attributes
self.assertEqual(cert.signature_hash_algorithm.name, 'sha256')
self.assertEqual(cert.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)[0].value, 'v-1-VM')
# Validate certificate against CA public key
caCert = x509.load_pem_x509_certificate(self.getCaCertificate().encode(), default_backend())
self.verifySignature(caCert, cert)
@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
def test_revoke_certificate(self):
"""
Tests certificate revocation
"""
cmd = revokeCertificate.revokeCertificateCmd()
cmd.serial = 'abc123' # hex value
cmd.cn = 'example.com'
cmd.provider = 'root'
serials = self.dbclient.execute(f"select serial, cn from crl where serial='{cmd.serial}'")
if len(serials) > 0:
self.dbclient.execute(f"delete from crl where serial='{cmd.serial}'")
response = self.apiclient.revokeCertificate(cmd)
self.assertTrue(response.success)
crl = self.dbclient.execute("select serial, cn from crl where serial='%s'" % cmd.serial)[0]
self.assertEqual(crl[0], cmd.serial)
self.assertEqual(crl[1], cmd.cn)
@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
def test_provision_certificate(self):
"""
Tests certificate provisioning
"""
hosts = self.getUpSystemVMHosts()
if not hosts or len(hosts) < 1:
raise self.skipTest("No Up systemvm hosts found, skipping test")
host = hosts[0]
cmd = provisionCertificate.provisionCertificateCmd()
cmd.hostid = host.id
cmd.reconnect = True
cmd.provider = 'root'
response = self.apiclient.provisionCertificate(cmd)
self.assertTrue(response.success)
if self.hypervisor.lower() == 'simulator':
hosts = self.getUpSystemVMHosts(host.id)
self.assertTrue(hosts is None or len(hosts) == 0)
else:
def checkHostIsUp(hostId):
hosts = self.getUpSystemVMHosts(host.id)
return (hosts is not None), hosts
result, hosts = wait_until(1, 30, checkHostIsUp, host.id)
if result:
self.assertTrue(len(hosts) == 1)
else:
self.fail("Failed to have systemvm host in Up state after cert provisioning")