CS-14948: Fixing an issue with parsing the xml-rpc response to a command

from vsm. This was throwing false exceptions when infact the command
execution was a success. Also adding retry logic for create port profile
request.
This commit is contained in:
Devdeep Singh 2012-05-17 18:33:54 +05:30 committed by Vijayendra Bhamidipati
parent a20aace44e
commit 3006bed6df
5 changed files with 249 additions and 68 deletions

View File

@ -12,7 +12,6 @@ import com.cloud.utils.cisco.n1kv.vsm.VsmCommand.PortProfileType;
import com.cloud.utils.cisco.n1kv.vsm.VsmCommand.SwitchPortMode; import com.cloud.utils.cisco.n1kv.vsm.VsmCommand.SwitchPortMode;
import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.ssh.SSHCmdHelper; import com.cloud.utils.ssh.SSHCmdHelper;
import com.trilead.ssh2.ChannelCondition;
import com.trilead.ssh2.Connection; import com.trilead.ssh2.Connection;
import com.trilead.ssh2.Session; import com.trilead.ssh2.Session;
@ -21,6 +20,9 @@ public class NetconfHelper {
private static final String SSH_NETCONF_TERMINATOR = "]]>]]>"; private static final String SSH_NETCONF_TERMINATOR = "]]>]]>";
// Number of times to retry the command on failure.
private static final int s_retryCount = 3;
private Connection _connection; private Connection _connection;
private Session _session; private Session _session;
@ -61,7 +63,7 @@ public class NetconfHelper {
+ "</nc:rpc>" + SSH_NETCONF_TERMINATOR; + "</nc:rpc>" + SSH_NETCONF_TERMINATOR;
send(status); send(status);
// parse the rpc reply. // parse the rpc reply.
parseReply(receive()); parseOkReply(receive());
} }
public void addPortProfile(String name, PortProfileType type, BindingType binding, public void addPortProfile(String name, PortProfileType type, BindingType binding,
@ -69,9 +71,28 @@ public class NetconfHelper {
String command = VsmCommand.getAddPortProfile(name, type, binding, mode, vlanid); String command = VsmCommand.getAddPortProfile(name, type, binding, mode, vlanid);
if (command != null) { if (command != null) {
command = command.concat(SSH_NETCONF_TERMINATOR); command = command.concat(SSH_NETCONF_TERMINATOR);
send(command);
// parse the rpc reply. // This command occasionally fails. On retry it succeeds. Putting in
parseReply(receive()); // retry to handle failures.
for (int i = 0; i < s_retryCount; ++i) {
send(command);
// parse the rpc reply.
// parseOkReply(receive());
VsmOkResponse response = new VsmOkResponse(receive().trim());
if (!response.isResponseOk()) {
if (i >= s_retryCount) {
throw new CloudRuntimeException(response.toString());
}
try {
Thread.sleep(1000);
} catch (final InterruptedException e) {
s_logger.debug("Got interrupted while waiting.");
}
} else {
break;
}
}
} else { } else {
throw new CloudRuntimeException("Error generating rpc request for adding port profile."); throw new CloudRuntimeException("Error generating rpc request for adding port profile.");
} }
@ -84,7 +105,7 @@ public class NetconfHelper {
command = command.concat(SSH_NETCONF_TERMINATOR); command = command.concat(SSH_NETCONF_TERMINATOR);
send(command); send(command);
// parse the rpc reply. // parse the rpc reply.
parseReply(receive()); parseOkReply(receive());
} else { } else {
throw new CloudRuntimeException("Error generating rpc request for updating port profile."); throw new CloudRuntimeException("Error generating rpc request for updating port profile.");
} }
@ -96,7 +117,7 @@ public class NetconfHelper {
command = command.concat(SSH_NETCONF_TERMINATOR); command = command.concat(SSH_NETCONF_TERMINATOR);
send(command); send(command);
// parse the rpc reply. // parse the rpc reply.
parseReply(receive()); parseOkReply(receive());
} else { } else {
throw new CloudRuntimeException("Error generating rpc request for deleting port profile."); throw new CloudRuntimeException("Error generating rpc request for deleting port profile.");
} }
@ -109,7 +130,7 @@ public class NetconfHelper {
command = command.concat(SSH_NETCONF_TERMINATOR); command = command.concat(SSH_NETCONF_TERMINATOR);
send(command); send(command);
// parse the rpc reply. // parse the rpc reply.
parseReply(receive()); parseOkReply(receive());
} else { } else {
throw new CloudRuntimeException("Error generating rpc request for adding/updating policy map."); throw new CloudRuntimeException("Error generating rpc request for adding/updating policy map.");
} }
@ -121,7 +142,7 @@ public class NetconfHelper {
command = command.concat(SSH_NETCONF_TERMINATOR); command = command.concat(SSH_NETCONF_TERMINATOR);
send(command); send(command);
// parse the rpc reply. // parse the rpc reply.
parseReply(receive()); parseOkReply(receive());
} else { } else {
throw new CloudRuntimeException("Error generating rpc request for deleting policy map."); throw new CloudRuntimeException("Error generating rpc request for deleting policy map.");
} }
@ -140,7 +161,7 @@ public class NetconfHelper {
command = command.concat(SSH_NETCONF_TERMINATOR); command = command.concat(SSH_NETCONF_TERMINATOR);
send(command); send(command);
// parse the rpc reply. // parse the rpc reply.
parseReply(receive()); parseOkReply(receive());
} else { } else {
throw new CloudRuntimeException("Error generating rpc request for adding policy map."); throw new CloudRuntimeException("Error generating rpc request for adding policy map.");
} }
@ -153,7 +174,22 @@ public class NetconfHelper {
command = command.concat(SSH_NETCONF_TERMINATOR); command = command.concat(SSH_NETCONF_TERMINATOR);
send(command); send(command);
// parse the rpc reply. // parse the rpc reply.
parseReply(receive()); parseOkReply(receive());
} else {
throw new CloudRuntimeException("Error generating rpc request for removing policy map.");
}
}
public void getPortProfileByName(String name) throws CloudRuntimeException {
String command = VsmCommand.getPortProfile(name);
if (command != null) {
command = command.concat(SSH_NETCONF_TERMINATOR);
send(command);
// parse the rpc reply.
VsmPortProfileResponse response = new VsmPortProfileResponse(receive().trim());
if (!response.isResponseOk()) {
throw new CloudRuntimeException("Error response while getting the port profile details.");
}
} else { } else {
throw new CloudRuntimeException("Error generating rpc request for removing policy map."); throw new CloudRuntimeException("Error generating rpc request for removing policy map.");
} }
@ -177,49 +213,122 @@ public class NetconfHelper {
} }
private String receive() { private String receive() {
byte[] buffer = new byte[8192]; String response = new String("");
InputStream inputStream = _session.getStdout(); InputStream inputStream = _session.getStdout();
try { try {
while (true) { Delimiter delimiter = new Delimiter();
if (inputStream.available() == 0) { byte[] buffer = new byte[1024];
int conditions = _session.waitForCondition(ChannelCondition.STDOUT_DATA int count = 0;
| ChannelCondition.STDERR_DATA | ChannelCondition.EOF, 3000);
if ((conditions & ChannelCondition.TIMEOUT) != 0) { // Read the input stream till we find the end sequence ']]>]]>'.
break; while (true) {
int data = inputStream.read();
if (data != -1) {
byte[] dataStream = delimiter.parse(data);
if (delimiter.endReached()) {
response += new String(buffer, 0, count);
break;
}
if (dataStream != null) {
for (int i = 0; i < dataStream.length; i++) {
buffer[count] = dataStream[i];
count++;
if (count == 1024) {
response += new String(buffer, 0, count);
count = 0;
}
} }
}
if ((conditions & ChannelCondition.EOF) != 0) { } else {
if ((conditions & (ChannelCondition.STDOUT_DATA | ChannelCondition.STDERR_DATA)) == 0) { break;
break; }
} }
} } catch (final Exception e) {
} throw new CloudRuntimeException("Error occured while reading from the stream: " + e.getMessage());
while (inputStream.available() > 0) {
inputStream.read(buffer);
}
}
} catch (Exception e) {
s_logger.error("Failed to receive message: " + e.getMessage());
throw new CloudRuntimeException("Failed to receives message: " + e.getMessage());
} }
return new String(buffer); return response;
} }
private void parseReply(String reply) throws CloudRuntimeException { private void parseOkReply(String reply) throws CloudRuntimeException {
reply = reply.trim(); VsmOkResponse response = new VsmOkResponse(reply.trim());
if (reply.endsWith(SSH_NETCONF_TERMINATOR)) {
reply = reply.substring(0, reply.length() - (new String(SSH_NETCONF_TERMINATOR).length()));
}
else {
throw new CloudRuntimeException("Malformed response from vsm" + reply);
}
VsmResponse response = new VsmResponse(reply);
if (!response.isResponseOk()) { if (!response.isResponseOk()) {
throw new CloudRuntimeException(response.toString()); throw new CloudRuntimeException(response.toString());
} }
} }
private static class Delimiter {
private boolean _endReached = false;
// Used to accumulate response read while searching for end of response.
private byte[] _gatherResponse = new byte[6];
// Index into number of bytes read.
private int _offset = 0;
// True if ']]>]]>' detected.
boolean endReached() {
return _endReached;
}
// Parses the input stream and checks if end sequence is reached.
byte[] parse(int input) throws RuntimeException {
boolean collect = false;
byte[] streamRead = null;
// Check if end sequence matched.
switch (_offset) {
case 0:
if (input == ']') {
collect = true;
}
break;
case 1:
if (input == ']') {
collect = true;
}
break;
case 2:
if (input == '>') {
collect = true;
}
break;
case 3:
if (input == ']') {
collect = true;
}
break;
case 4:
if (input == ']') {
collect = true;
}
break;
case 5:
if (input == '>') {
collect = true;
_endReached = true;
}
break;
default:
throw new RuntimeException("Invalid index value: " + _offset);
}
if (collect) {
_gatherResponse[_offset++] = (byte)input;
} else {
// End sequence not yet reached. Return the stream of bytes collected so far.
streamRead = new byte[_offset+1];
for (int index = 0; index < _offset; ++index) {
streamRead[index] = _gatherResponse[index];
}
streamRead[_offset] = (byte) input;
_offset = 0;
}
return streamRead;
}
}
} }

View File

@ -253,6 +253,43 @@ public class VsmCommand {
} }
} }
public static String getPortProfile(String name) {
try {
DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
DOMImplementation domImpl = docBuilder.getDOMImplementation();
Document doc = createDocument(domImpl);
Element get = doc.createElement("nf:get");
doc.getDocumentElement().appendChild(get);
Element filter = doc.createElement("nf:filter");
filter.setAttribute("type", "subtree");
get.appendChild(filter);
// Create the show port-profile name <profile-name> command.
Element show = doc.createElement("show");
filter.appendChild(show);
Element portProfile = doc.createElement("port-profile");
show.appendChild(portProfile);
Element nameNode = doc.createElement("name");
portProfile.appendChild(nameNode);
// Profile name
Element profileName = doc.createElement("profile_name");
profileName.setTextContent(name);
nameNode.appendChild(profileName);
return serialize(domImpl, doc);
} catch (ParserConfigurationException e) {
s_logger.error("Error while creating delete message : " + e.getMessage());
return null;
} catch (DOMException e) {
s_logger.error("Error while creating delete message : " + e.getMessage());
return null;
}
}
public static String getHello() { public static String getHello() {
try { try {
DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();

View File

@ -0,0 +1,23 @@
package com.cloud.utils.cisco.n1kv.vsm;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
public class VsmOkResponse extends VsmResponse {
VsmOkResponse(String response) {
super(response);
}
protected void parse(Element root) {
NodeList list = root.getElementsByTagName("nf:rpc-error");
if (list.getLength() == 0) {
// No rpc-error tag; means response was ok.
assert(root.getElementsByTagName("nf:ok").getLength() > 0);
_responseOk = true;
} else {
parseError(list.item(0));
_responseOk = false;
}
}
}

View File

@ -0,0 +1,22 @@
package com.cloud.utils.cisco.n1kv.vsm;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
public class VsmPortProfileResponse extends VsmResponse {
VsmPortProfileResponse(String response) {
super(response);
}
protected void parse(Element root) {
NodeList list = root.getElementsByTagName("nf:rpc-error");
if (list.getLength() == 0) {
// No rpc-error tag; means response was ok.
assert(root.getElementsByTagName("nf:ok").getLength() > 0);
_responseOk = true;
} else {
super.parseError(list.item(0));
_responseOk = false;
}
}
}

View File

@ -17,7 +17,7 @@ import org.xml.sax.InputSource;
import java.io.StringReader; import java.io.StringReader;
import java.io.IOException; import java.io.IOException;
public class VsmResponse { public abstract class VsmResponse {
// Following error tags, error types and severity have been taken from RFC 4741. // Following error tags, error types and severity have been taken from RFC 4741.
public enum ErrorTag { public enum ErrorTag {
@ -56,16 +56,16 @@ public class VsmResponse {
private static final Logger s_logger = Logger.getLogger(VsmResponse.class); private static final Logger s_logger = Logger.getLogger(VsmResponse.class);
private String _xmlResponse; protected String _xmlResponse;
private Document _docResponse; protected Document _docResponse;
private boolean _responseOk; protected boolean _responseOk;
private ErrorTag _tag; protected ErrorTag _tag;
private ErrorType _type; protected ErrorType _type;
private ErrorSeverity _severity; protected ErrorSeverity _severity;
private String _path; protected String _path;
private String _message; protected String _message;
private String _info; protected String _info;
VsmResponse(String response) { VsmResponse(String response) {
_xmlResponse = response; _xmlResponse = response;
@ -117,19 +117,9 @@ public class VsmResponse {
return error.toString(); return error.toString();
} }
private void parse(Element root) { protected abstract void parse(Element root);
NodeList list = root.getElementsByTagName("nf:rpc-error");
if (list.getLength() == 0) {
// No rpc-error tag; means response was ok.
assert(root.getElementsByTagName("nf:ok").getLength() > 0);
_responseOk = true;
} else {
parseError(list.item(0));
_responseOk = false;
}
}
private void parseError(Node element) { protected void parseError(Node element) {
Element rpcError = (Element) element; Element rpcError = (Element) element;
try { try {
@ -155,7 +145,7 @@ public class VsmResponse {
} }
} }
private ErrorTag getErrorTag(String tagText) { protected ErrorTag getErrorTag(String tagText) {
ErrorTag tag = ErrorTag.InUse; ErrorTag tag = ErrorTag.InUse;
if (tagText.equals("in-use")) { if (tagText.equals("in-use")) {
@ -202,7 +192,7 @@ public class VsmResponse {
} }
// Helper routine to check for the response received. // Helper routine to check for the response received.
private void printResponse() { protected void printResponse() {
try { try {
DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder docBuilder = docFactory.newDocumentBuilder(); DocumentBuilder docBuilder = docFactory.newDocumentBuilder();