From 79a3f8c5774c50dc128968c29da5096dc3dde39e Mon Sep 17 00:00:00 2001 From: wilderrodrigues Date: Tue, 8 Sep 2015 12:12:55 +0200 Subject: [PATCH] CLOUDSTACK-8822 - Replacing Runnable by Callable in the Taks and NioConnection classes - All the sub-classes were also updated according to the changes in the super-classes - There were also code formatting changes --- agent/src/com/cloud/agent/Agent.java | 125 +++--- .../cloud/agent/manager/AgentManagerImpl.java | 18 +- .../manager/ClusteredAgentManagerImpl.java | 363 +++++++++--------- .../com/cloud/utils/SerialVersionUID.java | 2 + .../exception/NioConnectionException.java | 48 +++ .../exception/TaskExecutionException.java | 48 +++ .../java/com/cloud/utils/nio/NioClient.java | 32 +- .../com/cloud/utils/nio/NioConnection.java | 314 ++++++++------- .../java/com/cloud/utils/nio/NioServer.java | 10 +- .../main/java/com/cloud/utils/nio/Task.java | 25 +- .../com/cloud/utils/testcase/NioTest.java | 35 +- 11 files changed, 587 insertions(+), 433 deletions(-) create mode 100644 utils/src/main/java/com/cloud/utils/exception/NioConnectionException.java create mode 100644 utils/src/main/java/com/cloud/utils/exception/TaskExecutionException.java diff --git a/agent/src/com/cloud/agent/Agent.java b/agent/src/com/cloud/agent/Agent.java index ac2d9ba29cc..e3510c41c32 100644 --- a/agent/src/com/cloud/agent/Agent.java +++ b/agent/src/com/cloud/agent/Agent.java @@ -35,9 +35,8 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.naming.ConfigurationException; -import org.apache.log4j.Logger; - import org.apache.cloudstack.managed.context.ManagedContextTimerTask; +import org.apache.log4j.Logger; import com.cloud.agent.api.AgentControlAnswer; import com.cloud.agent.api.AgentControlCommand; @@ -59,6 +58,8 @@ import com.cloud.utils.PropertiesUtil; import com.cloud.utils.backoff.BackoffAlgorithm; import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.exception.CloudRuntimeException; +import com.cloud.utils.exception.NioConnectionException; +import com.cloud.utils.exception.TaskExecutionException; import com.cloud.utils.nio.HandlerFactory; import com.cloud.utils.nio.Link; import com.cloud.utils.nio.NioClient; @@ -121,11 +122,11 @@ public class Agent implements HandlerFactory, IAgentControl { long _startupWait = _startupWaitDefault; boolean _reconnectAllowed = true; //For time sentitive task, e.g. PingTask - private ThreadPoolExecutor _ugentTaskPool; + private final ThreadPoolExecutor _ugentTaskPool; ExecutorService _executor; // for simulator use only - public Agent(IAgentShell shell) { + public Agent(final IAgentShell shell) { _shell = shell; _link = null; @@ -134,29 +135,29 @@ public class Agent implements HandlerFactory, IAgentControl { Runtime.getRuntime().addShutdownHook(new ShutdownThread(this)); _ugentTaskPool = - new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue(), new NamedThreadFactory( - "UgentTask")); + new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue(), new NamedThreadFactory( + "UgentTask")); _executor = - new ThreadPoolExecutor(_shell.getWorkers(), 5 * _shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue(), new NamedThreadFactory( - "agentRequest-Handler")); + new ThreadPoolExecutor(_shell.getWorkers(), 5 * _shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue(), new NamedThreadFactory( + "agentRequest-Handler")); } - public Agent(IAgentShell shell, int localAgentId, ServerResource resource) throws ConfigurationException { + public Agent(final IAgentShell shell, final int localAgentId, final ServerResource resource) throws ConfigurationException { _shell = shell; _resource = resource; _link = null; resource.setAgentControl(this); - String value = _shell.getPersistentProperty(getResourceName(), "id"); + final String value = _shell.getPersistentProperty(getResourceName(), "id"); _id = value != null ? Long.parseLong(value) : null; - s_logger.info("id is " + ((_id != null) ? _id : "")); + s_logger.info("id is " + (_id != null ? _id : "")); final Map params = PropertiesUtil.toMap(_shell.getProperties()); // merge with properties from command line to let resource access command line parameters - for (Map.Entry cmdLineProp : _shell.getCmdLineProperties().entrySet()) { + for (final Map.Entry cmdLineProp : _shell.getCmdLineProperties().entrySet()) { params.put(cmdLineProp.getKey(), cmdLineProp.getValue()); } @@ -172,15 +173,15 @@ public class Agent implements HandlerFactory, IAgentControl { Runtime.getRuntime().addShutdownHook(new ShutdownThread(this)); _ugentTaskPool = - new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue(), new NamedThreadFactory( - "UgentTask")); + new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue(), new NamedThreadFactory( + "UgentTask")); _executor = - new ThreadPoolExecutor(_shell.getWorkers(), 5 * _shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue(), new NamedThreadFactory( - "agentRequest-Handler")); + new ThreadPoolExecutor(_shell.getWorkers(), 5 * _shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue(), new NamedThreadFactory( + "agentRequest-Handler")); s_logger.info("Agent [id = " + (_id != null ? _id : "new") + " : type = " + getResourceName() + " : zone = " + _shell.getZone() + " : pod = " + _shell.getPod() + - " : workers = " + _shell.getWorkers() + " : host = " + _shell.getHost() + " : port = " + _shell.getPort()); + " : workers = " + _shell.getWorkers() + " : host = " + _shell.getHost() + " : port = " + _shell.getPort()); } public String getVersion() { @@ -188,7 +189,7 @@ public class Agent implements HandlerFactory, IAgentControl { } public String getResourceGuid() { - String guid = _shell.getGuid(); + final String guid = _shell.getGuid(); return guid + "-" + getResourceName(); } @@ -222,11 +223,19 @@ public class Agent implements HandlerFactory, IAgentControl { throw new CloudRuntimeException("Unable to start the resource: " + _resource.getName()); } - _connection.start(); + try { + _connection.start(); + } catch (final NioConnectionException e) { + throw new CloudRuntimeException("Unable to start the connection!", e); + } while (!_connection.isStartup()) { _shell.getBackoffAlgorithm().waitBeforeRetry(); _connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this); - _connection.start(); + try { + _connection.start(); + } catch (final NioConnectionException e) { + throw new CloudRuntimeException("Unable to start the connection!", e); + } } } @@ -236,12 +245,12 @@ public class Agent implements HandlerFactory, IAgentControl { final ShutdownCommand cmd = new ShutdownCommand(reason, detail); try { if (_link != null) { - Request req = new Request((_id != null ? _id : -1), -1, cmd, false); + final Request req = new Request(_id != null ? _id : -1, -1, cmd, false); _link.send(req.toBytes()); } } catch (final ClosedChannelException e) { s_logger.warn("Unable to send: " + cmd.toString()); - } catch (Exception e) { + } catch (final Exception e) { s_logger.warn("Unable to send: " + cmd.toString() + " due to exception: ", e); } s_logger.debug("Sending shutdown to management server"); @@ -294,13 +303,13 @@ public class Agent implements HandlerFactory, IAgentControl { _watchList.clear(); } } - public synchronized void lockStartupTask(Link link) + public synchronized void lockStartupTask(final Link link) { _startup = new StartupTask(link); _timer.schedule(_startup, _startupWait); } - public void sendStartup(Link link) { + public void sendStartup(final Link link) { final StartupCommand[] startup = _resource.initialize(); if (startup != null) { final Command[] commands = new Command[startup.length]; @@ -323,7 +332,7 @@ public class Agent implements HandlerFactory, IAgentControl { } } - protected void setupStartupCommand(StartupCommand startup) { + protected void setupStartupCommand(final StartupCommand startup) { InetAddress addr; try { addr = InetAddress.getLocalHost(); @@ -349,7 +358,7 @@ public class Agent implements HandlerFactory, IAgentControl { } @Override - public Task create(Task.Type type, Link link, byte[] data) { + public Task create(final Task.Type type, final Link link, final byte[] data) { return new ServerHandler(type, link, data); } @@ -391,19 +400,23 @@ public class Agent implements HandlerFactory, IAgentControl { try { _connection.cleanUp(); - } catch (IOException e) { + } catch (final IOException e) { s_logger.warn("Fail to clean up old connection. " + e); } _connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this); do { s_logger.info("Reconnecting..."); - _connection.start(); + try { + _connection.start(); + } catch (final NioConnectionException e) { + throw new CloudRuntimeException("Unable to start the connection!", e); + } _shell.getBackoffAlgorithm().waitBeforeRetry(); } while (!_connection.isStartup()); s_logger.info("Connected to the server"); } - public void processStartupAnswer(Answer answer, Response response, Link link) { + public void processStartupAnswer(final Answer answer, final Response response, final Link link) { boolean cancelled = false; synchronized (this) { if (_startup != null) { @@ -450,7 +463,7 @@ public class Agent implements HandlerFactory, IAgentControl { if (s_logger.isDebugEnabled()) { if (!requestLogged) // ensures request is logged only once per method call { - String requestMsg = request.toString(); + final String requestMsg = request.toString(); if (requestMsg != null) { s_logger.debug("Request:" + requestMsg); } @@ -464,7 +477,7 @@ public class Agent implements HandlerFactory, IAgentControl { scheduleWatch(link, request, (long)watch.getInterval() * 1000, watch.getInterval() * 1000); answer = new Answer(cmd, true, null); } else if (cmd instanceof ShutdownCommand) { - ShutdownCommand shutdown = (ShutdownCommand)cmd; + final ShutdownCommand shutdown = (ShutdownCommand)cmd; s_logger.debug("Received shutdownCommand, due to: " + shutdown.getReason()); cancelTasks(); _reconnectAllowed = false; @@ -481,7 +494,7 @@ public class Agent implements HandlerFactory, IAgentControl { } else if (cmd instanceof AgentControlCommand) { answer = null; synchronized (_controlListeners) { - for (IAgentControlListener listener : _controlListeners) { + for (final IAgentControlListener listener : _controlListeners) { answer = listener.processControlRequest(request, (AgentControlCommand)cmd); if (answer != null) { break; @@ -527,7 +540,7 @@ public class Agent implements HandlerFactory, IAgentControl { response = new Response(request, answers); } finally { if (s_logger.isDebugEnabled()) { - String responseMsg = response.toString(); + final String responseMsg = response.toString(); if (responseMsg != null) { s_logger.debug(response.toString()); } @@ -553,7 +566,7 @@ public class Agent implements HandlerFactory, IAgentControl { } else if (answer instanceof AgentControlAnswer) { // Notice, we are doing callback while holding a lock! synchronized (_controlListeners) { - for (IAgentControlListener listener : _controlListeners) { + for (final IAgentControlListener listener : _controlListeners) { listener.processControlResponse(response, (AgentControlAnswer)answer); } } @@ -562,7 +575,7 @@ public class Agent implements HandlerFactory, IAgentControl { } } - public void processReadyCommand(Command cmd) { + public void processReadyCommand(final Command cmd) { final ReadyCommand ready = (ReadyCommand)cmd; @@ -574,10 +587,10 @@ public class Agent implements HandlerFactory, IAgentControl { } - public void processOtherTask(Task task) { + public void processOtherTask(final Task task) { final Object obj = task.get(); if (obj instanceof Response) { - if ((System.currentTimeMillis() - _lastPingResponseTime) > _pingInterval * _shell.getPingRetries()) { + if (System.currentTimeMillis() - _lastPingResponseTime > _pingInterval * _shell.getPingRetries()) { s_logger.error("Ping Interval has gone past " + _pingInterval * _shell.getPingRetries() + ". Won't reconnect to mgt server, as connection is still alive"); return; } @@ -633,25 +646,25 @@ public class Agent implements HandlerFactory, IAgentControl { } @Override - public void registerControlListener(IAgentControlListener listener) { + public void registerControlListener(final IAgentControlListener listener) { synchronized (_controlListeners) { _controlListeners.add(listener); } } @Override - public void unregisterControlListener(IAgentControlListener listener) { + public void unregisterControlListener(final IAgentControlListener listener) { synchronized (_controlListeners) { _controlListeners.remove(listener); } } @Override - public AgentControlAnswer sendRequest(AgentControlCommand cmd, int timeoutInMilliseconds) throws AgentControlChannelException { - Request request = new Request(this.getId(), -1, new Command[] {cmd}, true, false); + public AgentControlAnswer sendRequest(final AgentControlCommand cmd, final int timeoutInMilliseconds) throws AgentControlChannelException { + final Request request = new Request(getId(), -1, new Command[] {cmd}, true, false); request.setSequence(getNextSequence()); - AgentControlListener listener = new AgentControlListener(request); + final AgentControlListener listener = new AgentControlListener(request); registerControlListener(listener); try { @@ -659,7 +672,7 @@ public class Agent implements HandlerFactory, IAgentControl { synchronized (listener) { try { listener.wait(timeoutInMilliseconds); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { s_logger.warn("sendRequest is interrupted, exit waiting"); } } @@ -671,13 +684,13 @@ public class Agent implements HandlerFactory, IAgentControl { } @Override - public void postRequest(AgentControlCommand cmd) throws AgentControlChannelException { - Request request = new Request(this.getId(), -1, new Command[] {cmd}, true, false); + public void postRequest(final AgentControlCommand cmd) throws AgentControlChannelException { + final Request request = new Request(getId(), -1, new Command[] {cmd}, true, false); request.setSequence(getNextSequence()); postRequest(request); } - private void postRequest(Request request) throws AgentControlChannelException { + private void postRequest(final Request request) throws AgentControlChannelException { if (_link != null) { try { _link.send(request.toBytes()); @@ -694,7 +707,7 @@ public class Agent implements HandlerFactory, IAgentControl { private AgentControlAnswer _answer; private final Request _request; - public AgentControlListener(Request request) { + public AgentControlListener(final Request request) { _request = request; } @@ -703,12 +716,12 @@ public class Agent implements HandlerFactory, IAgentControl { } @Override - public Answer processControlRequest(Request request, AgentControlCommand cmd) { + public Answer processControlRequest(final Request request, final AgentControlCommand cmd) { return null; } @Override - public void processControlResponse(Response response, AgentControlAnswer answer) { + public void processControlResponse(final Response response, final AgentControlAnswer answer) { if (_request.getSequence() == response.getSequence()) { _answer = answer; synchronized (this) { @@ -797,13 +810,13 @@ public class Agent implements HandlerFactory, IAgentControl { } public class AgentRequestHandler extends Task { - public AgentRequestHandler(Task.Type type, Link link, Request req) { + public AgentRequestHandler(final Task.Type type, final Link link, final Request req) { super(type, link, req); } @Override - protected void doTask(Task task) throws Exception { - Request req = (Request)this.get(); + protected void doTask(final Task task) throws TaskExecutionException { + final Request req = (Request)get(); if (!(req instanceof Response)) { processRequest(req, task.getLink()); } @@ -811,16 +824,16 @@ public class Agent implements HandlerFactory, IAgentControl { } public class ServerHandler extends Task { - public ServerHandler(Task.Type type, Link link, byte[] data) { + public ServerHandler(final Task.Type type, final Link link, final byte[] data) { super(type, link, data); } - public ServerHandler(Task.Type type, Link link, Request req) { + public ServerHandler(final Task.Type type, final Link link, final Request req) { super(type, link, req); } @Override - public void doTask(final Task task) { + public void doTask(final Task task) throws TaskExecutionException { if (task.getType() == Task.Type.CONNECT) { _shell.getBackoffAlgorithm().reset(); setLink(task.getLink()); @@ -835,7 +848,7 @@ public class Agent implements HandlerFactory, IAgentControl { } else { //put the requests from mgt server into another thread pool, as the request may take a longer time to finish. Don't block the NIO main thread pool //processRequest(request, task.getLink()); - _executor.execute(new AgentRequestHandler(this.getType(), this.getLink(), request)); + _executor.submit(new AgentRequestHandler(getType(), getLink(), request)); } } catch (final ClassNotFoundException e) { s_logger.error("Unable to find this request "); diff --git a/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java b/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java index a38fd0881c2..ef8a373e269 100644 --- a/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java +++ b/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java @@ -103,6 +103,8 @@ import com.cloud.utils.db.SearchCriteria.Op; import com.cloud.utils.db.TransactionLegacy; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.exception.HypervisorVersionChangedException; +import com.cloud.utils.exception.NioConnectionException; +import com.cloud.utils.exception.TaskExecutionException; import com.cloud.utils.fsm.NoTransitionException; import com.cloud.utils.fsm.StateMachine2; import com.cloud.utils.nio.HandlerFactory; @@ -593,7 +595,11 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl startDirectlyConnectedHosts(); if (_connection != null) { - _connection.start(); + try { + _connection.start(); + } catch (final NioConnectionException e) { + s_logger.error("Error when connecting to the NioServer!", e); + } } _monitorExecutor.scheduleWithFixedDelay(new MonitorTask(), PingInterval.value(), PingInterval.value(), TimeUnit.SECONDS); @@ -827,7 +833,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl Status determinedState = investigate(attache); // if state cannot be determined do nothing and bail out if (determinedState == null) { - if (((System.currentTimeMillis() >> 10) - host.getLastPinged()) > AlertWait.value()) { + if ((System.currentTimeMillis() >> 10) - host.getLastPinged() > AlertWait.value()) { s_logger.warn("Agent " + hostId + " state cannot be determined for more than " + AlertWait + "(" + AlertWait.value() + ") seconds, will go to Alert state"); determinedState = Status.Alert; } else { @@ -840,7 +846,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl s_logger.info("The agent " + hostId + " state determined is " + determinedState); if (determinedState == Status.Down) { - String message = "Host is down: " + host.getId() + "-" + host.getName() + ". Starting HA on the VMs"; + final String message = "Host is down: " + host.getId() + "-" + host.getName() + ". Starting HA on the VMs"; s_logger.error(message); if (host.getType() != Host.Type.SecondaryStorage && host.getType() != Host.Type.ConsoleProxy) { _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, host.getDataCenterId(), host.getPodId(), "Host down, " + host.getId(), message); @@ -1299,7 +1305,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl } @Override - protected void doTask(final Task task) throws Exception { + protected void doTask(final Task task) throws TaskExecutionException { final TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB); try { final Type type = task.getType(); @@ -1315,6 +1321,10 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl } catch (final UnsupportedVersionException e) { s_logger.warn(e.getMessage()); // upgradeAgent(task.getLink(), data, e.getReason()); + } catch (final ClassNotFoundException e) { + final String message = String.format("Exception occured when executing taks! Error '%s'", e.getMessage()); + s_logger.error(message); + throw new TaskExecutionException(message, e); } } else if (type == Task.Type.CONNECT) { } else if (type == Task.Type.DISCONNECT) { diff --git a/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java index 2bc4f6859c5..04be1ab7ee4 100644 --- a/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java +++ b/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java @@ -43,10 +43,6 @@ import javax.naming.ConfigurationException; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; -import org.apache.log4j.Logger; - -import com.google.gson.Gson; - import org.apache.cloudstack.framework.config.ConfigDepot; import org.apache.cloudstack.framework.config.ConfigKey; import org.apache.cloudstack.framework.config.dao.ConfigurationDao; @@ -54,6 +50,7 @@ import org.apache.cloudstack.managed.context.ManagedContextRunnable; import org.apache.cloudstack.managed.context.ManagedContextTimerTask; import org.apache.cloudstack.utils.identity.ManagementServerNode; import org.apache.cloudstack.utils.security.SSLUtils; +import org.apache.log4j.Logger; import com.cloud.agent.AgentManager; import com.cloud.agent.api.Answer; @@ -93,8 +90,10 @@ import com.cloud.utils.db.QueryBuilder; import com.cloud.utils.db.SearchCriteria.Op; import com.cloud.utils.db.TransactionLegacy; import com.cloud.utils.exception.CloudRuntimeException; +import com.cloud.utils.exception.TaskExecutionException; import com.cloud.utils.nio.Link; import com.cloud.utils.nio.Task; +import com.google.gson.Gson; @Local(value = {AgentManager.class, ClusteredAgentRebalanceService.class}) public class ClusteredAgentManagerImpl extends AgentManagerImpl implements ClusterManagerListener, ClusteredAgentRebalanceService { @@ -139,7 +138,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust "Interval between scans to load agents", false, ConfigKey.Scope.Global, 1000); @Override - public boolean configure(String name, Map xmlParams) throws ConfigurationException { + public boolean configure(final String name, final Map xmlParams) throws ConfigurationException { _peers = new HashMap(7); _sslEngines = new HashMap(7); _nodeId = ManagementServerNode.getManagementServerId(); @@ -192,17 +191,17 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } // for agents that are self-managed, threshold to be considered as disconnected after pingtimeout - long cutSeconds = (System.currentTimeMillis() >> 10) - getTimeout(); - List hosts = _hostDao.findAndUpdateDirectAgentToLoad(cutSeconds, LoadSize.value().longValue(), _nodeId); - List appliances = _hostDao.findAndUpdateApplianceToLoad(cutSeconds, _nodeId); + final long cutSeconds = (System.currentTimeMillis() >> 10) - getTimeout(); + final List hosts = _hostDao.findAndUpdateDirectAgentToLoad(cutSeconds, LoadSize.value().longValue(), _nodeId); + final List appliances = _hostDao.findAndUpdateApplianceToLoad(cutSeconds, _nodeId); - if (hosts != null) { + if (hosts != null) { hosts.addAll(appliances); if (hosts.size() > 0) { s_logger.debug("Found " + hosts.size() + " unmanaged direct hosts, processing connect for them..."); - for (HostVO host : hosts) { + for (final HostVO host : hosts) { try { - AgentAttache agentattache = findAttache(host.getId()); + final AgentAttache agentattache = findAttache(host.getId()); if (agentattache != null) { // already loaded, skip if (agentattache.forForward()) { @@ -219,7 +218,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ")"); } loadDirectlyConnectedHost(host, false); - } catch (Throwable e) { + } catch (final Throwable e) { s_logger.warn(" can not load directly connected host " + host.getId() + "(" + host.getName() + ") due to ", e); } } @@ -235,20 +234,20 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust protected void runInContext() { try { runDirectAgentScanTimerTask(); - } catch (Throwable e) { + } catch (final Throwable e) { s_logger.error("Unexpected exception " + e.getMessage(), e); } } } @Override - public Task create(Task.Type type, Link link, byte[] data) { + public Task create(final Task.Type type, final Link link, final byte[] data) { return new ClusteredAgentHandler(type, link, data); } - protected AgentAttache createAttache(long id) { + protected AgentAttache createAttache(final long id) { s_logger.debug("create forwarding ClusteredAgentAttache for " + id); - HostVO host = _hostDao.findById(id); + final HostVO host = _hostDao.findById(id); final AgentAttache attache = new ClusteredAgentAttache(this, id, host.getName()); AgentAttache old = null; synchronized (_agents) { @@ -265,7 +264,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } @Override - protected AgentAttache createAttacheForConnect(HostVO host, Link link) { + protected AgentAttache createAttacheForConnect(final HostVO host, final Link link) { s_logger.debug("create ClusteredAgentAttache for " + host.getId()); final AgentAttache attache = new ClusteredAgentAttache(this, host.getId(), host.getName(), link, host.isInMaintenanceStates()); link.attach(attache); @@ -281,7 +280,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } @Override - protected AgentAttache createAttacheForDirectConnect(Host host, ServerResource resource) { + protected AgentAttache createAttacheForDirectConnect(final Host host, final ServerResource resource) { s_logger.debug("create ClusteredDirectAgentAttache for " + host.getId()); final DirectAgentAttache attache = new ClusteredDirectAgentAttache(this, host.getId(), host.getName(), _nodeId, resource, host.isInMaintenanceStates()); AgentAttache old = null; @@ -296,16 +295,16 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } @Override - protected boolean handleDisconnectWithoutInvestigation(AgentAttache attache, Status.Event event, boolean transitState, boolean removeAgent) { + protected boolean handleDisconnectWithoutInvestigation(final AgentAttache attache, final Status.Event event, final boolean transitState, final boolean removeAgent) { return handleDisconnect(attache, event, false, true, removeAgent); } @Override - protected boolean handleDisconnectWithInvestigation(AgentAttache attache, Status.Event event) { + protected boolean handleDisconnectWithInvestigation(final AgentAttache attache, final Status.Event event) { return handleDisconnect(attache, event, true, true, true); } - protected boolean handleDisconnect(AgentAttache agent, Status.Event event, boolean investigate, boolean broadcast, boolean removeAgent) { + protected boolean handleDisconnect(final AgentAttache agent, final Status.Event event, final boolean investigate, final boolean broadcast, final boolean removeAgent) { boolean res; if (!investigate) { res = super.handleDisconnectWithoutInvestigation(agent, event, true, removeAgent); @@ -324,16 +323,16 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } @Override - public boolean executeUserRequest(long hostId, Event event) throws AgentUnavailableException { + public boolean executeUserRequest(final long hostId, final Event event) throws AgentUnavailableException { if (event == Event.AgentDisconnected) { if (s_logger.isDebugEnabled()) { s_logger.debug("Received agent disconnect event for host " + hostId); } - AgentAttache attache = findAttache(hostId); + final AgentAttache attache = findAttache(hostId); if (attache != null) { // don't process disconnect if the host is being rebalanced if (isAgentRebalanceEnabled()) { - HostTransferMapVO transferVO = _hostTransferDao.findById(hostId); + final HostTransferMapVO transferVO = _hostTransferDao.findById(hostId); if (transferVO != null) { if (transferVO.getFutureOwner() == _nodeId && transferVO.getState() == HostTransferState.TransferStarted) { s_logger.debug("Not processing " + Event.AgentDisconnected + " event for the host id=" + hostId + " as the host is being connected to " + @@ -368,7 +367,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust if (result != null) { return result; } - } catch (AgentUnavailableException e) { + } catch (final AgentUnavailableException e) { s_logger.debug("cannot propagate agent reconnect because agent is not available", e); return false; } @@ -376,9 +375,9 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust return super.reconnect(hostId); } - public void notifyNodesInCluster(AgentAttache attache) { + public void notifyNodesInCluster(final AgentAttache attache) { s_logger.debug("Notifying other nodes of to disconnect"); - Command[] cmds = new Command[] {new ChangeAgentCommand(attache.getId(), Event.AgentDisconnected)}; + final Command[] cmds = new Command[] {new ChangeAgentCommand(attache.getId(), Event.AgentDisconnected)}; _clusterMgr.broadcast(attache.getId(), _gson.toJson(cmds)); } @@ -387,26 +386,26 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust if (s_logger.isDebugEnabled()) { s_logger.debug("Notifying other MS nodes to run host scan task"); } - Command[] cmds = new Command[] {new ScheduleHostScanTaskCommand()}; + final Command[] cmds = new Command[] {new ScheduleHostScanTaskCommand()}; _clusterMgr.broadcast(0, _gson.toJson(cmds)); } - protected static void logT(byte[] bytes, final String msg) { + protected static void logT(final byte[] bytes, final String msg) { s_logger.trace("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " + (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg); } - protected static void logD(byte[] bytes, final String msg) { + protected static void logD(final byte[] bytes, final String msg) { s_logger.debug("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " + (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg); } - protected static void logI(byte[] bytes, final String msg) { + protected static void logI(final byte[] bytes, final String msg) { s_logger.info("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " + (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg); } - public boolean routeToPeer(String peer, byte[] bytes) { + public boolean routeToPeer(final String peer, final byte[] bytes) { int i = 0; SocketChannel ch = null; SSLEngine sslEngine = null; @@ -432,7 +431,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } Link.write(ch, new ByteBuffer[] {ByteBuffer.wrap(bytes)}, sslEngine); return true; - } catch (IOException e) { + } catch (final IOException e) { try { logI(bytes, "Unable to route to peer: " + Request.parse(bytes).toString() + " due to " + e.getMessage()); } catch (ClassNotFoundException | UnsupportedVersionException ex) { @@ -445,28 +444,28 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust return false; } - public String findPeer(long hostId) { + public String findPeer(final long hostId) { return getPeerName(hostId); } - public SSLEngine getSSLEngine(String peerName) { + public SSLEngine getSSLEngine(final String peerName) { return _sslEngines.get(peerName); } - public void cancel(String peerName, long hostId, long sequence, String reason) { - CancelCommand cancel = new CancelCommand(sequence, reason); - Request req = new Request(hostId, _nodeId, cancel, true); + public void cancel(final String peerName, final long hostId, final long sequence, final String reason) { + final CancelCommand cancel = new CancelCommand(sequence, reason); + final Request req = new Request(hostId, _nodeId, cancel, true); req.setControl(true); routeToPeer(peerName, req.getBytes()); } - public void closePeer(String peerName) { + public void closePeer(final String peerName) { synchronized (_peers) { - SocketChannel ch = _peers.get(peerName); + final SocketChannel ch = _peers.get(peerName); if (ch != null) { try { ch.close(); - } catch (IOException e) { + } catch (final IOException e) { s_logger.warn("Unable to close peer socket connection to " + peerName); } } @@ -475,29 +474,29 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } } - public SocketChannel connectToPeer(String peerName, SocketChannel prevCh) { + public SocketChannel connectToPeer(final String peerName, final SocketChannel prevCh) { synchronized (_peers) { - SocketChannel ch = _peers.get(peerName); + final SocketChannel ch = _peers.get(peerName); SSLEngine sslEngine = null; if (prevCh != null) { try { prevCh.close(); - } catch (Exception e) { + } catch (final Exception e) { s_logger.info("[ignored]" + "failed to get close resource for previous channel Socket: " + e.getLocalizedMessage()); } } if (ch == null || ch == prevCh) { - ManagementServerHost ms = _clusterMgr.getPeer(peerName); + final ManagementServerHost ms = _clusterMgr.getPeer(peerName); if (ms == null) { s_logger.info("Unable to find peer: " + peerName); return null; } - String ip = ms.getServiceIP(); + final String ip = ms.getServiceIP(); InetAddress addr; try { addr = InetAddress.getByName(ip); - } catch (UnknownHostException e) { + } catch (final UnknownHostException e) { throw new CloudRuntimeException("Unable to resolve " + ip); } SocketChannel ch1 = null; @@ -507,14 +506,14 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust ch1.socket().setKeepAlive(true); ch1.socket().setSoTimeout(60 * 1000); try { - SSLContext sslContext = Link.initSSLContext(true); + final SSLContext sslContext = Link.initSSLContext(true); sslEngine = sslContext.createSSLEngine(ip, Port.value()); sslEngine.setUseClientMode(true); sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols())); Link.doHandshake(ch1, sslEngine, true); s_logger.info("SSL: Handshake done"); - } catch (Exception e) { + } catch (final Exception e) { ch1.close(); throw new IOException("SSL: Fail to init SSL! " + e); } @@ -524,10 +523,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust _peers.put(peerName, ch1); _sslEngines.put(peerName, sslEngine); return ch1; - } catch (IOException e) { + } catch (final IOException e) { try { ch1.close(); - } catch (IOException ex) { + } catch (final IOException ex) { s_logger.error("failed to close failed peer socket: " + ex); } s_logger.warn("Unable to connect to peer management server: " + peerName + ", ip: " + ip + " due to " + e.getMessage(), e); @@ -542,8 +541,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } } - public SocketChannel connectToPeer(long hostId, SocketChannel prevCh) { - String peerName = getPeerName(hostId); + public SocketChannel connectToPeer(final long hostId, final SocketChannel prevCh) { + final String peerName = getPeerName(hostId); if (peerName == null) { return null; } @@ -553,8 +552,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust @Override protected AgentAttache getAttache(final Long hostId) throws AgentUnavailableException { - assert (hostId != null) : "Who didn't check their id value?"; - HostVO host = _hostDao.findById(hostId); + assert hostId != null : "Who didn't check their id value?"; + final HostVO host = _hostDao.findById(hostId); if (host == null) { throw new AgentUnavailableException("Can't find the host ", hostId); } @@ -569,7 +568,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } } if (agent == null) { - AgentUnavailableException ex = new AgentUnavailableException("Host with specified id is not in the right state: " + host.getStatus(), hostId); + final AgentUnavailableException ex = new AgentUnavailableException("Host with specified id is not in the right state: " + host.getStatus(), hostId); ex.addProxyObject(_entityMgr.findById(Host.class, hostId).getUuid()); throw ex; } @@ -580,11 +579,11 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust @Override public boolean stop() { if (_peers != null) { - for (SocketChannel ch : _peers.values()) { + for (final SocketChannel ch : _peers.values()) { try { s_logger.info("Closing: " + ch.toString()); ch.close(); - } catch (IOException e) { + } catch (final IOException e) { s_logger.info("[ignored] error on closing channel: " +ch.toString(), e); } } @@ -606,13 +605,13 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust public class ClusteredAgentHandler extends AgentHandler { - public ClusteredAgentHandler(Task.Type type, Link link, byte[] data) { + public ClusteredAgentHandler(final Task.Type type, final Link link, final byte[] data) { super(type, link, data); } @Override - protected void doTask(final Task task) throws Exception { - TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB); + protected void doTask(final Task task) throws TaskExecutionException { + final TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB); try { if (task.getType() != Task.Type.DATA) { super.doTask(task); @@ -620,37 +619,37 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } final byte[] data = task.getData(); - Version ver = Request.getVersion(data); + final Version ver = Request.getVersion(data); if (ver.ordinal() != Version.v1.ordinal() && ver.ordinal() != Version.v3.ordinal()) { s_logger.warn("Wrong version for clustered agent request"); super.doTask(task); return; } - long hostId = Request.getAgentId(data); - Link link = task.getLink(); + final long hostId = Request.getAgentId(data); + final Link link = task.getLink(); if (Request.fromServer(data)) { - AgentAttache agent = findAttache(hostId); + final AgentAttache agent = findAttache(hostId); if (Request.isControl(data)) { if (agent == null) { logD(data, "No attache to process cancellation"); return; } - Request req = Request.parse(data); - Command[] cmds = req.getCommands(); - CancelCommand cancel = (CancelCommand)cmds[0]; + final Request req = Request.parse(data); + final Command[] cmds = req.getCommands(); + final CancelCommand cancel = (CancelCommand)cmds[0]; if (s_logger.isDebugEnabled()) { logD(data, "Cancel request received"); } agent.cancel(cancel.getSequence()); final Long current = agent._currentSequence; // if the request is the current request, always have to trigger sending next request in -// sequence, + // sequence, // otherwise the agent queue will be blocked - if (req.executeInSequence() && (current != null && current == Request.getSequence(data))) { + if (req.executeInSequence() && current != null && current == Request.getSequence(data)) { agent.sendNext(Request.getSequence(data)); } return; @@ -665,29 +664,29 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust // route it to the agent. // But we have the serialize the control commands here so we have // to deserialize this and send it through the agent attache. - Request req = Request.parse(data); + final Request req = Request.parse(data); agent.send(req, null); return; } else { if (agent instanceof Routable) { - Routable cluster = (Routable)agent; + final Routable cluster = (Routable)agent; cluster.routeToAgent(data); } else { agent.send(Request.parse(data)); } return; } - } catch (AgentUnavailableException e) { + } catch (final AgentUnavailableException e) { logD(data, e.getMessage()); cancel(Long.toString(Request.getManagementServerId(data)), hostId, Request.getSequence(data), e.getMessage()); } } else { - long mgmtId = Request.getManagementServerId(data); + final long mgmtId = Request.getManagementServerId(data); if (mgmtId != -1 && mgmtId != _nodeId) { routeToPeer(Long.toString(mgmtId), data); if (Request.requiresSequentialExecution(data)) { - AgentAttache attache = (AgentAttache)link.attachment(); + final AgentAttache attache = (AgentAttache)link.attachment(); if (attache != null) { attache.sendNext(Request.getSequence(data)); } else if (s_logger.isDebugEnabled()) { @@ -701,7 +700,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } else { // received an answer. final Response response = Response.parse(data); - AgentAttache attache = findAttache(response.getAgentId()); + final AgentAttache attache = findAttache(response.getAgentId()); if (attache == null) { s_logger.info("SeqA " + response.getAgentId() + "-" + response.getSequence() + "Unable to find attache to forward " + response.toString()); return; @@ -713,6 +712,14 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust return; } } + } catch (final ClassNotFoundException e) { + final String message = String.format("ClassNotFoundException occured when executing taks! Error '%s'", e.getMessage()); + s_logger.error(message); + throw new TaskExecutionException(message, e); + } catch (final UnsupportedVersionException e) { + final String message = String.format("UnsupportedVersionException occured when executing taks! Error '%s'", e.getMessage()); + s_logger.error(message); + throw new TaskExecutionException(message, e); } finally { txn.close(); } @@ -720,14 +727,14 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } @Override - public void onManagementNodeJoined(List nodeList, long selfNodeId) { + public void onManagementNodeJoined(final List nodeList, final long selfNodeId) { } @Override - public void onManagementNodeLeft(List nodeList, long selfNodeId) { - for (ManagementServerHost vo : nodeList) { + public void onManagementNodeLeft(final List nodeList, final long selfNodeId) { + for (final ManagementServerHost vo : nodeList) { s_logger.info("Marking hosts as disconnected on Management server" + vo.getMsid()); - long lastPing = (System.currentTimeMillis() >> 10) - getTimeout(); + final long lastPing = (System.currentTimeMillis() >> 10) - getTimeout(); _hostDao.markHostsAsDisconnected(vo.getMsid(), lastPing); s_logger.info("Deleting entries from op_host_transfer table for Management server " + vo.getMsid()); cleanupTransferMap(vo.getMsid()); @@ -739,7 +746,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } @Override - public void removeAgent(AgentAttache attache, Status nextState) { + public void removeAgent(final AgentAttache attache, final Status nextState) { if (attache == null) { return; } @@ -748,7 +755,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } @Override - public boolean executeRebalanceRequest(long agentId, long currentOwnerId, long futureOwnerId, Event event) throws AgentUnavailableException, + public boolean executeRebalanceRequest(final long agentId, final long currentOwnerId, final long futureOwnerId, final Event event) throws AgentUnavailableException, OperationTimedoutException { boolean result = false; if (event == Event.RequestAgentRebalance) { @@ -756,7 +763,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } else if (event == Event.StartAgentRebalance) { try { result = rebalanceHost(agentId, currentOwnerId, futureOwnerId); - } catch (Exception e) { + } catch (final Exception e) { s_logger.warn("Unable to rebalance host id=" + agentId, e); } } @@ -795,7 +802,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } cancelled = true; } - } catch (Throwable e) { + } catch (final Throwable e) { s_logger.error("Unexpected exception " + e.toString(), e); } } @@ -803,11 +810,11 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust public void startRebalanceAgents() { s_logger.debug("Management server " + _nodeId + " is asking other peers to rebalance their agents"); - List allMS = _mshostDao.listBy(ManagementServerHost.State.Up); - QueryBuilder sc = QueryBuilder.create(HostVO.class); + final List allMS = _mshostDao.listBy(ManagementServerHost.State.Up); + final QueryBuilder sc = QueryBuilder.create(HostVO.class); sc.and(sc.entity().getManagementServerId(), Op.NNULL); sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing); - List allManagedAgents = sc.list(); + final List allManagedAgents = sc.list(); int avLoad = 0; @@ -828,11 +835,11 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust avLoad = 1; } - for (ManagementServerHostVO node : allMS) { + for (final ManagementServerHostVO node : allMS) { if (node.getMsid() != _nodeId) { List hostsToRebalance = new ArrayList(); - for (AgentLoadBalancerPlanner lbPlanner : _lbPlanners) { + for (final AgentLoadBalancerPlanner lbPlanner : _lbPlanners) { hostsToRebalance = lbPlanner.getHostsToRebalance(node.getMsid(), avLoad); if (hostsToRebalance != null && !hostsToRebalance.isEmpty()) { break; @@ -843,8 +850,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust if (hostsToRebalance != null && !hostsToRebalance.isEmpty()) { s_logger.debug("Found " + hostsToRebalance.size() + " hosts to rebalance from management server " + node.getMsid()); - for (HostVO host : hostsToRebalance) { - long hostId = host.getId(); + for (final HostVO host : hostsToRebalance) { + final long hostId = host.getId(); s_logger.debug("Asking management server " + node.getMsid() + " to give away host id=" + hostId); boolean result = true; @@ -856,23 +863,23 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust HostTransferMapVO transfer = null; try { transfer = _hostTransferDao.startAgentTransfering(hostId, node.getMsid(), _nodeId); - Answer[] answer = sendRebalanceCommand(node.getMsid(), hostId, node.getMsid(), _nodeId, Event.RequestAgentRebalance); + final Answer[] answer = sendRebalanceCommand(node.getMsid(), hostId, node.getMsid(), _nodeId, Event.RequestAgentRebalance); if (answer == null) { s_logger.warn("Failed to get host id=" + hostId + " from management server " + node.getMsid()); result = false; } - } catch (Exception ex) { + } catch (final Exception ex) { s_logger.warn("Failed to get host id=" + hostId + " from management server " + node.getMsid(), ex); result = false; } finally { if (transfer != null) { - HostTransferMapVO transferState = _hostTransferDao.findByIdAndFutureOwnerId(transfer.getId(), _nodeId); + final HostTransferMapVO transferState = _hostTransferDao.findByIdAndFutureOwnerId(transfer.getId(), _nodeId); if (!result && transferState != null && transferState.getState() == HostTransferState.TransferRequested) { if (s_logger.isDebugEnabled()) { s_logger.debug("Removing mapping from op_host_transfer as it failed to be set to transfer mode"); } // just remove the mapping (if exists) as nothing was done on the peer management -// server yet + // server yet _hostTransferDao.remove(transfer.getId()); } } @@ -885,31 +892,31 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } } - private Answer[] sendRebalanceCommand(long peer, long agentId, long currentOwnerId, long futureOwnerId, Event event) { - TransferAgentCommand transfer = new TransferAgentCommand(agentId, currentOwnerId, futureOwnerId, event); - Commands commands = new Commands(Command.OnError.Stop); + private Answer[] sendRebalanceCommand(final long peer, final long agentId, final long currentOwnerId, final long futureOwnerId, final Event event) { + final TransferAgentCommand transfer = new TransferAgentCommand(agentId, currentOwnerId, futureOwnerId, event); + final Commands commands = new Commands(Command.OnError.Stop); commands.addCommand(transfer); - Command[] cmds = commands.toCommands(); + final Command[] cmds = commands.toCommands(); try { if (s_logger.isDebugEnabled()) { s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer); } - String peerName = Long.toString(peer); - String cmdStr = _gson.toJson(cmds); - String ansStr = _clusterMgr.execute(peerName, agentId, cmdStr, true); - Answer[] answers = _gson.fromJson(ansStr, Answer[].class); + final String peerName = Long.toString(peer); + final String cmdStr = _gson.toJson(cmds); + final String ansStr = _clusterMgr.execute(peerName, agentId, cmdStr, true); + final Answer[] answers = _gson.fromJson(ansStr, Answer[].class); return answers; - } catch (Exception e) { + } catch (final Exception e) { s_logger.warn("Caught exception while talking to " + currentOwnerId, e); return null; } } - public String getPeerName(long agentHostId) { + public String getPeerName(final long agentHostId) { - HostVO host = _hostDao.findById(agentHostId); + final HostVO host = _hostDao.findById(agentHostId); if (host != null && host.getManagementServerId() != null) { if (_clusterMgr.getSelfPeerName().equals(Long.toString(host.getManagementServerId()))) { return null; @@ -920,7 +927,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust return null; } - public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException { + public Boolean propagateAgentEvent(final long agentId, final Event event) throws AgentUnavailableException { final String msPeer = getPeerName(agentId); if (msPeer == null) { return null; @@ -929,15 +936,15 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust if (s_logger.isDebugEnabled()) { s_logger.debug("Propagating agent change request event:" + event.toString() + " to agent:" + agentId); } - Command[] cmds = new Command[1]; + final Command[] cmds = new Command[1]; cmds[0] = new ChangeAgentCommand(agentId, event); - String ansStr = _clusterMgr.execute(msPeer, agentId, _gson.toJson(cmds), true); + final String ansStr = _clusterMgr.execute(msPeer, agentId, _gson.toJson(cmds), true); if (ansStr == null) { throw new AgentUnavailableException(agentId); } - Answer[] answers = _gson.fromJson(ansStr, Answer[].class); + final Answer[] answers = _gson.fromJson(ansStr, Answer[].class); if (s_logger.isDebugEnabled()) { s_logger.debug("Result for agent change is " + answers[0].getResult()); @@ -958,9 +965,9 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust if (_agentToTransferIds.size() > 0) { s_logger.debug("Found " + _agentToTransferIds.size() + " agents to transfer"); // for (Long hostId : _agentToTransferIds) { - for (Iterator iterator = _agentToTransferIds.iterator(); iterator.hasNext();) { - Long hostId = iterator.next(); - AgentAttache attache = findAttache(hostId); + for (final Iterator iterator = _agentToTransferIds.iterator(); iterator.hasNext();) { + final Long hostId = iterator.next(); + final AgentAttache attache = findAttache(hostId); // if the thread: // 1) timed out waiting for the host to reconnect @@ -968,8 +975,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust // 3) if the management server doesn't own the host any more // remove the host from re-balance list and delete from op_host_transfer DB // no need to do anything with the real attache as we haven't modified it yet - Date cutTime = DateUtil.currentGMTTime(); - HostTransferMapVO transferMap = + final Date cutTime = DateUtil.currentGMTTime(); + final HostTransferMapVO transferMap = _hostTransferDao.findActiveHostTransferMapByHostId(hostId, new Date(cutTime.getTime() - rebalanceTimeOut)); if (transferMap == null) { @@ -986,7 +993,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust continue; } - ManagementServerHostVO ms = _mshostDao.findByMsid(transferMap.getFutureOwner()); + final ManagementServerHostVO ms = _mshostDao.findByMsid(transferMap.getFutureOwner()); if (ms != null && ms.getState() != ManagementServerHost.State.Up) { s_logger.debug("Can't transfer host " + hostId + " as it's future owner is not in UP state: " + ms + ", skipping rebalance for the host"); @@ -999,7 +1006,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust iterator.remove(); try { _executor.execute(new RebalanceTask(hostId, transferMap.getInitialOwner(), transferMap.getFutureOwner())); - } catch (RejectedExecutionException ex) { + } catch (final RejectedExecutionException ex) { s_logger.warn("Failed to submit rebalance task for host id=" + hostId + "; postponing the execution"); continue; } @@ -1016,21 +1023,21 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } } - } catch (Throwable e) { + } catch (final Throwable e) { s_logger.error("Problem with the clustered agent transfer scan check!", e); } } }; } - private boolean setToWaitForRebalance(final long hostId, long currentOwnerId, long futureOwnerId) { + private boolean setToWaitForRebalance(final long hostId, final long currentOwnerId, final long futureOwnerId) { s_logger.debug("Adding agent " + hostId + " to the list of agents to transfer"); synchronized (_agentToTransferIds) { return _agentToTransferIds.add(hostId); } } - protected boolean rebalanceHost(final long hostId, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException { + protected boolean rebalanceHost(final long hostId, final long currentOwnerId, final long futureOwnerId) throws AgentUnavailableException { boolean result = true; if (currentOwnerId == _nodeId) { @@ -1040,12 +1047,12 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust return false; } try { - Answer[] answer = sendRebalanceCommand(futureOwnerId, hostId, currentOwnerId, futureOwnerId, Event.StartAgentRebalance); + final Answer[] answer = sendRebalanceCommand(futureOwnerId, hostId, currentOwnerId, futureOwnerId, Event.StartAgentRebalance); if (answer == null || !answer[0].getResult()) { result = false; } - } catch (Exception ex) { + } catch (final Exception ex) { s_logger.warn("Host " + hostId + " failed to connect to the management server " + futureOwnerId + " as a part of rebalance process", ex); result = false; } @@ -1059,13 +1066,13 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } } else if (futureOwnerId == _nodeId) { - HostVO host = _hostDao.findById(hostId); + final HostVO host = _hostDao.findById(hostId); try { if (s_logger.isDebugEnabled()) { s_logger.debug("Disconnecting host " + host.getId() + "(" + host.getName() + " as a part of rebalance process without notification"); } - AgentAttache attache = findAttache(hostId); + final AgentAttache attache = findAttache(hostId); if (attache != null) { result = handleDisconnect(attache, Event.AgentDisconnected, false, false, true); } @@ -1080,7 +1087,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust s_logger.warn("Failed to disconnect " + host.getId() + "(" + host.getName() + " as a part of rebalance process without notification"); } - } catch (Exception ex) { + } catch (final Exception ex) { s_logger.warn("Failed to load directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process due to:", ex); result = false; @@ -1098,21 +1105,21 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust return result; } - protected void finishRebalance(final long hostId, long futureOwnerId, Event event) { + protected void finishRebalance(final long hostId, final long futureOwnerId, final Event event) { - boolean success = (event == Event.RebalanceCompleted) ? true : false; + final boolean success = event == Event.RebalanceCompleted ? true : false; if (s_logger.isDebugEnabled()) { s_logger.debug("Finishing rebalancing for the agent " + hostId + " with event " + event); } - AgentAttache attache = findAttache(hostId); + final AgentAttache attache = findAttache(hostId); if (attache == null || !(attache instanceof ClusteredAgentAttache)) { s_logger.debug("Unable to find forward attache for the host id=" + hostId + ", assuming that the agent disconnected already"); _hostTransferDao.completeAgentTransfer(hostId); return; } - ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)attache; + final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)attache; if (success) { @@ -1124,7 +1131,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust while (requestToTransfer != null) { s_logger.debug("Forwarding request " + requestToTransfer.getSequence() + " held in transfer attache " + hostId + " from the management server " + _nodeId + " to " + futureOwnerId); - boolean routeResult = routeToPeer(Long.toString(futureOwnerId), requestToTransfer.getBytes()); + final boolean routeResult = routeToPeer(Long.toString(futureOwnerId), requestToTransfer.getBytes()); if (!routeResult) { logD(requestToTransfer.getBytes(), "Failed to route request to peer"); } @@ -1147,13 +1154,13 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust s_logger.debug("Management server " + _nodeId + " failed to rebalance agent " + hostId); _hostTransferDao.completeAgentTransfer(hostId); handleDisconnectWithoutInvestigation(findAttache(hostId), Event.RebalanceFailed, true, true); - } catch (Exception ex) { + } catch (final Exception ex) { s_logger.warn("Failed to reconnect host id=" + hostId + " as a part of failed rebalance task cleanup"); } } protected boolean startRebalance(final long hostId) { - HostVO host = _hostDao.findById(hostId); + final HostVO host = _hostDao.findById(hostId); if (host == null || host.getRemoved() != null) { s_logger.warn("Unable to find host record, fail start rebalancing process"); @@ -1161,10 +1168,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } synchronized (_agents) { - ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)_agents.get(hostId); + final ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)_agents.get(hostId); if (attache != null && attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) { handleDisconnectWithoutInvestigation(attache, Event.StartAgentRebalance, true, true); - ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)createAttache(hostId); + final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)createAttache(hostId); if (forwardAttache == null) { s_logger.warn("Unable to create a forward attache for the host " + hostId + " as a part of rebalance process"); return false; @@ -1186,15 +1193,15 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust return true; } - protected void cleanupTransferMap(long msId) { - List hostsJoingingCluster = _hostTransferDao.listHostsJoiningCluster(msId); + protected void cleanupTransferMap(final long msId) { + final List hostsJoingingCluster = _hostTransferDao.listHostsJoiningCluster(msId); - for (HostTransferMapVO hostJoingingCluster : hostsJoingingCluster) { + for (final HostTransferMapVO hostJoingingCluster : hostsJoingingCluster) { _hostTransferDao.remove(hostJoingingCluster.getId()); } - List hostsLeavingCluster = _hostTransferDao.listHostsLeavingCluster(msId); - for (HostTransferMapVO hostLeavingCluster : hostsLeavingCluster) { + final List hostsLeavingCluster = _hostTransferDao.listHostsLeavingCluster(msId); + for (final HostTransferMapVO hostLeavingCluster : hostsLeavingCluster) { _hostTransferDao.remove(hostLeavingCluster.getId()); } } @@ -1204,7 +1211,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust Long currentOwnerId = null; Long futureOwnerId = null; - public RebalanceTask(long hostId, long currentOwnerId, long futureOwnerId) { + public RebalanceTask(final long hostId, final long currentOwnerId, final long futureOwnerId) { this.hostId = hostId; this.currentOwnerId = currentOwnerId; this.futureOwnerId = futureOwnerId; @@ -1217,20 +1224,20 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust s_logger.debug("Rebalancing host id=" + hostId); } rebalanceHost(hostId, currentOwnerId, futureOwnerId); - } catch (Exception e) { + } catch (final Exception e) { s_logger.warn("Unable to rebalance host id=" + hostId, e); } } } - private String handleScheduleHostScanTaskCommand(ScheduleHostScanTaskCommand cmd) { + private String handleScheduleHostScanTaskCommand(final ScheduleHostScanTaskCommand cmd) { if (s_logger.isDebugEnabled()) { s_logger.debug("Intercepting resource manager command: " + _gson.toJson(cmd)); } try { scheduleHostScanTask(); - } catch (Exception e) { + } catch (final Exception e) { // Scheduling host scan task in peer MS is a best effort operation during host add, regular host scan // happens at fixed intervals anyways. So handling any exceptions that may be thrown s_logger.warn("Exception happened while trying to schedule host scan task on mgmt server " + _clusterMgr.getSelfPeerName() + @@ -1238,14 +1245,14 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust return null; } - Answer[] answers = new Answer[1]; + final Answer[] answers = new Answer[1]; answers[0] = new Answer(cmd, true, null); return _gson.toJson(answers); } - public Answer[] sendToAgent(Long hostId, Command[] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException { - Commands commands = new Commands(stopOnError ? Command.OnError.Stop : Command.OnError.Continue); - for (Command cmd : cmds) { + public Answer[] sendToAgent(final Long hostId, final Command[] cmds, final boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException { + final Commands commands = new Commands(stopOnError ? Command.OnError.Stop : Command.OnError.Continue); + for (final Command cmd : cmds) { commands.addCommand(cmd); } return send(hostId, commands); @@ -1258,7 +1265,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } @Override - public String dispatch(ClusterServicePdu pdu) { + public String dispatch(final ClusterServicePdu pdu) { if (s_logger.isDebugEnabled()) { s_logger.debug("Dispatch ->" + pdu.getAgentId() + ", json: " + pdu.getJsonPackage()); @@ -1267,13 +1274,13 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust Command[] cmds = null; try { cmds = _gson.fromJson(pdu.getJsonPackage(), Command[].class); - } catch (Throwable e) { - assert (false); + } catch (final Throwable e) { + assert false; s_logger.error("Excection in gson decoding : ", e); } if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { // intercepted - ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0]; + final ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0]; if (s_logger.isDebugEnabled()) { s_logger.debug("Intercepting command for agent change: agent " + cmd.getAgentId() + " event: " + cmd.getEvent()); @@ -1285,16 +1292,16 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust s_logger.debug("Result is " + result); } - } catch (AgentUnavailableException e) { + } catch (final AgentUnavailableException e) { s_logger.warn("Agent is unavailable", e); return null; } - Answer[] answers = new Answer[1]; + final Answer[] answers = new Answer[1]; answers[0] = new ChangeAgentAnswer(cmd, result); return _gson.toJson(answers); } else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) { - TransferAgentCommand cmd = (TransferAgentCommand)cmds[0]; + final TransferAgentCommand cmd = (TransferAgentCommand)cmds[0]; if (s_logger.isDebugEnabled()) { s_logger.debug("Intercepting command for agent rebalancing: agent " + cmd.getAgentId() + " event: " + cmd.getEvent()); @@ -1306,18 +1313,18 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust s_logger.debug("Result is " + result); } - } catch (AgentUnavailableException e) { + } catch (final AgentUnavailableException e) { s_logger.warn("Agent is unavailable", e); return null; - } catch (OperationTimedoutException e) { + } catch (final OperationTimedoutException e) { s_logger.warn("Operation timed out", e); return null; } - Answer[] answers = new Answer[1]; + final Answer[] answers = new Answer[1]; answers[0] = new Answer(cmd, result, null); return _gson.toJson(answers); } else if (cmds.length == 1 && cmds[0] instanceof PropagateResourceEventCommand) { - PropagateResourceEventCommand cmd = (PropagateResourceEventCommand)cmds[0]; + final PropagateResourceEventCommand cmd = (PropagateResourceEventCommand)cmds[0]; s_logger.debug("Intercepting command to propagate event " + cmd.getEvent().name() + " for host " + cmd.getHostId()); @@ -1325,29 +1332,29 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust try { result = _resourceMgr.executeUserRequest(cmd.getHostId(), cmd.getEvent()); s_logger.debug("Result is " + result); - } catch (AgentUnavailableException ex) { + } catch (final AgentUnavailableException ex) { s_logger.warn("Agent is unavailable", ex); return null; } - Answer[] answers = new Answer[1]; + final Answer[] answers = new Answer[1]; answers[0] = new Answer(cmd, result, null); return _gson.toJson(answers); } else if (cmds.length == 1 && cmds[0] instanceof ScheduleHostScanTaskCommand) { - ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand)cmds[0]; - String response = handleScheduleHostScanTaskCommand(cmd); + final ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand)cmds[0]; + final String response = handleScheduleHostScanTaskCommand(cmd); return response; } try { - long startTick = System.currentTimeMillis(); + final long startTick = System.currentTimeMillis(); if (s_logger.isDebugEnabled()) { s_logger.debug("Dispatch -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage()); } - Answer[] answers = sendToAgent(pdu.getAgentId(), cmds, pdu.isStopOnError()); + final Answer[] answers = sendToAgent(pdu.getAgentId(), cmds, pdu.isStopOnError()); if (answers != null) { - String jsonReturn = _gson.toJson(answers); + final String jsonReturn = _gson.toJson(answers); if (s_logger.isDebugEnabled()) { s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() + " in " + @@ -1361,9 +1368,9 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust (System.currentTimeMillis() - startTick) + " ms, return null result"); } } - } catch (AgentUnavailableException e) { + } catch (final AgentUnavailableException e) { s_logger.warn("Agent is unavailable", e); - } catch (OperationTimedoutException e) { + } catch (final OperationTimedoutException e) { s_logger.warn("Timed Out", e); } @@ -1372,11 +1379,11 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } - public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException { + public boolean executeAgentUserRequest(final long agentId, final Event event) throws AgentUnavailableException { return executeUserRequest(agentId, event); } - public boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException { + public boolean rebalanceAgent(final long agentId, final Event event, final long currentOwnerId, final long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException { return executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event); } @@ -1393,20 +1400,20 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust s_logger.trace("Agent rebalance task check, management server id:" + _nodeId); } // initiate agent lb task will be scheduled and executed only once, and only when number of agents -// loaded exceeds _connectedAgentsThreshold + // loaded exceeds _connectedAgentsThreshold if (!_agentLbHappened) { QueryBuilder sc = QueryBuilder.create(HostVO.class); sc.and(sc.entity().getManagementServerId(), Op.NNULL); sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing); - List allManagedRoutingAgents = sc.list(); + final List allManagedRoutingAgents = sc.list(); sc = QueryBuilder.create(HostVO.class); sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing); - List allAgents = sc.list(); - double allHostsCount = allAgents.size(); - double managedHostsCount = allManagedRoutingAgents.size(); + final List allAgents = sc.list(); + final double allHostsCount = allAgents.size(); + final double managedHostsCount = allManagedRoutingAgents.size(); if (allHostsCount > 0.0) { - double load = managedHostsCount / allHostsCount; + final double load = managedHostsCount / allHostsCount; if (load >= ConnectedAgentThreshold.value()) { s_logger.debug("Scheduling agent rebalancing task as the average agent load " + load + " is more than the threshold " + ConnectedAgentThreshold.value()); @@ -1418,7 +1425,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } } } - } catch (Throwable e) { + } catch (final Throwable e) { s_logger.error("Problem with the clustered agent transfer scan check!", e); } } @@ -1440,9 +1447,9 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust @Override public ConfigKey[] getConfigKeys() { - ConfigKey[] keys = super.getConfigKeys(); + final ConfigKey[] keys = super.getConfigKeys(); - List> keysLst = new ArrayList>(); + final List> keysLst = new ArrayList>(); keysLst.addAll(Arrays.asList(keys)); keysLst.add(EnableLB); keysLst.add(ConnectedAgentThreshold); diff --git a/utils/src/main/java/com/cloud/utils/SerialVersionUID.java b/utils/src/main/java/com/cloud/utils/SerialVersionUID.java index e4ea2174fbd..0683c8c1519 100644 --- a/utils/src/main/java/com/cloud/utils/SerialVersionUID.java +++ b/utils/src/main/java/com/cloud/utils/SerialVersionUID.java @@ -66,4 +66,6 @@ public interface SerialVersionUID { public static final long UnableDeleteHostException = Base | 0x29; public static final long AffinityConflictException = Base | 0x2a; public static final long JobCancellationException = Base | 0x2b; + public static final long NioConnectionException = Base | 0x2c; + public static final long TaskExecutionException = Base | 0x2d; } diff --git a/utils/src/main/java/com/cloud/utils/exception/NioConnectionException.java b/utils/src/main/java/com/cloud/utils/exception/NioConnectionException.java new file mode 100644 index 00000000000..e89b3d8b065 --- /dev/null +++ b/utils/src/main/java/com/cloud/utils/exception/NioConnectionException.java @@ -0,0 +1,48 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +package com.cloud.utils.exception; + +import com.cloud.utils.SerialVersionUID; + +/** + * Used by the NioConnection class to wrap-up its exceptions. + */ +public class NioConnectionException extends Exception { + private static final long serialVersionUID = SerialVersionUID.NioConnectionException; + + protected int csErrorCode; + + public NioConnectionException(final String msg, final Throwable cause) { + super(msg, cause); + setCSErrorCode(CSExceptionErrorCode.getCSErrCode(this.getClass().getName())); + } + + public NioConnectionException(final String msg) { + super(msg); + } + + public void setCSErrorCode(final int cserrcode) { + csErrorCode = cserrcode; + } + + public int getCSErrorCode() { + return csErrorCode; + } +} diff --git a/utils/src/main/java/com/cloud/utils/exception/TaskExecutionException.java b/utils/src/main/java/com/cloud/utils/exception/TaskExecutionException.java new file mode 100644 index 00000000000..be639baeaeb --- /dev/null +++ b/utils/src/main/java/com/cloud/utils/exception/TaskExecutionException.java @@ -0,0 +1,48 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +package com.cloud.utils.exception; + +import com.cloud.utils.SerialVersionUID; + +/** + * Used by the Task class to wrap-up its exceptions. + */ +public class TaskExecutionException extends Exception { + private static final long serialVersionUID = SerialVersionUID.NioConnectionException; + + protected int csErrorCode; + + public TaskExecutionException(final String msg, final Throwable cause) { + super(msg, cause); + setCSErrorCode(CSExceptionErrorCode.getCSErrCode(this.getClass().getName())); + } + + public TaskExecutionException(final String msg) { + super(msg); + } + + public void setCSErrorCode(final int cserrcode) { + csErrorCode = cserrcode; + } + + public int getCSErrorCode() { + return csErrorCode; + } +} diff --git a/utils/src/main/java/com/cloud/utils/nio/NioClient.java b/utils/src/main/java/com/cloud/utils/nio/NioClient.java index 2f742f99dc5..d989f306c55 100644 --- a/utils/src/main/java/com/cloud/utils/nio/NioClient.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioClient.java @@ -29,9 +29,8 @@ import java.security.GeneralSecurityException; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; -import org.apache.log4j.Logger; - import org.apache.cloudstack.utils.security.SSLUtils; +import org.apache.log4j.Logger; public class NioClient extends NioConnection { private static final Logger s_logger = Logger.getLogger(NioClient.class); @@ -40,12 +39,12 @@ public class NioClient extends NioConnection { protected String _bindAddress; protected SocketChannel _clientConnection; - public NioClient(String name, String host, int port, int workers, HandlerFactory factory) { + public NioClient(final String name, final String host, final int port, final int workers, final HandlerFactory factory) { super(name, port, workers, factory); _host = host; } - public void setBindAddress(String ipAddress) { + public void setBindAddress(final String ipAddress) { _bindAddress = ipAddress; } @@ -62,18 +61,18 @@ public class NioClient extends NioConnection { if (_bindAddress != null) { s_logger.info("Binding outbound interface at " + _bindAddress); - InetSocketAddress bindAddr = new InetSocketAddress(_bindAddress, 0); + final InetSocketAddress bindAddr = new InetSocketAddress(_bindAddress, 0); _clientConnection.socket().bind(bindAddr); } - InetSocketAddress peerAddr = new InetSocketAddress(_host, _port); + final InetSocketAddress peerAddr = new InetSocketAddress(_host, _port); _clientConnection.connect(peerAddr); SSLEngine sslEngine = null; // Begin SSL handshake in BLOCKING mode _clientConnection.configureBlocking(true); - SSLContext sslContext = Link.initSSLContext(true); + final SSLContext sslContext = Link.initSSLContext(true); sslEngine = sslContext.createSSLEngine(_host, _port); sslEngine.setUseClientMode(true); sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols())); @@ -83,32 +82,31 @@ public class NioClient extends NioConnection { s_logger.info("Connected to " + _host + ":" + _port); _clientConnection.configureBlocking(false); - Link link = new Link(peerAddr, this); + final Link link = new Link(peerAddr, this); link.setSSLEngine(sslEngine); - SelectionKey key = _clientConnection.register(_selector, SelectionKey.OP_READ); + final SelectionKey key = _clientConnection.register(_selector, SelectionKey.OP_READ); link.setKey(key); key.attach(link); // Notice we've already connected due to the handshake, so let's get the // remaining task done task = _factory.create(Task.Type.CONNECT, link, null); - } catch (GeneralSecurityException e) { + } catch (final GeneralSecurityException e) { _selector.close(); throw new IOException("Failed to initialise security", e); - } catch (IOException e) { + } catch (final IOException e) { _selector.close(); throw e; } - - _executor.execute(task); + _executor.submit(task); } @Override - protected void registerLink(InetSocketAddress saddr, Link link) { + protected void registerLink(final InetSocketAddress saddr, final Link link) { // don't do anything. } @Override - protected void unregisterLink(InetSocketAddress saddr) { + protected void unregisterLink(final InetSocketAddress saddr) { // don't do anything. } @@ -119,7 +117,5 @@ public class NioClient extends NioConnection { _clientConnection.close(); } s_logger.info("NioClient connection closed"); - } - -} +} \ No newline at end of file diff --git a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java index 4c663606818..ddc84cf18c8 100644 --- a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java @@ -27,6 +27,7 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; +import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; @@ -35,7 +36,10 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -43,21 +47,23 @@ import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; +import org.apache.cloudstack.utils.security.SSLUtils; import org.apache.log4j.Logger; -import org.apache.cloudstack.utils.security.SSLUtils; - import com.cloud.utils.concurrency.NamedThreadFactory; +import com.cloud.utils.exception.NioConnectionException; /** * NioConnection abstracts the NIO socket operations. The Java implementation * provides that. */ -public abstract class NioConnection implements Runnable { +public abstract class NioConnection implements Callable { private static final Logger s_logger = Logger.getLogger(NioConnection.class);; protected Selector _selector; - protected Thread _thread; + protected ExecutorService _threadExecutor; + protected Future _futureTask; + protected boolean _isRunning; protected boolean _isStartup; protected int _port; @@ -66,42 +72,48 @@ public abstract class NioConnection implements Runnable { protected String _name; protected ExecutorService _executor; - public NioConnection(String name, int port, int workers, HandlerFactory factory) { + public NioConnection(final String name, final int port, final int workers, final HandlerFactory factory) { _name = name; _isRunning = false; - _thread = null; _selector = null; _port = port; _factory = factory; _executor = new ThreadPoolExecutor(workers, 5 * workers, 1, TimeUnit.DAYS, new LinkedBlockingQueue(), new NamedThreadFactory(name + "-Handler")); } - public void start() { + public void start() throws NioConnectionException { _todos = new ArrayList(); - _thread = new Thread(this, _name + "-Selector"); - _isRunning = true; - _thread.start(); - // Wait until we got init() done - synchronized (_thread) { - try { - _thread.wait(); - } catch (InterruptedException e) { - s_logger.warn("Interrupted start thread ", e); - } + try { + init(); + } catch (final ConnectException e) { + s_logger.warn("Unable to connect to remote: is there a server running on port " + _port); + } catch (final IOException e) { + s_logger.error("Unable to initialize the threads.", e); + throw new NioConnectionException(e.getMessage(), e); + } catch (final Exception e) { + s_logger.error("Unable to initialize the threads due to unknown exception.", e); + throw new NioConnectionException(e.getMessage(), e); } + _isStartup = true; + + _threadExecutor = Executors.newSingleThreadExecutor(); + _futureTask = _threadExecutor.submit(this); + + _isRunning = true; } public void stop() { _executor.shutdown(); _isRunning = false; - if (_thread != null) { - _thread.interrupt(); + if (_threadExecutor != null) { + _futureTask.cancel(false); + _threadExecutor.shutdown(); } } public boolean isRunning() { - return _thread.isAlive(); + return !_futureTask.isDone(); } public boolean isStartup() { @@ -109,45 +121,28 @@ public abstract class NioConnection implements Runnable { } @Override - public void run() { - synchronized (_thread) { - try { - init(); - } catch (ConnectException e) { - s_logger.warn("Unable to connect to remote: is there a server running on port " + _port); - return; - } catch (IOException e) { - s_logger.error("Unable to initialize the threads.", e); - return; - } catch (Exception e) { - s_logger.error("Unable to initialize the threads due to unknown exception.", e); - return; - } - _isStartup = true; - _thread.notifyAll(); - } - + public Boolean call() throws NioConnectionException { while (_isRunning) { try { _selector.select(); // Someone is ready for I/O, get the ready keys - Set readyKeys = _selector.selectedKeys(); - Iterator i = readyKeys.iterator(); + final Set readyKeys = _selector.selectedKeys(); + final Iterator i = readyKeys.iterator(); if (s_logger.isTraceEnabled()) { s_logger.trace("Keys Processing: " + readyKeys.size()); } // Walk through the ready keys collection. while (i.hasNext()) { - SelectionKey sk = i.next(); + final SelectionKey sk = i.next(); i.remove(); if (!sk.isValid()) { if (s_logger.isTraceEnabled()) { s_logger.trace("Selection Key is invalid: " + sk.toString()); } - Link link = (Link)sk.attachment(); + final Link link = (Link)sk.attachment(); if (link != null) { link.terminated(); } else { @@ -167,13 +162,18 @@ public abstract class NioConnection implements Runnable { s_logger.trace("Keys Done Processing."); processTodos(); - } catch (Throwable e) { - s_logger.warn("Caught an exception but continuing on.", e); + } catch (final ClosedSelectorException e) { + /* + * Exception occurred when calling java.nio.channels.Selector.selectedKeys() method. It means the connection has not yet been established. Let's continue trying + * We do not log it here otherwise we will fill the disk with messages. + */ + } catch (final IOException e) { + s_logger.error("Agent will die due to this IOException!", e); + throw new NioConnectionException(e.getMessage(), e); } } - synchronized (_thread) { - _isStartup = false; - } + _isStartup = false; + return true; } abstract void init() throws IOException; @@ -182,11 +182,11 @@ public abstract class NioConnection implements Runnable { abstract void unregisterLink(InetSocketAddress saddr); - protected void accept(SelectionKey key) throws IOException { - ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel(); + protected void accept(final SelectionKey key) throws IOException { + final ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel(); - SocketChannel socketChannel = serverSocketChannel.accept(); - Socket socket = socketChannel.socket(); + final SocketChannel socketChannel = serverSocketChannel.accept(); + final Socket socket = socketChannel.socket(); socket.setKeepAlive(true); if (s_logger.isTraceEnabled()) { @@ -198,7 +198,7 @@ public abstract class NioConnection implements Runnable { SSLEngine sslEngine = null; try { - SSLContext sslContext = Link.initSSLContext(false); + final SSLContext sslContext = Link.initSSLContext(false); sslEngine = sslContext.createSSLEngine(); sslEngine.setUseClientMode(false); sslEngine.setNeedClientAuth(false); @@ -206,7 +206,7 @@ public abstract class NioConnection implements Runnable { Link.doHandshake(socketChannel, sslEngine, false); - } catch (Exception e) { + } catch (final Exception e) { if (s_logger.isTraceEnabled()) { s_logger.trace("Socket " + socket + " closed on read. Probably -1 returned: " + e.getMessage()); } @@ -219,53 +219,68 @@ public abstract class NioConnection implements Runnable { s_logger.trace("SSL: Handshake done"); } socketChannel.configureBlocking(false); - InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress(); - Link link = new Link(saddr, this); + final InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress(); + final Link link = new Link(saddr, this); link.setSSLEngine(sslEngine); link.setKey(socketChannel.register(key.selector(), SelectionKey.OP_READ, link)); - Task task = _factory.create(Task.Type.CONNECT, link, null); + final Task task = _factory.create(Task.Type.CONNECT, link, null); registerLink(saddr, link); - _executor.execute(task); - } - protected void terminate(SelectionKey key) { - Link link = (Link)key.attachment(); - closeConnection(key); - if (link != null) { - link.terminated(); - Task task = _factory.create(Task.Type.DISCONNECT, link, null); - unregisterLink(link.getSocketAddress()); - _executor.execute(task); + try { + _executor.submit(task); + } catch (final Exception e) { + s_logger.warn("Exception occurred when submitting the task", e); } } - protected void read(SelectionKey key) throws IOException { - Link link = (Link)key.attachment(); + protected void terminate(final SelectionKey key) { + final Link link = (Link)key.attachment(); + closeConnection(key); + if (link != null) { + link.terminated(); + final Task task = _factory.create(Task.Type.DISCONNECT, link, null); + unregisterLink(link.getSocketAddress()); + + try { + _executor.submit(task); + } catch (final Exception e) { + s_logger.warn("Exception occurred when submitting the task", e); + } + } + } + + protected void read(final SelectionKey key) throws IOException { + final Link link = (Link)key.attachment(); try { - SocketChannel socketChannel = (SocketChannel)key.channel(); + final SocketChannel socketChannel = (SocketChannel)key.channel(); if (s_logger.isTraceEnabled()) { s_logger.trace("Reading from: " + socketChannel.socket().toString()); } - byte[] data = link.read(socketChannel); + final byte[] data = link.read(socketChannel); if (data == null) { if (s_logger.isTraceEnabled()) { s_logger.trace("Packet is incomplete. Waiting for more."); } return; } - Task task = _factory.create(Task.Type.DATA, link, data); - _executor.execute(task); - } catch (Exception e) { + final Task task = _factory.create(Task.Type.DATA, link, data); + + try { + _executor.submit(task); + } catch (final Exception e) { + s_logger.warn("Exception occurred when submitting the task", e); + } + } catch (final Exception e) { logDebug(e, key, 1); terminate(key); } } - protected void logTrace(Exception e, SelectionKey key, int loc) { + protected void logTrace(final Exception e, final SelectionKey key, final int loc) { if (s_logger.isTraceEnabled()) { Socket socket = null; if (key != null) { - SocketChannel ch = (SocketChannel)key.channel(); + final SocketChannel ch = (SocketChannel)key.channel(); if (ch != null) { socket = ch.socket(); } @@ -275,11 +290,11 @@ public abstract class NioConnection implements Runnable { } } - protected void logDebug(Exception e, SelectionKey key, int loc) { + protected void logDebug(final Exception e, final SelectionKey key, final int loc) { if (s_logger.isDebugEnabled()) { Socket socket = null; if (key != null) { - SocketChannel ch = (SocketChannel)key.channel(); + final SocketChannel ch = (SocketChannel)key.channel(); if (ch != null) { socket = ch.socket(); } @@ -304,113 +319,122 @@ public abstract class NioConnection implements Runnable { s_logger.trace("Todos Processing: " + todos.size()); } SelectionKey key; - for (ChangeRequest todo : todos) { + for (final ChangeRequest todo : todos) { switch (todo.type) { - case ChangeRequest.CHANGEOPS: - try { - key = (SelectionKey)todo.key; - if (key != null && key.isValid()) { - if (todo.att != null) { - key.attach(todo.att); - Link link = (Link)todo.att; - link.setKey(key); - } - key.interestOps(todo.ops); - } - } catch (CancelledKeyException e) { - s_logger.debug("key has been cancelled"); - } - break; - case ChangeRequest.REGISTER: - try { - key = ((SocketChannel)(todo.key)).register(_selector, todo.ops, todo.att); + case ChangeRequest.CHANGEOPS: + try { + key = (SelectionKey)todo.key; + if (key != null && key.isValid()) { if (todo.att != null) { - Link link = (Link)todo.att; + key.attach(todo.att); + final Link link = (Link)todo.att; link.setKey(key); } - } catch (ClosedChannelException e) { - s_logger.warn("Couldn't register socket: " + todo.key); - try { - ((SocketChannel)(todo.key)).close(); - } catch (IOException ignore) { - s_logger.info("[ignored] socket channel"); - } finally { - Link link = (Link)todo.att; - link.terminated(); - } + key.interestOps(todo.ops); } - break; - case ChangeRequest.CLOSE: - if (s_logger.isTraceEnabled()) { - s_logger.trace("Trying to close " + todo.key); + } catch (final CancelledKeyException e) { + s_logger.debug("key has been cancelled"); + } + break; + case ChangeRequest.REGISTER: + try { + key = ((SocketChannel)todo.key).register(_selector, todo.ops, todo.att); + if (todo.att != null) { + final Link link = (Link)todo.att; + link.setKey(key); } - key = (SelectionKey)todo.key; - closeConnection(key); - if (key != null) { - Link link = (Link)key.attachment(); - if (link != null) { - link.terminated(); - } + } catch (final ClosedChannelException e) { + s_logger.warn("Couldn't register socket: " + todo.key); + try { + ((SocketChannel)todo.key).close(); + } catch (final IOException ignore) { + s_logger.info("[ignored] socket channel"); + } finally { + final Link link = (Link)todo.att; + link.terminated(); } - break; - default: - s_logger.warn("Shouldn't be here"); - throw new RuntimeException("Shouldn't be here"); + } + break; + case ChangeRequest.CLOSE: + if (s_logger.isTraceEnabled()) { + s_logger.trace("Trying to close " + todo.key); + } + key = (SelectionKey)todo.key; + closeConnection(key); + if (key != null) { + final Link link = (Link)key.attachment(); + if (link != null) { + link.terminated(); + } + } + break; + default: + s_logger.warn("Shouldn't be here"); + throw new RuntimeException("Shouldn't be here"); } } s_logger.trace("Todos Done processing"); } - protected void connect(SelectionKey key) throws IOException { - SocketChannel socketChannel = (SocketChannel)key.channel(); + protected void connect(final SelectionKey key) throws IOException { + final SocketChannel socketChannel = (SocketChannel)key.channel(); try { socketChannel.finishConnect(); key.interestOps(SelectionKey.OP_READ); - Socket socket = socketChannel.socket(); + final Socket socket = socketChannel.socket(); if (!socket.getKeepAlive()) { socket.setKeepAlive(true); } if (s_logger.isDebugEnabled()) { s_logger.debug("Connected to " + socket); } - Link link = new Link((InetSocketAddress)socket.getRemoteSocketAddress(), this); + final Link link = new Link((InetSocketAddress)socket.getRemoteSocketAddress(), this); link.setKey(key); key.attach(link); - Task task = _factory.create(Task.Type.CONNECT, link, null); - _executor.execute(task); - } catch (IOException e) { + final Task task = _factory.create(Task.Type.CONNECT, link, null); + + try { + _executor.submit(task); + } catch (final Exception e) { + s_logger.warn("Exception occurred when submitting the task", e); + } + } catch (final IOException e) { logTrace(e, key, 2); terminate(key); } } - protected void scheduleTask(Task task) { - _executor.execute(task); + protected void scheduleTask(final Task task) { + try { + _executor.submit(task); + } catch (final Exception e) { + s_logger.warn("Exception occurred when submitting the task", e); + } } - protected void write(SelectionKey key) throws IOException { - Link link = (Link)key.attachment(); + protected void write(final SelectionKey key) throws IOException { + final Link link = (Link)key.attachment(); try { if (s_logger.isTraceEnabled()) { s_logger.trace("Writing to " + link.getSocketAddress().toString()); } - boolean close = link.write((SocketChannel)key.channel()); + final boolean close = link.write((SocketChannel)key.channel()); if (close) { closeConnection(key); link.terminated(); } else { key.interestOps(SelectionKey.OP_READ); } - } catch (Exception e) { + } catch (final Exception e) { logDebug(e, key, 3); terminate(key); } } - protected void closeConnection(SelectionKey key) { + protected void closeConnection(final SelectionKey key) { if (key != null) { - SocketChannel channel = (SocketChannel)key.channel(); + final SocketChannel channel = (SocketChannel)key.channel(); key.cancel(); try { if (channel != null) { @@ -419,30 +443,30 @@ public abstract class NioConnection implements Runnable { } channel.close(); } - } catch (IOException ignore) { + } catch (final IOException ignore) { s_logger.info("[ignored] channel"); } } } - public void register(int ops, SocketChannel key, Object att) { - ChangeRequest todo = new ChangeRequest(key, ChangeRequest.REGISTER, ops, att); + public void register(final int ops, final SocketChannel key, final Object att) { + final ChangeRequest todo = new ChangeRequest(key, ChangeRequest.REGISTER, ops, att); synchronized (this) { _todos.add(todo); } _selector.wakeup(); } - public void change(int ops, SelectionKey key, Object att) { - ChangeRequest todo = new ChangeRequest(key, ChangeRequest.CHANGEOPS, ops, att); + public void change(final int ops, final SelectionKey key, final Object att) { + final ChangeRequest todo = new ChangeRequest(key, ChangeRequest.CHANGEOPS, ops, att); synchronized (this) { _todos.add(todo); } _selector.wakeup(); } - public void close(SelectionKey key) { - ChangeRequest todo = new ChangeRequest(key, ChangeRequest.CLOSE, 0, null); + public void close(final SelectionKey key) { + final ChangeRequest todo = new ChangeRequest(key, ChangeRequest.CLOSE, 0, null); synchronized (this) { _todos.add(todo); } @@ -466,7 +490,7 @@ public abstract class NioConnection implements Runnable { public int ops; public Object att; - public ChangeRequest(Object key, int type, int ops, Object att) { + public ChangeRequest(final Object key, final int type, final int ops, final Object att) { this.key = key; this.type = type; this.ops = ops; diff --git a/utils/src/main/java/com/cloud/utils/nio/NioServer.java b/utils/src/main/java/com/cloud/utils/nio/NioServer.java index 98a4a51dbfa..539c2bb13d8 100644 --- a/utils/src/main/java/com/cloud/utils/nio/NioServer.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioServer.java @@ -37,7 +37,7 @@ public class NioServer extends NioConnection { protected WeakHashMap _links; - public NioServer(String name, int port, int workers, HandlerFactory factory) { + public NioServer(final String name, final int port, final int workers, final HandlerFactory factory) { super(name, port, workers, factory); _localAddr = null; _links = new WeakHashMap(1024); @@ -68,12 +68,12 @@ public class NioServer extends NioConnection { } @Override - protected void registerLink(InetSocketAddress addr, Link link) { + protected void registerLink(final InetSocketAddress addr, final Link link) { _links.put(addr, link); } @Override - protected void unregisterLink(InetSocketAddress saddr) { + protected void unregisterLink(final InetSocketAddress saddr) { _links.remove(saddr); } @@ -86,8 +86,8 @@ public class NioServer extends NioConnection { * @param data * @return null if not sent. attach object in link if sent. */ - public Object send(InetSocketAddress saddr, byte[] data) throws ClosedChannelException { - Link link = _links.get(saddr); + public Object send(final InetSocketAddress saddr, final byte[] data) throws ClosedChannelException { + final Link link = _links.get(saddr); if (link == null) { return null; } diff --git a/utils/src/main/java/com/cloud/utils/nio/Task.java b/utils/src/main/java/com/cloud/utils/nio/Task.java index c77c7035687..60228ef9412 100644 --- a/utils/src/main/java/com/cloud/utils/nio/Task.java +++ b/utils/src/main/java/com/cloud/utils/nio/Task.java @@ -19,14 +19,14 @@ package com.cloud.utils.nio; -import org.apache.log4j.Logger; +import java.util.concurrent.Callable; + +import com.cloud.utils.exception.TaskExecutionException; /** * Task represents one todo item for the AgentManager or the AgentManager - * */ -public abstract class Task implements Runnable { - private static final Logger s_logger = Logger.getLogger(Task.class); +public abstract class Task implements Callable { public enum Type { CONNECT, // Process a new connection. @@ -40,13 +40,13 @@ public abstract class Task implements Runnable { Type _type; Link _link; - public Task(Type type, Link link, byte[] data) { + public Task(final Type type, final Link link, final byte[] data) { _data = data; _type = type; _link = link; } - public Task(Type type, Link link, Object data) { + public Task(final Type type, final Link link, final Object data) { _data = data; _type = type; _link = link; @@ -76,14 +76,11 @@ public abstract class Task implements Runnable { return _type.toString(); } - abstract protected void doTask(Task task) throws Exception; + abstract protected void doTask(Task task) throws TaskExecutionException; @Override - public final void run() { - try { - doTask(this); - } catch (Throwable e) { - s_logger.warn("Caught the following exception but pushing on", e); - } + public Boolean call() throws TaskExecutionException { + doTask(this); + return true; } -} +} \ No newline at end of file diff --git a/utils/src/test/java/com/cloud/utils/testcase/NioTest.java b/utils/src/test/java/com/cloud/utils/testcase/NioTest.java index fc166847dc2..d8510cfcac2 100644 --- a/utils/src/test/java/com/cloud/utils/testcase/NioTest.java +++ b/utils/src/test/java/com/cloud/utils/testcase/NioTest.java @@ -27,6 +27,7 @@ import junit.framework.TestCase; import org.apache.log4j.Logger; import org.junit.Assert; +import com.cloud.utils.exception.NioConnectionException; import com.cloud.utils.nio.HandlerFactory; import com.cloud.utils.nio.Link; import com.cloud.utils.nio.NioClient; @@ -56,7 +57,7 @@ public class NioTest extends TestCase { private boolean isTestsDone() { boolean result; synchronized (this) { - result = (_testCount == _completedCount); + result = _testCount == _completedCount; } return result; } @@ -81,16 +82,24 @@ public class NioTest extends TestCase { _completedCount = 0; _server = new NioServer("NioTestServer", 7777, 5, new NioTestServer()); - _server.start(); + try { + _server.start(); + } catch (final NioConnectionException e) { + fail(e.getMessage()); + } _client = new NioClient("NioTestServer", "127.0.0.1", 7777, 5, new NioTestClient()); - _client.start(); + try { + _client.start(); + } catch (final NioConnectionException e) { + fail(e.getMessage()); + } while (_clientLink == null) { try { s_logger.debug("Link is not up! Waiting ..."); Thread.sleep(1000); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } @@ -101,9 +110,9 @@ public class NioTest extends TestCase { public void tearDown() { while (!isTestsDone()) { try { - s_logger.debug(this._completedCount + "/" + this._testCount + " tests done. Waiting for completion"); + s_logger.debug(_completedCount + "/" + _testCount + " tests done. Waiting for completion"); Thread.sleep(1000); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } @@ -122,7 +131,7 @@ public class NioTest extends TestCase { s_logger.info("Server stopped."); } - protected void setClientLink(Link link) { + protected void setClientLink(final Link link) { _clientLink = link; } @@ -140,13 +149,13 @@ public class NioTest extends TestCase { getOneMoreTest(); _clientLink.send(_testBytes); s_logger.info("Client: Data sent"); - } catch (ClosedChannelException e) { + } catch (final ClosedChannelException e) { // TODO Auto-generated catch block e.printStackTrace(); } } - protected void doServerProcess(byte[] data) { + protected void doServerProcess(final byte[] data) { oneMoreTestDone(); Assert.assertArrayEquals(_testBytes, data); s_logger.info("Verify done."); @@ -155,13 +164,13 @@ public class NioTest extends TestCase { public class NioTestClient implements HandlerFactory { @Override - public Task create(Type type, Link link, byte[] data) { + public Task create(final Type type, final Link link, final byte[] data) { return new NioTestClientHandler(type, link, data); } public class NioTestClientHandler extends Task { - public NioTestClientHandler(Type type, Link link, byte[] data) { + public NioTestClientHandler(final Type type, final Link link, final byte[] data) { super(type, link, data); } @@ -186,13 +195,13 @@ public class NioTest extends TestCase { public class NioTestServer implements HandlerFactory { @Override - public Task create(Type type, Link link, byte[] data) { + public Task create(final Type type, final Link link, final byte[] data) { return new NioTestServerHandler(type, link, data); } public class NioTestServerHandler extends Task { - public NioTestServerHandler(Type type, Link link, byte[] data) { + public NioTestServerHandler(final Type type, final Link link, final byte[] data) { super(type, link, data); }