# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import with_statement # install subprocess.check_output for 2.4 =< python < 2.7 try: from subprocess import check_output except (NameError, ImportError): import subprocess def check_output(*popenargs, **kwargs): r"""Run command with arguments and return its output as a byte string. Backported from Python 2.7 as it's implemented as pure python on stdlib. >>> check_output(['/usr/bin/python', '--version']) Python 2.6.2 """ process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs) output, unused_err = process.communicate() retcode = process.poll() if retcode: cmd = kwargs.get("args") if cmd is None: cmd = popenargs[0] error = subprocess.CalledProcessError(retcode, cmd) error.output = output raise error return output subprocess.check_output = check_output import logging logging.getLogger('paramiko.transport').setLevel(logging.ERROR) from vagrant import Vagrant from unittest import TestCase from paramiko.config import SSHConfig from paramiko.client import SSHClient, AutoAddPolicy from fabric import state from fabric.api import env from fabric.api import run, hide from envassert import file, detect from StringIO import StringIO from nose.plugins.attrib import attr import os.path import sys _defaultVagrantDir = os.path.abspath(os.path.join( os.path.basename(__file__), '..', '..', '..', 'tools', 'vagrant', 'systemvm')) class SystemVM(object): def __init__(self, host='default', vagrantDir=None, controlVagrant=True): global _defaultVagrantDir self.host = host self._controlVagrant = controlVagrant if vagrantDir is None: vagrantDir = _defaultVagrantDir self._vagrant = Vagrant(root=vagrantDir) self._startedVagrant = False self._sshClient = None self._sshConfigStr = None self._sshConfig = None self._sshHostConfig = None def maybeUp(self): if not self._controlVagrant: return state = self._vagrant.status(vm_name=self.host)[0].state if state == Vagrant.NOT_CREATED: self._vagrant.up(vm_name=self.host) self._startedVagrant = True elif state in [Vagrant.POWEROFF, Vagrant.SAVED, Vagrant.ABORTED]: raise Exception( "SystemVM testing does not support resume(), do not use vagrant suspend/halt") elif state == Vagrant.RUNNING: self._startedVagrant = False else: raise Exception("Unrecognized vagrant state %s" % state) def maybeDestroy(self): if not self._controlVagrant or not self._startedVagrant: return self._vagrant.destroy(vm_name=self.host) if self._sshClient is not None: self._sshClient.close() def loadSshConfig(self): if self._sshConfig is None: self._sshConfigStr = self._vagrant.ssh_config(vm_name=self.host) configObj = StringIO(self._sshConfigStr) self._sshConfig = SSHConfig() # noinspection PyTypeChecker self._sshConfig.parse(configObj) self._sshHostConfig = self._sshConfig.lookup(self.host) @property def sshConfig(self): if self._sshConfig is None: self.loadSshConfig() return self._sshConfig @property def sshConfigStr(self): if self._sshConfigStr is None: self.loadSshConfig() return self._sshConfigStr @property def sshClient(self): if self._sshClient is None: self.loadSshConfig() self._sshClient = SSHClient() self._sshClient.set_missing_host_key_policy(AutoAddPolicy()) self._sshClient.connect(self.hostname, self.sshPort, self.sshUser, key_filename=self.sshKey, timeout=10) return self._sshClient @property def hostname(self): return self._sshHostConfig.get('hostname', self.host) @property def sshPort(self): return int(self._sshHostConfig.get('port', 22)) @property def sshUser(self): return self._sshHostConfig.get('user', 'root') @property def sshKey(self): return self._sshHostConfig.get('identityfile', '~/.ssh/id_rsa') class SystemVMTestCase(TestCase): @classmethod def setUpClass(cls): cls.systemvm = SystemVM() cls.systemvm.maybeUp() @classmethod def tearDownClass(cls): # noinspection PyUnresolvedReferences cls.systemvm.maybeDestroy() def setUp(self): self.sshClient = self.systemvm.sshClient # self._env_host_string_orig = env.host_string env.host_string = "%s:%s" % (self.systemvm.hostname, self.systemvm.sshPort) env.user = self.systemvm.sshUser env.port = self.systemvm.sshPort env.key_filename = self.systemvm.sshKey env.use_ssh_config = True env.abort_on_prompts = True env.command_timeout = 10 env.timeout = 5 env.disable_known_hosts = True env.platform_family = detect.detect() state.output.running = False state.output.status = False state.output.stdout = False state.output.stderr = False state.output.warnings = False # this could break down when executing multiple test cases in parallel in the same python process # def tearDown(self): # env.host_string = self._env_host_string_orig def has_line(location, line, ctx=3): with hide("everything"): text = run('cat "%s"' % location) text_len = len(text) pos = text.find(line) if pos < 0: return False, '' start = end = pos newlines = 0 while start > 0: if text[start] == '\n': newlines += 1 if newlines > ctx: break start -= 1 newlines = 0 while end < text_len: if text[end] == '\n': newlines += 1 if newlines > ctx: break end += 1 context = '...\n' + text[start:end].strip() + '\n...' return True, context def print_doc(name, data, target=None): if target is None: target = sys.stdout print >>target, " ", "-" * 4, name, "-" * max(68-4-2-len(name), 0) for line in data.split('\n'): print >>target, " ", line print >>target, " ", "-" * 68