more changes for refactor

This commit is contained in:
Alex Huang 2010-10-12 07:17:04 -07:00
parent 634b67ce07
commit ce091de3d2
51 changed files with 3567 additions and 474 deletions

View File

@ -49,7 +49,6 @@ import com.cloud.agent.api.UpgradeAnswer;
import com.cloud.agent.api.UpgradeCommand;
import com.cloud.agent.transport.Request;
import com.cloud.agent.transport.Response;
import com.cloud.agent.transport.UpgradeResponse;
import com.cloud.exception.AgentControlChannelException;
import com.cloud.resource.ServerResource;
import com.cloud.utils.PropertiesUtil;
@ -302,7 +301,7 @@ public class Agent implements HandlerFactory, IAgentControl {
commands[i] = startup[i];
}
final Request request = new Request(getNextSequence(), _id != null ? _id : -1, -1, commands, false);
final Request request = new Request(getNextSequence(), _id != null ? _id : -1, -1, commands, false, false, false);
if (s_logger.isDebugEnabled()) {
s_logger.debug("Sending Startup: " + request.toString());
@ -624,7 +623,7 @@ public class Agent implements HandlerFactory, IAgentControl {
@Override
public AgentControlAnswer sendRequest(AgentControlCommand cmd, int timeoutInMilliseconds) throws AgentControlChannelException {
Request request = new Request(this.getNextSequence(), this.getId(),
-1, new Command[] {cmd}, true, false);
-1, new Command[] {cmd}, true, false, false);
AgentControlListener listener = new AgentControlListener(request);
registerControlListener(listener);
@ -647,7 +646,7 @@ public class Agent implements HandlerFactory, IAgentControl {
@Override
public void postRequest(AgentControlCommand cmd) throws AgentControlChannelException {
Request request = new Request(this.getNextSequence(), this.getId(),
-1, new Command[] {cmd}, true, false);
-1, new Command[] {cmd}, true, false, false);
postRequest(request);
}
@ -676,11 +675,13 @@ public class Agent implements HandlerFactory, IAgentControl {
return _answer;
}
public Answer processControlRequest(Request request, AgentControlCommand cmd) {
@Override
public Answer processControlRequest(Request request, AgentControlCommand cmd) {
return null;
}
public void processControlResponse(Response response, AgentControlAnswer answer) {
@Override
public void processControlResponse(Response response, AgentControlAnswer answer) {
if(_request.getSequence() == response.getSequence()) {
_answer = answer;
synchronized(this) {
@ -778,9 +779,7 @@ public class Agent implements HandlerFactory, IAgentControl {
Request request;
try {
request = Request.parse(task.getData());
if (request instanceof UpgradeResponse) {
upgradeAgent(((UpgradeResponse)request).getUpgradeUrl(), null);
} else if (request instanceof Response) {
if (request instanceof Response) {
processResponse((Response)request, task.getLink());
} else {
processRequest(request, task.getLink());

View File

@ -3,11 +3,92 @@
*/
package com.cloud.deploy;
import java.util.HashSet;
import java.util.Set;
import com.cloud.exception.InsufficientServerCapacityException;
import com.cloud.host.Host;
import com.cloud.utils.component.Adapter;
import com.cloud.vm.VirtualMachineProfile;
/**
* Returns a deployment destination for the VM.
*/
public interface DeploymentPlanner extends Adapter {
DeployDestination plan(VirtualMachineProfile vm, DeploymentPlan plan, Set<DeployDestination> avoid);
/**
* plan is called to determine where a virtual machine should be running.
*
* @param vm virtual machine.
* @param plan deployment plan that tells you where it's being deployed to.
* @param avoid avoid these data centers, pods, clusters, or hosts.
* @return DeployDestination for that virtual machine.
*/
DeployDestination plan(VirtualMachineProfile vm, DeploymentPlan plan, ExcludeList avoid) throws InsufficientServerCapacityException;
/**
* check() is called right before the virtual machine starts to make sure
* the host has enough capacity.
*
* @param vm virtual machine in question.
* @param plan deployment plan used to determined the deploy destination.
* @param dest destination returned by plan.
* @param avoid what to avoid.
* @return true if it's okay to start; false if not. If false, the exclude list will include what should be excluded.
*/
boolean check(VirtualMachineProfile vm, DeploymentPlan plan, DeployDestination dest, ExcludeList exclude);
public static class ExcludeList {
Set<Long> _dcIds;
Set<Long> _podIds;
Set<Long> _clusterIds;
Set<Long> _hostIds;
public void adddDataCenter(long dataCenterId) {
if (_dcIds == null) {
_dcIds = new HashSet<Long>();
}
_dcIds.add(dataCenterId);
}
public void addPod(long podId) {
if (_podIds == null) {
_podIds = new HashSet<Long>();
}
_podIds.add(podId);
}
public void addCluster(long clusterId) {
if (_clusterIds == null) {
_clusterIds = new HashSet<Long>();
}
_clusterIds.add(clusterId);
}
public void addHost(long hostId) {
if (_hostIds == null) {
_hostIds = new HashSet<Long>();
}
_hostIds.add(hostId);
}
public boolean shouldAvoid(Host host) {
if (_dcIds != null && _dcIds.contains(host.getDataCenterId())) {
return true;
}
if (_podIds != null && _podIds.contains(host.getPodId())) {
return true;
}
if (_clusterIds != null && _clusterIds.contains(host.getClusterId())) {
return true;
}
if (_hostIds != null && _hostIds.contains(host.getId())) {
return true;
}
return false;
}
}
}

View File

@ -15,11 +15,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.host;
package com.cloud.exception;
import com.cloud.exception.InsufficientCapacityException;
import com.cloud.utils.SerialVersionUID;
import com.cloud.vm.VirtualMachine;
/**
* This exception is thrown when a server cannot be found to host the
@ -30,13 +28,7 @@ public class InsufficientServerCapacityException extends InsufficientCapacityExc
private static final long serialVersionUID = SerialVersionUID.InsufficientServerCapacityException;
VirtualMachine.Type type;
public InsufficientServerCapacityException(VirtualMachine.Type type, String msg) {
public InsufficientServerCapacityException(String msg) {
super(msg);
this.type = type;
}
VirtualMachine.Type getType() {
return type;
}
}

View File

@ -76,6 +76,9 @@ public interface StoragePool {
* @return available storage in bytes
*/
long getAvailableBytes();
Long getClusterId();
/**
* @return the fqdn or ip address of the storage host

View File

@ -1,13 +0,0 @@
/**
*
*/
package com.cloud.vm;
import com.cloud.offering.ServiceOffering;
import com.cloud.template.VirtualMachineTemplate;
import com.cloud.utils.component.Adapter;
public interface VirtualMachineProfiler extends Adapter {
VirtualMachineProfile convert(ServiceOffering offering, VirtualMachineTemplate template);
}

View File

@ -26,7 +26,7 @@ import com.cloud.host.HostVO;
import com.cloud.host.Status;
/**
* Listener is a multipurpose interface for hooking into the AgentManager.
* Listener is a multi-purpose interface for hooking into the AgentManager.
* There are several types of events that the AgentManager forwards
* to the listener.
*
@ -45,7 +45,7 @@ public interface Listener {
* @param answers answers to the commands.
* @return true if processed. false if not.
*/
boolean processAnswer(long agentId, long seq, Answer[] answers);
boolean processAnswers(long agentId, long seq, Answer[] answers);
/**
* This method is called by the AgentManager when an agent sent
@ -57,7 +57,7 @@ public interface Listener {
* @param commands commands that were sent.
* @return true if you processed the commands. false if not.
*/
boolean processCommand(long agentId, long seq, Command[] commands);
boolean processCommands(long agentId, long seq, Command[] commands);
/**
* process control command sent from agent under its management
@ -89,7 +89,7 @@ public interface Listener {
boolean processDisconnect(long agentId, Status state);
/**
* If ths Listener is passed to the send() method, this method
* If this Listener is passed to the send() method, this method
* is called by AgentManager after processing an answer
* from the agent. Returning true means you're expecting more
* answers from the agent using the same sequence number.

View File

@ -49,7 +49,6 @@ import com.google.gson.reflect.TypeToken;
* 6. AgentId - 8 bytes;
* 7. Data Package.
*
* Currently flags has only if it is a request or response.
*/
public class Request {
private static final Logger s_logger = Logger.getLogger(Request.class);
@ -72,8 +71,7 @@ public class Request {
protected static final short FLAG_REQUEST = 0x1;
protected static final short FLAG_STOP_ON_ERROR = 0x2;
protected static final short FLAG_IN_SEQUENCE = 0x4;
protected static final short FLAG_WATCH = 0x8;
protected static final short FLAG_UPDATE = 0x10;
protected static final short FLAG_REVERT_ON_ERROR = 0x8;
protected static final short FLAG_FROM_SERVER = 0x20;
protected static final short FLAG_CONTROL = 0x40;
@ -93,86 +91,88 @@ public class Request {
protected Version _ver;
protected long _seq;
protected Command[] _cmds;
protected boolean _inSequence;
protected boolean _stopOnError;
protected boolean _fromServer;
protected boolean _control;
protected short _flags;
protected long _mgmtId;
protected long _agentId;
protected Command[] _cmds;
protected String _content;
public Request(long seq, long agentId, long mgmtId, final Command command, boolean fromServer) {
this(seq, agentId, mgmtId, new Command[] {command}, true, fromServer);
}
public Request(long seq, long agentId, long mgmtId, final Command[] commands, boolean fromServer) {
this(seq, agentId, mgmtId, commands, true, fromServer);
protected Request() {
}
protected Request(Version ver, long seq, long agentId, long mgmtId, final Command[] cmds, final Boolean inSequence, final boolean stopOnError, boolean fromServer) {
protected Request(Version ver, long seq, long agentId, long mgmtId, short flags, final Command[] cmds) {
_ver = ver;
_cmds = cmds;
_stopOnError = stopOnError;
if (inSequence != null) {
_inSequence = inSequence;
} else {
for (final Command cmd : cmds) {
if (cmd.executeInSequence()) {
_inSequence = true;
break;
}
}
}
_flags = flags;
_seq = seq;
_agentId = agentId;
_mgmtId = mgmtId;
_fromServer = fromServer;
}
protected Request(Version ver, long seq, long agentId, long mgmtId, final String content, final boolean inSequence, final boolean stopOnError, final boolean fromServer, final boolean control) {
_ver = ver;
_cmds = null;
protected Request(Version ver, long seq, long agentId, long mgmtId, short flags, final String content) {
this(ver, seq, agentId, mgmtId, flags, (Command[])null);
_content = content;
_stopOnError = stopOnError;
_inSequence = inSequence;
_seq = seq;
_agentId = agentId;
_mgmtId = mgmtId;
_fromServer = fromServer;
_control = control;
}
public Request(long seq, long agentId, long mgmtId, final Command[] cmds, final boolean stopOnError, boolean fromServer) {
this(Version.v3, seq, agentId, mgmtId, cmds, null, stopOnError, fromServer);
public Request(long seq, long agentId, long mgmtId, final Command command, boolean fromServer) {
this(seq, agentId, mgmtId, new Command[] {command}, true, fromServer, true);
}
public Request(long seq, long agentId, long mgmtId, Command[] cmds, boolean stopOnError, boolean fromServer, boolean revert) {
this(Version.v3, seq, agentId, mgmtId, (short)0, cmds);
setStopOnError(stopOnError);
setFromServer(fromServer);
setRevertOnError(revert);
}
protected Request(final Request that, final Command[] cmds) {
this._ver = that._ver;
this._seq = that._seq;
setInSequence(that.executeInSequence());
setStopOnError(that.stopOnError());
this._cmds = cmds;
this._mgmtId = that._mgmtId;
this._agentId = that._agentId;
setFromServer(!that.isFromServer());
}
private final void setStopOnError(boolean stopOnError) {
_flags |= (stopOnError ? 1 : 0) << FLAG_STOP_ON_ERROR;
}
private final void setInSequence(boolean inSequence) {
_flags |= (inSequence ? 1 : 0) << FLAG_IN_SEQUENCE;
}
public boolean isControl() {
return _control;
return (_flags & FLAG_CONTROL) > 0;
}
public void setControl() {
_control = true;
public void setControl(boolean control) {
_flags |= (control ? 1 : 0) << FLAG_CONTROL;
}
public boolean revertOnError() {
return (_flags & FLAG_CONTROL) > 0;
}
private final void setRevertOnError(boolean revertOnError) {
_flags |= (revertOnError ? 1 : 0) << FLAG_REVERT_ON_ERROR;
}
private final void setFromServer(boolean fromServer) {
_flags |= (fromServer ? 1 : 0) << FLAG_FROM_SERVER;
}
public long getManagementServerId() {
return _mgmtId;
}
protected Request(final Request that, final Command[] cmds) {
this._ver = that._ver;
this._seq = that._seq;
this._inSequence = that._inSequence;
this._stopOnError = that._stopOnError;
this._cmds = cmds;
this._mgmtId = that._mgmtId;
this._agentId = that._agentId;
this._fromServer = !that._fromServer;
public boolean isFromServer() {
return (_flags & FLAG_FROM_SERVER) > 0;
}
protected Request() {
}
public Version getVersion() {
return _ver;
}
@ -182,7 +182,7 @@ public class Request {
}
public boolean executeInSequence() {
return _inSequence;
return (_flags & FLAG_IN_SEQUENCE) > 0;
}
public long getSequence() {
@ -190,7 +190,7 @@ public class Request {
}
public boolean stopOnError() {
return _stopOnError;
return (_flags & FLAG_STOP_ON_ERROR) > 0;
}
public Command getCommand() {
@ -269,25 +269,7 @@ public class Request {
}
protected short getFlags() {
short flags = 0;
if (!(this instanceof Response)) {
flags = FLAG_REQUEST;
} else {
flags = FLAG_RESPONSE;
}
if (_inSequence) {
flags = (short)(flags | FLAG_IN_SEQUENCE);
}
if (_stopOnError) {
flags = (short)(flags | FLAG_STOP_ON_ERROR);
}
if (_fromServer) {
flags = (short)(flags | FLAG_FROM_SERVER);
}
if (_control) {
flags = (short)(flags | FLAG_CONTROL);
}
return flags;
return (short)(((this instanceof Response) ? FLAG_RESPONSE : FLAG_REQUEST) | _flags);
}
/**
@ -311,12 +293,6 @@ public class Request {
final byte reserved = buff.get(); // tossed away for now.
final Short flags = buff.getShort();
final boolean isRequest = (flags & FLAG_REQUEST) > 0;
final boolean isControl = (flags & FLAG_IN_SEQUENCE) > 0;
final boolean isStopOnError = (flags & FLAG_STOP_ON_ERROR) > 0;
final boolean isWatch = (flags & FLAG_WATCH) > 0;
final boolean fromServer = (flags & FLAG_FROM_SERVER) > 0;
final boolean needsUpdate = (flags & FLAG_UPDATE) > 0;
final boolean control = (flags & FLAG_CONTROL) > 0;
final long seq = buff.getLong();
final int size = buff.getInt();
@ -335,14 +311,11 @@ public class Request {
}
final String content = new String(command, offset, command.length - offset);
if (needsUpdate && !isRequest) {
return new UpgradeResponse(Version.get(ver), seq, content);
}
if (isRequest) {
return new Request(version, seq, agentId, mgmtId, content, isControl, isStopOnError, fromServer, control);
return new Request(version, seq, agentId, mgmtId, flags, content);
} else {
return new Response(Version.get(ver), seq, agentId, mgmtId, content, isControl, isStopOnError, fromServer, control);
return new Response(Version.get(ver), seq, agentId, mgmtId, flags, content);
}
}
@ -371,8 +344,6 @@ public class Request {
}
public static boolean fromServer(final byte[] bytes) {
// int flags = NumbersUtil.bytesToShort(bytes, 2);
return (bytes[3] & FLAG_FROM_SERVER) > 0;
}
@ -385,7 +356,6 @@ public class Request {
}
public static boolean isControl(final byte[] bytes) {
// int flags = NumbersUtil.bytesToShort(bytes, 2);
return (bytes[3] & FLAG_CONTROL) > 0;
}
}

View File

@ -46,8 +46,8 @@ public class Response extends Request {
_agentId = agentId;
}
protected Response(Version ver, long seq, long agentId, long mgmtId, String ans, boolean inSequence, boolean stopOnError, boolean fromServer, boolean control) {
super(ver, seq, agentId, mgmtId, ans, inSequence, stopOnError, fromServer, control);
protected Response(Version ver, long seq, long agentId, long mgmtId, short flags, String ans) {
super(ver, seq, agentId, mgmtId, flags, ans);
}
public Answer getAnswer() {

View File

@ -1,98 +0,0 @@
/**
* Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.agent.transport;
import java.nio.ByteBuffer;
import com.cloud.agent.api.Answer;
import com.cloud.utils.NumbersUtil;
/**
* An UpgradeResponse is sent when there is a protocol version mismatched.
* It has the same header as the request but contains an url to the
* updated agent.
*/
public class UpgradeResponse extends Response {
byte[] _requestBytes;
public UpgradeResponse(Request request, String url) {
super(request, new Answer[0]);
_requestBytes = null;
}
public UpgradeResponse(byte[] request, String url) {
super(Version.v2, -1, -1, -1, url, true, true, true, false);
_requestBytes = request;
}
protected UpgradeResponse(Version ver, long seq, String url) {
super(ver, seq, -1, -1, url, true, false, true, false);
_requestBytes = null;
}
@Override
protected ByteBuffer serializeHeader(int contentSize) {
if (_requestBytes == null) {
return super.serializeHeader(contentSize);
}
byte[] responseHeader = new byte[16];
ByteBuffer buffer = ByteBuffer.wrap(responseHeader);
buffer.put(_requestBytes[0]); // version number
buffer.put((byte)0);
buffer.putShort(getFlags());
buffer.put(_requestBytes, 4, 8); // sequence number
buffer.putInt(contentSize);
buffer.flip();
return buffer;
}
@Override
public String toString() {
if (_requestBytes == null) {
return super.toString();
}
final StringBuilder buffer = new StringBuilder();
buffer.append("{ ").append(getType());
buffer.append(", Seq: ").append(NumbersUtil.bytesToLong(_requestBytes, 4)).append(", Ver: ").append(_requestBytes[0]).append(", Flags: ").append(Integer.toBinaryString(getFlags()));
buffer.append(", ").append(_content).append(" }");
return buffer.toString();
}
@Override
public ByteBuffer[] toBytes() {
ByteBuffer[] buffers = new ByteBuffer[2];
buffers[1] = ByteBuffer.wrap(_content.getBytes());
buffers[0] = serializeHeader(buffers[1].capacity());
return buffers;
}
public String getUpgradeUrl() {
return _content;
}
@Override
protected short getFlags() {
return FLAG_RESPONSE | FLAG_UPDATE;
}
}

View File

@ -35,10 +35,10 @@ import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.db.UpdateBuilder;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.vm.DomainRouter.Role;
import com.cloud.vm.DomainRouterVO;
import com.cloud.vm.State;
import com.cloud.vm.VirtualMachine;
import com.cloud.vm.DomainRouter.Role;
@Local(value = { DomainRouterDao.class })
public class DomainRouterDaoImpl extends GenericDaoBase<DomainRouterVO, Long> implements DomainRouterDao {
@ -287,13 +287,13 @@ public class DomainRouterDaoImpl extends GenericDaoBase<DomainRouterVO, Long> im
public List<DomainRouterVO> listByDomain(Long domainId) {
SearchCriteria<DomainRouterVO> sc = DomainIdSearch.create();
sc.setParameters("domainId", domainId);
return listIncludingRemovedBy(sc);
return listBy(sc);
}
@Override
public List<DomainRouterVO> listByVlanDbId(Long vlanDbId) {
SearchCriteria<DomainRouterVO> sc = VlanDbIdSearch.create();
sc.setParameters("vlanDbId", vlanDbId);
return listIncludingRemovedBy(sc);
return listBy(sc);
}
}

View File

@ -229,6 +229,7 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
return search(sc, null);
}
@Override
public Integer[] countRoutersAndProxies(Long hostId) {
Transaction txn = Transaction.currentTxn();
PreparedStatement pstmt = null;

View File

@ -23,6 +23,7 @@ import java.util.Set;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.agent.manager.Commands;
import com.cloud.dc.DataCenterVO;
import com.cloud.dc.HostPodVO;
import com.cloud.dc.PodCluster;
@ -51,6 +52,11 @@ import com.cloud.vm.VirtualMachineProfile;
* DAOs and the connections it manages.
*/
public interface AgentManager extends Manager {
public enum OnError {
Revert,
Continue,
Stop
}
/**
* easy send method that returns null if there's any errors. It handles all exceptions.
@ -81,9 +87,9 @@ public interface AgentManager extends Manager {
* @param stopOnError should the agent stop execution on the first error.
* @return an array of Answer
*/
Answer[] send(Long hostId, Command [] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException;
Answer[] send(Long hostId, Commands cmds) throws AgentUnavailableException, OperationTimedoutException;
Answer[] send(Long hostId, Command [] cmds, boolean stopOnError, int timeout) throws AgentUnavailableException, OperationTimedoutException;
Answer[] send(Long hostId, Commands cmds, int timeout) throws AgentUnavailableException, OperationTimedoutException;
/**
* Asynchronous sending of a command to the agent.
@ -102,7 +108,7 @@ public interface AgentManager extends Manager {
* @param listener the listener to process the answer.
* @return sequence number.
*/
long send(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) throws AgentUnavailableException;
long send(Long hostId, Commands cmds, Listener listener) throws AgentUnavailableException;
/**
* Register to listen for host events. These are mostly connection and

View File

@ -245,7 +245,7 @@ public abstract class AgentAttache {
s_logger.debug(log(seq, "Unable to find listener."));
}
} else {
processed = monitor.processAnswer(_id, seq, answers);
processed = monitor.processAnswers(_id, seq, answers);
if (s_logger.isTraceEnabled()) {
s_logger.trace(log(seq, (processed ? "" : " did not ") + " processed "));
}

View File

@ -387,7 +387,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory {
public void handleCommands(AgentAttache attache, final long sequence, final Command[] cmds) {
for (Pair<Integer, Listener> listener : _cmdMonitors) {
boolean processed = listener.second().processCommand(attache.getId(), sequence, cmds);
boolean processed = listener.second().processCommands(attache.getId(), sequence, cmds);
if (s_logger.isTraceEnabled()) {
s_logger.trace("SeqA " + attache.getId() + "-" + sequence + ": " + (processed ? "processed" : "not processed") + " by " + listener.getClass());
}
@ -674,8 +674,11 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory {
}
@Override
public Answer send(final Long hostId, final Command cmd, final int timeout) throws AgentUnavailableException, OperationTimedoutException {
Answer[] answers = send(hostId, new Command[] { cmd }, true, timeout);
public Answer send(Long hostId, Command cmd, int timeout) throws AgentUnavailableException, OperationTimedoutException {
Commands cmds = new Commands(OnError.Revert);
cmds.addCommand(cmd);
send(hostId, cmds, timeout);
Answer[] answers = cmds.getAnswers();
if (answers != null && !(answers[0] instanceof UnsupportedAnswer)) {
return answers[0];
}
@ -689,17 +692,18 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory {
}
@Override
public Answer[] send(final Long hostId, final Command[] cmds, final boolean stopOnError, final int timeout) throws AgentUnavailableException,
OperationTimedoutException {
public Answer[] send(Long hostId, Commands commands, int timeout) throws AgentUnavailableException, OperationTimedoutException {
assert hostId != null : "Who's not checking the agent id before sending? ... (finger wagging)";
if (hostId == null) {
throw new AgentUnavailableException(-1);
}
Command[] cmds = commands.toCommands();
assert cmds.length > 0 : "Ask yourself this about a hundred times. Why am I sending zero length commands?";
if (cmds.length == 0) {
return new Answer[0];
commands.setAnswers(new Answer[0]);
}
final AgentAttache agent = getAttache(hostId);
@ -708,8 +712,10 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory {
}
long seq = _hostDao.getNextSequence(hostId);
Request req = new Request(seq, hostId, _nodeId, cmds, stopOnError, true);
return agent.send(req, timeout);
Request req = new Request(seq, hostId, _nodeId, cmds, commands.stopOnError(), true, commands.revertOnError());
Answer[] answers = agent.send(req, timeout);
commands.setAnswers(answers);
return answers;
}
protected Status investigate(AgentAttache agent) {
@ -720,7 +726,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory {
try {
long seq = _hostDao.getNextSequence(hostId);
Request req = new Request(seq, hostId, _nodeId, new Command[] { new CheckHealthCommand() }, true, true);
Request req = new Request(seq, hostId, _nodeId, new CheckHealthCommand(), true);
Answer[] answers = agent.send(req, 50 * 1000);
if (answers != null && answers[0] != null ) {
Status status = answers[0].getResult() ? Status.Up : Status.Down;
@ -754,27 +760,28 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory {
}
@Override
public long send(final Long hostId, final Command[] cmds, final boolean stopOnError, final Listener listener) throws AgentUnavailableException {
public long send(Long hostId, Commands commands, Listener listener) throws AgentUnavailableException {
final AgentAttache agent = getAttache(hostId);
if (agent.isClosed()) {
return -1;
}
Command[] cmds = commands.toCommands();
assert cmds.length > 0 : "Why are you sending zero length commands?";
if (cmds.length == 0) {
return -1;
}
}
long seq = _hostDao.getNextSequence(hostId);
Request req = new Request(seq, hostId, _nodeId, cmds, stopOnError, true);
Request req = new Request(seq, hostId, _nodeId, cmds, commands.stopOnError(), true, commands.revertOnError());
agent.send(req, listener);
return seq;
}
@Override
public long gatherStats(final Long hostId, final Command cmd, final Listener listener) {
final Command[] cmds = new Command[] { cmd };
try {
return send(hostId, cmds, true, listener);
return send(hostId, new Commands(cmd), listener);
} catch (final AgentUnavailableException e) {
return -1;
}
@ -1042,7 +1049,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory {
AgentAttache attache = null;
if (s_logger.isDebugEnabled()) {
s_logger.debug("Startup request from directly connected host: " + new Request(0, -1, -1, cmds, false).toString());
s_logger.debug("Startup request from directly connected host: " + new Request(0l, -1l, -1l, cmds, true, false, true).toString());
}
try {
attache = handleDirectConnect(resource, cmds, details, old);
@ -1231,8 +1238,8 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory {
}
@Override
public Answer[] send(final Long hostId, final Command[] cmds, final boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException {
return send(hostId, cmds, stopOnError, _wait);
public Answer[] send(final Long hostId, Commands cmds) throws AgentUnavailableException, OperationTimedoutException {
return send(hostId, cmds, _wait);
}
@Override

View File

@ -166,12 +166,12 @@ public class AgentMonitor extends Thread implements Listener {
}
@Override
public boolean processAnswer(long agentId, long seq, Answer[] answers) {
public boolean processAnswers(long agentId, long seq, Answer[] answers) {
return false;
}
@Override
public boolean processCommand(long agentId, long seq, Command[] commands) {
public boolean processCommands(long agentId, long seq, Command[] commands) {
boolean processed = false;
for (Command cmd : commands) {
if (cmd instanceof PingCommand) {

View File

@ -0,0 +1,128 @@
/**
* Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.agent.manager;
import java.util.ArrayList;
import java.util.List;
import com.cloud.agent.AgentManager.OnError;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.utils.exception.CloudRuntimeException;
public class Commands {
OnError _handler;
private ArrayList<String> _ids = new ArrayList<String>();
private ArrayList<Command> _cmds = new ArrayList<Command>();
private Answer[] _answers;
public Commands(OnError handler) {
_handler = handler;
}
public Commands(Command cmd) {
this(OnError.Revert);
addCommand(cmd);
}
public void addCommands(List<Command> cmds) {
int i = 0;
for (Command cmd : cmds) {
addCommand(Integer.toString(i++), cmd);
}
}
public int size() {
return _cmds.size();
}
public void addCommand(String id, Command cmd) {
_ids.add(id);
_cmds.add(cmd);
}
public void addCommand(Command cmd) {
addCommand(null, cmd);
}
public Answer getAnswer(String id) {
int i = _ids.indexOf(id);
return _answers[i];
}
@SuppressWarnings("unchecked")
public <T extends Answer> T getAnswer(Class<T> clazz) {
assert(clazz != Answer.class) : "How do you expect to get a unique answer in this case? huh? How? How? How?....one more time....How?";
for (Answer answer : _answers) {
if (answer.getClass() == clazz) {
return (T)answer;
}
}
throw new CloudRuntimeException("Unable to get answer that is of " + clazz);
}
public <T extends Command> Answer getAnswerFor(Class<T> clazz) {
assert (clazz != Command.class) : "You passed in a generic Command. Seriously, you think you did that?";
int i = 0;
for (Command cmd : _cmds) {
if (cmd.getClass() == clazz) {
break;
}
i++;
}
assert i < _cmds.size() : "You sure you actually sent this command " + clazz;
return _answers[i];
}
public Command[] toCommands() {
return _cmds.toArray(new Command[_cmds.size()]);
}
public void setAnswers(Answer[] answers) {
assert answers.length == _cmds.size() : "We didn't get back the same number of answers as commands sent";
_answers = answers;
}
public OnError getErrorHandling() {
return _handler;
}
public boolean stopOnError() {
return _handler == OnError.Revert || _handler == OnError.Stop;
}
public boolean revertOnError() {
return _handler == OnError.Revert;
}
public Answer[] getAnswers() {
return _answers;
}
@SuppressWarnings("unchecked")
public <T extends Command> T getCommand(Class<T> clazz) {
for (Command cmd : _cmds) {
if (cmd.getClass() == clazz) {
return (T)cmd;
}
}
return null;
}
}

View File

@ -63,7 +63,7 @@ public class SynchronousListener implements Listener {
}
@Override
public synchronized boolean processAnswer(long agentId, long seq, Answer[] resp) {
public synchronized boolean processAnswers(long agentId, long seq, Answer[] resp) {
_answers = resp;
notifyAll();
return true;
@ -85,7 +85,7 @@ public class SynchronousListener implements Listener {
}
@Override
public boolean processCommand(long agentId, long seq, Command[] req) {
public boolean processCommands(long agentId, long seq, Command[] req) {
return false;
}

View File

@ -58,7 +58,6 @@ import com.cloud.network.dao.IPAddressDao;
import com.cloud.offering.ServiceOffering;
import com.cloud.service.ServiceOfferingVO;
import com.cloud.service.dao.ServiceOfferingDao;
import com.cloud.storage.Storage.StoragePoolType;
import com.cloud.storage.StorageManager;
import com.cloud.storage.StoragePoolVO;
import com.cloud.storage.dao.StoragePoolDao;
@ -78,7 +77,6 @@ import com.cloud.vm.dao.ConsoleProxyDao;
import com.cloud.vm.dao.DomainRouterDao;
import com.cloud.vm.dao.SecondaryStorageVmDao;
import com.cloud.vm.dao.UserVmDao;
import com.cloud.vm.dao.VMInstanceDao;
import com.sun.mail.smtp.SMTPMessage;
import com.sun.mail.smtp.SMTPSSLTransport;
import com.sun.mail.smtp.SMTPTransport;
@ -94,22 +92,21 @@ public class AlertManagerImpl implements AlertManager {
private String _name = null;
private EmailAlert _emailAlert;
private AlertDao _alertDao;
private HostDao _hostDao;
@Inject private AlertDao _alertDao;
@Inject private HostDao _hostDao;
@Inject protected StorageManager _storageMgr;
private ServiceOfferingDao _offeringsDao;
private CapacityDao _capacityDao;
private VMInstanceDao _vmDao;
private DomainRouterDao _routerDao;
private ConsoleProxyDao _consoleProxyDao;
private SecondaryStorageVmDao _secStorgaeVmDao;
private UserVmDao _userVmDao;
private DataCenterDao _dcDao;
private HostPodDao _podDao;
private VolumeDao _volumeDao;
private IPAddressDao _publicIPAddressDao;
private DataCenterIpAddressDaoImpl _privateIPAddressDao;
private StoragePoolDao _storagePoolDao;
@Inject private ServiceOfferingDao _offeringsDao;
@Inject private CapacityDao _capacityDao;
@Inject private DomainRouterDao _routerDao;
@Inject private ConsoleProxyDao _consoleProxyDao;
@Inject private SecondaryStorageVmDao _secStorgaeVmDao;
@Inject private UserVmDao _userVmDao;
@Inject private DataCenterDao _dcDao;
@Inject private HostPodDao _podDao;
@Inject private VolumeDao _volumeDao;
@Inject private IPAddressDao _publicIPAddressDao;
@Inject private DataCenterIpAddressDaoImpl _privateIPAddressDao;
@Inject private StoragePoolDao _storagePoolDao;
private Timer _timer = null;
private float _cpuOverProvisioningFactor = 1;
@ -183,79 +180,6 @@ public class AlertManagerImpl implements AlertManager {
_privateIPCapacityThreshold = Double.parseDouble(privateIPCapacityThreshold);
}
_hostDao = locator.getDao(HostDao.class);
if (_hostDao == null) {
s_logger.error("Unable to get the host dao.");
return false;
}
_vmDao = locator.getDao(VMInstanceDao.class);
if (_vmDao == null) {
s_logger.error("Unable to get the VM Instance dao.");
return false;
}
_routerDao = locator.getDao(DomainRouterDao.class);
_consoleProxyDao = locator.getDao(ConsoleProxyDao.class);
_secStorgaeVmDao = locator.getDao(SecondaryStorageVmDao.class);
_userVmDao = locator.getDao(UserVmDao.class);
if (_userVmDao == null) {
s_logger.error("Unable to get the UserVm dao.");
return false;
}
_offeringsDao = locator.getDao(ServiceOfferingDao.class);
if (_offeringsDao == null) {
s_logger.error("Unable to get the ServiceOffering dao.");
return false;
}
_capacityDao = locator.getDao(CapacityDao.class);
if (_capacityDao == null) {
s_logger.error("Unable to get the capacity dao.");
return false;
}
_alertDao = locator.getDao(AlertDao.class);
if (_alertDao == null) {
s_logger.error("Unable to get the alert dao.");
return false;
}
_dcDao = locator.getDao(DataCenterDao.class);
if (_dcDao == null) {
s_logger.error("Unable to get the DataCenter dao.");
return false;
}
_podDao = locator.getDao(HostPodDao.class);
if (_podDao == null) {
s_logger.error("Unable to get the Pod dao.");
return false;
}
_volumeDao = locator.getDao(VolumeDao.class);
if (_volumeDao == null) {
s_logger.error("Unable to get the Volume dao.");
return false;
}
_publicIPAddressDao = locator.getDao(IPAddressDao.class);
if (_publicIPAddressDao == null) {
throw new ConfigurationException("Unable to get " + IPAddressDao.class.getName());
}
_privateIPAddressDao = locator.getDao(DataCenterIpAddressDaoImpl.class);
if (_privateIPAddressDao == null) {
throw new ConfigurationException("Unable to get " + DataCenterIpAddressDaoImpl.class.getName());
}
_storagePoolDao = locator.getDao(StoragePoolDao.class);
if (_storagePoolDao == null) {
throw new ConfigurationException("Unable to get " + StoragePoolDao.class.getName());
}
String capacityCheckPeriodStr = configs.get("capacity.check.period");
if (capacityCheckPeriodStr != null) {
_capacityCheckPeriod = Long.parseLong(capacityCheckPeriodStr);

View File

@ -50,7 +50,7 @@ public class VMOperationListener implements Listener {
_cookie = cookie;
}
public boolean processAnswer(long agentId, long seq, Answer[] answers) {
public boolean processAnswers(long agentId, long seq, Answer[] answers) {
Answer answer = null;
if(answers != null)
answer = answers[0];
@ -61,7 +61,7 @@ public class VMOperationListener implements Listener {
return true;
}
public boolean processCommand(long agentId, long seq, Command[] commands) {
public boolean processCommands(long agentId, long seq, Command[] commands) {
return true;
}

View File

@ -52,7 +52,7 @@ public class VolumeOperationListener implements Listener {
_cookie = cookie;
}
public boolean processAnswer(long agentId, long seq, Answer[] answers) {
public boolean processAnswers(long agentId, long seq, Answer[] answers) {
Answer answer = null;
if(answers != null)
answer = answers[0];
@ -64,7 +64,7 @@ public class VolumeOperationListener implements Listener {
return true;
}
public boolean processCommand(long agentId, long seq, Command[] commands) {
public boolean processCommands(long agentId, long seq, Command[] commands) {
return true;
}

View File

@ -0,0 +1,12 @@
package com.cloud.capacity;
import com.cloud.utils.component.Manager;
/**
* Capacity Manager manages the different capacities
* available within the Cloud Stack.
*
*/
public interface CapacityManager extends Manager {
}

View File

@ -15,15 +15,37 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.vm;
package com.cloud.capacity;
import java.util.List;
import java.util.Map;
import com.cloud.agent.api.to.VirtualMachineTO;
import com.cloud.deploy.DeployDestination;
import com.cloud.utils.Pair;
import javax.naming.ConfigurationException;
public interface VirtualMachineChecker {
boolean finalizeDeployment(VirtualMachineTO vm, VirtualMachineProfile profile, DeployDestination dest);
boolean finalizeDeployments(List<Pair<VirtualMachineProfile, DeployDestination>> deployments);
import com.cloud.capacity.dao.CapacityDao;
import com.cloud.utils.component.Inject;
public class CapacityManagerImpl implements CapacityManager {
String _name;
@Inject CapacityDao _capacityDao;
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
_name = name;
return true;
}
@Override
public boolean start() {
return true;
}
@Override
public boolean stop() {
return true;
}
@Override
public String getName() {
return _name;
}
}

View File

@ -45,12 +45,12 @@ public class ConsoleProxyListener implements Listener {
}
@Override
public boolean processAnswer(long agentId, long seq, Answer[] answers) {
public boolean processAnswers(long agentId, long seq, Answer[] answers) {
return true;
}
@Override
public boolean processCommand(long agentId, long seq, Command[] commands) {
public boolean processCommands(long agentId, long seq, Command[] commands) {
return false;
}

View File

@ -52,6 +52,7 @@ import com.cloud.agent.api.ConsoleProxyLoadReportCommand;
import com.cloud.agent.api.MigrateCommand;
import com.cloud.agent.api.PrepareForMigrationCommand;
import com.cloud.agent.api.RebootCommand;
import com.cloud.agent.api.Start2Command;
import com.cloud.agent.api.StartConsoleProxyAnswer;
import com.cloud.agent.api.StartConsoleProxyCommand;
import com.cloud.agent.api.StartupCommand;
@ -61,6 +62,7 @@ import com.cloud.agent.api.proxy.ConsoleProxyLoadAnswer;
import com.cloud.agent.api.to.NicTO;
import com.cloud.agent.api.to.VirtualMachineTO;
import com.cloud.agent.api.to.VirtualMachineTO.SshMonitor;
import com.cloud.agent.manager.Commands;
import com.cloud.async.AsyncJobExecutor;
import com.cloud.async.AsyncJobManager;
import com.cloud.async.AsyncJobVO;
@ -149,7 +151,7 @@ import com.cloud.vm.State;
import com.cloud.vm.VMInstanceVO;
import com.cloud.vm.VirtualMachine;
import com.cloud.vm.VirtualMachine.Event;
import com.cloud.vm.VirtualMachineChecker;
import com.cloud.vm.VirtualMachineGuru;
import com.cloud.vm.VirtualMachineManager;
import com.cloud.vm.VirtualMachineName;
import com.cloud.vm.VirtualMachineProfile;
@ -178,7 +180,7 @@ import com.google.gson.GsonBuilder;
// because sooner or later, it will be driven into Running state
//
@Local(value = { ConsoleProxyManager.class })
public class ConsoleProxyManagerImpl implements ConsoleProxyManager, VirtualMachineManager<ConsoleProxyVO>, AgentHook, VirtualMachineChecker {
public class ConsoleProxyManagerImpl implements ConsoleProxyManager, VirtualMachineManager<ConsoleProxyVO>, AgentHook, VirtualMachineGuru {
private static final Logger s_logger = Logger.getLogger(ConsoleProxyManagerImpl.class);
private static final int DEFAULT_FIND_HOST_RETRY_COUNT = 2;
@ -2322,7 +2324,7 @@ public class ConsoleProxyManagerImpl implements ConsoleProxyManager, VirtualMach
haMgr.registerHandler(VirtualMachine.Type.ConsoleProxy, this);
}
boolean useLocalStorage = Boolean.parseBoolean((String) params.get(Config.SystemVMUseLocalStorage.key()));
boolean useLocalStorage = Boolean.parseBoolean(configs.get(Config.SystemVMUseLocalStorage.key()));
String networkRateStr = _configDao.getValue("network.throttling.rate");
String multicastRateStr = _configDao.getValue("multicast.throttling.rate");
_networkRate = ((networkRateStr == null) ? 200 : Integer.parseInt(networkRateStr));
@ -2347,7 +2349,10 @@ public class ConsoleProxyManagerImpl implements ConsoleProxyManager, VirtualMach
}
@Override
public boolean finalizeDeployment(VirtualMachineTO vm, VirtualMachineProfile profile, DeployDestination dest) {
public boolean finalizeDeployment(Commands cmds, VirtualMachineProfile profile, DeployDestination dest) {
Start2Command cmd = cmds.getCommand(Start2Command.class);
VirtualMachineTO vm = cmd.getVirtualMachine();
StringBuilder buf = new StringBuilder();
buf.append(" template=domP type=consoleproxy");
buf.append(" host=").append(_mgmt_host);
@ -2372,7 +2377,6 @@ public class ConsoleProxyManagerImpl implements ConsoleProxyManager, VirtualMach
buf.append(" dns2=").append(nic.getDns2());
}
}
// buf.append(" bootproto=dhcp"); //FIXME: Not sure what the private ip address is suppose to be.
if (nic.getType() == TrafficType.Management) {
buf.append(" localgw=").append(dest.getPod().getGateway());
} else if (nic.getType() == TrafficType.Control) {
@ -2398,7 +2402,7 @@ public class ConsoleProxyManagerImpl implements ConsoleProxyManager, VirtualMach
}
@Override
public boolean finalizeDeployments(List<Pair<VirtualMachineProfile, DeployDestination>> deployments) {
return false;
public boolean checkDeploymentResult(Commands cmds, VirtualMachineProfile profile, DeployDestination dest) {
return true;
}
}

View File

@ -18,7 +18,6 @@
package com.cloud.deploy;
import java.util.List;
import java.util.Set;
import javax.ejb.Local;
@ -43,7 +42,7 @@ public class SimplePlanner extends PlannerBase implements DeploymentPlanner {
@Inject ClusterDao _clusterDao;
@Override
public DeployDestination plan(VirtualMachineProfile vm, DeploymentPlan plan, Set<DeployDestination> avoid) {
public DeployDestination plan(VirtualMachineProfile vm, DeploymentPlan plan, ExcludeList avoid) {
DataCenterVO dc = _dcDao.findById(plan.getDataCenterId());
List<HostVO> hosts = _hostDao.listBy(Type.Routing, plan.getDataCenterId());
@ -61,7 +60,10 @@ public class SimplePlanner extends PlannerBase implements DeploymentPlanner {
return new DeployDestination(dc, pod, cluster, host);
}
public boolean check(VirtualMachineProfile vm, DeploymentPlan plan, DeployDestination dest, ExcludeList avoid) {
return true;
}
protected SimplePlanner() {
super();

View File

@ -22,6 +22,7 @@ import java.util.List;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.AgentManager.OnError;
import com.cloud.agent.Listener;
import com.cloud.agent.api.AgentControlAnswer;
import com.cloud.agent.api.AgentControlCommand;
@ -30,6 +31,7 @@ import com.cloud.agent.api.Command;
import com.cloud.agent.api.PingRoutingCommand;
import com.cloud.agent.api.StartupCommand;
import com.cloud.agent.api.StartupRoutingCommand;
import com.cloud.agent.manager.Commands;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.host.HostVO;
import com.cloud.host.Status;
@ -59,7 +61,7 @@ public class VmSyncListener implements Listener {
}
@Override
public boolean processAnswer(long agentId, long seq, Answer[] answers) {
public boolean processAnswers(long agentId, long seq, Answer[] answers) {
for (final Answer answer : answers) {
if (!answer.getResult()) {
s_logger.warn("Cleanup failed due to " + answer.getDetails());
@ -83,7 +85,7 @@ public class VmSyncListener implements Listener {
}
@Override
public boolean processCommand(long agentId, long seq, Command[] req) {
public boolean processCommands(long agentId, long seq, Command[] req) {
boolean processed = false;
for (Command cmd : req) {
if (cmd instanceof PingRoutingCommand) {
@ -92,7 +94,9 @@ public class VmSyncListener implements Listener {
List<Command> commands = _haMgr.deltaSync(agentId, ping.getNewStates());
if (commands.size() > 0) {
try {
_agentMgr.send(agentId, commands.toArray(new Command[commands.size()]), false, this);
Commands cmds = new Commands(OnError.Continue);
cmds.addCommands(commands);
_agentMgr.send(agentId, cmds, this);
} catch (final AgentUnavailableException e) {
s_logger.warn("Agent is now unavailable", e);
}
@ -128,9 +132,10 @@ public class VmSyncListener implements Listener {
s_logger.debug("Sending clean commands to the agent");
if (commands.size() > 0) {
final Command[] cmds = commands.toArray(new Command[commands.size()]);
Commands cmds = new Commands(OnError.Continue);
cmds.addCommands(commands);
try {
_agentMgr.send(agentId, cmds, false, this);
_agentMgr.send(agentId, cmds, this);
} catch (final AgentUnavailableException e) {
s_logger.warn("Agent is unavailable now", e);
}

View File

@ -52,13 +52,13 @@ public class KvmServerDiscoverer extends DiscovererBase implements Discoverer,
@Inject ClusterDao _clusterDao;
@Override
public boolean processAnswer(long agentId, long seq, Answer[] answers) {
public boolean processAnswers(long agentId, long seq, Answer[] answers) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean processCommand(long agentId, long seq, Command[] commands) {
public boolean processCommands(long agentId, long seq, Command[] commands) {
// TODO Auto-generated method stub
return false;
}

View File

@ -486,12 +486,12 @@ public class XcpServerDiscoverer extends DiscovererBase implements Discoverer, L
}
@Override
public boolean processAnswer(long agentId, long seq, Answer[] answers) {
public boolean processAnswers(long agentId, long seq, Answer[] answers) {
return false;
}
@Override
public boolean processCommand(long agentId, long seq, Command[] commands) {
public boolean processCommands(long agentId, long seq, Command[] commands) {
return false;
}

View File

@ -34,6 +34,7 @@ import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.AgentManager.OnError;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.CheckVirtualMachineAnswer;
import com.cloud.agent.api.CheckVirtualMachineCommand;
@ -57,6 +58,7 @@ import com.cloud.agent.api.routing.SavePasswordCommand;
import com.cloud.agent.api.routing.SetFirewallRuleCommand;
import com.cloud.agent.api.routing.VmDataCommand;
import com.cloud.agent.api.to.NicTO;
import com.cloud.agent.manager.Commands;
import com.cloud.alert.AlertManager;
import com.cloud.api.BaseCmd;
import com.cloud.async.AsyncJobExecutor;
@ -216,6 +218,7 @@ public class NetworkManagerImpl implements NetworkManager, VirtualMachineManager
@Inject NetworkConfigurationDao _networkProfileDao = null;
@Inject NicDao _nicDao;
@Inject GuestOSDao _guestOSDao = null;
// @Inject DomainRouterManager _routerMgr;
@Inject(adapter=NetworkGuru.class)
Adapters<NetworkGuru> _networkGurus;
@ -1183,28 +1186,27 @@ public class NetworkManagerImpl implements NetworkManager, VirtualMachineManager
private boolean resendDhcpEntries(final DomainRouterVO router){
final List<UserVmVO> vms = _vmDao.listBy(router.getId(), State.Creating, State.Starting, State.Running, State.Stopping, State.Stopped, State.Migrating);
final List<Command> cmdList = new ArrayList<Command>();
Commands cmds = new Commands(OnError.Continue);
for (UserVmVO vm: vms) {
if (vm.getGuestIpAddress() == null || vm.getGuestMacAddress() == null || vm.getName() == null)
continue;
DhcpEntryCommand decmd = new DhcpEntryCommand(vm.getGuestMacAddress(), vm.getGuestIpAddress(), router.getPrivateIpAddress(), vm.getName());
cmdList.add(decmd);
cmds.addCommand(decmd);
}
if (cmdList.size() > 0) {
final Command [] cmds = new Command[cmdList.size()];
Answer [] answers = null;
if (cmds.size() > 0) {
try {
answers = _agentMgr.send(router.getHostId(), cmdList.toArray(cmds), false);
_agentMgr.send(router.getHostId(), cmds);
} catch (final AgentUnavailableException e) {
s_logger.warn("agent unavailable", e);
} catch (final OperationTimedoutException e) {
s_logger.warn("Timed Out", e);
}
Answer[] answers = cmds.getAnswers();
if (answers == null ){
return false;
}
int i=0;
while (i < cmdList.size()) {
while (i < cmds.size()) {
Answer ans = answers[i];
i++;
if ((ans != null) && (ans.getResult())) {
@ -1398,7 +1400,7 @@ public class NetworkManagerImpl implements NetworkManager, VirtualMachineManager
@Override
public boolean associateIP(final DomainRouterVO router, final List<String> ipAddrList, final boolean add, long vmId) {
final Command [] cmds = new Command[ipAddrList.size()];
Commands cmds = new Commands(OnError.Continue);
int i=0;
boolean sourceNat = false;
for (final String ipAddress: ipAddrList) {
@ -1423,14 +1425,14 @@ public class NetworkManagerImpl implements NetworkManager, VirtualMachineManager
vmGuestAddress = _vmDao.findById(vmId).getGuestIpAddress();
}
cmds[i++] = new IPAssocCommand(router.getInstanceName(), router.getPrivateIpAddress(), ipAddress, add, firstIP, sourceNat, vlanId, vlanGateway, vlanNetmask, vifMacAddress, vmGuestAddress);
cmds.addCommand(new IPAssocCommand(router.getInstanceName(), router.getPrivateIpAddress(), ipAddress, add, firstIP, sourceNat, vlanId, vlanGateway, vlanNetmask, vifMacAddress, vmGuestAddress));
sourceNat = false;
}
Answer[] answers = null;
try {
answers = _agentMgr.send(router.getHostId(), cmds, false);
answers = _agentMgr.send(router.getHostId(), cmds);
} catch (final AgentUnavailableException e) {
s_logger.warn("Agent unavailable", e);
return false;
@ -1500,7 +1502,7 @@ public class NetworkManagerImpl implements NetworkManager, VirtualMachineManager
return result;
}
final List<Command> cmdList = new ArrayList<Command>();
Commands cmds = new Commands(OnError.Continue);
final List<FirewallRuleVO> lbRules = new ArrayList<FirewallRuleVO>();
final List<FirewallRuleVO> fwdRules = new ArrayList<FirewallRuleVO>();
@ -1515,7 +1517,7 @@ public class NetworkManagerImpl implements NetworkManager, VirtualMachineManager
if (rule.isForwarding()) {
fwdRules.add(rule);
final SetFirewallRuleCommand cmd = new SetFirewallRuleCommand(routerName, routerIp, rule);
cmdList.add(cmd);
cmds.addCommand(cmd);
} else {
lbRules.add(rule);
}
@ -1526,12 +1528,11 @@ public class NetworkManagerImpl implements NetworkManager, VirtualMachineManager
final String [] cfg = cfgrtr.generateConfiguration(fwRules);
final String [][] addRemoveRules = cfgrtr.generateFwRules(fwRules);
final LoadBalancerCfgCommand cmd = new LoadBalancerCfgCommand(cfg, addRemoveRules, routerName, routerIp);
cmdList.add(cmd);
cmds.addCommand(cmd);
}
final Command [] cmds = new Command[cmdList.size()];
Answer [] answers = null;
try {
answers = _agentMgr.send(host.getId(), cmdList.toArray(cmds), false);
answers = _agentMgr.send(host.getId(), cmds);
} catch (final AgentUnavailableException e) {
s_logger.warn("agent unavailable", e);
} catch (final OperationTimedoutException e) {
@ -1585,7 +1586,7 @@ public class NetworkManagerImpl implements NetworkManager, VirtualMachineManager
return result;
}
final Command [] cmds = new Command[fwRules.size()];
Commands cmds = new Commands(OnError.Continue);
int i=0;
for (final FirewallRuleVO rule: fwRules) {
IPAddressVO ip = _ipAddressDao.findById(rule.getPublicIpAddress());
@ -1595,17 +1596,17 @@ public class NetworkManagerImpl implements NetworkManager, VirtualMachineManager
if (rule.isForwarding()) {
fwdRules.add(rule);
final SetFirewallRuleCommand cmd = new SetFirewallRuleCommand(router.getInstanceName(), router.getPrivateIpAddress(), rule);
cmds[i++] = cmd;
cmds.addCommand(cmd);
}
}
final Answer [] answers = null;
try {
_agentMgr.send(hostId, cmds, false);
_agentMgr.send(hostId, cmds);
} catch (final AgentUnavailableException e) {
s_logger.warn("agent unavailable", e);
} catch (final OperationTimedoutException e) {
s_logger.warn("Timed Out", e);
}
Answer[] answers = cmds.getAnswers();
if (answers == null ){
return result;
}
@ -1890,7 +1891,7 @@ public class NetworkManagerImpl implements NetworkManager, VirtualMachineManager
_agentMgr.registerForHostEvents(new SshKeysDistriMonitor(this, _hostDao, _configDao), true, false, false);
_haMgr.registerHandler(VirtualMachine.Type.DomainRouter, this);
boolean useLocalStorage = Boolean.parseBoolean((String)params.get(Config.SystemVMUseLocalStorage.key()));
boolean useLocalStorage = Boolean.parseBoolean(configs.get(Config.SystemVMUseLocalStorage.key()));
String networkRateStr = _configDao.getValue("network.throttling.rate");
String multicastRateStr = _configDao.getValue("multicast.throttling.rate");
_networkRate = ((networkRateStr == null) ? 200 : Integer.parseInt(networkRateStr));
@ -2297,14 +2298,14 @@ public class NetworkManagerImpl implements NetworkManager, VirtualMachineManager
}
String userData = vm.getUserData();
int cmdsLength = (password == null ? 0:1) + 1;
Command[] cmds = new Command[++cmdsLength];
Commands cmds = new Commands(OnError.Stop);
int cmdIndex = 0;
int passwordIndex = -1;
int vmDataIndex = -1;
cmds[cmdIndex] = new DhcpEntryCommand(vm.getGuestMacAddress(), vm.getGuestIpAddress(), router.getPrivateIpAddress(), vm.getName());
cmds.addCommand(new DhcpEntryCommand(vm.getGuestMacAddress(), vm.getGuestIpAddress(), router.getPrivateIpAddress(), vm.getName()));
if (password != null) {
final String encodedPassword = rot13(password);
cmds[++cmdIndex] = new SavePasswordCommand(encodedPassword, vm.getPrivateIpAddress(), router.getPrivateIpAddress(), vm.getName());
cmds.addCommand(new SavePasswordCommand(encodedPassword, vm.getPrivateIpAddress(), router.getPrivateIpAddress(), vm.getName()));
passwordIndex = cmdIndex;
}
@ -2313,10 +2314,10 @@ public class NetworkManagerImpl implements NetworkManager, VirtualMachineManager
String zoneName = _dcDao.findById(vm.getDataCenterId()).getName();
String routerPublicIpAddress = (router.getPublicIpAddress() != null) ? router.getPublicIpAddress() : vm.getGuestIpAddress();
cmds[++cmdIndex] = generateVmDataCommand(router.getPrivateIpAddress(), routerPublicIpAddress, vm.getPrivateIpAddress(), userData, serviceOffering, zoneName, vm.getGuestIpAddress(), vm.getName(), vm.getInstanceName(), vm.getId());
cmds.addCommand(generateVmDataCommand(router.getPrivateIpAddress(), routerPublicIpAddress, vm.getPrivateIpAddress(), userData, serviceOffering, zoneName, vm.getGuestIpAddress(), vm.getName(), vm.getInstanceName(), vm.getId()));
vmDataIndex = cmdIndex;
Answer[] answers = _agentMgr.send(router.getHostId(), cmds, true);
Answer[] answers = _agentMgr.send(router.getHostId(), cmds);
if (!answers[0].getResult()) {
s_logger.error("Unable to set dhcp entry for " + vm.getId() + " - " + vm.getName() +" on domR: " + router.getName() + " due to " + answers[0].getDetails());
return null;

View File

@ -56,7 +56,7 @@ public class SshKeysDistriMonitor implements Listener {
}
@Override
public synchronized boolean processAnswer(long agentId, long seq, Answer[] resp) {
public synchronized boolean processAnswers(long agentId, long seq, Answer[] resp) {
return true;
}
@ -96,7 +96,7 @@ public class SshKeysDistriMonitor implements Listener {
@Override
public boolean processCommand(long agentId, long seq, Command[] commands) {
public boolean processCommands(long agentId, long seq, Command[] commands) {
// TODO Auto-generated method stub
return false;
}

View File

@ -0,0 +1,59 @@
/**
* Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.network.element;
import javax.ejb.Local;
import com.cloud.network.NetworkConfiguration;
import com.cloud.offering.NetworkOffering;
import com.cloud.utils.component.AdapterBase;
import com.cloud.vm.NicProfile;
import com.cloud.vm.VirtualMachineProfile;
@Local(value=NetworkElement.class)
public class DomainRouterElement extends AdapterBase implements NetworkElement {
@Override
public boolean implement(NetworkConfiguration config, NetworkOffering offering) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean prepare(NetworkConfiguration config, NicProfile nic, VirtualMachineProfile vm, NetworkOffering offering) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean release(NetworkConfiguration config, NicProfile nic, VirtualMachineProfile vm, NetworkOffering offering) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean shutdown(NetworkConfiguration config, NetworkOffering offering) {
// TODO Auto-generated method stub
return false;
}
protected DomainRouterElement() {
super();
}
}

View File

@ -1,10 +1,243 @@
/**
* Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.network.router;
import java.util.List;
import java.util.Map;
import com.cloud.agent.api.to.NicTO;
import com.cloud.async.executor.AssignToLoadBalancerExecutor;
import com.cloud.async.executor.LoadBalancerParam;
import com.cloud.dc.DataCenterVO;
import com.cloud.dc.HostPodVO;
import com.cloud.dc.VlanVO;
import com.cloud.deploy.DeployDestination;
import com.cloud.deploy.DeploymentPlan;
import com.cloud.exception.ConcurrentOperationException;
import com.cloud.exception.InsufficientAddressCapacityException;
import com.cloud.exception.InsufficientCapacityException;
import com.cloud.exception.InsufficientVirtualNetworkCapcityException;
import com.cloud.exception.ResourceAllocationException;
import com.cloud.hypervisor.Hypervisor.HypervisorType;
import com.cloud.network.FirewallRuleVO;
import com.cloud.network.IPAddressVO;
import com.cloud.network.NetworkConfiguration;
import com.cloud.network.NetworkConfigurationVO;
import com.cloud.offerings.NetworkOfferingVO;
import com.cloud.service.ServiceOfferingVO;
import com.cloud.user.AccountVO;
import com.cloud.utils.Pair;
import com.cloud.utils.component.Manager;
import com.cloud.vm.DomainRouter;
import com.cloud.vm.DomainRouterVO;
import com.cloud.vm.NicProfile;
import com.cloud.vm.NicVO;
import com.cloud.vm.UserVmVO;
import com.cloud.vm.VMInstanceVO;
import com.cloud.vm.VirtualMachineProfile;
/**
* NetworkManager manages the network for the different end users.
*
*/
public interface DomainRouterManager extends Manager {
public static final int DEFAULT_ROUTER_VM_RAMSIZE = 128; // 128M
public static final boolean USE_POD_VLAN = false;
/**
* Assigns a router to the user.
*
* @param userId user id.
* @param dcId data center id.
* @param podId pod id.
* @param domain domain to use
* @return DomainRouter if one is assigned.
* @throws InsufficientCapacityException if assigning causes any capacity issues.
*/
DomainRouterVO assignRouter(long userId, long accountId, long dcId, long podId, String domain, String instance) throws InsufficientCapacityException;
/**
* create the router.
*
* @param accountId account Id the router belongs to.
* @param ipAddress public ip address the router should use to access the internet.
* @param dcId data center id the router should live in.
* @param domain domain name of this network.
* @param offering service offering associated with this request
* @return DomainRouterVO if created. null if not.
*/
DomainRouterVO createRouter(long accountId, String ipAddress, long dcId, String domain, ServiceOfferingVO offering, long startEventId) throws ConcurrentOperationException;
/**
* create a DHCP server/user data server for directly connected VMs
* @param userId the user id of the user creating the router.
* @param accountId the account id of the user creating the router.
* @param dcId data center id the router should live in.
* @param domain domain name of this network.
* @return DomainRouterVO if created. null if not.
*/
DomainRouterVO createDhcpServerForDirectlyAttachedGuests(long userId, long accountId, DataCenterVO dc, HostPodVO pod, Long candidateHost, VlanVO vlan) throws ConcurrentOperationException;
/**
/*
* Send ssh public/private key pair to specified host
* @param hostId
* @param pubKey
* @param prvKey
*/
boolean sendSshKeysToHost(Long hostId, String pubKey, String prvKey);
/**
* save a vm password on the router.
*
* @param routerId the ID of the router to save the password to
* @param vmIpAddress the IP address of the User VM that will use the password
* @param password the password to save to the router
*/
boolean savePasswordToRouter(long routerId, String vmIpAddress, String password);
DomainRouterVO startRouter(long routerId, long eventId);
boolean releaseRouter(long routerId);
boolean destroyRouter(long routerId);
boolean stopRouter(long routerId, long eventId);
boolean getRouterStatistics(long vmId, Map<String, long[]> netStats, Map<String, long[]> diskStats);
boolean rebootRouter(long routerId, long eventId);
/**
* @param hostId get all of the virtual machine routers on a host.
* @return collection of VirtualMachineRouter
*/
List<? extends DomainRouter> getRouters(long hostId);
/**
* @param routerId id of the router
* @return VirtualMachineRouter
*/
DomainRouterVO getRouter(long routerId);
/**
* Do all of the work of releasing public ip addresses. Note that
* if this method fails, there can be side effects.
* @param userId
* @param ipAddress
* @return true if it did; false if it didn't
*/
public boolean releasePublicIpAddress(long userId, String ipAddress);
/**
* Find or create the source nat ip address a user uses within the
* data center.
*
* @param account account
* @param dc data center
* @param domain domain used for user's network.
* @param so service offering associated with this request
* @return public ip address.
*/
public String assignSourceNatIpAddress(AccountVO account, DataCenterVO dc, String domain, ServiceOfferingVO so, long startEventId, HypervisorType hyperType) throws ResourceAllocationException;
/**
* @param fwRules list of rules to be updated
* @param router router where the rules have to be updated
* @return list of rules successfully updated
*/
public List<FirewallRuleVO> updatePortForwardingRules(List<FirewallRuleVO> fwRules, DomainRouterVO router, Long hostId);
/**
* @param fwRules list of rules to be updated
* @param router router where the rules have to be updated
* @return success
*/
public boolean updateLoadBalancerRules(List<FirewallRuleVO> fwRules, DomainRouterVO router, Long hostId);
/**
* @param publicIpAddress public ip address associated with the fwRules
* @param fwRules list of rules to be updated
* @param router router where the rules have to be updated
* @return list of rules successfully updated
*/
public List<FirewallRuleVO> updateFirewallRules(String publicIpAddress, List<FirewallRuleVO> fwRules, DomainRouterVO router);
/**
* Associates or disassociates a list of public IP address for a router.
* @param router router object to send the association to
* @param ipAddrList list of public IP addresses
* @param add true if associate, false if disassociate
* @param vmId
* @return
*/
boolean associateIP(DomainRouterVO router, List<String> ipAddrList, boolean add, long vmId) throws ResourceAllocationException;
boolean updateFirewallRule(FirewallRuleVO fwRule, String oldPrivateIP, String oldPrivatePort);
boolean executeAssignToLoadBalancer(AssignToLoadBalancerExecutor executor, LoadBalancerParam param);
/**
* Add a DHCP entry on the domr dhcp server
* @param routerHostId - the host id of the domr
* @param routerIp - the private ip address of the domr
* @param vmName - the name of the VM (e.g., i-10-TEST)
* @param vmMac - the mac address of the eth0 interface of the VM
* @param vmIp - the ip address to hand out.
* @return success or failure
*/
public boolean addDhcpEntry(long routerHostId, String routerIp, String vmName, String vmMac, String vmIp);
/**
* Adds a virtual machine into the guest network.
* 1. Starts the domR
* 2. Sets the dhcp Entry on the domR
* 3. Sets the domR
*
* @param vm user vm to add to the guest network
* @param password password for this vm. Can be null
* @return DomainRouterVO if everything is successful. null if not.
*
* @throws ConcurrentOperationException if multiple starts are being attempted.
*/
public DomainRouterVO addVirtualMachineToGuestNetwork(UserVmVO vm, String password, long startEventId) throws ConcurrentOperationException;
String createZoneVlan(DomainRouterVO router);
/**
* Lists IP addresses that belong to VirtualNetwork VLANs
* @param accountId - account that the IP address should belong to
* @param dcId - zone that the IP address should belong to
* @param sourceNat - (optional) true if the IP address should be a source NAT address
* @return - list of IP addresses
*/
List<IPAddressVO> listPublicIpAddressesInVirtualNetwork(long accountId, long dcId, Boolean sourceNat);
NetworkConfigurationVO setupNetworkConfiguration(AccountVO owner, NetworkOfferingVO offering, DeploymentPlan plan);
NetworkConfigurationVO setupNetworkConfiguration(AccountVO owner, NetworkOfferingVO offering, NetworkConfiguration predefined, DeploymentPlan plan);
List<NetworkConfigurationVO> setupNetworkConfigurations(AccountVO owner, List<NetworkOfferingVO> offerings, DeploymentPlan plan);
List<NetworkOfferingVO> getSystemAccountNetworkOfferings(String... offeringNames);
List<NicProfile> allocate(VirtualMachineProfile vm, List<Pair<NetworkConfigurationVO, NicProfile>> networks) throws InsufficientCapacityException;
NicTO[] prepare(VirtualMachineProfile profile, DeployDestination dest) throws InsufficientAddressCapacityException, InsufficientVirtualNetworkCapcityException;
void release(VirtualMachineProfile vmProfile);
<K extends VMInstanceVO> void create(K vm);
<K extends VMInstanceVO> List<NicVO> getNics(K vm);
boolean upgradeRouter(long routerId, long serviceOfferingId);
}

View File

@ -71,7 +71,7 @@ public class NetworkGroupListener implements Listener {
@Override
public boolean processAnswer(long agentId, long seq, Answer[] answers) {
public boolean processAnswers(long agentId, long seq, Answer[] answers) {
Set<Long> affectedVms = new HashSet<Long>();
int commandNum = 0;
for (Answer ans: answers) {
@ -95,7 +95,7 @@ public class NetworkGroupListener implements Listener {
}
@Override
public boolean processCommand(long agentId, long seq, Command[] commands) {
public boolean processCommands(long agentId, long seq, Command[] commands) {
boolean processed = false;
for (Command cmd : commands) {
if (cmd instanceof PingRoutingWithNwGroupsCommand) {

View File

@ -38,9 +38,9 @@ import org.apache.commons.codec.digest.DigestUtils;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.NetworkIngressRulesCmd;
import com.cloud.agent.api.NetworkIngressRulesCmd.IpPortAndProto;
import com.cloud.agent.manager.Commands;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.domain.DomainVO;
import com.cloud.domain.dao.DomainDao;
@ -662,9 +662,9 @@ public class NetworkGroupManagerImpl implements NetworkGroupManager {
if (agentId != null ) {
_rulesetLogDao.findByVmId(work.getInstanceId());
NetworkIngressRulesCmd cmd = generateRulesetCmd(vm.getInstanceName(), vm.getGuestIpAddress(), vm.getGuestMacAddress(), vm.getId(), generateRulesetSignature(rules), seqnum, rules);
Command[] cmds = new Command[]{cmd};
Commands cmds = new Commands(cmd);
try {
_agentMgr.send(agentId, cmds, false, _answerListener);
_agentMgr.send(agentId, cmds, _answerListener);
} catch (AgentUnavailableException e) {
s_logger.debug("Unable to send updates for vm: " + userVmId + "(agentid=" + agentId + ")");
_workDao.updateStep(work.getInstanceId(), seqnum, Step.Done);

View File

@ -32,12 +32,13 @@ import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.AgentManager.OnError;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.GetFileStatsCommand;
import com.cloud.agent.api.GetStorageStatsCommand;
import com.cloud.agent.api.HostStatsEntry;
import com.cloud.agent.api.VmStatsEntry;
import com.cloud.agent.manager.Commands;
import com.cloud.capacity.CapacityVO;
import com.cloud.capacity.dao.CapacityDao;
import com.cloud.exception.AgentUnavailableException;
@ -59,7 +60,6 @@ import com.cloud.storage.dao.VolumeDao;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.component.ComponentLocator;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.GlobalLock;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.Transaction;
import com.cloud.vm.UserVmManager;
@ -144,7 +144,8 @@ public class StatsCollector {
}
class HostCollector implements Runnable {
public void run() {
@Override
public void run() {
try {
s_logger.debug("HostStatsCollector is running...");
@ -184,7 +185,8 @@ public class StatsCollector {
}
class VmStatsCollector implements Runnable {
public void run() {
@Override
public void run() {
try {
s_logger.debug("VmStatsCollector is running...");
@ -252,7 +254,8 @@ public class StatsCollector {
}
class StorageCollector implements Runnable {
public void run() {
@Override
public void run() {
try {
SearchCriteria<HostVO> sc = _hostDao.createSearchCriteria();
sc.addAnd("status", SearchCriteria.Op.EQ, Status.Up.toString());
@ -371,7 +374,8 @@ public class StatsCollector {
}
class VolumeCollector implements Runnable {
public void run() {
@Override
public void run() {
try {
List<VolumeVO> volumes = _volsDao.listAll();
Map<Long, List<VolumeCommand>> commandsByPool = new HashMap<Long, List<VolumeCommand>>();
@ -393,16 +397,16 @@ public class StatsCollector {
List<VolumeCommand> commandsList = commandsByPool.get(poolId);
long[] volumeIdArray = new long[commandsList.size()];
Command[] commandsArray = new Command[commandsList.size()];
Commands commands = new Commands(OnError.Continue);
for (int i = 0; i < commandsList.size(); i++) {
VolumeCommand vCommand = commandsList.get(i);
volumeIdArray[i] = vCommand.volumeId;
commandsArray[i] = vCommand.command;
commands.addCommand(vCommand.command);
}
List<StoragePoolHostVO> poolhosts = _storagePoolHostDao.listByPoolId(poolId);
for(StoragePoolHostVO poolhost : poolhosts) {
Answer[] answers = _agentMgr.send(poolhost.getHostId(), commandsArray, false);
Answer[] answers = _agentMgr.send(poolhost.getHostId(), commands);
if (answers != null) {
long totalBytes = 0L;
for (int i = 0; i < answers.length; i++) {

View File

@ -54,12 +54,12 @@ public class LocalStoragePoolListener implements Listener {
}
@Override
public boolean processAnswer(long agentId, long seq, Answer[] answers) {
public boolean processAnswers(long agentId, long seq, Answer[] answers) {
return false;
}
@Override
public boolean processCommand(long agentId, long seq, Command[] commands) {
public boolean processCommands(long agentId, long seq, Command[] commands) {
return false;
}

View File

@ -25,6 +25,7 @@ import java.util.Map;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.to.VolumeTO;
import com.cloud.agent.manager.Commands;
import com.cloud.dc.DataCenterVO;
import com.cloud.dc.HostPodVO;
import com.cloud.deploy.DeployDestination;
@ -235,7 +236,7 @@ public interface StorageManager extends Manager {
*/
boolean volumeOnSharedStoragePool(VolumeVO volume);
Answer[] sendToPool(StoragePool pool, Command[] cmds, boolean stopOnError);
Answer[] sendToPool(StoragePool pool, Commands cmds);
Answer sendToPool(StoragePool pool, Command cmd);

View File

@ -57,6 +57,7 @@ import com.cloud.agent.api.storage.DeleteTemplateCommand;
import com.cloud.agent.api.storage.DestroyCommand;
import com.cloud.agent.api.to.StorageFilerTO;
import com.cloud.agent.api.to.VolumeTO;
import com.cloud.agent.manager.Commands;
import com.cloud.alert.AlertManager;
import com.cloud.api.BaseCmd;
import com.cloud.async.AsyncInstanceCreateStatus;
@ -427,13 +428,13 @@ public class StorageManagerImpl implements StorageManager {
}
@Override
public Answer[] sendToPool(StoragePool pool, Command[] cmds, boolean stopOnError) {
public Answer[] sendToPool(StoragePool pool, Commands cmds) {
List<StoragePoolHostVO> poolHosts = _poolHostDao.listByHostStatus(pool.getId(), Status.Up);
Collections.shuffle(poolHosts);
for (StoragePoolHostVO poolHost: poolHosts) {
try {
Answer[] answerRet = _agentMgr.send(poolHost.getHostId(), cmds, stopOnError);
Answer[] answerRet = _agentMgr.send(poolHost.getHostId(), cmds);
return answerRet;
} catch (AgentUnavailableException e) {
@ -450,8 +451,8 @@ public class StorageManagerImpl implements StorageManager {
@Override
public Answer sendToPool(StoragePool pool, Command cmd) {
Command[] cmds = new Command[]{cmd};
Answer[] answers = sendToPool(pool, cmds, true);
Commands cmds = new Commands(cmd);
Answer[] answers = sendToPool(pool, cmds);
if (answers == null) {
return null;
}

View File

@ -208,7 +208,7 @@ public class DownloadListener implements Listener {
@Override
public boolean processAnswer(long agentId, long seq, Answer[] answers) {
public boolean processAnswers(long agentId, long seq, Answer[] answers) {
boolean processed = false;
if(answers != null & answers.length > 0) {
if(answers[0] instanceof DownloadAnswer) {
@ -258,7 +258,7 @@ public class DownloadListener implements Listener {
}
@Override
public boolean processCommand(long agentId, long seq, Command[] req) {
public boolean processCommands(long agentId, long seq, Command[] req) {
return false;
}

View File

@ -56,7 +56,7 @@ public class StoragePoolMonitor implements Listener {
}
@Override
public synchronized boolean processAnswer(long agentId, long seq, Answer[] resp) {
public synchronized boolean processAnswers(long agentId, long seq, Answer[] resp) {
return true;
}
@ -87,7 +87,7 @@ public class StoragePoolMonitor implements Listener {
@Override
public boolean processCommand(long agentId, long seq, Command[] req) {
public boolean processCommands(long agentId, long seq, Command[] req) {
return false;
}

View File

@ -40,7 +40,7 @@ public class StorageSyncListener implements Listener {
}
@Override
public boolean processAnswer(long agentId, long seq, Answer[] answers) {
public boolean processAnswers(long agentId, long seq, Answer[] answers) {
for (Answer answer : answers) {
if (answer.getResult() == false) {
s_logger.warn("Unable to execute sync command: " + answer.toString());
@ -63,7 +63,7 @@ public class StorageSyncListener implements Listener {
}
@Override
public boolean processCommand(long agentId, long seq, Command[] request) {
public boolean processCommands(long agentId, long seq, Command[] request) {
return false;
}

View File

@ -46,7 +46,7 @@ public class SecondaryStorageListener implements Listener {
}
@Override
public boolean processAnswer(long agentId, long seq, Answer[] answers) {
public boolean processAnswers(long agentId, long seq, Answer[] answers) {
boolean processed = false;
if(answers != null) {
for(int i = 0; i < answers.length; i++) {
@ -61,7 +61,7 @@ public class SecondaryStorageListener implements Listener {
}
@Override
public boolean processCommand(long agentId, long seq, Command[] commands) {
public boolean processCommands(long agentId, long seq, Command[] commands) {
return false;
}

View File

@ -1385,7 +1385,7 @@ public class SecondaryStorageManagerImpl implements SecondaryStorageVmManager, V
_IpAllocator = it.nextElement();
}
boolean useLocalStorage = Boolean.parseBoolean((String)params.get(Config.SystemVMUseLocalStorage.key()));
boolean useLocalStorage = Boolean.parseBoolean(configs.get(Config.SystemVMUseLocalStorage.key()));
String networkRateStr = _configDao.getValue("network.throttling.rate");
String multicastRateStr = _configDao.getValue("multicast.throttling.rate");
_networkRate = ((networkRateStr == null) ? 200 : Integer.parseInt(networkRateStr));

View File

@ -186,7 +186,7 @@ public class UploadListener implements Listener {
}
@Override
public boolean processAnswer(long agentId, long seq, Answer[] answers) {
public boolean processAnswers(long agentId, long seq, Answer[] answers) {
boolean processed = false;
if(answers != null & answers.length > 0) {
if(answers[0] instanceof UploadAnswer) {
@ -205,7 +205,7 @@ public class UploadListener implements Listener {
@Override
public boolean processCommand(long agentId, long seq, Command[] commands) {
public boolean processCommands(long agentId, long seq, Command[] commands) {
return false;
}

View File

@ -18,10 +18,8 @@
package com.cloud.vm;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.ejb.Local;
import javax.naming.ConfigurationException;
@ -29,19 +27,23 @@ import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.api.Start2Answer;
import com.cloud.agent.AgentManager.OnError;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Start2Command;
import com.cloud.agent.api.to.NicTO;
import com.cloud.agent.api.to.VirtualMachineTO;
import com.cloud.agent.api.to.VolumeTO;
import com.cloud.agent.manager.Commands;
import com.cloud.configuration.Config;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.deploy.DeployDestination;
import com.cloud.deploy.DeploymentPlan;
import com.cloud.deploy.DeploymentPlanner;
import com.cloud.deploy.DeploymentPlanner.ExcludeList;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.exception.ConcurrentOperationException;
import com.cloud.exception.InsufficientCapacityException;
import com.cloud.exception.InsufficientServerCapacityException;
import com.cloud.exception.OperationTimedoutException;
import com.cloud.exception.StorageUnavailableException;
import com.cloud.network.NetworkConfigurationVO;
@ -209,7 +211,7 @@ public class MauriceMoss implements VmManager {
}
@Override
public <T extends VMInstanceVO> T start(T vm, DeploymentPlan plan, VirtualMachineChecker checker) throws InsufficientCapacityException, ConcurrentOperationException {
public <T extends VMInstanceVO> T start(T vm, DeploymentPlan plan, VirtualMachineGuru guru) throws InsufficientCapacityException, ConcurrentOperationException {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Creating actual resources for VM " + vm);
}
@ -230,23 +232,25 @@ public class MauriceMoss implements VmManager {
throw new CloudRuntimeException("Guest OS is not set");
}
VirtualMachineProfile vmProfile = new VirtualMachineProfile(vm, offering, guestOS.getDisplayName(), template.getHypervisorType());
_vmDao.updateIf(vm, Event.StartRequested, null);
if (!_vmDao.updateIf(vm, Event.StartRequested, null)) {
throw new ConcurrentOperationException("Unable to start vm " + vm + " due to concurrent operations");
}
Set<DeployDestination> avoids = new HashSet<DeployDestination>();
ExcludeList avoids = new ExcludeList();
int retry = _retry;
while (retry-- != 0) { // It's != so that it can match -1.
DeployDestination dest = null;
for (DeploymentPlanner dispatcher : _planners) {
dest = dispatcher.plan(vmProfile, plan, avoids);
for (DeploymentPlanner planner : _planners) {
dest = planner.plan(vmProfile, plan, avoids);
if (dest != null) {
avoids.add(dest);
avoids.addHost(dest.getHost().getId());
journal.record("Deployment found ", vmProfile, dest);
break;
}
}
if (dest == null) {
throw new CloudRuntimeException("Unable to create a deployment for " + vmProfile);
throw new InsufficientServerCapacityException("Unable to create a deployment for " + vmProfile);
}
vm.setDataCenterId(dest.getDataCenter().getId());
@ -268,20 +272,20 @@ public class MauriceMoss implements VmManager {
vmTO.setNics(nics);
vmTO.setDisks(volumes);
if (checker != null) {
checker.finalizeDeployment(vmTO, vmProfile, dest);
Commands cmds = new Commands(OnError.Revert);
cmds.addCommand(new Start2Command(vmTO));
if (guru != null) {
guru.finalizeDeployment(cmds, vmProfile, dest);
}
Start2Command cmd = new Start2Command(vmTO);
try {
Start2Answer answer = (Start2Answer)_agentMgr.send(dest.getHost().getId(), cmd);
if (answer.getResult()) {
Answer[] answers = _agentMgr.send(dest.getHost().getId(), cmds);
if (answers[0].getResult()) {
if (!_vmDao.updateIf(vm, Event.OperationSucceeded, dest.getHost().getId())) {
throw new CloudRuntimeException("Unable to transition to a new state.");
}
return vm;
}
s_logger.info("Unable to start VM on " + dest.getHost() + " due to " + answer.getDetails());
s_logger.info("Unable to start VM on " + dest.getHost() + " due to " + answers[0].getDetails());
} catch (AgentUnavailableException e) {
s_logger.debug("Unable to send the start command to host " + dest.getHost());
continue;

View File

@ -50,7 +50,6 @@ import com.cloud.agent.api.CreatePrivateTemplateFromSnapshotCommand;
import com.cloud.agent.api.CreatePrivateTemplateFromVolumeCommand;
import com.cloud.agent.api.GetVmStatsAnswer;
import com.cloud.agent.api.GetVmStatsCommand;
import com.cloud.agent.api.ManageSnapshotAnswer;
import com.cloud.agent.api.ManageSnapshotCommand;
import com.cloud.agent.api.MigrateCommand;
import com.cloud.agent.api.PrepareForMigrationCommand;
@ -60,6 +59,7 @@ import com.cloud.agent.api.StartCommand;
import com.cloud.agent.api.StopCommand;
import com.cloud.agent.api.VmStatsEntry;
import com.cloud.agent.api.storage.CreatePrivateTemplateAnswer;
import com.cloud.agent.manager.Commands;
import com.cloud.alert.AlertManager;
import com.cloud.api.BaseCmd;
import com.cloud.async.AsyncJobExecutor;
@ -105,7 +105,6 @@ import com.cloud.host.Host;
import com.cloud.host.HostVO;
import com.cloud.host.dao.DetailsDao;
import com.cloud.host.dao.HostDao;
import com.cloud.hypervisor.Hypervisor;
import com.cloud.hypervisor.Hypervisor.HypervisorType;
import com.cloud.network.FirewallRuleVO;
import com.cloud.network.IPAddressVO;
@ -128,8 +127,6 @@ import com.cloud.service.ServiceOfferingVO;
import com.cloud.service.dao.ServiceOfferingDao;
import com.cloud.storage.DiskOfferingVO;
import com.cloud.storage.GuestOSVO;
import com.cloud.storage.Snapshot;
import com.cloud.storage.Snapshot.SnapshotType;
import com.cloud.storage.SnapshotVO;
import com.cloud.storage.Storage;
import com.cloud.storage.Storage.ImageFormat;
@ -161,7 +158,6 @@ import com.cloud.user.dao.AccountDao;
import com.cloud.user.dao.UserDao;
import com.cloud.user.dao.UserStatisticsDao;
import com.cloud.uservm.UserVm;
import com.cloud.utils.DateUtil;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.Pair;
import com.cloud.utils.component.Adapters;
@ -1043,8 +1039,7 @@ public class UserVmManagerImpl implements UserVmManager {
StopCommand cmd = new StopCommand(vm, vm.getInstanceName(), vm.getVnet());
try {
long seq = _agentMgr.send(vm.getHostId(), new Command[] {cmd}, true,
new VMOperationListener(executor, param, vm, 0));
long seq = _agentMgr.send(vm.getHostId(), new Commands(cmd), new VMOperationListener(executor, param, vm, 0));
resultDescription = "Execute asynchronize stop VM command: sending command to agent, seq - " + seq;
if(s_logger.isDebugEnabled())
s_logger.debug(resultDescription);
@ -1118,8 +1113,7 @@ public class UserVmManagerImpl implements UserVmManager {
if (vm.getState() == State.Running && vm.getHostId() != null) {
RebootCommand cmd = new RebootCommand(vm.getInstanceName());
try {
long seq = _agentMgr.send(vm.getHostId(), new Command[] {cmd}, true,
new VMOperationListener(executor, param, vm, 0));
long seq = _agentMgr.send(vm.getHostId(), new Commands(cmd), new VMOperationListener(executor, param, vm, 0));
resultDescription = "Execute asynchronize Reboot VM command: sending command to agent, seq - " + seq;
if(s_logger.isDebugEnabled())
s_logger.debug(resultDescription);
@ -1707,8 +1701,7 @@ public class UserVmManagerImpl implements UserVmManager {
param.setChildEventId(childEventId);
StopCommand cmd = new StopCommand(vm, vm.getInstanceName(), vm.getVnet());
try {
long seq = _agentMgr.send(vm.getHostId(), new Command[] {cmd}, true,
new VMOperationListener(executor, param, vm, 0));
long seq = _agentMgr.send(vm.getHostId(), new Commands(cmd), new VMOperationListener(executor, param, vm, 0));
resultDescription = "Execute asynchronize destroy VM command: sending stop command to agent, seq - " + seq;
if(s_logger.isDebugEnabled())
s_logger.debug(resultDescription);

View File

@ -0,0 +1,46 @@
/**
* Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.vm;
import com.cloud.agent.manager.Commands;
import com.cloud.deploy.DeployDestination;
/**
* A VirtualMachineGuru knows how to process a certain type of virtual machine.
*
*/
public interface VirtualMachineGuru {
/**
* finalize the virtual machine deployment.
* @param cmds commands that were created.
* @param profile virtual machine profile.
* @param dest destination to send the command.
* @return true if everything checks out. false if not and we should try again.
*/
boolean finalizeDeployment(Commands cmds, VirtualMachineProfile profile, DeployDestination dest);
/**
* Check the deployment results.
* @param cmds commands and answers that were sent.
* @param profile virtual machine profile.
* @param dest destination it was sent to.
* @return true if deployment was fine; false if it didn't go well.
*/
boolean checkDeploymentResult(Commands cmds, VirtualMachineProfile profile, DeployDestination dest);
}

View File

@ -62,7 +62,7 @@ public interface VmManager extends Manager {
DeploymentPlan plan,
AccountVO owner) throws InsufficientCapacityException, StorageUnavailableException;
<T extends VMInstanceVO> T start(T vm, DeploymentPlan plan, VirtualMachineChecker checker) throws InsufficientCapacityException, StorageUnavailableException, ConcurrentOperationException;
<T extends VMInstanceVO> T start(T vm, DeploymentPlan plan, VirtualMachineGuru checker) throws InsufficientCapacityException, StorageUnavailableException, ConcurrentOperationException;
<T extends VMInstanceVO> T stop(T vm) throws AgentUnavailableException, ConcurrentOperationException;

View File

@ -28,7 +28,7 @@ public class NicDaoImpl extends GenericDaoBase<NicVO, Long> implements NicDao {
public List<NicVO> listBy(long instanceId) {
SearchCriteria<NicVO> sc = InstanceSearch.create();
sc.setParameters("instance", instanceId);
return listIncludingRemovedBy(sc);
return listBy(sc);
}
}

View File

@ -436,7 +436,6 @@ CREATE TABLE `cloud`.`host` (
`ram` bigint unsigned,
`resource` varchar(255) DEFAULT NULL COMMENT 'If it is a local resource, this is the class name',
`version` varchar(40) NOT NULL,
`sequence` bigint unsigned NOT NULL DEFAULT 1,
`parent` varchar(255) COMMENT 'parent path for the storage server',
`total_size` bigint unsigned COMMENT 'TotalSize',
`capabilities` varchar(255) COMMENT 'host capabilities in comma separated list',