Enable KVM HA on nfs storage

This commit is contained in:
edison 2010-11-09 22:01:04 -08:00
parent 0b4c865b36
commit 4bc63e5c32
6 changed files with 721 additions and 1 deletions

View File

@ -0,0 +1,198 @@
package com.cloud.agent.resource.computing;
import java.io.File;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.libvirt.Connect;
import org.libvirt.LibvirtException;
import org.libvirt.StoragePool;
import org.libvirt.StoragePoolInfo;
import org.libvirt.StoragePoolInfo.StoragePoolState;
import com.cloud.utils.script.OutputInterpreter;
import com.cloud.utils.script.OutputInterpreter.AllLinesParser;
import com.cloud.utils.script.Script;
public class KVMHABase {
protected Connect _libvirtConnection;
private long _timeout = 60000; /*1 minutes*/
protected static String _heartBeatPath;
protected long _heartBeatUpdateTimeout = 5000; /*5 sec*/
protected long _heartBeatUpdateFreq = 60000;
protected long _heartBeatUpdateMaxRetry = 3;
public static enum PoolType {
PrimaryStorage,
SecondaryStorage
}
public static class NfsStoragePool {
String _poolUUID;
String _poolIp;
String _poolMountSourcePath;
String _mountDestPath;
PoolType _type;
public NfsStoragePool(String poolUUID, String poolIp, String poolSourcePath, String mountDestPath, PoolType type) {
this._poolUUID = poolUUID;
this._poolIp = poolIp;
this._poolMountSourcePath = poolSourcePath;
this._mountDestPath = mountDestPath;
this._type = type;
}
}
protected String checkingMountPoint(NfsStoragePool pool, String poolName) {
String mountSource = pool._poolIp + ":" + pool._poolMountSourcePath;
String mountPaths = Script.runSimpleBashScript("cat /proc/mounts | grep " + mountSource);
String destPath = pool._mountDestPath;
if (mountPaths != null) {
String token[] = mountPaths.split(" ");
String mountType = token[2];
String mountDestPath = token[1];
if (mountType.equalsIgnoreCase("nfs")) {
if (poolName != null && !mountDestPath.startsWith(destPath)) {
/*we need to mount it under poolName*/
Script mount = new Script("/bin/bash", 60000);
mount.add("-c");
mount.add("mount " + mountSource + " " + destPath);
String result = mount.execute();
if (result != null) {
destPath = null;
}
destroyVMs(destPath);
} else if (poolName == null) {
destPath = mountDestPath;
}
}
} else {
/*Can't find the mount point?*/
/*we need to mount it under poolName*/
if (poolName != null) {
Script mount = new Script("/bin/bash", 60000);
mount.add("-c");
mount.add("mount " + mountSource + " " + destPath);
String result = mount.execute();
if (result != null) {
destPath = null;
}
destroyVMs(destPath);
}
}
return destPath;
}
protected String getMountPoint(NfsStoragePool storagePool) {
StoragePool pool = null;
String poolName = null;
try {
pool = _libvirtConnection.storagePoolLookupByUUIDString(storagePool._poolUUID);
if (pool != null) {
StoragePoolInfo spi = pool.getInfo();
if (spi.state != StoragePoolState.VIR_STORAGE_POOL_RUNNING) {
pool.create(0);
} else {
/*Sometimes, the mount point is lost, even libvirt thinks the storage pool still running*/
}
}
poolName = pool.getName();
} catch (LibvirtException e) {
} finally {
try {
if (pool != null) {
pool.free();
}
} catch (LibvirtException e) {
}
}
return checkingMountPoint(storagePool, poolName);
}
protected void destroyVMs(String mountPath) {
/*if there are VMs using disks under this mount path, destroy them*/
Script cmd = new Script("/bin/bash", _timeout);
cmd.add("-c");
cmd.add("ps axu|grep qemu|grep " + mountPath + "* |awk '{print $2}'");
AllLinesParser parser = new OutputInterpreter.AllLinesParser();
String result = cmd.execute(parser);
if (result != null) {
return;
}
String pids[] = parser.getLines().split("\n");
for (String pid : pids) {
Script.runSimpleBashScript("kill -9 " + pid);
}
}
protected String getHBFile(String mountPoint, String hostIP) {
return mountPoint + File.separator + "KVMHA" + File.separator + "hb-" + hostIP;
}
protected String getHBFolder(String mountPoint) {
return mountPoint + File.separator + "KVMHA" + File.separator;
}
protected String runScriptRetry(String cmdString, OutputInterpreter interpreter) {
String result = null;
for (int i = 0; i < 3; i++) {
Script cmd = new Script("/bin/bash", _timeout);
cmd.add("-c");
cmd.add(cmdString);
if (interpreter != null)
result = cmd.execute(interpreter);
else {
result = cmd.execute();
}
if (result == Script.ERR_TIMEOUT) {
continue;
} else if (result == null) {
break;
}
}
return result;
}
public static void main(String[] args) {
NfsStoragePool pool = new KVMHAMonitor.NfsStoragePool(null,null,null,null, PoolType.PrimaryStorage);
KVMHAMonitor haWritter = new KVMHAMonitor(pool, null, "192.168.1.163", null);
Thread ha = new Thread(haWritter);
ha.start();
KVMHAChecker haChecker = new KVMHAChecker(haWritter.getStoragePools(), null, "192.168.1.163");
ExecutorService exe = Executors.newFixedThreadPool(1);
Future<Boolean> future = exe.submit((Callable<Boolean>)haChecker);
try {
for (int i = 0; i < 10; i++) {
System.out.println(future.get());
future = exe.submit((Callable<Boolean>)haChecker);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,65 @@
package com.cloud.agent.resource.computing;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.log4j.Logger;
import org.libvirt.Connect;
import com.cloud.utils.script.OutputInterpreter;
import com.cloud.utils.script.Script;
public class KVMHAChecker extends KVMHABase implements Callable<Boolean> {
private static final Logger s_logger = Logger.getLogger(KVMHAChecker.class);
private List<NfsStoragePool> _pools;
private String _hostIP;
private long _heartBeatCheckerTimeout = 300000; /*5 minutes*/
public KVMHAChecker(List<NfsStoragePool> pools, Connect conn, String host) {
this._pools = pools;
this._libvirtConnection = conn;
this._hostIP = host;
}
/*True means heartbeaing is on going, or we can't get it's status. False means heartbeating is stopped definitely */
private Boolean checkingHB() {
List<Boolean> results = new ArrayList<Boolean>();
for (NfsStoragePool pool : _pools) {
Script cmd = new Script(_heartBeatPath, _heartBeatCheckerTimeout, s_logger);
cmd.add("-i", pool._poolIp);
cmd.add("-p", pool._poolMountSourcePath);
cmd.add("-m", pool._mountDestPath);
cmd.add("-h", _hostIP);
cmd.add("-r");
cmd.add("-t", String.valueOf(_heartBeatUpdateFreq/1000 * 2));
OutputInterpreter.OneLineParser parser = new OutputInterpreter.OneLineParser();
String result = cmd.execute(parser);
s_logger.debug("pool: " + pool._poolIp);
s_logger.debug("reture: " + result);
s_logger.debug("parser: " + parser.getLine());
if (result == null && parser.getLine().contains("> DEAD <")) {
s_logger.debug("read heartbeat failed: " + result);
results.add(false);
} else {
results.add(true);
}
}
for (Boolean r : results) {
if (r) {
return true;
}
}
return false;
}
@Override
public Boolean call() throws Exception {
//s_logger.addAppender(new org.apache.log4j.ConsoleAppender(new org.apache.log4j.PatternLayout(), "System.out"));
return checkingHB();
}
}

View File

@ -0,0 +1,89 @@
package com.cloud.agent.resource.computing;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.libvirt.Connect;
import com.cloud.utils.script.Script;
public class KVMHAMonitor extends KVMHABase implements Runnable{
private static final Logger s_logger = Logger.getLogger(KVMHAMonitor.class);
private Map<String, NfsStoragePool> _storagePool = new HashMap<String, NfsStoragePool>();
private String _hostIP; /*private ip address*/
public KVMHAMonitor(NfsStoragePool pool, Connect conn, String host, String scriptPath) {
if (pool != null) {
this._storagePool.put(pool._poolUUID, pool);
}
this._libvirtConnection = conn;
this._hostIP = host;
this._heartBeatPath = scriptPath;
}
public void addStoragePool(NfsStoragePool pool) {
synchronized (_storagePool) {
this._storagePool.put(pool._poolUUID, pool);
}
}
public void removeStoragePool(NfsStoragePool pool) {
synchronized (_storagePool) {
this._storagePool.remove(pool._poolUUID);
}
}
public List<NfsStoragePool> getStoragePools() {
synchronized (_storagePool) {
return new ArrayList<NfsStoragePool>(_storagePool.values());
}
}
private class Monitor implements Runnable {
@Override
public void run() {
synchronized (_storagePool) {
for (NfsStoragePool primaryStoragePool : _storagePool.values()) {
Script cmd = new Script(_heartBeatPath, _heartBeatUpdateTimeout, s_logger);
cmd.add("-i", primaryStoragePool._poolIp);
cmd.add("-p", primaryStoragePool._poolMountSourcePath);
cmd.add("-m", primaryStoragePool._mountDestPath);
cmd.add("-h", _hostIP);
String result = cmd.execute();
if (result != null) {
s_logger.debug("write heartbeat failed: " + result);
}
}
}
}
}
@Override
public void run() {
//s_logger.addAppender(new org.apache.log4j.ConsoleAppender(new org.apache.log4j.PatternLayout(), "System.out"));
while (true) {
Thread monitorThread = new Thread(new Monitor());
monitorThread.start();
try {
monitorThread.join();
} catch (InterruptedException e) {
}
try {
Thread.sleep(_heartBeatUpdateFreq);
} catch (InterruptedException e) {
}
}
}
}

View File

@ -46,6 +46,9 @@ import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.ejb.Local;
import javax.naming.ConfigurationException;
@ -85,6 +88,8 @@ import com.cloud.agent.api.DeleteSnapshotBackupAnswer;
import com.cloud.agent.api.DeleteSnapshotBackupCommand;
import com.cloud.agent.api.DeleteSnapshotsDirCommand;
import com.cloud.agent.api.DeleteStoragePoolCommand;
import com.cloud.agent.api.FenceAnswer;
import com.cloud.agent.api.FenceCommand;
import com.cloud.agent.api.GetHostStatsAnswer;
import com.cloud.agent.api.GetHostStatsCommand;
import com.cloud.agent.api.GetStorageStatsAnswer;
@ -142,6 +147,8 @@ import com.cloud.agent.api.storage.DownloadAnswer;
import com.cloud.agent.api.storage.PrimaryStorageDownloadCommand;
import com.cloud.agent.api.to.StorageFilerTO;
import com.cloud.agent.api.to.VolumeTO;
import com.cloud.agent.resource.computing.KVMHABase.NfsStoragePool;
import com.cloud.agent.resource.computing.KVMHABase.PoolType;
import com.cloud.agent.resource.computing.LibvirtStoragePoolDef.poolType;
import com.cloud.agent.resource.computing.LibvirtStorageVolumeDef.volFormat;
import com.cloud.agent.resource.computing.LibvirtVMDef.consoleDef;
@ -191,6 +198,7 @@ import com.cloud.vm.SecondaryStorageVmVO;
import com.cloud.vm.State;
import com.cloud.vm.VirtualMachineName;
/**
* LibvirtComputingResource execute requests on the computing/routing host using the libvirt API
*
@ -222,11 +230,14 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
private String _createvmPath;
private String _manageSnapshotPath;
private String _createTmplPath;
private String _heartBeatPath;
private String _host;
private String _dcId;
private String _pod;
private String _clusterId;
private String _premium;
private long _hvVersion;
private KVMHAMonitor _monitor;
private final String _SSHKEYSPATH = "/root/.ssh";
private final String _SSHPRVKEYPATH = _SSHKEYSPATH + File.separator + "id_rsa.cloud";
private final String _SSHPUBKEYPATH = _SSHKEYSPATH + File.separator + "id_rsa.pub.cloud";
@ -595,6 +606,11 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
throw new ConfigurationException("Unable to find rundomrpre.sh");
}
_heartBeatPath = Script.findScript(kvmScriptsDir, "kvmheartbeat.sh");
if (_heartBeatPath == null) {
throw new ConfigurationException("Unable to find kvmheartbeat.sh");
}
_createvmPath = Script.findScript(storageScriptsDir, "createvm.sh");
if (_createvmPath == null) {
throw new ConfigurationException("Unable to find the createvm.sh");
@ -717,6 +733,18 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
_hvVersion = (_hvVersion % 1000000) / 1000;
} catch (LibvirtException e) {
}
_premium = (String)params.get("premium");
if (_premium == null) {
_premium = "false";
}
if (_premium.equalsIgnoreCase("true")) {
String[] info = NetUtils.getNetworkParams(_privateNic);
_monitor = new KVMHAMonitor(null, _conn, info[0], _heartBeatPath);
Thread ha = new Thread(_monitor);
ha.start();
}
try {
@ -1204,6 +1232,8 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
return execute((NetworkIngressRulesCmd) cmd);
} else if (cmd instanceof DeleteStoragePoolCommand) {
return execute((DeleteStoragePoolCommand) cmd);
} else if (cmd instanceof FenceCommand ) {
return execute((FenceCommand) cmd);
} else if (cmd instanceof RoutingCommand) {
return _virtRouterResource.executeRequest(cmd);
} else {
@ -1221,12 +1251,44 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
StoragePool pool = _conn.storagePoolLookupByUUIDString(cmd.getPool().getUuid());
pool.destroy();
pool.undefine();
if (_premium.equalsIgnoreCase("true")) {
KVMHABase.NfsStoragePool sp = new KVMHABase.NfsStoragePool(cmd.getPool().getUuid(),
cmd.getPool().getHostAddress(),
cmd.getPool().getPath(),
_mountPoint + File.separator + cmd.getPool().getUuid(),
PoolType.PrimaryStorage);
_monitor.removeStoragePool(sp);
}
return new Answer(cmd);
} catch (LibvirtException e) {
return new Answer(cmd, false, e.toString());
}
}
protected FenceAnswer execute(FenceCommand cmd) {
ExecutorService executors = Executors.newSingleThreadExecutor();
List<NfsStoragePool> pools = _monitor.getStoragePools();
KVMHAChecker ha = new KVMHAChecker(pools, _conn, cmd.getHostIp());
Future<Boolean> future = executors.submit(ha);
try {
Boolean result = future.get();
if (result) {
return new FenceAnswer(cmd, false, "Heart is still beating...");
} else {
return new FenceAnswer(cmd);
}
} catch (InterruptedException e) {
s_logger.warn("Unable to fence", e);
return new FenceAnswer(cmd, false, e.getMessage());
} catch (ExecutionException e) {
s_logger.warn("Unable to fence", e);
return new FenceAnswer(cmd, false, e.getMessage());
}
}
protected Storage.StorageResourceType getStorageResourceType() {
return Storage.StorageResourceType.STORAGE_POOL;
}
@ -1841,6 +1903,14 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
spi.capacity,
spi.allocation,
tInfo);
if (_premium.equalsIgnoreCase("true")) {
KVMHABase.NfsStoragePool pool = new KVMHABase.NfsStoragePool(cmd.getPool().getUuid(),
cmd.getPool().getHostAddress(),
cmd.getPool().getPath(),
_mountPoint + File.separator + cmd.getPool().getUuid(),
PoolType.PrimaryStorage);
_monitor.addStoragePool(pool);
}
try {
storagePool.free();
} catch (LibvirtException e) {
@ -2097,8 +2167,10 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
if (computingHostIp != null) {
result = doPingTest(computingHostIp);
} else {
} else if (cmd.getRouterIp() != null && cmd.getPrivateIp() != null){
result = doPingTest(cmd.getRouterIp(), cmd.getPrivateIp());
} else {
return new Answer(cmd, false, "routerip and private ip is null");
}
if (result != null) {

View File

@ -0,0 +1,134 @@
help() {
printf "Usage: $0
-i nfs server ip
-p nfs server path
-m mount point
-h host
-r write/read hb log
-t interval between read hb log\n"
exit 1
}
#set -x
NfsSvrIP=
NfsSvrPath=
MountPoint=
HostIP=
interval=
rflag=0
while getopts 'i:p:m:h:t:r' OPTION
do
case $OPTION in
i)
NfsSvrIP="$OPTARG"
;;
p)
NfsSvrPath="$OPTARG"
;;
m)
MountPoint="$OPTARG"
;;
h)
HostIP="$OPTARG"
;;
r)
rflag=1
;;
t)
interval="$OPTARG"
;;
*)
help
;;
esac
done
if [ -z "$NfsSvrIP" ]
then
exit 1
fi
#delete VMs on this mountpoint
deleteVMs() {
local mountPoint=$1
vmPids=$(ps aux| grep qemu | grep $mountPoint* | awk '{print $2}' &> /dev/null)
if [ $? -gt 0 ]
then
return
fi
if [ -z "$vmPids" ]
then
return
fi
for pid in vmPids
do
kill -9 $pid &> /dev/null
done
}
#checking is there the same nfs server mounted under $MountPoint?
mounts=$(cat /proc/mounts |grep nfs|grep $MountPoint)
if [ $? -gt 0 ]
then
# remount it
mount $NfsSvrIP:$NfsSvrPath $MountPoint -o sync,soft,proto=tcp,acregmin=0,acregmax=0,acdirmin=0,acdirmax=0,noac,timeo=133,retrans=10 &> /dev/null
if [ $? -gt 0 ]
then
exit 1
fi
if [ "$rflag" == "0" ]
then
deleteVMs $MountPoint
fi
fi
hbFolder=$MountPoint/KVMHA/
hbFile=$hbFolder/hb-$HostIP
write_hbLog() {
#write the heart beat log
stat $hbFile &> /dev/null
if [ $? -gt 0 ]
then
# create a new one
mkdir -p $hbFolder &> /dev/null
touch $hbFile &> /dev/null
if [ $? -gt 0 ]
then
return 2
fi
fi
timestamp=$(date +%s)
echo $timestamp > $hbFile
return $?
}
check_hbLog() {
oldTimeStamp=$(cat $hbFile)
sleep $interval &> /dev/null
newTimeStamp=$(cat $hbFile)
if [ $newTimeStamp -gt $oldTimeStamp ]
then
return 0
fi
return 1
}
if [ "$rflag" == "1" ]
then
check_hbLog
if [ $? == 0 ]
then
echo "=====> ALIVE <====="
else
echo "=====> DEAD <======"
fi
exit 0
else
write_hbLog
exit $?
fi

View File

@ -0,0 +1,162 @@
#!/usr/bin/env bash
# $Id: pingtest.sh 9132 2010-06-04 20:17:43Z manuel $ $HeadURL: svn://svn.lab.vmops.com/repos/vmdev/java/scripts/vm/pingtest.sh $
# pingtest.sh -- ping
#
#
usage() {
printf "Usage:\n %s -i <domR eth1 ip> -p <private-ip-address> \n" $(basename $0) >&2
printf " %s -h <computing-agent-host-ip> \n" $(basename $0) >&2
printf " %s -g \n" $(basename $0) >&2
}
# check if gateway domain is up and running
check_gw() {
ping -c 1 -n -q $1 > /dev/null
if [ $? -gt 0 ]
then
sleep 1
ping -c 1 -n -q $1 > /dev/null
fi
return $?;
}
# ping the vm's private IP from the domR
ping_vm() {
local routerIp=$1
local vmIp=$2
ssh -o StrictHostKeyChecking=no -p 3922 -i ./id_rsa root@$routerIp "ping -c 1 -n -q $vmIp"
# if return code of ping is > 0, the ping failed, return a result
if [ $? -gt 0 ]
then
arping_vm $routerIp $vmIp
return $?
fi
return $?;
}
arping_vm() {
local routerIp=$1
local vmIp=$2
ssh -o StrictHostKeyChecking=no -p 3922 -i ./id_rsa root@$routerIp "arping -c 1 -q $vmIp"
# if return code of ping is > 0, the ping failed, return a result
if [ $? -gt 0 ]
then
return 1
fi
return $?;
}
# ping the default route
ping_default_route() {
defaultRoute=`ip route|grep default| awk '{ print $3 }'`
if [ $? -gt 0 ]
then
return $?
fi
if [ -z "$defaultRoute" ]
then
return 1
fi
ping -c 1 -n -q $defaultRoute > /dev/null
return $?
}
# ping the computing host
ping_host() {
ping -c 1 -n -q $1 > /dev/null
if [ $? -gt 0 ]
then
return 1
fi
return $?;
}
iflag=
pflag=
hflag=
gflag=
while getopts 'i:p:h:g' OPTION
do
case $OPTION in
i) iflag=1
domRIp="$OPTARG"
;;
p) pflag=1
privateIp="$OPTARG"
;;
h) hflag=1
hostIp="$OPTARG"
;;
g) gflag=1
;;
?) usage
exit 2
;;
esac
done
# make sure both domRIp and vm private ip are set
if [ "$iflag$hflag$gflag" != "1" ]
then
usage
exit 2
fi
if [ "$iflag" == "1" ]
then
if [ "$pflag" != "1" ]
then
usage
exit 3
fi
fi
if [ "$iflag" == "1" ]
then
# check if gateway domain is up and running
if ! check_gw "$domRIp"
then
printf "Unable to ping the routing domain, exiting\n" >&2
exit 4
fi
if ! ping_vm $domRIp $privateIp
then
printf "Unable to ping the vm, exiting\n" >&2
exit 5
fi
fi
if [ "$hflag" == "1" ]
then
if ! ping_host "$hostIp"
then
# first ping default route to make sure we can get out successfully before returning error
if ! ping_default_route
then
printf "Unable to ping default route, exiting\n" >&2
exit 7
fi
printf "Unable to ping computing host, exiting\n" >&2
exit 6
fi
fi
if [ "$gflag" == "1" ]
then
if ! ping_default_route
then
printf "Unable to ping default route\n" >&2
exit 8
fi
fi
exit 0