diff --git a/.gitignore b/.gitignore index 01a43f2fb9a..6bc922c7d86 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +build/replace.properties build/build.number bin/ cloudstack-proprietary/ diff --git a/agent/src/com/cloud/agent/Agent.java b/agent/src/com/cloud/agent/Agent.java index 821cf6de5f8..25b1df31d69 100755 --- a/agent/src/com/cloud/agent/Agent.java +++ b/agent/src/com/cloud/agent/Agent.java @@ -77,8 +77,7 @@ import com.cloud.utils.script.Script; * **/ public class Agent implements HandlerFactory, IAgentControl { - private static final Logger s_logger = Logger.getLogger(Agent.class.getName()); - + private static final Logger s_logger = Logger.getLogger(Agent.class.getName()); public enum ExitStatus { Normal(0), // Normal status = 0. Upgrade(65), // Exiting for upgrade. diff --git a/api/src/com/cloud/network/ovs/OvsCreateGreTunnelAnswer.java b/api/src/com/cloud/network/ovs/OvsCreateGreTunnelAnswer.java new file mode 100644 index 00000000000..0520f3076fd --- /dev/null +++ b/api/src/com/cloud/network/ovs/OvsCreateGreTunnelAnswer.java @@ -0,0 +1,41 @@ +package com.cloud.network.ovs; + +import com.cloud.agent.api.Answer; +import com.cloud.agent.api.Command; + +public class OvsCreateGreTunnelAnswer extends Answer { + String hostIp; + String remoteIp; + String bridge; + String key; + + public OvsCreateGreTunnelAnswer(Command cmd, boolean success, String details) { + super(cmd, success, details); + } + + public OvsCreateGreTunnelAnswer(Command cmd, boolean success, + String details, String hostIp, String remoteIp, String bridge, + String key) { + super(cmd, success, details); + this.hostIp = hostIp; + this.remoteIp = remoteIp; + this.bridge = bridge; + this.key = key; + } + + public String getHostIp() { + return hostIp; + } + + public String getRemoteIp() { + return remoteIp; + } + + public String getBridge() { + return bridge; + } + + public String getKey() { + return key; + } +} diff --git a/api/src/com/cloud/network/ovs/OvsDeleteFlowCommand.java b/api/src/com/cloud/network/ovs/OvsDeleteFlowCommand.java new file mode 100644 index 00000000000..2765eb789db --- /dev/null +++ b/api/src/com/cloud/network/ovs/OvsDeleteFlowCommand.java @@ -0,0 +1,20 @@ +package com.cloud.network.ovs; + +import com.cloud.agent.api.Command; + +public class OvsDeleteFlowCommand extends Command { + String vmName; + + @Override + public boolean executeInSequence() { + return true; + } + + public String getVmName() { + return vmName; + } + + public OvsDeleteFlowCommand(String vmName) { + this.vmName = vmName; + } +} diff --git a/api/src/com/cloud/network/ovs/OvsSetTagAndFlowAnswer.java b/api/src/com/cloud/network/ovs/OvsSetTagAndFlowAnswer.java new file mode 100644 index 00000000000..5b495c7cdec --- /dev/null +++ b/api/src/com/cloud/network/ovs/OvsSetTagAndFlowAnswer.java @@ -0,0 +1,24 @@ +package com.cloud.network.ovs; + +import com.cloud.agent.api.Answer; +import com.cloud.agent.api.Command; + +public class OvsSetTagAndFlowAnswer extends Answer { + Long vmId; + Long seqno; + + public OvsSetTagAndFlowAnswer(Command cmd, boolean success, String details) { + super(cmd, success, details); + OvsSetTagAndFlowCommand c = (OvsSetTagAndFlowCommand)cmd; + this.vmId = c.getVmId(); + this.seqno = Long.parseLong(c.getSeqNo()); + } + + public Long getVmId() { + return vmId; + } + + public Long getSeqNo() { + return seqno; + } +} diff --git a/api/src/com/cloud/network/ovs/OvsSetTagAndFlowCommand.java b/api/src/com/cloud/network/ovs/OvsSetTagAndFlowCommand.java index 0ed7a67bfa5..c74f9c13e1f 100644 --- a/api/src/com/cloud/network/ovs/OvsSetTagAndFlowCommand.java +++ b/api/src/com/cloud/network/ovs/OvsSetTagAndFlowCommand.java @@ -5,12 +5,18 @@ import com.cloud.agent.api.Command; public class OvsSetTagAndFlowCommand extends Command { String vlans; String vmName; + String seqno; + Long vmId; @Override public boolean executeInSequence() { return true; } + public String getSeqNo() { + return seqno; + } + public String getVlans() { return vlans; } @@ -19,8 +25,14 @@ public class OvsSetTagAndFlowCommand extends Command { return vmName; } - public OvsSetTagAndFlowCommand(String vmName, String vlans) { + public Long getVmId() { + return vmId; + } + + public OvsSetTagAndFlowCommand(String vmName, String vlans, String seqno, Long vmId) { this.vmName = vmName; this.vlans = vlans; + this.seqno = seqno; + this.vmId = vmId; } } diff --git a/build/replace.properties b/build/replace.properties index 0b6b60c363f..61200f70217 100644 --- a/build/replace.properties +++ b/build/replace.properties @@ -6,3 +6,4 @@ DBHOST=localhost AGENTLOGDIR=logs AGENTLOG=logs/agent.log MSMNTDIR=/mnt +DBROOTPW= diff --git a/core/src/com/cloud/agent/api/PingRoutingWithOvsCommand.java b/core/src/com/cloud/agent/api/PingRoutingWithOvsCommand.java new file mode 100644 index 00000000000..c52efe1a2ca --- /dev/null +++ b/core/src/com/cloud/agent/api/PingRoutingWithOvsCommand.java @@ -0,0 +1,26 @@ +package com.cloud.agent.api; + +import java.util.List; +import java.util.Map; + +import com.cloud.host.Host; +import com.cloud.utils.Pair; +import com.cloud.vm.State; + +public class PingRoutingWithOvsCommand extends PingRoutingCommand { + List> states; + + protected PingRoutingWithOvsCommand() { + super(); + } + + public PingRoutingWithOvsCommand(Host.Type type, long id, + Map states, List> ovsStates) { + super(type, id, states); + this.states = ovsStates; + } + + public List> getStates() { + return states; + } +} diff --git a/core/src/com/cloud/hypervisor/xen/resource/CitrixResourceBase.java b/core/src/com/cloud/hypervisor/xen/resource/CitrixResourceBase.java index d51aab03e12..e607e4a08e9 100644 --- a/core/src/com/cloud/hypervisor/xen/resource/CitrixResourceBase.java +++ b/core/src/com/cloud/hypervisor/xen/resource/CitrixResourceBase.java @@ -94,6 +94,7 @@ import com.cloud.agent.api.ModifyStoragePoolCommand; import com.cloud.agent.api.PingCommand; import com.cloud.agent.api.PingRoutingCommand; import com.cloud.agent.api.PingRoutingWithNwGroupsCommand; +import com.cloud.agent.api.PingRoutingWithOvsCommand; import com.cloud.agent.api.PingTestCommand; import com.cloud.agent.api.PoolEjectCommand; import com.cloud.agent.api.PrepareForMigrationAnswer; @@ -159,7 +160,10 @@ import com.cloud.network.Networks; import com.cloud.network.Networks.BroadcastDomainType; import com.cloud.network.Networks.IsolationType; import com.cloud.network.Networks.TrafficType; +import com.cloud.network.ovs.OvsCreateGreTunnelAnswer; import com.cloud.network.ovs.OvsCreateGreTunnelCommand; +import com.cloud.network.ovs.OvsDeleteFlowCommand; +import com.cloud.network.ovs.OvsSetTagAndFlowAnswer; import com.cloud.network.ovs.OvsSetTagAndFlowCommand; import com.cloud.resource.ServerResource; import com.cloud.storage.Storage; @@ -247,6 +251,7 @@ public abstract class CitrixResourceBase implements ServerResource { protected StorageLayer _storage; protected boolean _canBridgeFirewall = false; + protected boolean _isOvs = false; protected HashMap _pools = new HashMap(5); public enum SRType { @@ -449,6 +454,8 @@ public abstract class CitrixResourceBase implements ServerResource { return execute((OvsCreateGreTunnelCommand)cmd); } else if (cmd instanceof OvsSetTagAndFlowCommand) { return execute((OvsSetTagAndFlowCommand)cmd); + } else if (cmd instanceof OvsDeleteFlowCommand) { + return execute((OvsDeleteFlowCommand)cmd); } else { return Answer.createUnsupportedCommandAnswer(cmd); } @@ -3312,9 +3319,12 @@ public abstract class CitrixResourceBase implements ServerResource { if (newStates == null) { newStates = new HashMap(); } - if (!_canBridgeFirewall) { + if (!_canBridgeFirewall && !_isOvs) { return new PingRoutingCommand(getType(), id, newStates); - } else { + } else if (_isOvs) { + List>ovsStates = ovsFullSyncStates(); + return new PingRoutingWithOvsCommand(getType(), id, newStates, ovsStates); + }else { HashMap> nwGrpStates = syncNetworkGroups(conn, id); return new PingRoutingWithNwGroupsCommand(getType(), id, newStates, nwGrpStates); } @@ -3820,7 +3830,61 @@ public abstract class CitrixResourceBase implements ServerResource { return Boolean.valueOf(callHostPlugin(conn, "vmops", "can_bridge_firewall", "host_uuid", _host.uuid)); } - private Answer execute(OvsSetTagAndFlowCommand cmd) { + private Answer execute(OvsDeleteFlowCommand cmd) { + _isOvs = true; + + Connection conn = getConnection(); + try { + String nwName = Networks.BroadcastScheme.VSwitch.toString(); + Network nw = getNetworkByName(conn, nwName); + assert nw!= null : "Why there is no vswith network ???"; + String bridge = nw.getBridge(conn); + String result = callHostPlugin(conn, "vmops", "ovs_delete_flow", "bridge", bridge, + "vmName", cmd.getVmName()); + + if (result.equalsIgnoreCase("SUCCESS")) { + return new Answer(cmd, true, "success to delete flows for " + cmd.getVmName()); + } else { + return new Answer(cmd, false, result); + } + } catch (Exception e) { + e.printStackTrace(); + } + return new Answer(cmd, false, "failed to delete flow for " + cmd.getVmName()); + } + + private List> ovsFullSyncStates() { + Connection conn = getConnection(); + try { + String result = callHostPlugin(conn, "vmops", "ovs_get_vm_log", "host_uuid", _host.uuid); + String [] logs = result != null ?result.split(";"): new String [0]; + List> states = new ArrayList>(); + for (String log: logs){ + String [] info = log.split(","); + if (info.length != 4) { + s_logger.warn("Wrong element number in ovs log"); + continue; + } + + //','.join([bridge, vmName, vmId, seqno]) + try { + states.add(new Pair(info[0], Long.parseLong(info[3]))); + } catch (NumberFormatException nfe) { + states.add(new Pair(info[0], -1L)); + } + } + + return states; + } catch (Exception e) { + e.printStackTrace(); + } + + return null; + } + + private OvsSetTagAndFlowAnswer execute(OvsSetTagAndFlowCommand cmd) { + _isOvs = true; + Connection conn = getConnection(); try { String nwName = Networks.BroadcastScheme.VSwitch.toString(); @@ -3833,48 +3897,47 @@ public abstract class CitrixResourceBase implements ServerResource { * plugin side */ String result = callHostPlugin(conn, "vmops", "ovs_set_tag_and_flow", "bridge", bridge, - "vmName", cmd.getVmName(), "vlans", cmd.getVlans()); + "vmName", cmd.getVmName(), "vlans", cmd.getVlans(), "seqno", cmd.getSeqNo()); s_logger.debug("set flow for " + cmd.getVmName() + " " + result); if (result.equalsIgnoreCase("SUCCESS")) { - return new Answer(cmd, true, "Set flow for " + cmd.getVmName() - + " success, vlans:" + cmd.getVlans()); + return new OvsSetTagAndFlowAnswer(cmd, true, result); } else { - return new Answer(cmd, false, "Set flow for " + cmd.getVmName() - + " failed, vlans:" + cmd.getVlans()); + return new OvsSetTagAndFlowAnswer(cmd, false, result); } } catch (Exception e) { e.printStackTrace(); } - return new Answer(cmd, false, "Set flow for " + cmd.getVmName() - + " failed, vlans:" + cmd.getVlans()); + return new OvsSetTagAndFlowAnswer(cmd, false, "EXCEPTION"); } - private Answer execute(OvsCreateGreTunnelCommand cmd) { + + private OvsCreateGreTunnelAnswer execute(OvsCreateGreTunnelCommand cmd) { + _isOvs = true; + Connection conn = getConnection(); + String bridge = "unkonwn"; try { - String nwName = Networks.BroadcastScheme.VSwitch.toString(); - Network nw = getNetworkByName(conn, nwName); - if (nw == null) { - nw = setupvSwitchNetwork(conn); - } + //TODO: we may store vswtich network to _host + Network nw = setupvSwitchNetwork(conn); + bridge = nw.getBridge(conn); String result = callHostPlugin(conn, "vmops", "vlanRemapUtils", - "op", "createGRE", "bridge", nw.getBridge(conn), + "op", "createGRE", "bridge", bridge, "remoteIP", cmd.getRemoteIp(), "greKey", cmd.getKey()); - if (result.equalsIgnoreCase("SUCCESS")) { - return new Answer(cmd, true, "create gre tunnel to " - + cmd.getRemoteIp() + " success"); + if (result.equalsIgnoreCase("SUCCESS") || result.equalsIgnoreCase("TUNNEL_EXISTED")) { + return new OvsCreateGreTunnelAnswer(cmd, true, result); } else { - return new Answer(cmd, false, "create gre tunnel to " - + cmd.getRemoteIp() + " failed"); + return new OvsCreateGreTunnelAnswer(cmd, false, result, + _host.ip, cmd.getRemoteIp(), bridge, cmd.getKey()); } } catch (Exception e) { e.printStackTrace(); } - return new Answer(cmd, false, "create gre tunnel to " + cmd.getRemoteIp() + " failed"); + return new OvsCreateGreTunnelAnswer(cmd, false, "EXCEPTION", _host.ip, + cmd.getRemoteIp(), bridge, cmd.getKey()); } private Answer execute(SecurityIngressRulesCmd cmd) { diff --git a/scripts/vm/hypervisor/xenserver/vlanRemapUtils.py b/scripts/vm/hypervisor/xenserver/vlanRemapUtils.py index 4be4b8d8a99..58850aef5a4 100644 --- a/scripts/vm/hypervisor/xenserver/vlanRemapUtils.py +++ b/scripts/vm/hypervisor/xenserver/vlanRemapUtils.py @@ -29,6 +29,9 @@ errors = \ "SWITCH_NOT_RUN" : "SWITCH_NOT_RUN", \ "NO_VSCTL" : "NO_VSCTL", \ "COMMAND_FAILED" : "COMMAND_FAILED", \ + "TUNNEL_EXISTED" : "TUNNEL_EXISTED", \ + "NO_INPORT" : "NO_INPORT", \ + "NO_OFPORT" : "NO_OFPORT", \ "ERR_ARGS_NUM" : "ERR_ARGS_NUM", \ "ERROR_OP" : "ERROR_OP", \ @@ -137,24 +140,24 @@ def isUUID (uuid): return 0 -#FIXME: better check method def checkGREInterface (bridge, remoteIP, greKey): - listIfaces = [vsctlPath, "list interface"] - res = doCmd (listIfaces, True) + ports = getPortsOnBridge(bridge) + if ports == None: + return 0 - start = False - num = 0 - keyStr = "key=%s" % greKey - uuid = '' - for i in res: - if "_uuid" in i: - (x, uuid) = i.split(":") - uuid = strip(uuid) + for i in ports: + ifaces = getInterfacesOnPort(i) + if ifaces == None: + continue - if "options" in i and remoteIP in i and keyStr in i: - log("WARNING: GRE tunnel for remote_ip=%s key=%s already here" % \ - (remoteIP, greKey)) - return -1 + for j in ifaces: + if j == '[]': + continue + options = getFieldOfInterface(j, "options") + if remoteIP in options and greKey in options: + log("WARNING: GRE tunnel for remote_ip=%s key=%s already here, \ +interface(%s)" % (remoteIP, greKey, j)) + return -1 return 0 @@ -164,8 +167,17 @@ def createGRE (bridge, remoteIP, greKey): name = "%sgre" % bridge if checkGREInterface(bridge, remoteIP, greKey) < 0: + result = errors["TUNNEL_EXISTED"] return 0 + wait = [vsctlPath, "--timeout=30 wait-until bridge %s -- get bridge %s name" % \ + (bridge, bridge)] + res = doCmd(wait) + if bridge not in res: + log("WARNIING:Can't find bridge %s for creating tunnel!" % bridge) + result = errors["COMMAND_FAILED"] + return -1 + createInterface = [vsctlPath, "create interface", "name=%s" % name, \ 'type=gre options:"remote_ip=%s key=%s"' % (remoteIP, greKey)] ifaceUUID = doCmd (createInterface) @@ -180,9 +192,18 @@ def createGRE (bridge, remoteIP, greKey): result = errors["COMMAND_FAILED"]; return -1 - addBridge = [vsctlPath, "add bridge %s" % bridge, "port %s" % portUUID] + addBridge = [vsctlPath, "add bridge %s" % bridge, "ports %s" % portUUID] doCmd (addBridge) - return 0 + + wait = [vsctlPath, "--timeout=5 wait-until port %s -- get port %s name" % \ + (name, name)] + res = doCmd(wait) + if name in res: + result = errors["SUCCESS"] + return 0 + else: + result = errors["COMMAND_FAILED"] + return -1 ######################## End GRE creation utils ########################## ######################## Flow creation utils ########################## @@ -263,7 +284,7 @@ def getOfPortsByType(bridge, askGre): portUuids = getPortsOnBridge(bridge) if portUuids == None: log("WARNING:No ports on bridge %s" % bridge) - return -1 + return [] OfPorts = [] for i in portUuids: @@ -293,28 +314,14 @@ def getNoneGreOfPort(bridge): def getGreOfPorts(bridge): return getOfPortsByType(bridge, True) -def findInPort(): - listIface = [vsctlPath, "list interface"] - res = doCmd (listIface, True) - - inport = [] - port = "" - for i in res: - if "ofport" in i: - (x, port) = i.split(":") - port = port.lstrip().rstrip() - - if "type" in i: - (x, type) = i.split(":") - type = type.lstrip().rstrip() - if type == "gre": - inport.append (port) - return inport - - def formatFlow(inPort, vlan, mac, outPut): flow = "in_port=%s dl_vlan=%s dl_dst=%s idle_timeout=0 hard_timeout=0 \ - actions=strip_vlan,output:%s" % (inPort, vlan, mac, outPut) + priority=10000 actions=strip_vlan,output:%s" % (inPort, vlan, mac, outPut) + return flow + +def formatDropFlow(inPort, vlan): + flow = "in_port=%s dl_vlan=%s priority=0 idle_timeout=0 hard_timeout=0 \ + actions=drop" % (inPort, vlan) return flow def delFlow(mac): @@ -327,6 +334,21 @@ def delARPFlow(vlan): delFlow = ["ovs-ofctl del-flows %s" % bridge, '"%s"' % param] doCmd(delFlow) +def delDHCPFlow(vlan): + param = "dl_type=0x0800 nw_proto=6 tp_src=547 dl_vlan=%s" % vlan + delFlow = ["ovs-ofctl del-flows %s" % bridge, '"%s"' % param] + doCmd(delFlow) + +def formatDHCPFlow(bridge, inPort, vlan, ports): + outputs = '' + for i in ports: + str = "output:%s," % i + outputs += str + outputs = outputs[:-1] + flow = "in_port=%s dl_vlan=%s dl_type=0x0800 nw_proto=6 tp_src=547 idle_timeout=0 hard_timeout=0 \ + priority=10000 actions=strip_vlan,%s" % (inPort, vlan, outputs) + return flow + def formatARPFlow(bridge, inPort, vlan, ports): outputs = '' for i in ports: @@ -335,21 +357,25 @@ def formatARPFlow(bridge, inPort, vlan, ports): outputs = outputs[:-1] flow = "in_port=%s dl_vlan=%s dl_type=0x0806 idle_timeout=0 hard_timeout=0 \ - actions=strip_vlan,%s" % (inPort, vlan, outputs) + priority=10000 actions=strip_vlan,%s" % (inPort, vlan, outputs) return flow def createFlow (bridge, vifName, mac, remap): + global result inport = getGreOfPorts(bridge) if len(inport) == 0: log("WARNING: no inport found") + result = errors["NO_INPORT"] return -1 output = getVifPort(bridge, vifName) if output == None: log("WARNING: cannot find ofport for %s" % vifName) + result = errors["NO_OFPORT"] return -1 if output == '[]': log("WARNING: ofport is [] for %s" % vifName) + result = errors["NO_OFPORT"] return -1 #del old flow here, if any, but in normal there should be no old flow @@ -363,23 +389,36 @@ def createFlow (bridge, vifName, mac, remap): noneGreOfPorts = getNoneGreOfPort(bridge) isARP = True if len(noneGreOfPorts) == 0: - log("WARNING: no none GRE ofports found, no ARP flow will be created") + log("WARNING: no none GRE ofports found, no ARP flow and DHCP flow will be created") isARP = False for j in remap.split("/"): delARPFlow(j) + delDHCPFlow(j) for i in inport: + flow = formatDropFlow(i, j) + param = bridge + ' "%s"' % flow + dropflow = ["ovs-ofctl add-flow", param] + doCmd (dropflow) + flow = formatFlow(i, j, mac, output) param = bridge + ' "%s"' % flow addflow = ["ovs-ofctl add-flow", param] doCmd (addflow) + if isARP == True: flow = formatARPFlow(bridge, i, j, noneGreOfPorts) param = bridge + ' "%s"' % flow addflow = ["ovs-ofctl add-flow", param] doCmd (addflow) + flow = formatDHCPFlow(bridge, i, j, noneGreOfPorts) + param = bridge + ' "%s"' % flow + addflow = ["ovs-ofctl add-flow", param] + doCmd (addflow) + + result = errors["SUCCESS"] return 0 ######################## End Flow creation utils ########################## @@ -402,20 +441,24 @@ def setTag(bridge, vifName, vlan): return 0 def doCreateGRE(bridge, remoteIP, key): + global result if createGRE(bridge, remoteIP, key) < 0: - log("WARNING: create GRE tunnel on %s for %s failed" % (bridge, \ + log("create GRE tunnel on %s for %s failed" % (bridge, \ remoteIP)) else: log("WARNING: create GRE tunnel on %s for %s success" % (bridge, \ remoteIP)) + print result def doCreateFlow (bridge, vifName, mac, remap): + global result if createFlow(bridge, vifName, mac, remap) < 0: log ("Create flow failed(bridge=%s, vifName=%s, mac=%s,\ remap=%s" % (bridge, vifName, mac, remap)) else: log ("Create flow success(bridge=%s, vifName=%s, mac=%s,\ remap=%s" % (bridge, vifName, mac, remap)) + print result def doSetTag (bridge, vifName, tag): setTag(bridge, vifName, tag) @@ -469,6 +512,7 @@ if __name__ == "__main__": remoteIP = sys.argv[3] key = sys.argv[4] doCreateGRE(bridge, remoteIP, key) + sys.exit(0) elif op == "createFlow": checkArgNum(6) bridge = sys.argv[2] @@ -476,6 +520,7 @@ if __name__ == "__main__": mac = sys.argv[4] remap = sys.argv[5] doCreateFlow(bridge, vifName, mac, remap) + sys.exit(0) elif op == "deleteFlow": checkArgNum(6) bridge = sys.argv[2] diff --git a/scripts/vm/hypervisor/xenserver/vmops b/scripts/vm/hypervisor/xenserver/vmops index 1c0071f8032..fb6f9a2194b 100755 --- a/scripts/vm/hypervisor/xenserver/vmops +++ b/scripts/vm/hypervisor/xenserver/vmops @@ -553,19 +553,187 @@ def default_ebtables_rules(vm_name, vif, vm_ip, vm_mac): except: util.SMlog("Failed to program default ebtables OUT rules") return 'false' - -def ovs_set_tag_and_flow(session, args): - def get_vif_field(name, field): - return session.xenapi.VIF.get_record(name).get(field) +def format_ovs_vm_log_name(vmName): + vm_name = "ovs-%s" % vmName; + logfilename = "/var/run/cloud/" + vm_name +".log" + return logfilename + +def remove_ovs_log_for_vm(vmName): + logfilename = format_ovs_vm_log_name(vmName) + + result = True + try: + os.remove(logfilename) + except: + util.SMlog("Failed to delete ovs log file " + logfilename) + result = False + return result + +def ovs_get_info_from_log(vmName, num): + logfilename = format_ovs_vm_log_name(vmName) + try: + lines = [line.rstrip() for line in open(logfilename)] + return lines[num] + except: + util.SMlog("Failed to open ovs log %s" % logfilename); + remove_ovs_log_for_vm(vmName) + return None + +def ovs_get_common_info_from_log(vmName): + return ovs_get_info_from_log(vmName, 0) + +def ovs_get_nic_info_from_log(vmName): + return ovs_get_info_from_log(vmName, 1) + +def ovs_get_mac_info_from_log(vmName): + return ovs_get_info_from_log(vmName, 2) + +def ovs_get_vlans_info_from_log(vmName): + return ovs_get_info_from_log(vmName, 3) + +def ovs_parse_common_info_from_log(vmName, num): + info = ovs_get_common_info_from_log(vmName) + if info == None: + return None + return info.split(",")[num] + +def ovs_get_bridge_from_log(vmName): + return ovs_parse_common_info_from_log(vmName, 1) + +def ovs_get_vm_id_from_log(vmName): + return ovs_parse_common_info_from_log(vmName, 2) + +def ovs_get_seqno_from_log(vmName): + return ovs_parse_common_info_from_log(vmName, 3) + +def ovs_handle_rebooted_vm(session, vmName): + curr_domid = '-1' + + (curr_domid, vifrs, hostuuid) = ovs_get_domid_vifrs_hostuuid(session, vmName) + + old_id = ovs_get_vm_id_from_log(vmName) + if curr_domid == old_id: + util.SMlog("OvsInfo:%s is normal" % vmName) + return True + + util.SMlog("%s rebooted, reset flow for it" % vmName) + try: + vlans = ovs_get_vlans_info_from_log(vmName) + bridge = ovs_get_bridge_from_log(vmName) + except Exception, e: + util.SMlog(e.__str__()) + util.SMlog("ovs get info from %s failed" % \ + format_ovs_vm_log_name(vmName)) + return False + + i = 0 + if vlans == None: + util.SMlog("OVSErr: cannot get vlans for %s" % vmName) + return False + + tag = vlans.split('/')[0] + nics = [] + macs = [] + for vifr in vifrs: + vifName = "vif" + curr_domid + "." + vifr[0] + vlanRemapUtils(session, {"op":"setTag", "vifName":vifName, "bridge":bridge, "tag":tag}) + vlanRemapUtils(session, {"op":"createFlow", "vifName":vifName, "bridge":bridge, "mac":vifr[1], "remap":vlans}) + nics.append(vifName) + macs.append(vifr[1]) + i += 1 + seqno = ovs_get_seqno_from_log(vmName) + ovs_write_vm_log(bridge, vmName, curr_domid, seqno, nics, macs, vlans) + + #see if there is rebooted vm to handle + ovs_get_vm_log(session, {"host_uuid":hostuuid}) + return True + +@echo +def ovs_get_vm_log(session, args): + host_uuid = args.pop('host_uuid') + try: + session = get_xapi_session() + + thishost = session.xenapi.host.get_by_uuid(host_uuid) + hostrec = session.xenapi.host.get_record(thishost) + vms = hostrec.get('resident_VMs') + except: + util.SMlog("Failed to get host from uuid " + host_uuid) + return ' ' + + result = [] + try: + for name in [session.xenapi.VM.get_name_label(x) for x in vms]: + if 1 not in [ name.startswith(c) for c in ['r-', 'i-'] ]: + continue + ovs_handle_rebooted_vm(session, name) + if name.startswith('i-'): + info = ovs_get_common_info_from_log(name) + result.append(info) + except Exception, e: + util.SMlog(e.__str__()) + util.SMlog("OVs failed to get rule logs, better luck next time!") + + return ";".join(result) + +def ovs_write_vm_log(bridge, vmName, vmId, seqno, vifNames, macs, vlans): + logfilename = format_ovs_vm_log_name(vmName) + + util.SMlog("Writing ovs log to " + logfilename) + logf = open(logfilename, 'w') + output = ','.join([vmName, bridge, vmId, seqno]) + result = True + try: + logf.write(output) + logf.write('\n') + output = ','.join(vifNames) + logf.write(output) + logf.write('\n') + output = ','.join(macs) + logf.write(output) + logf.write('\n') + logf.write(vlans) + logf.write('\n') + except: + util.SMlog("Failed to write to ovs log file " + logfilename) + result = False + + logf.close() + + return result + +def ovs_delete_flow(session, args): bridge = args.pop('bridge') vm_name = args.pop('vmName') - vlanStr = args.pop('vlans') + + nicStr = ovs_get_nic_info_from_log(vm_name) + macStr = ovs_get_mac_info_from_log(vm_name) + vlanStr = ovs_get_vlans_info_from_log(vm_name) + + if nicStr == None or macStr == None or vlanStr == None: + return 'ERROR_LOG' + + nics = nicStr.split(',') + macs = macStr.split(',') + if len(nics) != len(macs): + return 'ERROR_LOG' + + i = 0 + for nic in nics: + vlanRemapUtils(session, {"op":"deleteFlow", "bridge":bridge, \ + "vifName":nic, "mac":macs[i], "remap":vlanStr}) + i += 1 + return 'SUCCESS' + +def ovs_get_domid_vifrs_hostuuid(session, vm_name): + def get_vif_field(name, field): + return session.xenapi.VIF.get_record(name).get(field) try: vm = session.xenapi.VM.get_by_name_label(vm_name) if len(vm) != 1: - return 'false' + return 'NO_VM' vm_rec = session.xenapi.VM.get_record(vm[0]) vm_vifs = vm_rec.get('VIFs') vifrs = [] @@ -573,13 +741,27 @@ def ovs_set_tag_and_flow(session, args): rec = (get_vif_field(vif, 'device'), get_vif_field(vif, 'MAC')) vifrs.append(rec) domid = vm_rec.get('domid') + host = vm_rec.get('resident_on') + host_rec = session.xenapi.host.get_record(host) + uuid = host_rec.get('uuid') + util.SMlog("OVSINFO: (domid:%s, uuid:%s)" % (domid, uuid)) + return (domid, vifrs, uuid) + except: util.SMlog("### Failed to get domid or vif list for vm ##" + vm_name) - return 'false' - + return (-1, []) + +def ovs_set_tag_and_flow(session, args): + bridge = args.pop('bridge') + vm_name = args.pop('vmName') + vlanStr = args.pop('vlans') + seqno = args.pop('seqno') + + (domid, vifrs, hostuuid) = ovs_get_domid_vifrs_hostuuid(session, vm_name) + if domid == '-1': util.SMlog("### Failed to get domid for vm (-1): " + vm_name) - return 'false' + return 'NO_DOMID' if len(vifrs) == 0: return 'SUCCESS' @@ -587,12 +769,20 @@ def ovs_set_tag_and_flow(session, args): if vlanStr.startswith("/"): vlanStr = vlanStr[1:] if vlanStr.endswith("/"): vlanStr = vlanStr[:-1] vlans = vlanStr.split("/") + vifNames = [] + macs = [] for vifr in vifrs: vifName = "vif" + domid + "." + vifr[0] + vifNames.append(vifName) mac = vifr[1] + macs.append(mac) vlanRemapUtils(session, {"op":"setTag", "vifName":vifName, "bridge":bridge, "tag":vlans[0]}) vlanRemapUtils(session, {"op":"createFlow", "vifName":vifName, "bridge":bridge, "mac":mac, "remap":vlanStr}) + res = ovs_write_vm_log(bridge, vm_name, domid, seqno, vifNames, macs, vlanStr) + if res == 'false': + return 'CREATE_LOG_FAILED' + return 'SUCCESS' @echo @@ -938,6 +1128,7 @@ def cleanup_rules(session, args): util.SMlog("Failed to cleanup rules !") return '-1'; + @echo def check_rule_log_for_vm(vmName, vmID, vmIP, domID, signature, seqno): vm_name = vmName; @@ -1107,5 +1298,5 @@ def network_rules(session, args): if __name__ == "__main__": - XenAPIPlugin.dispatch({"pingtest": pingtest, "setup_iscsi":setup_iscsi, "gethostvmstats": gethostvmstats, "getvncport": getvncport, "getgateway": getgateway, "preparemigration": preparemigration, "setIptables": setIptables, "pingdomr": pingdomr, "pingxenserver": pingxenserver, "ipassoc": ipassoc, "vm_data": vm_data, "savePassword": savePassword, "saveDhcpEntry": saveDhcpEntry, "setFirewallRule": setFirewallRule, "setLoadBalancerRule": setLoadBalancerRule, "createFile": createFile, "deleteFile": deleteFile, "networkUsage": networkUsage, "network_rules":network_rules, "can_bridge_firewall":can_bridge_firewall, "default_network_rules":default_network_rules, "destroy_network_rules_for_vm":destroy_network_rules_for_vm, "default_network_rules_systemvm":default_network_rules_systemvm, "get_rule_logs_for_vms":get_rule_logs_for_vms, "setLinkLocalIP":setLinkLocalIP, "lt2p_vpn":lt2p_vpn,"cleanup_rules":cleanup_rules,"vlanRemapUtils":vlanRemapUtils, "ovs_set_tag_and_flow":ovs_set_tag_and_flow}) + XenAPIPlugin.dispatch({"pingtest": pingtest, "setup_iscsi":setup_iscsi, "gethostvmstats": gethostvmstats, "getvncport": getvncport, "getgateway": getgateway, "preparemigration": preparemigration, "setIptables": setIptables, "pingdomr": pingdomr, "pingxenserver": pingxenserver, "ipassoc": ipassoc, "vm_data": vm_data, "savePassword": savePassword, "saveDhcpEntry": saveDhcpEntry, "setFirewallRule": setFirewallRule, "setLoadBalancerRule": setLoadBalancerRule, "createFile": createFile, "deleteFile": deleteFile, "networkUsage": networkUsage, "network_rules":network_rules, "can_bridge_firewall":can_bridge_firewall, "default_network_rules":default_network_rules, "destroy_network_rules_for_vm":destroy_network_rules_for_vm, "default_network_rules_systemvm":default_network_rules_systemvm, "get_rule_logs_for_vms":get_rule_logs_for_vms, "setLinkLocalIP":setLinkLocalIP, "lt2p_vpn":lt2p_vpn, "vlanRemapUtils":vlanRemapUtils, "ovs_set_tag_and_flow":ovs_set_tag_and_flow, "ovs_get_vm_log":ovs_get_vm_log,"ovs_delete_flow":ovs_delete_flow,"cleanup_rules":cleanup_rules}) diff --git a/server/src/com/cloud/network/element/OvsElement.java b/server/src/com/cloud/network/element/OvsElement.java index 4d660b8723c..f1e5c5683c9 100644 --- a/server/src/com/cloud/network/element/OvsElement.java +++ b/server/src/com/cloud/network/element/OvsElement.java @@ -63,7 +63,7 @@ public class OvsElement extends AdapterBase implements NetworkElement { } if (network.getTrafficType() == Networks.TrafficType.Guest) { - _ovsNetworkMgr.CheckAndUpdateDhcpFlow(network); + _ovsNetworkMgr.CheckAndUpdateDhcpFlow(network, vm.getVirtualMachine()); } return true; } diff --git a/server/src/com/cloud/network/ovs/OvsListener.java b/server/src/com/cloud/network/ovs/OvsListener.java new file mode 100644 index 00000000000..4915d4ddfc0 --- /dev/null +++ b/server/src/com/cloud/network/ovs/OvsListener.java @@ -0,0 +1,114 @@ +package com.cloud.network.ovs; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.log4j.Logger; + +import com.cloud.agent.Listener; +import com.cloud.agent.api.AgentControlAnswer; +import com.cloud.agent.api.AgentControlCommand; +import com.cloud.agent.api.Answer; +import com.cloud.agent.api.Command; +import com.cloud.agent.api.PingRoutingWithOvsCommand; +import com.cloud.agent.api.StartupCommand; +import com.cloud.exception.ConnectionException; +import com.cloud.host.HostVO; +import com.cloud.host.Status; +import com.cloud.network.ovs.dao.OvsWorkDao; +import com.cloud.network.ovs.dao.OvsWorkVO.Step; + +public class OvsListener implements Listener { + public static final Logger s_logger = Logger.getLogger(OvsListener.class.getName()); + OvsNetworkManager _ovsNetworkMgr; + OvsWorkDao _workDao; + + public OvsListener(OvsNetworkManager ovsMgr, OvsWorkDao workDao) { + this._ovsNetworkMgr = ovsMgr; + this._workDao = workDao; + } + + @Override + public boolean processAnswers(long agentId, long seq, Answer[] answers) { + Set failedFlowVms = new HashSet(); + + for (Answer ans: answers) { + if (ans instanceof OvsCreateGreTunnelAnswer) { + OvsCreateGreTunnelAnswer r = (OvsCreateGreTunnelAnswer)ans; + String s = String.format("(hostIP:%1$s, remoteIP:%2$s, bridge:%3$s, greKey:%4$s)", + r.getHostIp(), r.getRemoteIp(), + r.getBridge(), r.getKey()); + if (!r.getResult()) { + s_logger.warn("Create GRE tunnel failed due to " + r.getDetails() + s); + } else { + s_logger.info("Create GRE tunnel success" + s); + } + } else if (ans instanceof OvsSetTagAndFlowAnswer) { + OvsSetTagAndFlowAnswer r = (OvsSetTagAndFlowAnswer)ans; + if (!r.getResult()) { + s_logger.warn("Failed to set flow for VM " + r.getVmId()); + _workDao.updateStep(r.getVmId(), r.getSeqNo(), Step.Error); + failedFlowVms.add(r.getVmId()); + } else { + s_logger.info("Success to set flow for VM " + r.getVmId()); + _workDao.updateStep(r.getVmId(), r.getSeqNo(), Step.Done); + } + } + + //TODO: handle delete failure + } + + if (failedFlowVms.size() > 0) { + _ovsNetworkMgr.scheduleFlowUpdateToHosts(failedFlowVms, false, new Long(10*1000l)); + } + + return true; + } + + @Override + public boolean processCommands(long agentId, long seq, Command[] commands) { + boolean processed = false; + for (Command cmd : commands) { + if (cmd instanceof PingRoutingWithOvsCommand) { + PingRoutingWithOvsCommand ping = (PingRoutingWithOvsCommand)cmd; + if (ping !=null && ping.getStates().size() > 0) { + _ovsNetworkMgr.fullSync(ping.getStates()); + } + processed = true; + } + } + return processed; + } + + @Override + public AgentControlAnswer processControlCommand(long agentId, + AgentControlCommand cmd) { + return null; + } + + @Override + public void processConnect(HostVO host, StartupCommand cmd) + throws ConnectionException { + } + + @Override + public boolean processDisconnect(long agentId, Status state) { + return true; + } + + @Override + public boolean isRecurring() { + return false; + } + + @Override + public int getTimeout() { + return -1; + } + + @Override + public boolean processTimeout(long agentId, long seq) { + return true; + } + +} diff --git a/server/src/com/cloud/network/ovs/OvsNetworkManager.java b/server/src/com/cloud/network/ovs/OvsNetworkManager.java index 41687e92e00..02bf582143e 100644 --- a/server/src/com/cloud/network/ovs/OvsNetworkManager.java +++ b/server/src/com/cloud/network/ovs/OvsNetworkManager.java @@ -1,13 +1,20 @@ package com.cloud.network.ovs; +import java.util.List; +import java.util.Set; + import com.cloud.agent.manager.Commands; import com.cloud.deploy.DeployDestination; import com.cloud.network.Network; +import com.cloud.resource.ServerResource; import com.cloud.uservm.UserVm; +import com.cloud.utils.Pair; import com.cloud.utils.component.Manager; import com.cloud.vm.DomainRouterVO; import com.cloud.vm.State; import com.cloud.vm.UserVmVO; +import com.cloud.vm.VMInstanceVO; +import com.cloud.vm.VirtualMachine; import com.cloud.vm.VirtualMachineProfile; public interface OvsNetworkManager extends Manager { @@ -27,11 +34,15 @@ public interface OvsNetworkManager extends Manager { VirtualMachineProfile profile, DeployDestination dest); - public void CheckAndUpdateDhcpFlow(Network nw); + public void CheckAndUpdateDhcpFlow(Network nw, VirtualMachine vm); - public void handleVmStateTransition(UserVm userVm, State vmState); + public void handleVmStateTransition(VMInstanceVO userVm, State vmState); public void RouterCheckAndCreateTunnel(Commands cmds, VirtualMachineProfile profile, DeployDestination dest); + + public void fullSync(List> states); + + public void scheduleFlowUpdateToHosts(Set affectedVms, boolean updateSeqno, Long delayMs); } diff --git a/server/src/com/cloud/network/ovs/OvsNetworkManagerImpl.java b/server/src/com/cloud/network/ovs/OvsNetworkManagerImpl.java index b7a12e6436c..029a275cb51 100644 --- a/server/src/com/cloud/network/ovs/OvsNetworkManagerImpl.java +++ b/server/src/com/cloud/network/ovs/OvsNetworkManagerImpl.java @@ -37,6 +37,8 @@ import com.cloud.network.ovs.dao.VlanMappingVO; import com.cloud.network.ovs.dao.VmFlowLogDao; import com.cloud.network.ovs.dao.VmFlowLogVO; import com.cloud.server.ManagementServer; +import com.cloud.user.AccountVO; +import com.cloud.user.dao.AccountDao; import com.cloud.uservm.UserVm; import com.cloud.utils.Pair; import com.cloud.utils.component.ComponentLocator; @@ -54,6 +56,7 @@ import com.cloud.vm.VirtualMachineProfile; import com.cloud.vm.dao.DomainRouterDao; import com.cloud.vm.dao.NicDao; import com.cloud.vm.dao.UserVmDao; +import com.cloud.vm.dao.VMInstanceDao; @Local(value={OvsNetworkManager.class}) public class OvsNetworkManagerImpl implements OvsNetworkManager { @@ -70,10 +73,13 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager { @Inject OvsWorkDao _workDao; @Inject VmFlowLogDao _flowLogDao; @Inject UserVmDao _userVMDao; + @Inject VMInstanceDao _instanceDao; + @Inject AccountDao _accountDao; String _name; boolean _isEnabled; ScheduledExecutorService _executorPool; ScheduledExecutorService _cleanupExecutor; + OvsListener _ovsListener; private long _serverId; private final long _timeBetweenCleanups = 30; //seconds @@ -109,6 +115,8 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager { _serverId = ((ManagementServer)ComponentLocator.getComponent(ManagementServer.Name)).getId(); _executorPool = Executors.newScheduledThreadPool(10, new NamedThreadFactory("OVS")); _cleanupExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("OVS-Cleanup")); + _ovsListener = new OvsListener(this, _workDao); + _agentMgr.registerForHostEvents(_ovsListener, true, true, true); return true; } @@ -198,16 +206,17 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager { if (vm != null && vm.getState() == State.Running) { agentId = vm.getHostId(); if (agentId != null ) { - //TODO: set flow here - OvsSetTagAndFlowCommand cmd = new OvsSetTagAndFlowCommand(vm.getName(),vlans); + OvsSetTagAndFlowCommand cmd = new OvsSetTagAndFlowCommand( + vm.getName(), vlans, seqnum.toString(), vm.getId()); Commands cmds = new Commands(cmd); try { - //_agentMgr.send(agentId, cmds, _answerListener); - _agentMgr.send(agentId, cmds, null); - //TODO: clean dirty in answerListener + _agentMgr.send(agentId, cmds, _ovsListener); + // TODO: clean dirty in answerListener } catch (AgentUnavailableException e) { - s_logger.debug("Unable to send updates for vm: " + userVmId + "(agentid=" + agentId + ")"); - _workDao.updateStep(work.getInstanceId(), seqnum, Step.Done); + s_logger.debug("Unable to send updates for vm: " + + userVmId + "(agentid=" + agentId + ")"); + _workDao.updateStep(work.getInstanceId(), seqnum, + Step.Done); } } } @@ -283,6 +292,7 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager { vlans.add(new Long(vo.getVlan())); } + assert vlans.size() > 0 : "Vlan map can't be null"; StringBuffer buf = new StringBuffer(); for (Long i : vlans) { buf.append("/"); @@ -331,9 +341,11 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager { HostVO rHost = _hostDao.findById(i.longValue()); cmds.addCommand( 0, new OvsCreateGreTunnelCommand(rHost.getPrivateIpAddress(), "1")); - _agentMgr.send(i.longValue(), new OvsCreateGreTunnelCommand(myIp, "1")); + Commands cmd2s = new Commands( new OvsCreateGreTunnelCommand(myIp, "1")); + _agentMgr.send(i.longValue(), cmd2s , _ovsListener); s_logger.debug("Ask host " + i.longValue() + " to create gre tunnel to " + hostId); } + _vlanMappingDirtyDao.markDirty(accountId); } catch (Exception e) { e.printStackTrace(); } @@ -375,11 +387,15 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager { assert nic!=null : "Why there is no guest network nic???"; String vlans = parseVlanAndMapping(nic.getBroadcastUri().toASCIIString()); - cmds.addCommand(new OvsSetTagAndFlowCommand(instance.getName(), vlans)); + VmFlowLogVO log = _flowLogDao.findOrNewByVmId(instance.getId(), instance.getName()); + cmds.addCommand(new OvsSetTagAndFlowCommand(instance.getName(), vlans, + Long.toString(log.getLogsequence()), instance.getId())); } - + + //FIXME: if at this router is not start, this will hang 10 secs due to host + //plugin cannot found vif for router. @Override - public void CheckAndUpdateDhcpFlow(Network nw) { + public void CheckAndUpdateDhcpFlow(Network nw, VirtualMachine vm) { if (!_isEnabled) { return; } @@ -396,8 +412,9 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager { try { String vlans = getVlanMapping(accountId); + VmFlowLogVO log = _flowLogDao.findOrNewByVmId(vm.getId(), vm.getName()); _agentMgr.send(router.getHostId(), new OvsSetTagAndFlowCommand( - router.getName(), vlans)); + router.getName(), vlans, Long.toString(log.getLogsequence()), vm.getId())); s_logger.debug("ask router " + router.getName() + " on host " + router.getHostId() + " update vlan map to " + vlans); } catch (Exception e) { @@ -405,7 +422,9 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager { } } + //TODO: handle router @DB + @Override public void scheduleFlowUpdateToHosts(Set affectedVms, boolean updateSeqno, Long delayMs) { if (!_isEnabled) { return; @@ -430,11 +449,7 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager { s_logger.warn("Ovs failed to acquire lock on vm id " + vmId); continue; } - log = _flowLogDao.findByVmId(vmId); - if (log == null) { - log = new VmFlowLogVO(vmId); - log = _flowLogDao.persist(log); - } + log = _flowLogDao.findOrNewByVmId(vmId, vm.getName()); if (log != null && updateSeqno){ log.incrLogsequence(); @@ -461,8 +476,8 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager { } } - protected Set getAffectedVms(UserVm userVm) { - long accountId = userVm.getAccountId(); + protected Set getAffectedVms(VMInstanceVO instance) { + long accountId = instance.getAccountId(); if (!_vlanMappingDirtyDao.isDirty(accountId)) { return null; } @@ -475,36 +490,47 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager { return affectedVms; } - protected void handleVmStateChange(UserVm userVm) { - Set affectedVms = getAffectedVms(userVm); + protected void handleVmStateChange(VMInstanceVO instance) { + Set affectedVms = getAffectedVms(instance); scheduleFlowUpdateToHosts(affectedVms, true, null); + _vlanMappingDirtyDao.clean(instance.getAccountId()); + s_logger.debug("Clean dirty for account " + instance.getAccountId()); } //TODO: think about lock - protected void checkAndRemove(UserVm userVm) { - long accountId = userVm.getAccountId(); - long hostId = userVm.getHostId(); + @DB + protected void checkAndRemove(VMInstanceVO instance) { + long accountId = instance.getAccountId(); + long hostId = instance.getHostId(); final Transaction txn = Transaction.currentTxn(); txn.start(); VlanMappingVO vo = _vlanMappingDao.findByAccountIdAndHostId(accountId, hostId); if (vo.unref() == 0) { _vlanMappingDao.remove(vo.getId()); - s_logger.debug(userVm.getName() + " is the last one on host " + s_logger.debug(instance.getName() + " is the last one on host " + hostId + " for account " + accountId + ", remove vlan in ovs_host_vlan_alloc"); _vlanMappingDirtyDao.markDirty(accountId); } else { _vlanMappingDao.update(vo.getId(), vo); - s_logger.debug(userVm.getName() + s_logger.debug(instance.getName() + " reduces reference count of (account,host) = (" + accountId + "," + hostId + ") to " + vo.getRef()); } + _flowLogDao.deleteByVmId(instance.getId()); txn.commit(); + + try { + Commands cmds = new Commands(new OvsDeleteFlowCommand(instance.getName())); + _agentMgr.send(hostId, cmds, _ovsListener); + } catch (Exception e) { + e.printStackTrace(); + } } @Override - public void handleVmStateTransition(UserVm userVm, State vmState) { + public void handleVmStateTransition(VMInstanceVO instance, State vmState) { if (!_isEnabled) { return; } @@ -519,12 +545,12 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager { case Unknown: return; case Running: - handleVmStateChange(userVm); + handleVmStateChange(instance); break; case Stopping: case Stopped: - checkAndRemove(userVm); - handleVmStateChange(userVm); + checkAndRemove(instance); + handleVmStateChange(instance); break; } @@ -557,4 +583,53 @@ public class OvsNetworkManagerImpl implements OvsNetworkManager { applyDefaultFlow(cmds, profile.getVirtualMachine(), dest); } + @Override + public void fullSync(List> states) { + if (!_isEnabled) { + return; + } + + //TODO:debug code, remove in future + List accounts = _accountDao.listAll(); + for (AccountVO acnt : accounts) { + if (_vlanMappingDirtyDao.isDirty(acnt.getId())) { + s_logger.warn("Vlan mapping for account " + + acnt.getAccountName() + " id " + acnt.getId() + + " is dirty"); + } + } + + if (states.size() ==0) { + s_logger.info("Nothing to do, Ovs fullsync is happy"); + return; + } + + SetvmIds = new HashSet(); + for (Pairstate : states) { + if (state.second() == -1) { + s_logger.warn("Ovs fullsync get wrong seqno for " + state.first()); + continue; + } + VmFlowLogVO log = _flowLogDao.findByName(state.first()); + if (log.getLogsequence() != state.second()) { + s_logger.debug("Ovs fullsync detected unmatch seq number for " + state.first() + ", run sync"); + VMInstanceVO vo = _instanceDao.findById(log.getInstanceId()); + if (vo == null) { + s_logger.warn("Ovs can't find " + state.first() + " in vm_instance!"); + continue; + } + + if (vo.getType() != VirtualMachine.Type.User && vo.getType() != VirtualMachine.Type.DomainRouter) { + s_logger.warn("Ovs fullsync: why we sync a " + vo.getType().toString() + " VM???"); + continue; + } + vmIds.add(new Long(vo.getId())); + } + } + + if (vmIds.size() > 0) { + scheduleFlowUpdateToHosts(vmIds, true, null); + } + } + } diff --git a/server/src/com/cloud/network/ovs/dao/OvsWorkDaoImpl.java b/server/src/com/cloud/network/ovs/dao/OvsWorkDaoImpl.java index 663ff3f0b01..4c0f572cda8 100644 --- a/server/src/com/cloud/network/ovs/dao/OvsWorkDaoImpl.java +++ b/server/src/com/cloud/network/ovs/dao/OvsWorkDaoImpl.java @@ -5,6 +5,7 @@ import java.util.List; import javax.ejb.Local; import com.cloud.ha.HaWorkVO; import com.cloud.network.ovs.dao.OvsWorkVO.Step; +import com.cloud.utils.db.DB; import com.cloud.utils.db.Filter; import com.cloud.utils.db.GenericDaoBase; import com.cloud.utils.db.SearchBuilder; @@ -115,6 +116,7 @@ public class OvsWorkDaoImpl extends GenericDaoBase implements } @Override + @DB public void updateStep(Long vmId, Long logSequenceNumber, Step step) { final Transaction txn = Transaction.currentTxn(); txn.start(); @@ -183,5 +185,5 @@ public class OvsWorkDaoImpl extends GenericDaoBase implements update(work, sc); return result; - } + } } diff --git a/server/src/com/cloud/network/ovs/dao/VmFlowLogDao.java b/server/src/com/cloud/network/ovs/dao/VmFlowLogDao.java index 6742a6a9c94..a8a97e48094 100644 --- a/server/src/com/cloud/network/ovs/dao/VmFlowLogDao.java +++ b/server/src/com/cloud/network/ovs/dao/VmFlowLogDao.java @@ -4,4 +4,7 @@ import com.cloud.utils.db.GenericDao; public interface VmFlowLogDao extends GenericDao { VmFlowLogVO findByVmId(long vmId); + VmFlowLogVO findOrNewByVmId(long vmId, String name); + VmFlowLogVO findByName(String name); + void deleteByVmId(long vmId); } diff --git a/server/src/com/cloud/network/ovs/dao/VmFlowLogDaoImpl.java b/server/src/com/cloud/network/ovs/dao/VmFlowLogDaoImpl.java index a842bc3bbbc..b7d44fe01ba 100644 --- a/server/src/com/cloud/network/ovs/dao/VmFlowLogDaoImpl.java +++ b/server/src/com/cloud/network/ovs/dao/VmFlowLogDaoImpl.java @@ -9,6 +9,7 @@ import javax.ejb.Local; public class VmFlowLogDaoImpl extends GenericDaoBase implements VmFlowLogDao { private SearchBuilder VmIdSearch; + private SearchBuilder VmNameSearch; @Override public VmFlowLogVO findByVmId(long vmId) { @@ -22,6 +23,35 @@ public class VmFlowLogDaoImpl extends GenericDaoBase VmIdSearch.and("vmId", VmIdSearch.entity().getInstanceId(), SearchCriteria.Op.EQ); VmIdSearch.done(); + + VmNameSearch = createSearchBuilder(); + VmNameSearch.and("name", VmNameSearch.entity().getName(), + SearchCriteria.Op.EQ); + VmNameSearch.done(); } + + @Override + public VmFlowLogVO findOrNewByVmId(long vmId, String name) { + VmFlowLogVO log = findByVmId(vmId); + if (log == null) { + log = new VmFlowLogVO(vmId, name); + log = persist(log); + } + return log; + } + + @Override + public void deleteByVmId(long vmId) { + SearchCriteria sc = VmIdSearch.create(); + sc.setParameters("vmId", vmId); + expunge(sc); + } + + @Override + public VmFlowLogVO findByName(String name) { + SearchCriteria sc = VmNameSearch.create(); + sc.setParameters("name", name); + return findOneIncludingRemovedBy(sc); + } } diff --git a/server/src/com/cloud/network/ovs/dao/VmFlowLogVO.java b/server/src/com/cloud/network/ovs/dao/VmFlowLogVO.java index df44d6c326d..13733185d28 100644 --- a/server/src/com/cloud/network/ovs/dao/VmFlowLogVO.java +++ b/server/src/com/cloud/network/ovs/dao/VmFlowLogVO.java @@ -28,13 +28,17 @@ public class VmFlowLogVO { @Column(name="logsequence") long logsequence; + @Column(name="vm_name", updatable=false, nullable=false, length=255) + protected String name = null; + protected VmFlowLogVO() { } - public VmFlowLogVO(Long instanceId) { + public VmFlowLogVO(Long instanceId, String name) { super(); this.instanceId = instanceId; + this.name = name; } public Long getId() { @@ -56,4 +60,8 @@ public class VmFlowLogVO { public void incrLogsequence() { logsequence++; } + + public String getName() { + return name; + } } diff --git a/server/src/com/cloud/network/router/VirtualNetworkApplianceManagerImpl.java b/server/src/com/cloud/network/router/VirtualNetworkApplianceManagerImpl.java index 87d729c1f60..bc526b01746 100644 --- a/server/src/com/cloud/network/router/VirtualNetworkApplianceManagerImpl.java +++ b/server/src/com/cloud/network/router/VirtualNetworkApplianceManagerImpl.java @@ -738,11 +738,6 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian } } - @Override - public void completeStartCommand(final DomainRouterVO router) { - _itMgr.stateTransitTo(router, VirtualMachine.Event.AgentReportRunning, router.getHostId()); - } - @Override public void completeStopCommand(final DomainRouterVO router) { completeStopCommand(router, VirtualMachine.Event.AgentReportStopped); @@ -1359,6 +1354,8 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian return false; } + DomainRouterVO router = profile.getVirtualMachine(); + _ovsNetworkMgr.handleVmStateTransition(router, State.Running); return true; } @@ -1367,6 +1364,9 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian if (answer != null && answer.length > 0) { processStopOrRebootAnswer(profile.getVirtualMachine(), answer[0]); } + + DomainRouterVO router = profile.getVirtualMachine(); + _ovsNetworkMgr.handleVmStateTransition(router, State.Stopped); } @Override @@ -1870,4 +1870,8 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian return routersToStop; } + @Override + public void completeStartCommand(DomainRouterVO router) { + _itMgr.stateTransitTo(router, VirtualMachine.Event.AgentReportRunning, router.getHostId()); + } } diff --git a/server/src/com/cloud/vm/UserVmManagerImpl.java b/server/src/com/cloud/vm/UserVmManagerImpl.java index 9403e768a1c..db71b46f8a7 100755 --- a/server/src/com/cloud/vm/UserVmManagerImpl.java +++ b/server/src/com/cloud/vm/UserVmManagerImpl.java @@ -1128,14 +1128,7 @@ public class UserVmManagerImpl implements UserVmManager, UserVmService, Manager throw new CloudRuntimeException("Shouldn't even be here!"); } } - - @Override - public void completeStartCommand(UserVmVO vm) { - _itMgr.stateTransitTo(vm, VirtualMachine.Event.AgentReportRunning, vm.getHostId()); - _networkGroupMgr.handleVmStateTransition(vm, State.Running); - _ovsNetworkMgr.handleVmStateTransition(vm, State.Running); - } - + @Override public void completeStopCommand(UserVmVO instance) { completeStopCommand(1L, instance, VirtualMachine.Event.AgentReportStopped); @@ -1166,8 +1159,6 @@ public class UserVmManagerImpl implements UserVmManager, UserVmService, Manager } txn.commit(); - _networkGroupMgr.handleVmStateTransition(vm, State.Stopped); - _ovsNetworkMgr.handleVmStateTransition(vm, State.Stopped); } catch (Throwable th) { s_logger.error("Error during stop: ", th); throw new CloudRuntimeException("Error during stop: ", th); @@ -2408,7 +2399,10 @@ public class UserVmManagerImpl implements UserVmManager, UserVmService, Manager @Override public boolean finalizeStart(Commands cmds, VirtualMachineProfile profile, DeployDestination dest, ReservationContext context) { - return true; + UserVmVO vm = profile.getVirtualMachine(); + _networkGroupMgr.handleVmStateTransition(vm, State.Running); + _ovsNetworkMgr.handleVmStateTransition(vm, State.Running); + return true; } @Override @@ -2460,6 +2454,9 @@ public class UserVmManagerImpl implements UserVmManager, UserVmService, Manager @Override public void finalizeStop(VirtualMachineProfile profile, long hostId, String reservationId, Answer...answer) { + UserVmVO vm = profile.getVirtualMachine(); + _networkGroupMgr.handleVmStateTransition(vm, State.Stopped); + _ovsNetworkMgr.handleVmStateTransition(vm, State.Stopped); } public String generateRandomPassword() { @@ -2520,6 +2517,11 @@ public class UserVmManagerImpl implements UserVmManager, UserVmService, Manager throw new CloudRuntimeException("Failed to destroy vm with id " + vmId); } } + + @Override + public void completeStartCommand(UserVmVO vm) { + _itMgr.stateTransitTo(vm, VirtualMachine.Event.AgentReportRunning, vm.getHostId()); + } @Override diff --git a/server/src/com/cloud/vm/VirtualMachineManagerImpl.java b/server/src/com/cloud/vm/VirtualMachineManagerImpl.java index cda2c647325..787db4ac2c0 100644 --- a/server/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/server/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -31,6 +31,7 @@ import org.apache.log4j.Logger; import com.cloud.agent.AgentManager; import com.cloud.agent.AgentManager.OnError; import com.cloud.agent.api.Answer; +import com.cloud.agent.api.StartAnswer; import com.cloud.agent.api.StartCommand; import com.cloud.agent.api.StopAnswer; import com.cloud.agent.api.StopCommand; @@ -388,6 +389,17 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Cluster } } + private Answer getStartAnswer(Answer[] answers) { + for (Answer ans : answers) { + if (ans instanceof StartAnswer) { + return ans; + } + } + + assert 1 == 0 : "Why there is no Start Answer???"; + return null; + } + @Override public T advanceStart(T vm, Map params, User caller, Account account, HypervisorType hyperType) throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { State state = vm.getState(); @@ -495,7 +507,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Cluster vmGuru.finalizeDeployment(cmds, vmProfile, dest, context); try { Answer[] answers = _agentMgr.send(dest.getHost().getId(), cmds); - if (answers[0].getResult() && vmGuru.finalizeStart(cmds, vmProfile, dest, context)) { + if (getStartAnswer(answers).getResult() && vmGuru.finalizeStart(cmds, vmProfile, dest, context)) { if (!stateTransitTo(vm, Event.OperationSucceeded, dest.getHost().getId())) { throw new CloudRuntimeException("Unable to transition to a new state."); } diff --git a/setup/db/create-schema.sql b/setup/db/create-schema.sql index bd980f476a5..60a55dd2c45 100755 --- a/setup/db/create-schema.sql +++ b/setup/db/create-schema.sql @@ -1344,6 +1344,7 @@ CREATE TABLE `cloud`.`ovs_vm_flow_log` ( `instance_id` bigint unsigned NOT NULL COMMENT 'vm instance that needs flows to be synced.', `created` datetime NOT NULL COMMENT 'time the entry was requested', `logsequence` bigint unsigned COMMENT 'seq number to be sent to agent, uniquely identifies flow update', + `vm_name` varchar(255) NOT NULL COMMENT 'vm name', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;