This commit is contained in:
Frank 2011-01-06 20:16:51 -08:00
parent c94b6acadf
commit 9d2916ffcc
23 changed files with 814 additions and 129 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
build/replace.properties
build/build.number build/build.number
bin/ bin/
cloudstack-proprietary/ cloudstack-proprietary/

View File

@ -78,7 +78,6 @@ import com.cloud.utils.script.Script;
**/ **/
public class Agent implements HandlerFactory, IAgentControl { public class Agent implements HandlerFactory, IAgentControl {
private static final Logger s_logger = Logger.getLogger(Agent.class.getName()); private static final Logger s_logger = Logger.getLogger(Agent.class.getName());
public enum ExitStatus { public enum ExitStatus {
Normal(0), // Normal status = 0. Normal(0), // Normal status = 0.
Upgrade(65), // Exiting for upgrade. Upgrade(65), // Exiting for upgrade.

View File

@ -0,0 +1,41 @@
package com.cloud.network.ovs;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
public class OvsCreateGreTunnelAnswer extends Answer {
String hostIp;
String remoteIp;
String bridge;
String key;
public OvsCreateGreTunnelAnswer(Command cmd, boolean success, String details) {
super(cmd, success, details);
}
public OvsCreateGreTunnelAnswer(Command cmd, boolean success,
String details, String hostIp, String remoteIp, String bridge,
String key) {
super(cmd, success, details);
this.hostIp = hostIp;
this.remoteIp = remoteIp;
this.bridge = bridge;
this.key = key;
}
public String getHostIp() {
return hostIp;
}
public String getRemoteIp() {
return remoteIp;
}
public String getBridge() {
return bridge;
}
public String getKey() {
return key;
}
}

View File

@ -0,0 +1,20 @@
package com.cloud.network.ovs;
import com.cloud.agent.api.Command;
public class OvsDeleteFlowCommand extends Command {
String vmName;
@Override
public boolean executeInSequence() {
return true;
}
public String getVmName() {
return vmName;
}
public OvsDeleteFlowCommand(String vmName) {
this.vmName = vmName;
}
}

View File

@ -0,0 +1,24 @@
package com.cloud.network.ovs;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
public class OvsSetTagAndFlowAnswer extends Answer {
Long vmId;
Long seqno;
public OvsSetTagAndFlowAnswer(Command cmd, boolean success, String details) {
super(cmd, success, details);
OvsSetTagAndFlowCommand c = (OvsSetTagAndFlowCommand)cmd;
this.vmId = c.getVmId();
this.seqno = Long.parseLong(c.getSeqNo());
}
public Long getVmId() {
return vmId;
}
public Long getSeqNo() {
return seqno;
}
}

View File

@ -5,12 +5,18 @@ import com.cloud.agent.api.Command;
public class OvsSetTagAndFlowCommand extends Command { public class OvsSetTagAndFlowCommand extends Command {
String vlans; String vlans;
String vmName; String vmName;
String seqno;
Long vmId;
@Override @Override
public boolean executeInSequence() { public boolean executeInSequence() {
return true; return true;
} }
public String getSeqNo() {
return seqno;
}
public String getVlans() { public String getVlans() {
return vlans; return vlans;
} }
@ -19,8 +25,14 @@ public class OvsSetTagAndFlowCommand extends Command {
return vmName; return vmName;
} }
public OvsSetTagAndFlowCommand(String vmName, String vlans) { public Long getVmId() {
return vmId;
}
public OvsSetTagAndFlowCommand(String vmName, String vlans, String seqno, Long vmId) {
this.vmName = vmName; this.vmName = vmName;
this.vlans = vlans; this.vlans = vlans;
this.seqno = seqno;
this.vmId = vmId;
} }
} }

View File

@ -6,3 +6,4 @@ DBHOST=localhost
AGENTLOGDIR=logs AGENTLOGDIR=logs
AGENTLOG=logs/agent.log AGENTLOG=logs/agent.log
MSMNTDIR=/mnt MSMNTDIR=/mnt
DBROOTPW=

View File

@ -0,0 +1,26 @@
package com.cloud.agent.api;
import java.util.List;
import java.util.Map;
import com.cloud.host.Host;
import com.cloud.utils.Pair;
import com.cloud.vm.State;
public class PingRoutingWithOvsCommand extends PingRoutingCommand {
List<Pair<String, Long>> states;
protected PingRoutingWithOvsCommand() {
super();
}
public PingRoutingWithOvsCommand(Host.Type type, long id,
Map<String, State> states, List<Pair<String, Long>> ovsStates) {
super(type, id, states);
this.states = ovsStates;
}
public List<Pair<String, Long>> getStates() {
return states;
}
}

View File

@ -94,6 +94,7 @@ import com.cloud.agent.api.ModifyStoragePoolCommand;
import com.cloud.agent.api.PingCommand; import com.cloud.agent.api.PingCommand;
import com.cloud.agent.api.PingRoutingCommand; import com.cloud.agent.api.PingRoutingCommand;
import com.cloud.agent.api.PingRoutingWithNwGroupsCommand; import com.cloud.agent.api.PingRoutingWithNwGroupsCommand;
import com.cloud.agent.api.PingRoutingWithOvsCommand;
import com.cloud.agent.api.PingTestCommand; import com.cloud.agent.api.PingTestCommand;
import com.cloud.agent.api.PoolEjectCommand; import com.cloud.agent.api.PoolEjectCommand;
import com.cloud.agent.api.PrepareForMigrationAnswer; import com.cloud.agent.api.PrepareForMigrationAnswer;
@ -159,7 +160,10 @@ import com.cloud.network.Networks;
import com.cloud.network.Networks.BroadcastDomainType; import com.cloud.network.Networks.BroadcastDomainType;
import com.cloud.network.Networks.IsolationType; import com.cloud.network.Networks.IsolationType;
import com.cloud.network.Networks.TrafficType; import com.cloud.network.Networks.TrafficType;
import com.cloud.network.ovs.OvsCreateGreTunnelAnswer;
import com.cloud.network.ovs.OvsCreateGreTunnelCommand; import com.cloud.network.ovs.OvsCreateGreTunnelCommand;
import com.cloud.network.ovs.OvsDeleteFlowCommand;
import com.cloud.network.ovs.OvsSetTagAndFlowAnswer;
import com.cloud.network.ovs.OvsSetTagAndFlowCommand; import com.cloud.network.ovs.OvsSetTagAndFlowCommand;
import com.cloud.resource.ServerResource; import com.cloud.resource.ServerResource;
import com.cloud.storage.Storage; import com.cloud.storage.Storage;
@ -247,6 +251,7 @@ public abstract class CitrixResourceBase implements ServerResource {
protected StorageLayer _storage; protected StorageLayer _storage;
protected boolean _canBridgeFirewall = false; protected boolean _canBridgeFirewall = false;
protected boolean _isOvs = false;
protected HashMap<StoragePoolType, StoragePoolResource> _pools = new HashMap<StoragePoolType, StoragePoolResource>(5); protected HashMap<StoragePoolType, StoragePoolResource> _pools = new HashMap<StoragePoolType, StoragePoolResource>(5);
public enum SRType { public enum SRType {
@ -449,6 +454,8 @@ public abstract class CitrixResourceBase implements ServerResource {
return execute((OvsCreateGreTunnelCommand)cmd); return execute((OvsCreateGreTunnelCommand)cmd);
} else if (cmd instanceof OvsSetTagAndFlowCommand) { } else if (cmd instanceof OvsSetTagAndFlowCommand) {
return execute((OvsSetTagAndFlowCommand)cmd); return execute((OvsSetTagAndFlowCommand)cmd);
} else if (cmd instanceof OvsDeleteFlowCommand) {
return execute((OvsDeleteFlowCommand)cmd);
} else { } else {
return Answer.createUnsupportedCommandAnswer(cmd); return Answer.createUnsupportedCommandAnswer(cmd);
} }
@ -3312,9 +3319,12 @@ public abstract class CitrixResourceBase implements ServerResource {
if (newStates == null) { if (newStates == null) {
newStates = new HashMap<String, State>(); newStates = new HashMap<String, State>();
} }
if (!_canBridgeFirewall) { if (!_canBridgeFirewall && !_isOvs) {
return new PingRoutingCommand(getType(), id, newStates); return new PingRoutingCommand(getType(), id, newStates);
} else { } else if (_isOvs) {
List<Pair<String, Long>>ovsStates = ovsFullSyncStates();
return new PingRoutingWithOvsCommand(getType(), id, newStates, ovsStates);
}else {
HashMap<String, Pair<Long, Long>> nwGrpStates = syncNetworkGroups(conn, id); HashMap<String, Pair<Long, Long>> nwGrpStates = syncNetworkGroups(conn, id);
return new PingRoutingWithNwGroupsCommand(getType(), id, newStates, nwGrpStates); return new PingRoutingWithNwGroupsCommand(getType(), id, newStates, nwGrpStates);
} }
@ -3820,7 +3830,61 @@ public abstract class CitrixResourceBase implements ServerResource {
return Boolean.valueOf(callHostPlugin(conn, "vmops", "can_bridge_firewall", "host_uuid", _host.uuid)); return Boolean.valueOf(callHostPlugin(conn, "vmops", "can_bridge_firewall", "host_uuid", _host.uuid));
} }
private Answer execute(OvsSetTagAndFlowCommand cmd) { private Answer execute(OvsDeleteFlowCommand cmd) {
_isOvs = true;
Connection conn = getConnection();
try {
String nwName = Networks.BroadcastScheme.VSwitch.toString();
Network nw = getNetworkByName(conn, nwName);
assert nw!= null : "Why there is no vswith network ???";
String bridge = nw.getBridge(conn);
String result = callHostPlugin(conn, "vmops", "ovs_delete_flow", "bridge", bridge,
"vmName", cmd.getVmName());
if (result.equalsIgnoreCase("SUCCESS")) {
return new Answer(cmd, true, "success to delete flows for " + cmd.getVmName());
} else {
return new Answer(cmd, false, result);
}
} catch (Exception e) {
e.printStackTrace();
}
return new Answer(cmd, false, "failed to delete flow for " + cmd.getVmName());
}
private List<Pair<String, Long>> ovsFullSyncStates() {
Connection conn = getConnection();
try {
String result = callHostPlugin(conn, "vmops", "ovs_get_vm_log", "host_uuid", _host.uuid);
String [] logs = result != null ?result.split(";"): new String [0];
List<Pair<String, Long>> states = new ArrayList<Pair<String, Long>>();
for (String log: logs){
String [] info = log.split(",");
if (info.length != 4) {
s_logger.warn("Wrong element number in ovs log");
continue;
}
//','.join([bridge, vmName, vmId, seqno])
try {
states.add(new Pair<String,Long>(info[0], Long.parseLong(info[3])));
} catch (NumberFormatException nfe) {
states.add(new Pair<String,Long>(info[0], -1L));
}
}
return states;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
private OvsSetTagAndFlowAnswer execute(OvsSetTagAndFlowCommand cmd) {
_isOvs = true;
Connection conn = getConnection(); Connection conn = getConnection();
try { try {
String nwName = Networks.BroadcastScheme.VSwitch.toString(); String nwName = Networks.BroadcastScheme.VSwitch.toString();
@ -3833,48 +3897,47 @@ public abstract class CitrixResourceBase implements ServerResource {
* plugin side * plugin side
*/ */
String result = callHostPlugin(conn, "vmops", "ovs_set_tag_and_flow", "bridge", bridge, String result = callHostPlugin(conn, "vmops", "ovs_set_tag_and_flow", "bridge", bridge,
"vmName", cmd.getVmName(), "vlans", cmd.getVlans()); "vmName", cmd.getVmName(), "vlans", cmd.getVlans(), "seqno", cmd.getSeqNo());
s_logger.debug("set flow for " + cmd.getVmName() + " " + result); s_logger.debug("set flow for " + cmd.getVmName() + " " + result);
if (result.equalsIgnoreCase("SUCCESS")) { if (result.equalsIgnoreCase("SUCCESS")) {
return new Answer(cmd, true, "Set flow for " + cmd.getVmName() return new OvsSetTagAndFlowAnswer(cmd, true, result);
+ " success, vlans:" + cmd.getVlans());
} else { } else {
return new Answer(cmd, false, "Set flow for " + cmd.getVmName() return new OvsSetTagAndFlowAnswer(cmd, false, result);
+ " failed, vlans:" + cmd.getVlans());
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
return new Answer(cmd, false, "Set flow for " + cmd.getVmName() return new OvsSetTagAndFlowAnswer(cmd, false, "EXCEPTION");
+ " failed, vlans:" + cmd.getVlans());
} }
private Answer execute(OvsCreateGreTunnelCommand cmd) {
private OvsCreateGreTunnelAnswer execute(OvsCreateGreTunnelCommand cmd) {
_isOvs = true;
Connection conn = getConnection(); Connection conn = getConnection();
String bridge = "unkonwn";
try { try {
String nwName = Networks.BroadcastScheme.VSwitch.toString(); //TODO: we may store vswtich network to _host
Network nw = getNetworkByName(conn, nwName); Network nw = setupvSwitchNetwork(conn);
if (nw == null) { bridge = nw.getBridge(conn);
nw = setupvSwitchNetwork(conn);
}
String result = callHostPlugin(conn, "vmops", "vlanRemapUtils", String result = callHostPlugin(conn, "vmops", "vlanRemapUtils",
"op", "createGRE", "bridge", nw.getBridge(conn), "op", "createGRE", "bridge", bridge,
"remoteIP", cmd.getRemoteIp(), "greKey", cmd.getKey()); "remoteIP", cmd.getRemoteIp(), "greKey", cmd.getKey());
if (result.equalsIgnoreCase("SUCCESS")) { if (result.equalsIgnoreCase("SUCCESS") || result.equalsIgnoreCase("TUNNEL_EXISTED")) {
return new Answer(cmd, true, "create gre tunnel to " return new OvsCreateGreTunnelAnswer(cmd, true, result);
+ cmd.getRemoteIp() + " success");
} else { } else {
return new Answer(cmd, false, "create gre tunnel to " return new OvsCreateGreTunnelAnswer(cmd, false, result,
+ cmd.getRemoteIp() + " failed"); _host.ip, cmd.getRemoteIp(), bridge, cmd.getKey());
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
return new Answer(cmd, false, "create gre tunnel to " + cmd.getRemoteIp() + " failed"); return new OvsCreateGreTunnelAnswer(cmd, false, "EXCEPTION", _host.ip,
cmd.getRemoteIp(), bridge, cmd.getKey());
} }
private Answer execute(SecurityIngressRulesCmd cmd) { private Answer execute(SecurityIngressRulesCmd cmd) {

View File

@ -29,6 +29,9 @@ errors = \
"SWITCH_NOT_RUN" : "SWITCH_NOT_RUN", \ "SWITCH_NOT_RUN" : "SWITCH_NOT_RUN", \
"NO_VSCTL" : "NO_VSCTL", \ "NO_VSCTL" : "NO_VSCTL", \
"COMMAND_FAILED" : "COMMAND_FAILED", \ "COMMAND_FAILED" : "COMMAND_FAILED", \
"TUNNEL_EXISTED" : "TUNNEL_EXISTED", \
"NO_INPORT" : "NO_INPORT", \
"NO_OFPORT" : "NO_OFPORT", \
"ERR_ARGS_NUM" : "ERR_ARGS_NUM", \ "ERR_ARGS_NUM" : "ERR_ARGS_NUM", \
"ERROR_OP" : "ERROR_OP", \ "ERROR_OP" : "ERROR_OP", \
@ -137,23 +140,23 @@ def isUUID (uuid):
return 0 return 0
#FIXME: better check method
def checkGREInterface (bridge, remoteIP, greKey): def checkGREInterface (bridge, remoteIP, greKey):
listIfaces = [vsctlPath, "list interface"] ports = getPortsOnBridge(bridge)
res = doCmd (listIfaces, True) if ports == None:
return 0
start = False for i in ports:
num = 0 ifaces = getInterfacesOnPort(i)
keyStr = "key=%s" % greKey if ifaces == None:
uuid = '' continue
for i in res:
if "_uuid" in i:
(x, uuid) = i.split(":")
uuid = strip(uuid)
if "options" in i and remoteIP in i and keyStr in i: for j in ifaces:
log("WARNING: GRE tunnel for remote_ip=%s key=%s already here" % \ if j == '[]':
(remoteIP, greKey)) continue
options = getFieldOfInterface(j, "options")
if remoteIP in options and greKey in options:
log("WARNING: GRE tunnel for remote_ip=%s key=%s already here, \
interface(%s)" % (remoteIP, greKey, j))
return -1 return -1
return 0 return 0
@ -164,8 +167,17 @@ def createGRE (bridge, remoteIP, greKey):
name = "%sgre" % bridge name = "%sgre" % bridge
if checkGREInterface(bridge, remoteIP, greKey) < 0: if checkGREInterface(bridge, remoteIP, greKey) < 0:
result = errors["TUNNEL_EXISTED"]
return 0 return 0
wait = [vsctlPath, "--timeout=30 wait-until bridge %s -- get bridge %s name" % \
(bridge, bridge)]
res = doCmd(wait)
if bridge not in res:
log("WARNIING:Can't find bridge %s for creating tunnel!" % bridge)
result = errors["COMMAND_FAILED"]
return -1
createInterface = [vsctlPath, "create interface", "name=%s" % name, \ createInterface = [vsctlPath, "create interface", "name=%s" % name, \
'type=gre options:"remote_ip=%s key=%s"' % (remoteIP, greKey)] 'type=gre options:"remote_ip=%s key=%s"' % (remoteIP, greKey)]
ifaceUUID = doCmd (createInterface) ifaceUUID = doCmd (createInterface)
@ -180,9 +192,18 @@ def createGRE (bridge, remoteIP, greKey):
result = errors["COMMAND_FAILED"]; result = errors["COMMAND_FAILED"];
return -1 return -1
addBridge = [vsctlPath, "add bridge %s" % bridge, "port %s" % portUUID] addBridge = [vsctlPath, "add bridge %s" % bridge, "ports %s" % portUUID]
doCmd (addBridge) doCmd (addBridge)
wait = [vsctlPath, "--timeout=5 wait-until port %s -- get port %s name" % \
(name, name)]
res = doCmd(wait)
if name in res:
result = errors["SUCCESS"]
return 0 return 0
else:
result = errors["COMMAND_FAILED"]
return -1
######################## End GRE creation utils ########################## ######################## End GRE creation utils ##########################
######################## Flow creation utils ########################## ######################## Flow creation utils ##########################
@ -263,7 +284,7 @@ def getOfPortsByType(bridge, askGre):
portUuids = getPortsOnBridge(bridge) portUuids = getPortsOnBridge(bridge)
if portUuids == None: if portUuids == None:
log("WARNING:No ports on bridge %s" % bridge) log("WARNING:No ports on bridge %s" % bridge)
return -1 return []
OfPorts = [] OfPorts = []
for i in portUuids: for i in portUuids:
@ -293,28 +314,14 @@ def getNoneGreOfPort(bridge):
def getGreOfPorts(bridge): def getGreOfPorts(bridge):
return getOfPortsByType(bridge, True) return getOfPortsByType(bridge, True)
def findInPort():
listIface = [vsctlPath, "list interface"]
res = doCmd (listIface, True)
inport = []
port = ""
for i in res:
if "ofport" in i:
(x, port) = i.split(":")
port = port.lstrip().rstrip()
if "type" in i:
(x, type) = i.split(":")
type = type.lstrip().rstrip()
if type == "gre":
inport.append (port)
return inport
def formatFlow(inPort, vlan, mac, outPut): def formatFlow(inPort, vlan, mac, outPut):
flow = "in_port=%s dl_vlan=%s dl_dst=%s idle_timeout=0 hard_timeout=0 \ flow = "in_port=%s dl_vlan=%s dl_dst=%s idle_timeout=0 hard_timeout=0 \
actions=strip_vlan,output:%s" % (inPort, vlan, mac, outPut) priority=10000 actions=strip_vlan,output:%s" % (inPort, vlan, mac, outPut)
return flow
def formatDropFlow(inPort, vlan):
flow = "in_port=%s dl_vlan=%s priority=0 idle_timeout=0 hard_timeout=0 \
actions=drop" % (inPort, vlan)
return flow return flow
def delFlow(mac): def delFlow(mac):
@ -327,6 +334,21 @@ def delARPFlow(vlan):
delFlow = ["ovs-ofctl del-flows %s" % bridge, '"%s"' % param] delFlow = ["ovs-ofctl del-flows %s" % bridge, '"%s"' % param]
doCmd(delFlow) doCmd(delFlow)
def delDHCPFlow(vlan):
param = "dl_type=0x0800 nw_proto=6 tp_src=547 dl_vlan=%s" % vlan
delFlow = ["ovs-ofctl del-flows %s" % bridge, '"%s"' % param]
doCmd(delFlow)
def formatDHCPFlow(bridge, inPort, vlan, ports):
outputs = ''
for i in ports:
str = "output:%s," % i
outputs += str
outputs = outputs[:-1]
flow = "in_port=%s dl_vlan=%s dl_type=0x0800 nw_proto=6 tp_src=547 idle_timeout=0 hard_timeout=0 \
priority=10000 actions=strip_vlan,%s" % (inPort, vlan, outputs)
return flow
def formatARPFlow(bridge, inPort, vlan, ports): def formatARPFlow(bridge, inPort, vlan, ports):
outputs = '' outputs = ''
for i in ports: for i in ports:
@ -335,21 +357,25 @@ def formatARPFlow(bridge, inPort, vlan, ports):
outputs = outputs[:-1] outputs = outputs[:-1]
flow = "in_port=%s dl_vlan=%s dl_type=0x0806 idle_timeout=0 hard_timeout=0 \ flow = "in_port=%s dl_vlan=%s dl_type=0x0806 idle_timeout=0 hard_timeout=0 \
actions=strip_vlan,%s" % (inPort, vlan, outputs) priority=10000 actions=strip_vlan,%s" % (inPort, vlan, outputs)
return flow return flow
def createFlow (bridge, vifName, mac, remap): def createFlow (bridge, vifName, mac, remap):
global result
inport = getGreOfPorts(bridge) inport = getGreOfPorts(bridge)
if len(inport) == 0: if len(inport) == 0:
log("WARNING: no inport found") log("WARNING: no inport found")
result = errors["NO_INPORT"]
return -1 return -1
output = getVifPort(bridge, vifName) output = getVifPort(bridge, vifName)
if output == None: if output == None:
log("WARNING: cannot find ofport for %s" % vifName) log("WARNING: cannot find ofport for %s" % vifName)
result = errors["NO_OFPORT"]
return -1 return -1
if output == '[]': if output == '[]':
log("WARNING: ofport is [] for %s" % vifName) log("WARNING: ofport is [] for %s" % vifName)
result = errors["NO_OFPORT"]
return -1 return -1
#del old flow here, if any, but in normal there should be no old flow #del old flow here, if any, but in normal there should be no old flow
@ -363,23 +389,36 @@ def createFlow (bridge, vifName, mac, remap):
noneGreOfPorts = getNoneGreOfPort(bridge) noneGreOfPorts = getNoneGreOfPort(bridge)
isARP = True isARP = True
if len(noneGreOfPorts) == 0: if len(noneGreOfPorts) == 0:
log("WARNING: no none GRE ofports found, no ARP flow will be created") log("WARNING: no none GRE ofports found, no ARP flow and DHCP flow will be created")
isARP = False isARP = False
for j in remap.split("/"): for j in remap.split("/"):
delARPFlow(j) delARPFlow(j)
delDHCPFlow(j)
for i in inport: for i in inport:
flow = formatDropFlow(i, j)
param = bridge + ' "%s"' % flow
dropflow = ["ovs-ofctl add-flow", param]
doCmd (dropflow)
flow = formatFlow(i, j, mac, output) flow = formatFlow(i, j, mac, output)
param = bridge + ' "%s"' % flow param = bridge + ' "%s"' % flow
addflow = ["ovs-ofctl add-flow", param] addflow = ["ovs-ofctl add-flow", param]
doCmd (addflow) doCmd (addflow)
if isARP == True: if isARP == True:
flow = formatARPFlow(bridge, i, j, noneGreOfPorts) flow = formatARPFlow(bridge, i, j, noneGreOfPorts)
param = bridge + ' "%s"' % flow param = bridge + ' "%s"' % flow
addflow = ["ovs-ofctl add-flow", param] addflow = ["ovs-ofctl add-flow", param]
doCmd (addflow) doCmd (addflow)
flow = formatDHCPFlow(bridge, i, j, noneGreOfPorts)
param = bridge + ' "%s"' % flow
addflow = ["ovs-ofctl add-flow", param]
doCmd (addflow)
result = errors["SUCCESS"]
return 0 return 0
######################## End Flow creation utils ########################## ######################## End Flow creation utils ##########################
@ -402,20 +441,24 @@ def setTag(bridge, vifName, vlan):
return 0 return 0
def doCreateGRE(bridge, remoteIP, key): def doCreateGRE(bridge, remoteIP, key):
global result
if createGRE(bridge, remoteIP, key) < 0: if createGRE(bridge, remoteIP, key) < 0:
log("WARNING: create GRE tunnel on %s for %s failed" % (bridge, \ log("create GRE tunnel on %s for %s failed" % (bridge, \
remoteIP)) remoteIP))
else: else:
log("WARNING: create GRE tunnel on %s for %s success" % (bridge, \ log("WARNING: create GRE tunnel on %s for %s success" % (bridge, \
remoteIP)) remoteIP))
print result
def doCreateFlow (bridge, vifName, mac, remap): def doCreateFlow (bridge, vifName, mac, remap):
global result
if createFlow(bridge, vifName, mac, remap) < 0: if createFlow(bridge, vifName, mac, remap) < 0:
log ("Create flow failed(bridge=%s, vifName=%s, mac=%s,\ log ("Create flow failed(bridge=%s, vifName=%s, mac=%s,\
remap=%s" % (bridge, vifName, mac, remap)) remap=%s" % (bridge, vifName, mac, remap))
else: else:
log ("Create flow success(bridge=%s, vifName=%s, mac=%s,\ log ("Create flow success(bridge=%s, vifName=%s, mac=%s,\
remap=%s" % (bridge, vifName, mac, remap)) remap=%s" % (bridge, vifName, mac, remap))
print result
def doSetTag (bridge, vifName, tag): def doSetTag (bridge, vifName, tag):
setTag(bridge, vifName, tag) setTag(bridge, vifName, tag)
@ -469,6 +512,7 @@ if __name__ == "__main__":
remoteIP = sys.argv[3] remoteIP = sys.argv[3]
key = sys.argv[4] key = sys.argv[4]
doCreateGRE(bridge, remoteIP, key) doCreateGRE(bridge, remoteIP, key)
sys.exit(0)
elif op == "createFlow": elif op == "createFlow":
checkArgNum(6) checkArgNum(6)
bridge = sys.argv[2] bridge = sys.argv[2]
@ -476,6 +520,7 @@ if __name__ == "__main__":
mac = sys.argv[4] mac = sys.argv[4]
remap = sys.argv[5] remap = sys.argv[5]
doCreateFlow(bridge, vifName, mac, remap) doCreateFlow(bridge, vifName, mac, remap)
sys.exit(0)
elif op == "deleteFlow": elif op == "deleteFlow":
checkArgNum(6) checkArgNum(6)
bridge = sys.argv[2] bridge = sys.argv[2]

View File

@ -554,18 +554,186 @@ def default_ebtables_rules(vm_name, vif, vm_ip, vm_mac):
util.SMlog("Failed to program default ebtables OUT rules") util.SMlog("Failed to program default ebtables OUT rules")
return 'false' return 'false'
def ovs_set_tag_and_flow(session, args): def format_ovs_vm_log_name(vmName):
def get_vif_field(name, field): vm_name = "ovs-%s" % vmName;
return session.xenapi.VIF.get_record(name).get(field) logfilename = "/var/run/cloud/" + vm_name +".log"
return logfilename
def remove_ovs_log_for_vm(vmName):
logfilename = format_ovs_vm_log_name(vmName)
result = True
try:
os.remove(logfilename)
except:
util.SMlog("Failed to delete ovs log file " + logfilename)
result = False
return result
def ovs_get_info_from_log(vmName, num):
logfilename = format_ovs_vm_log_name(vmName)
try:
lines = [line.rstrip() for line in open(logfilename)]
return lines[num]
except:
util.SMlog("Failed to open ovs log %s" % logfilename);
remove_ovs_log_for_vm(vmName)
return None
def ovs_get_common_info_from_log(vmName):
return ovs_get_info_from_log(vmName, 0)
def ovs_get_nic_info_from_log(vmName):
return ovs_get_info_from_log(vmName, 1)
def ovs_get_mac_info_from_log(vmName):
return ovs_get_info_from_log(vmName, 2)
def ovs_get_vlans_info_from_log(vmName):
return ovs_get_info_from_log(vmName, 3)
def ovs_parse_common_info_from_log(vmName, num):
info = ovs_get_common_info_from_log(vmName)
if info == None:
return None
return info.split(",")[num]
def ovs_get_bridge_from_log(vmName):
return ovs_parse_common_info_from_log(vmName, 1)
def ovs_get_vm_id_from_log(vmName):
return ovs_parse_common_info_from_log(vmName, 2)
def ovs_get_seqno_from_log(vmName):
return ovs_parse_common_info_from_log(vmName, 3)
def ovs_handle_rebooted_vm(session, vmName):
curr_domid = '-1'
(curr_domid, vifrs, hostuuid) = ovs_get_domid_vifrs_hostuuid(session, vmName)
old_id = ovs_get_vm_id_from_log(vmName)
if curr_domid == old_id:
util.SMlog("OvsInfo:%s is normal" % vmName)
return True
util.SMlog("%s rebooted, reset flow for it" % vmName)
try:
vlans = ovs_get_vlans_info_from_log(vmName)
bridge = ovs_get_bridge_from_log(vmName)
except Exception, e:
util.SMlog(e.__str__())
util.SMlog("ovs get info from %s failed" % \
format_ovs_vm_log_name(vmName))
return False
i = 0
if vlans == None:
util.SMlog("OVSErr: cannot get vlans for %s" % vmName)
return False
tag = vlans.split('/')[0]
nics = []
macs = []
for vifr in vifrs:
vifName = "vif" + curr_domid + "." + vifr[0]
vlanRemapUtils(session, {"op":"setTag", "vifName":vifName, "bridge":bridge, "tag":tag})
vlanRemapUtils(session, {"op":"createFlow", "vifName":vifName, "bridge":bridge, "mac":vifr[1], "remap":vlans})
nics.append(vifName)
macs.append(vifr[1])
i += 1
seqno = ovs_get_seqno_from_log(vmName)
ovs_write_vm_log(bridge, vmName, curr_domid, seqno, nics, macs, vlans)
#see if there is rebooted vm to handle
ovs_get_vm_log(session, {"host_uuid":hostuuid})
return True
@echo
def ovs_get_vm_log(session, args):
host_uuid = args.pop('host_uuid')
try:
session = get_xapi_session()
thishost = session.xenapi.host.get_by_uuid(host_uuid)
hostrec = session.xenapi.host.get_record(thishost)
vms = hostrec.get('resident_VMs')
except:
util.SMlog("Failed to get host from uuid " + host_uuid)
return ' '
result = []
try:
for name in [session.xenapi.VM.get_name_label(x) for x in vms]:
if 1 not in [ name.startswith(c) for c in ['r-', 'i-'] ]:
continue
ovs_handle_rebooted_vm(session, name)
if name.startswith('i-'):
info = ovs_get_common_info_from_log(name)
result.append(info)
except Exception, e:
util.SMlog(e.__str__())
util.SMlog("OVs failed to get rule logs, better luck next time!")
return ";".join(result)
def ovs_write_vm_log(bridge, vmName, vmId, seqno, vifNames, macs, vlans):
logfilename = format_ovs_vm_log_name(vmName)
util.SMlog("Writing ovs log to " + logfilename)
logf = open(logfilename, 'w')
output = ','.join([vmName, bridge, vmId, seqno])
result = True
try:
logf.write(output)
logf.write('\n')
output = ','.join(vifNames)
logf.write(output)
logf.write('\n')
output = ','.join(macs)
logf.write(output)
logf.write('\n')
logf.write(vlans)
logf.write('\n')
except:
util.SMlog("Failed to write to ovs log file " + logfilename)
result = False
logf.close()
return result
def ovs_delete_flow(session, args):
bridge = args.pop('bridge') bridge = args.pop('bridge')
vm_name = args.pop('vmName') vm_name = args.pop('vmName')
vlanStr = args.pop('vlans')
nicStr = ovs_get_nic_info_from_log(vm_name)
macStr = ovs_get_mac_info_from_log(vm_name)
vlanStr = ovs_get_vlans_info_from_log(vm_name)
if nicStr == None or macStr == None or vlanStr == None:
return 'ERROR_LOG'
nics = nicStr.split(',')
macs = macStr.split(',')
if len(nics) != len(macs):
return 'ERROR_LOG'
i = 0
for nic in nics:
vlanRemapUtils(session, {"op":"deleteFlow", "bridge":bridge, \
"vifName":nic, "mac":macs[i], "remap":vlanStr})
i += 1
return 'SUCCESS'
def ovs_get_domid_vifrs_hostuuid(session, vm_name):
def get_vif_field(name, field):
return session.xenapi.VIF.get_record(name).get(field)
try: try:
vm = session.xenapi.VM.get_by_name_label(vm_name) vm = session.xenapi.VM.get_by_name_label(vm_name)
if len(vm) != 1: if len(vm) != 1:
return 'false' return 'NO_VM'
vm_rec = session.xenapi.VM.get_record(vm[0]) vm_rec = session.xenapi.VM.get_record(vm[0])
vm_vifs = vm_rec.get('VIFs') vm_vifs = vm_rec.get('VIFs')
vifrs = [] vifrs = []
@ -573,13 +741,27 @@ def ovs_set_tag_and_flow(session, args):
rec = (get_vif_field(vif, 'device'), get_vif_field(vif, 'MAC')) rec = (get_vif_field(vif, 'device'), get_vif_field(vif, 'MAC'))
vifrs.append(rec) vifrs.append(rec)
domid = vm_rec.get('domid') domid = vm_rec.get('domid')
host = vm_rec.get('resident_on')
host_rec = session.xenapi.host.get_record(host)
uuid = host_rec.get('uuid')
util.SMlog("OVSINFO: (domid:%s, uuid:%s)" % (domid, uuid))
return (domid, vifrs, uuid)
except: except:
util.SMlog("### Failed to get domid or vif list for vm ##" + vm_name) util.SMlog("### Failed to get domid or vif list for vm ##" + vm_name)
return 'false' return (-1, [])
def ovs_set_tag_and_flow(session, args):
bridge = args.pop('bridge')
vm_name = args.pop('vmName')
vlanStr = args.pop('vlans')
seqno = args.pop('seqno')
(domid, vifrs, hostuuid) = ovs_get_domid_vifrs_hostuuid(session, vm_name)
if domid == '-1': if domid == '-1':
util.SMlog("### Failed to get domid for vm (-1): " + vm_name) util.SMlog("### Failed to get domid for vm (-1): " + vm_name)
return 'false' return 'NO_DOMID'
if len(vifrs) == 0: if len(vifrs) == 0:
return 'SUCCESS' return 'SUCCESS'
@ -587,12 +769,20 @@ def ovs_set_tag_and_flow(session, args):
if vlanStr.startswith("/"): vlanStr = vlanStr[1:] if vlanStr.startswith("/"): vlanStr = vlanStr[1:]
if vlanStr.endswith("/"): vlanStr = vlanStr[:-1] if vlanStr.endswith("/"): vlanStr = vlanStr[:-1]
vlans = vlanStr.split("/") vlans = vlanStr.split("/")
vifNames = []
macs = []
for vifr in vifrs: for vifr in vifrs:
vifName = "vif" + domid + "." + vifr[0] vifName = "vif" + domid + "." + vifr[0]
vifNames.append(vifName)
mac = vifr[1] mac = vifr[1]
macs.append(mac)
vlanRemapUtils(session, {"op":"setTag", "vifName":vifName, "bridge":bridge, "tag":vlans[0]}) vlanRemapUtils(session, {"op":"setTag", "vifName":vifName, "bridge":bridge, "tag":vlans[0]})
vlanRemapUtils(session, {"op":"createFlow", "vifName":vifName, "bridge":bridge, "mac":mac, "remap":vlanStr}) vlanRemapUtils(session, {"op":"createFlow", "vifName":vifName, "bridge":bridge, "mac":mac, "remap":vlanStr})
res = ovs_write_vm_log(bridge, vm_name, domid, seqno, vifNames, macs, vlanStr)
if res == 'false':
return 'CREATE_LOG_FAILED'
return 'SUCCESS' return 'SUCCESS'
@echo @echo
@ -938,6 +1128,7 @@ def cleanup_rules(session, args):
util.SMlog("Failed to cleanup rules !") util.SMlog("Failed to cleanup rules !")
return '-1'; return '-1';
@echo @echo
def check_rule_log_for_vm(vmName, vmID, vmIP, domID, signature, seqno): def check_rule_log_for_vm(vmName, vmID, vmIP, domID, signature, seqno):
vm_name = vmName; vm_name = vmName;
@ -1107,5 +1298,5 @@ def network_rules(session, args):
if __name__ == "__main__": if __name__ == "__main__":
XenAPIPlugin.dispatch({"pingtest": pingtest, "setup_iscsi":setup_iscsi, "gethostvmstats": gethostvmstats, "getvncport": getvncport, "getgateway": getgateway, "preparemigration": preparemigration, "setIptables": setIptables, "pingdomr": pingdomr, "pingxenserver": pingxenserver, "ipassoc": ipassoc, "vm_data": vm_data, "savePassword": savePassword, "saveDhcpEntry": saveDhcpEntry, "setFirewallRule": setFirewallRule, "setLoadBalancerRule": setLoadBalancerRule, "createFile": createFile, "deleteFile": deleteFile, "networkUsage": networkUsage, "network_rules":network_rules, "can_bridge_firewall":can_bridge_firewall, "default_network_rules":default_network_rules, "destroy_network_rules_for_vm":destroy_network_rules_for_vm, "default_network_rules_systemvm":default_network_rules_systemvm, "get_rule_logs_for_vms":get_rule_logs_for_vms, "setLinkLocalIP":setLinkLocalIP, "lt2p_vpn":lt2p_vpn,"cleanup_rules":cleanup_rules,"vlanRemapUtils":vlanRemapUtils, "ovs_set_tag_and_flow":ovs_set_tag_and_flow}) XenAPIPlugin.dispatch({"pingtest": pingtest, "setup_iscsi":setup_iscsi, "gethostvmstats": gethostvmstats, "getvncport": getvncport, "getgateway": getgateway, "preparemigration": preparemigration, "setIptables": setIptables, "pingdomr": pingdomr, "pingxenserver": pingxenserver, "ipassoc": ipassoc, "vm_data": vm_data, "savePassword": savePassword, "saveDhcpEntry": saveDhcpEntry, "setFirewallRule": setFirewallRule, "setLoadBalancerRule": setLoadBalancerRule, "createFile": createFile, "deleteFile": deleteFile, "networkUsage": networkUsage, "network_rules":network_rules, "can_bridge_firewall":can_bridge_firewall, "default_network_rules":default_network_rules, "destroy_network_rules_for_vm":destroy_network_rules_for_vm, "default_network_rules_systemvm":default_network_rules_systemvm, "get_rule_logs_for_vms":get_rule_logs_for_vms, "setLinkLocalIP":setLinkLocalIP, "lt2p_vpn":lt2p_vpn, "vlanRemapUtils":vlanRemapUtils, "ovs_set_tag_and_flow":ovs_set_tag_and_flow, "ovs_get_vm_log":ovs_get_vm_log,"ovs_delete_flow":ovs_delete_flow,"cleanup_rules":cleanup_rules})

View File

@ -63,7 +63,7 @@ public class OvsElement extends AdapterBase implements NetworkElement {
} }
if (network.getTrafficType() == Networks.TrafficType.Guest) { if (network.getTrafficType() == Networks.TrafficType.Guest) {
_ovsNetworkMgr.CheckAndUpdateDhcpFlow(network); _ovsNetworkMgr.CheckAndUpdateDhcpFlow(network, vm.getVirtualMachine());
} }
return true; return true;
} }

View File

@ -0,0 +1,114 @@
package com.cloud.network.ovs;
import java.util.HashSet;
import java.util.Set;
import org.apache.log4j.Logger;
import com.cloud.agent.Listener;
import com.cloud.agent.api.AgentControlAnswer;
import com.cloud.agent.api.AgentControlCommand;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.PingRoutingWithOvsCommand;
import com.cloud.agent.api.StartupCommand;
import com.cloud.exception.ConnectionException;
import com.cloud.host.HostVO;
import com.cloud.host.Status;
import com.cloud.network.ovs.dao.OvsWorkDao;
import com.cloud.network.ovs.dao.OvsWorkVO.Step;
public class OvsListener implements Listener {
public static final Logger s_logger = Logger.getLogger(OvsListener.class.getName());
OvsNetworkManager _ovsNetworkMgr;
OvsWorkDao _workDao;
public OvsListener(OvsNetworkManager ovsMgr, OvsWorkDao workDao) {
this._ovsNetworkMgr = ovsMgr;
this._workDao = workDao;
}
@Override
public boolean processAnswers(long agentId, long seq, Answer[] answers) {
Set<Long> failedFlowVms = new HashSet<Long>();
for (Answer ans: answers) {
if (ans instanceof OvsCreateGreTunnelAnswer) {
OvsCreateGreTunnelAnswer r = (OvsCreateGreTunnelAnswer)ans;
String s = String.format("(hostIP:%1$s, remoteIP:%2$s, bridge:%3$s, greKey:%4$s)",
r.getHostIp(), r.getRemoteIp(),
r.getBridge(), r.getKey());
if (!r.getResult()) {
s_logger.warn("Create GRE tunnel failed due to " + r.getDetails() + s);
} else {
s_logger.info("Create GRE tunnel success" + s);
}
} else if (ans instanceof OvsSetTagAndFlowAnswer) {
OvsSetTagAndFlowAnswer r = (OvsSetTagAndFlowAnswer)ans;
if (!r.getResult()) {
s_logger.warn("Failed to set flow for VM " + r.getVmId());
_workDao.updateStep(r.getVmId(), r.getSeqNo(), Step.Error);
failedFlowVms.add(r.getVmId());
} else {
s_logger.info("Success to set flow for VM " + r.getVmId());
_workDao.updateStep(r.getVmId(), r.getSeqNo(), Step.Done);
}
}
//TODO: handle delete failure
}
if (failedFlowVms.size() > 0) {
_ovsNetworkMgr.scheduleFlowUpdateToHosts(failedFlowVms, false, new Long(10*1000l));
}
return true;
}
@Override
public boolean processCommands(long agentId, long seq, Command[] commands) {
boolean processed = false;
for (Command cmd : commands) {
if (cmd instanceof PingRoutingWithOvsCommand) {
PingRoutingWithOvsCommand ping = (PingRoutingWithOvsCommand)cmd;
if (ping !=null && ping.getStates().size() > 0) {
_ovsNetworkMgr.fullSync(ping.getStates());
}
processed = true;
}
}
return processed;
}
@Override
public AgentControlAnswer processControlCommand(long agentId,
AgentControlCommand cmd) {
return null;
}
@Override
public void processConnect(HostVO host, StartupCommand cmd)
throws ConnectionException {
}
@Override
public boolean processDisconnect(long agentId, Status state) {
return true;
}
@Override
public boolean isRecurring() {
return false;
}
@Override
public int getTimeout() {
return -1;
}
@Override
public boolean processTimeout(long agentId, long seq) {
return true;
}
}

View File

@ -1,13 +1,20 @@
package com.cloud.network.ovs; package com.cloud.network.ovs;
import java.util.List;
import java.util.Set;
import com.cloud.agent.manager.Commands; import com.cloud.agent.manager.Commands;
import com.cloud.deploy.DeployDestination; import com.cloud.deploy.DeployDestination;
import com.cloud.network.Network; import com.cloud.network.Network;
import com.cloud.resource.ServerResource;
import com.cloud.uservm.UserVm; import com.cloud.uservm.UserVm;
import com.cloud.utils.Pair;
import com.cloud.utils.component.Manager; import com.cloud.utils.component.Manager;
import com.cloud.vm.DomainRouterVO; import com.cloud.vm.DomainRouterVO;
import com.cloud.vm.State; import com.cloud.vm.State;
import com.cloud.vm.UserVmVO; import com.cloud.vm.UserVmVO;
import com.cloud.vm.VMInstanceVO;
import com.cloud.vm.VirtualMachine;
import com.cloud.vm.VirtualMachineProfile; import com.cloud.vm.VirtualMachineProfile;
public interface OvsNetworkManager extends Manager { public interface OvsNetworkManager extends Manager {
@ -27,11 +34,15 @@ public interface OvsNetworkManager extends Manager {
VirtualMachineProfile<DomainRouterVO> profile, VirtualMachineProfile<DomainRouterVO> profile,
DeployDestination dest); DeployDestination dest);
public void CheckAndUpdateDhcpFlow(Network nw); public void CheckAndUpdateDhcpFlow(Network nw, VirtualMachine vm);
public void handleVmStateTransition(UserVm userVm, State vmState); public void handleVmStateTransition(VMInstanceVO userVm, State vmState);
public void RouterCheckAndCreateTunnel(Commands cmds, public void RouterCheckAndCreateTunnel(Commands cmds,
VirtualMachineProfile<DomainRouterVO> profile, VirtualMachineProfile<DomainRouterVO> profile,
DeployDestination dest); DeployDestination dest);
public void fullSync(List<Pair<String, Long>> states);
public void scheduleFlowUpdateToHosts(Set<Long> affectedVms, boolean updateSeqno, Long delayMs);
} }

View File

@ -37,6 +37,8 @@ import com.cloud.network.ovs.dao.VlanMappingVO;
import com.cloud.network.ovs.dao.VmFlowLogDao; import com.cloud.network.ovs.dao.VmFlowLogDao;
import com.cloud.network.ovs.dao.VmFlowLogVO; import com.cloud.network.ovs.dao.VmFlowLogVO;
import com.cloud.server.ManagementServer; import com.cloud.server.ManagementServer;
import com.cloud.user.AccountVO;
import com.cloud.user.dao.AccountDao;
import com.cloud.uservm.UserVm; import com.cloud.uservm.UserVm;
import com.cloud.utils.Pair; import com.cloud.utils.Pair;
import com.cloud.utils.component.ComponentLocator; import com.cloud.utils.component.ComponentLocator;
@ -54,6 +56,7 @@ import com.cloud.vm.VirtualMachineProfile;
import com.cloud.vm.dao.DomainRouterDao; import com.cloud.vm.dao.DomainRouterDao;
import com.cloud.vm.dao.NicDao; import com.cloud.vm.dao.NicDao;
import com.cloud.vm.dao.UserVmDao; import com.cloud.vm.dao.UserVmDao;
import com.cloud.vm.dao.VMInstanceDao;
@Local(value={OvsNetworkManager.class}) @Local(value={OvsNetworkManager.class})
public class OvsNetworkManagerImpl implements OvsNetworkManager { public class OvsNetworkManagerImpl implements OvsNetworkManager {
@ -70,10 +73,13 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager {
@Inject OvsWorkDao _workDao; @Inject OvsWorkDao _workDao;
@Inject VmFlowLogDao _flowLogDao; @Inject VmFlowLogDao _flowLogDao;
@Inject UserVmDao _userVMDao; @Inject UserVmDao _userVMDao;
@Inject VMInstanceDao _instanceDao;
@Inject AccountDao _accountDao;
String _name; String _name;
boolean _isEnabled; boolean _isEnabled;
ScheduledExecutorService _executorPool; ScheduledExecutorService _executorPool;
ScheduledExecutorService _cleanupExecutor; ScheduledExecutorService _cleanupExecutor;
OvsListener _ovsListener;
private long _serverId; private long _serverId;
private final long _timeBetweenCleanups = 30; //seconds private final long _timeBetweenCleanups = 30; //seconds
@ -109,6 +115,8 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager {
_serverId = ((ManagementServer)ComponentLocator.getComponent(ManagementServer.Name)).getId(); _serverId = ((ManagementServer)ComponentLocator.getComponent(ManagementServer.Name)).getId();
_executorPool = Executors.newScheduledThreadPool(10, new NamedThreadFactory("OVS")); _executorPool = Executors.newScheduledThreadPool(10, new NamedThreadFactory("OVS"));
_cleanupExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("OVS-Cleanup")); _cleanupExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("OVS-Cleanup"));
_ovsListener = new OvsListener(this, _workDao);
_agentMgr.registerForHostEvents(_ovsListener, true, true, true);
return true; return true;
} }
@ -198,16 +206,17 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager {
if (vm != null && vm.getState() == State.Running) { if (vm != null && vm.getState() == State.Running) {
agentId = vm.getHostId(); agentId = vm.getHostId();
if (agentId != null ) { if (agentId != null ) {
//TODO: set flow here OvsSetTagAndFlowCommand cmd = new OvsSetTagAndFlowCommand(
OvsSetTagAndFlowCommand cmd = new OvsSetTagAndFlowCommand(vm.getName(),vlans); vm.getName(), vlans, seqnum.toString(), vm.getId());
Commands cmds = new Commands(cmd); Commands cmds = new Commands(cmd);
try { try {
//_agentMgr.send(agentId, cmds, _answerListener); _agentMgr.send(agentId, cmds, _ovsListener);
_agentMgr.send(agentId, cmds, null); // TODO: clean dirty in answerListener
//TODO: clean dirty in answerListener
} catch (AgentUnavailableException e) { } catch (AgentUnavailableException e) {
s_logger.debug("Unable to send updates for vm: " + userVmId + "(agentid=" + agentId + ")"); s_logger.debug("Unable to send updates for vm: "
_workDao.updateStep(work.getInstanceId(), seqnum, Step.Done); + userVmId + "(agentid=" + agentId + ")");
_workDao.updateStep(work.getInstanceId(), seqnum,
Step.Done);
} }
} }
} }
@ -283,6 +292,7 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager {
vlans.add(new Long(vo.getVlan())); vlans.add(new Long(vo.getVlan()));
} }
assert vlans.size() > 0 : "Vlan map can't be null";
StringBuffer buf = new StringBuffer(); StringBuffer buf = new StringBuffer();
for (Long i : vlans) { for (Long i : vlans) {
buf.append("/"); buf.append("/");
@ -331,9 +341,11 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager {
HostVO rHost = _hostDao.findById(i.longValue()); HostVO rHost = _hostDao.findById(i.longValue());
cmds.addCommand( cmds.addCommand(
0, new OvsCreateGreTunnelCommand(rHost.getPrivateIpAddress(), "1")); 0, new OvsCreateGreTunnelCommand(rHost.getPrivateIpAddress(), "1"));
_agentMgr.send(i.longValue(), new OvsCreateGreTunnelCommand(myIp, "1")); Commands cmd2s = new Commands( new OvsCreateGreTunnelCommand(myIp, "1"));
_agentMgr.send(i.longValue(), cmd2s , _ovsListener);
s_logger.debug("Ask host " + i.longValue() + " to create gre tunnel to " + hostId); s_logger.debug("Ask host " + i.longValue() + " to create gre tunnel to " + hostId);
} }
_vlanMappingDirtyDao.markDirty(accountId);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -375,11 +387,15 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager {
assert nic!=null : "Why there is no guest network nic???"; assert nic!=null : "Why there is no guest network nic???";
String vlans = parseVlanAndMapping(nic.getBroadcastUri().toASCIIString()); String vlans = parseVlanAndMapping(nic.getBroadcastUri().toASCIIString());
cmds.addCommand(new OvsSetTagAndFlowCommand(instance.getName(), vlans)); VmFlowLogVO log = _flowLogDao.findOrNewByVmId(instance.getId(), instance.getName());
cmds.addCommand(new OvsSetTagAndFlowCommand(instance.getName(), vlans,
Long.toString(log.getLogsequence()), instance.getId()));
} }
//FIXME: if at this router is not start, this will hang 10 secs due to host
//plugin cannot found vif for router.
@Override @Override
public void CheckAndUpdateDhcpFlow(Network nw) { public void CheckAndUpdateDhcpFlow(Network nw, VirtualMachine vm) {
if (!_isEnabled) { if (!_isEnabled) {
return; return;
} }
@ -396,8 +412,9 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager {
try { try {
String vlans = getVlanMapping(accountId); String vlans = getVlanMapping(accountId);
VmFlowLogVO log = _flowLogDao.findOrNewByVmId(vm.getId(), vm.getName());
_agentMgr.send(router.getHostId(), new OvsSetTagAndFlowCommand( _agentMgr.send(router.getHostId(), new OvsSetTagAndFlowCommand(
router.getName(), vlans)); router.getName(), vlans, Long.toString(log.getLogsequence()), vm.getId()));
s_logger.debug("ask router " + router.getName() + " on host " s_logger.debug("ask router " + router.getName() + " on host "
+ router.getHostId() + " update vlan map to " + vlans); + router.getHostId() + " update vlan map to " + vlans);
} catch (Exception e) { } catch (Exception e) {
@ -405,7 +422,9 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager {
} }
} }
//TODO: handle router
@DB @DB
@Override
public void scheduleFlowUpdateToHosts(Set<Long> affectedVms, boolean updateSeqno, Long delayMs) { public void scheduleFlowUpdateToHosts(Set<Long> affectedVms, boolean updateSeqno, Long delayMs) {
if (!_isEnabled) { if (!_isEnabled) {
return; return;
@ -430,11 +449,7 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager {
s_logger.warn("Ovs failed to acquire lock on vm id " + vmId); s_logger.warn("Ovs failed to acquire lock on vm id " + vmId);
continue; continue;
} }
log = _flowLogDao.findByVmId(vmId); log = _flowLogDao.findOrNewByVmId(vmId, vm.getName());
if (log == null) {
log = new VmFlowLogVO(vmId);
log = _flowLogDao.persist(log);
}
if (log != null && updateSeqno){ if (log != null && updateSeqno){
log.incrLogsequence(); log.incrLogsequence();
@ -461,8 +476,8 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager {
} }
} }
protected Set<Long> getAffectedVms(UserVm userVm) { protected Set<Long> getAffectedVms(VMInstanceVO instance) {
long accountId = userVm.getAccountId(); long accountId = instance.getAccountId();
if (!_vlanMappingDirtyDao.isDirty(accountId)) { if (!_vlanMappingDirtyDao.isDirty(accountId)) {
return null; return null;
} }
@ -475,36 +490,47 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager {
return affectedVms; return affectedVms;
} }
protected void handleVmStateChange(UserVm userVm) { protected void handleVmStateChange(VMInstanceVO instance) {
Set<Long> affectedVms = getAffectedVms(userVm); Set<Long> affectedVms = getAffectedVms(instance);
scheduleFlowUpdateToHosts(affectedVms, true, null); scheduleFlowUpdateToHosts(affectedVms, true, null);
_vlanMappingDirtyDao.clean(instance.getAccountId());
s_logger.debug("Clean dirty for account " + instance.getAccountId());
} }
//TODO: think about lock //TODO: think about lock
protected void checkAndRemove(UserVm userVm) { @DB
long accountId = userVm.getAccountId(); protected void checkAndRemove(VMInstanceVO instance) {
long hostId = userVm.getHostId(); long accountId = instance.getAccountId();
long hostId = instance.getHostId();
final Transaction txn = Transaction.currentTxn(); final Transaction txn = Transaction.currentTxn();
txn.start(); txn.start();
VlanMappingVO vo = _vlanMappingDao.findByAccountIdAndHostId(accountId, hostId); VlanMappingVO vo = _vlanMappingDao.findByAccountIdAndHostId(accountId, hostId);
if (vo.unref() == 0) { if (vo.unref() == 0) {
_vlanMappingDao.remove(vo.getId()); _vlanMappingDao.remove(vo.getId());
s_logger.debug(userVm.getName() + " is the last one on host " s_logger.debug(instance.getName() + " is the last one on host "
+ hostId + " for account " + accountId + hostId + " for account " + accountId
+ ", remove vlan in ovs_host_vlan_alloc"); + ", remove vlan in ovs_host_vlan_alloc");
_vlanMappingDirtyDao.markDirty(accountId); _vlanMappingDirtyDao.markDirty(accountId);
} else { } else {
_vlanMappingDao.update(vo.getId(), vo); _vlanMappingDao.update(vo.getId(), vo);
s_logger.debug(userVm.getName() s_logger.debug(instance.getName()
+ " reduces reference count of (account,host) = (" + " reduces reference count of (account,host) = ("
+ accountId + "," + hostId + ") to " + vo.getRef()); + accountId + "," + hostId + ") to " + vo.getRef());
} }
_flowLogDao.deleteByVmId(instance.getId());
txn.commit(); txn.commit();
try {
Commands cmds = new Commands(new OvsDeleteFlowCommand(instance.getName()));
_agentMgr.send(hostId, cmds, _ovsListener);
} catch (Exception e) {
e.printStackTrace();
}
} }
@Override @Override
public void handleVmStateTransition(UserVm userVm, State vmState) { public void handleVmStateTransition(VMInstanceVO instance, State vmState) {
if (!_isEnabled) { if (!_isEnabled) {
return; return;
} }
@ -519,12 +545,12 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager {
case Unknown: case Unknown:
return; return;
case Running: case Running:
handleVmStateChange(userVm); handleVmStateChange(instance);
break; break;
case Stopping: case Stopping:
case Stopped: case Stopped:
checkAndRemove(userVm); checkAndRemove(instance);
handleVmStateChange(userVm); handleVmStateChange(instance);
break; break;
} }
@ -557,4 +583,53 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager {
applyDefaultFlow(cmds, profile.getVirtualMachine(), dest); applyDefaultFlow(cmds, profile.getVirtualMachine(), dest);
} }
@Override
public void fullSync(List<Pair<String, Long>> states) {
if (!_isEnabled) {
return;
}
//TODO:debug code, remove in future
List<AccountVO> accounts = _accountDao.listAll();
for (AccountVO acnt : accounts) {
if (_vlanMappingDirtyDao.isDirty(acnt.getId())) {
s_logger.warn("Vlan mapping for account "
+ acnt.getAccountName() + " id " + acnt.getId()
+ " is dirty");
}
}
if (states.size() ==0) {
s_logger.info("Nothing to do, Ovs fullsync is happy");
return;
}
Set<Long>vmIds = new HashSet<Long>();
for (Pair<String, Long>state : states) {
if (state.second() == -1) {
s_logger.warn("Ovs fullsync get wrong seqno for " + state.first());
continue;
}
VmFlowLogVO log = _flowLogDao.findByName(state.first());
if (log.getLogsequence() != state.second()) {
s_logger.debug("Ovs fullsync detected unmatch seq number for " + state.first() + ", run sync");
VMInstanceVO vo = _instanceDao.findById(log.getInstanceId());
if (vo == null) {
s_logger.warn("Ovs can't find " + state.first() + " in vm_instance!");
continue;
}
if (vo.getType() != VirtualMachine.Type.User && vo.getType() != VirtualMachine.Type.DomainRouter) {
s_logger.warn("Ovs fullsync: why we sync a " + vo.getType().toString() + " VM???");
continue;
}
vmIds.add(new Long(vo.getId()));
}
}
if (vmIds.size() > 0) {
scheduleFlowUpdateToHosts(vmIds, true, null);
}
}
} }

View File

@ -5,6 +5,7 @@ import java.util.List;
import javax.ejb.Local; import javax.ejb.Local;
import com.cloud.ha.HaWorkVO; import com.cloud.ha.HaWorkVO;
import com.cloud.network.ovs.dao.OvsWorkVO.Step; import com.cloud.network.ovs.dao.OvsWorkVO.Step;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.Filter; import com.cloud.utils.db.Filter;
import com.cloud.utils.db.GenericDaoBase; import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchBuilder;
@ -115,6 +116,7 @@ public class OvsWorkDaoImpl extends GenericDaoBase<OvsWorkVO, Long> implements
} }
@Override @Override
@DB
public void updateStep(Long vmId, Long logSequenceNumber, Step step) { public void updateStep(Long vmId, Long logSequenceNumber, Step step) {
final Transaction txn = Transaction.currentTxn(); final Transaction txn = Transaction.currentTxn();
txn.start(); txn.start();

View File

@ -4,4 +4,7 @@ import com.cloud.utils.db.GenericDao;
public interface VmFlowLogDao extends GenericDao<VmFlowLogVO, Long> { public interface VmFlowLogDao extends GenericDao<VmFlowLogVO, Long> {
VmFlowLogVO findByVmId(long vmId); VmFlowLogVO findByVmId(long vmId);
VmFlowLogVO findOrNewByVmId(long vmId, String name);
VmFlowLogVO findByName(String name);
void deleteByVmId(long vmId);
} }

View File

@ -9,6 +9,7 @@ import javax.ejb.Local;
public class VmFlowLogDaoImpl extends GenericDaoBase<VmFlowLogVO, Long> public class VmFlowLogDaoImpl extends GenericDaoBase<VmFlowLogVO, Long>
implements VmFlowLogDao { implements VmFlowLogDao {
private SearchBuilder<VmFlowLogVO> VmIdSearch; private SearchBuilder<VmFlowLogVO> VmIdSearch;
private SearchBuilder<VmFlowLogVO> VmNameSearch;
@Override @Override
public VmFlowLogVO findByVmId(long vmId) { public VmFlowLogVO findByVmId(long vmId) {
@ -23,5 +24,34 @@ public class VmFlowLogDaoImpl extends GenericDaoBase<VmFlowLogVO, Long>
SearchCriteria.Op.EQ); SearchCriteria.Op.EQ);
VmIdSearch.done(); VmIdSearch.done();
VmNameSearch = createSearchBuilder();
VmNameSearch.and("name", VmNameSearch.entity().getName(),
SearchCriteria.Op.EQ);
VmNameSearch.done();
}
@Override
public VmFlowLogVO findOrNewByVmId(long vmId, String name) {
VmFlowLogVO log = findByVmId(vmId);
if (log == null) {
log = new VmFlowLogVO(vmId, name);
log = persist(log);
}
return log;
}
@Override
public void deleteByVmId(long vmId) {
SearchCriteria<VmFlowLogVO> sc = VmIdSearch.create();
sc.setParameters("vmId", vmId);
expunge(sc);
}
@Override
public VmFlowLogVO findByName(String name) {
SearchCriteria<VmFlowLogVO> sc = VmNameSearch.create();
sc.setParameters("name", name);
return findOneIncludingRemovedBy(sc);
} }
} }

View File

@ -28,13 +28,17 @@ public class VmFlowLogVO {
@Column(name="logsequence") @Column(name="logsequence")
long logsequence; long logsequence;
@Column(name="vm_name", updatable=false, nullable=false, length=255)
protected String name = null;
protected VmFlowLogVO() { protected VmFlowLogVO() {
} }
public VmFlowLogVO(Long instanceId) { public VmFlowLogVO(Long instanceId, String name) {
super(); super();
this.instanceId = instanceId; this.instanceId = instanceId;
this.name = name;
} }
public Long getId() { public Long getId() {
@ -56,4 +60,8 @@ public class VmFlowLogVO {
public void incrLogsequence() { public void incrLogsequence() {
logsequence++; logsequence++;
} }
public String getName() {
return name;
}
} }

View File

@ -738,11 +738,6 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian
} }
} }
@Override
public void completeStartCommand(final DomainRouterVO router) {
_itMgr.stateTransitTo(router, VirtualMachine.Event.AgentReportRunning, router.getHostId());
}
@Override @Override
public void completeStopCommand(final DomainRouterVO router) { public void completeStopCommand(final DomainRouterVO router) {
completeStopCommand(router, VirtualMachine.Event.AgentReportStopped); completeStopCommand(router, VirtualMachine.Event.AgentReportStopped);
@ -1359,6 +1354,8 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian
return false; return false;
} }
DomainRouterVO router = profile.getVirtualMachine();
_ovsNetworkMgr.handleVmStateTransition(router, State.Running);
return true; return true;
} }
@ -1367,6 +1364,9 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian
if (answer != null && answer.length > 0) { if (answer != null && answer.length > 0) {
processStopOrRebootAnswer(profile.getVirtualMachine(), answer[0]); processStopOrRebootAnswer(profile.getVirtualMachine(), answer[0]);
} }
DomainRouterVO router = profile.getVirtualMachine();
_ovsNetworkMgr.handleVmStateTransition(router, State.Stopped);
} }
@Override @Override
@ -1870,4 +1870,8 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian
return routersToStop; return routersToStop;
} }
@Override
public void completeStartCommand(DomainRouterVO router) {
_itMgr.stateTransitTo(router, VirtualMachine.Event.AgentReportRunning, router.getHostId());
}
} }

View File

@ -1129,13 +1129,6 @@ public class UserVmManagerImpl implements UserVmManager, UserVmService, Manager
} }
} }
@Override
public void completeStartCommand(UserVmVO vm) {
_itMgr.stateTransitTo(vm, VirtualMachine.Event.AgentReportRunning, vm.getHostId());
_networkGroupMgr.handleVmStateTransition(vm, State.Running);
_ovsNetworkMgr.handleVmStateTransition(vm, State.Running);
}
@Override @Override
public void completeStopCommand(UserVmVO instance) { public void completeStopCommand(UserVmVO instance) {
completeStopCommand(1L, instance, VirtualMachine.Event.AgentReportStopped); completeStopCommand(1L, instance, VirtualMachine.Event.AgentReportStopped);
@ -1166,8 +1159,6 @@ public class UserVmManagerImpl implements UserVmManager, UserVmService, Manager
} }
txn.commit(); txn.commit();
_networkGroupMgr.handleVmStateTransition(vm, State.Stopped);
_ovsNetworkMgr.handleVmStateTransition(vm, State.Stopped);
} catch (Throwable th) { } catch (Throwable th) {
s_logger.error("Error during stop: ", th); s_logger.error("Error during stop: ", th);
throw new CloudRuntimeException("Error during stop: ", th); throw new CloudRuntimeException("Error during stop: ", th);
@ -2408,6 +2399,9 @@ public class UserVmManagerImpl implements UserVmManager, UserVmService, Manager
@Override @Override
public boolean finalizeStart(Commands cmds, VirtualMachineProfile<UserVmVO> profile, DeployDestination dest, ReservationContext context) { public boolean finalizeStart(Commands cmds, VirtualMachineProfile<UserVmVO> profile, DeployDestination dest, ReservationContext context) {
UserVmVO vm = profile.getVirtualMachine();
_networkGroupMgr.handleVmStateTransition(vm, State.Running);
_ovsNetworkMgr.handleVmStateTransition(vm, State.Running);
return true; return true;
} }
@ -2460,6 +2454,9 @@ public class UserVmManagerImpl implements UserVmManager, UserVmService, Manager
@Override @Override
public void finalizeStop(VirtualMachineProfile<UserVmVO> profile, long hostId, String reservationId, Answer...answer) { public void finalizeStop(VirtualMachineProfile<UserVmVO> profile, long hostId, String reservationId, Answer...answer) {
UserVmVO vm = profile.getVirtualMachine();
_networkGroupMgr.handleVmStateTransition(vm, State.Stopped);
_ovsNetworkMgr.handleVmStateTransition(vm, State.Stopped);
} }
public String generateRandomPassword() { public String generateRandomPassword() {
@ -2521,6 +2518,11 @@ public class UserVmManagerImpl implements UserVmManager, UserVmService, Manager
} }
} }
@Override
public void completeStartCommand(UserVmVO vm) {
_itMgr.stateTransitTo(vm, VirtualMachine.Event.AgentReportRunning, vm.getHostId());
}
@Override @Override
public List<UserVmVO> searchForUserVMs(ListVMsCmd cmd) throws InvalidParameterValueException, PermissionDeniedException { public List<UserVmVO> searchForUserVMs(ListVMsCmd cmd) throws InvalidParameterValueException, PermissionDeniedException {

View File

@ -31,6 +31,7 @@ import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager; import com.cloud.agent.AgentManager;
import com.cloud.agent.AgentManager.OnError; import com.cloud.agent.AgentManager.OnError;
import com.cloud.agent.api.Answer; import com.cloud.agent.api.Answer;
import com.cloud.agent.api.StartAnswer;
import com.cloud.agent.api.StartCommand; import com.cloud.agent.api.StartCommand;
import com.cloud.agent.api.StopAnswer; import com.cloud.agent.api.StopAnswer;
import com.cloud.agent.api.StopCommand; import com.cloud.agent.api.StopCommand;
@ -388,6 +389,17 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Cluster
} }
} }
private Answer getStartAnswer(Answer[] answers) {
for (Answer ans : answers) {
if (ans instanceof StartAnswer) {
return ans;
}
}
assert 1 == 0 : "Why there is no Start Answer???";
return null;
}
@Override @Override
public <T extends VMInstanceVO> T advanceStart(T vm, Map<String, Object> params, User caller, Account account, HypervisorType hyperType) throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { public <T extends VMInstanceVO> T advanceStart(T vm, Map<String, Object> params, User caller, Account account, HypervisorType hyperType) throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException {
State state = vm.getState(); State state = vm.getState();
@ -495,7 +507,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Cluster
vmGuru.finalizeDeployment(cmds, vmProfile, dest, context); vmGuru.finalizeDeployment(cmds, vmProfile, dest, context);
try { try {
Answer[] answers = _agentMgr.send(dest.getHost().getId(), cmds); Answer[] answers = _agentMgr.send(dest.getHost().getId(), cmds);
if (answers[0].getResult() && vmGuru.finalizeStart(cmds, vmProfile, dest, context)) { if (getStartAnswer(answers).getResult() && vmGuru.finalizeStart(cmds, vmProfile, dest, context)) {
if (!stateTransitTo(vm, Event.OperationSucceeded, dest.getHost().getId())) { if (!stateTransitTo(vm, Event.OperationSucceeded, dest.getHost().getId())) {
throw new CloudRuntimeException("Unable to transition to a new state."); throw new CloudRuntimeException("Unable to transition to a new state.");
} }

View File

@ -1344,6 +1344,7 @@ CREATE TABLE `cloud`.`ovs_vm_flow_log` (
`instance_id` bigint unsigned NOT NULL COMMENT 'vm instance that needs flows to be synced.', `instance_id` bigint unsigned NOT NULL COMMENT 'vm instance that needs flows to be synced.',
`created` datetime NOT NULL COMMENT 'time the entry was requested', `created` datetime NOT NULL COMMENT 'time the entry was requested',
`logsequence` bigint unsigned COMMENT 'seq number to be sent to agent, uniquely identifies flow update', `logsequence` bigint unsigned COMMENT 'seq number to be sent to agent, uniquely identifies flow update',
`vm_name` varchar(255) NOT NULL COMMENT 'vm name',
PRIMARY KEY (`id`) PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8; ) ENGINE=InnoDB DEFAULT CHARSET=utf8;