CLOUDSTACK-8822 - Replacing Runnable by Callable in the Taks and NioConnection classes

- All the sub-classes were also updated according to the changes in the super-classes
   - There were also code formatting changes
This commit is contained in:
wilderrodrigues 2015-09-08 12:12:55 +02:00
parent 2d90f18b82
commit 79a3f8c577
11 changed files with 587 additions and 433 deletions

View File

@ -35,9 +35,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.ConfigurationException; import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.cloudstack.managed.context.ManagedContextTimerTask; import org.apache.cloudstack.managed.context.ManagedContextTimerTask;
import org.apache.log4j.Logger;
import com.cloud.agent.api.AgentControlAnswer; import com.cloud.agent.api.AgentControlAnswer;
import com.cloud.agent.api.AgentControlCommand; import com.cloud.agent.api.AgentControlCommand;
@ -59,6 +58,8 @@ import com.cloud.utils.PropertiesUtil;
import com.cloud.utils.backoff.BackoffAlgorithm; import com.cloud.utils.backoff.BackoffAlgorithm;
import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.exception.NioConnectionException;
import com.cloud.utils.exception.TaskExecutionException;
import com.cloud.utils.nio.HandlerFactory; import com.cloud.utils.nio.HandlerFactory;
import com.cloud.utils.nio.Link; import com.cloud.utils.nio.Link;
import com.cloud.utils.nio.NioClient; import com.cloud.utils.nio.NioClient;
@ -121,11 +122,11 @@ public class Agent implements HandlerFactory, IAgentControl {
long _startupWait = _startupWaitDefault; long _startupWait = _startupWaitDefault;
boolean _reconnectAllowed = true; boolean _reconnectAllowed = true;
//For time sentitive task, e.g. PingTask //For time sentitive task, e.g. PingTask
private ThreadPoolExecutor _ugentTaskPool; private final ThreadPoolExecutor _ugentTaskPool;
ExecutorService _executor; ExecutorService _executor;
// for simulator use only // for simulator use only
public Agent(IAgentShell shell) { public Agent(final IAgentShell shell) {
_shell = shell; _shell = shell;
_link = null; _link = null;
@ -134,29 +135,29 @@ public class Agent implements HandlerFactory, IAgentControl {
Runtime.getRuntime().addShutdownHook(new ShutdownThread(this)); Runtime.getRuntime().addShutdownHook(new ShutdownThread(this));
_ugentTaskPool = _ugentTaskPool =
new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), new NamedThreadFactory( new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), new NamedThreadFactory(
"UgentTask")); "UgentTask"));
_executor = _executor =
new ThreadPoolExecutor(_shell.getWorkers(), 5 * _shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory( new ThreadPoolExecutor(_shell.getWorkers(), 5 * _shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(
"agentRequest-Handler")); "agentRequest-Handler"));
} }
public Agent(IAgentShell shell, int localAgentId, ServerResource resource) throws ConfigurationException { public Agent(final IAgentShell shell, final int localAgentId, final ServerResource resource) throws ConfigurationException {
_shell = shell; _shell = shell;
_resource = resource; _resource = resource;
_link = null; _link = null;
resource.setAgentControl(this); resource.setAgentControl(this);
String value = _shell.getPersistentProperty(getResourceName(), "id"); final String value = _shell.getPersistentProperty(getResourceName(), "id");
_id = value != null ? Long.parseLong(value) : null; _id = value != null ? Long.parseLong(value) : null;
s_logger.info("id is " + ((_id != null) ? _id : "")); s_logger.info("id is " + (_id != null ? _id : ""));
final Map<String, Object> params = PropertiesUtil.toMap(_shell.getProperties()); final Map<String, Object> params = PropertiesUtil.toMap(_shell.getProperties());
// merge with properties from command line to let resource access command line parameters // merge with properties from command line to let resource access command line parameters
for (Map.Entry<String, Object> cmdLineProp : _shell.getCmdLineProperties().entrySet()) { for (final Map.Entry<String, Object> cmdLineProp : _shell.getCmdLineProperties().entrySet()) {
params.put(cmdLineProp.getKey(), cmdLineProp.getValue()); params.put(cmdLineProp.getKey(), cmdLineProp.getValue());
} }
@ -172,15 +173,15 @@ public class Agent implements HandlerFactory, IAgentControl {
Runtime.getRuntime().addShutdownHook(new ShutdownThread(this)); Runtime.getRuntime().addShutdownHook(new ShutdownThread(this));
_ugentTaskPool = _ugentTaskPool =
new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), new NamedThreadFactory( new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), new NamedThreadFactory(
"UgentTask")); "UgentTask"));
_executor = _executor =
new ThreadPoolExecutor(_shell.getWorkers(), 5 * _shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory( new ThreadPoolExecutor(_shell.getWorkers(), 5 * _shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(
"agentRequest-Handler")); "agentRequest-Handler"));
s_logger.info("Agent [id = " + (_id != null ? _id : "new") + " : type = " + getResourceName() + " : zone = " + _shell.getZone() + " : pod = " + _shell.getPod() + s_logger.info("Agent [id = " + (_id != null ? _id : "new") + " : type = " + getResourceName() + " : zone = " + _shell.getZone() + " : pod = " + _shell.getPod() +
" : workers = " + _shell.getWorkers() + " : host = " + _shell.getHost() + " : port = " + _shell.getPort()); " : workers = " + _shell.getWorkers() + " : host = " + _shell.getHost() + " : port = " + _shell.getPort());
} }
public String getVersion() { public String getVersion() {
@ -188,7 +189,7 @@ public class Agent implements HandlerFactory, IAgentControl {
} }
public String getResourceGuid() { public String getResourceGuid() {
String guid = _shell.getGuid(); final String guid = _shell.getGuid();
return guid + "-" + getResourceName(); return guid + "-" + getResourceName();
} }
@ -222,11 +223,19 @@ public class Agent implements HandlerFactory, IAgentControl {
throw new CloudRuntimeException("Unable to start the resource: " + _resource.getName()); throw new CloudRuntimeException("Unable to start the resource: " + _resource.getName());
} }
_connection.start(); try {
_connection.start();
} catch (final NioConnectionException e) {
throw new CloudRuntimeException("Unable to start the connection!", e);
}
while (!_connection.isStartup()) { while (!_connection.isStartup()) {
_shell.getBackoffAlgorithm().waitBeforeRetry(); _shell.getBackoffAlgorithm().waitBeforeRetry();
_connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this); _connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this);
_connection.start(); try {
_connection.start();
} catch (final NioConnectionException e) {
throw new CloudRuntimeException("Unable to start the connection!", e);
}
} }
} }
@ -236,12 +245,12 @@ public class Agent implements HandlerFactory, IAgentControl {
final ShutdownCommand cmd = new ShutdownCommand(reason, detail); final ShutdownCommand cmd = new ShutdownCommand(reason, detail);
try { try {
if (_link != null) { if (_link != null) {
Request req = new Request((_id != null ? _id : -1), -1, cmd, false); final Request req = new Request(_id != null ? _id : -1, -1, cmd, false);
_link.send(req.toBytes()); _link.send(req.toBytes());
} }
} catch (final ClosedChannelException e) { } catch (final ClosedChannelException e) {
s_logger.warn("Unable to send: " + cmd.toString()); s_logger.warn("Unable to send: " + cmd.toString());
} catch (Exception e) { } catch (final Exception e) {
s_logger.warn("Unable to send: " + cmd.toString() + " due to exception: ", e); s_logger.warn("Unable to send: " + cmd.toString() + " due to exception: ", e);
} }
s_logger.debug("Sending shutdown to management server"); s_logger.debug("Sending shutdown to management server");
@ -294,13 +303,13 @@ public class Agent implements HandlerFactory, IAgentControl {
_watchList.clear(); _watchList.clear();
} }
} }
public synchronized void lockStartupTask(Link link) public synchronized void lockStartupTask(final Link link)
{ {
_startup = new StartupTask(link); _startup = new StartupTask(link);
_timer.schedule(_startup, _startupWait); _timer.schedule(_startup, _startupWait);
} }
public void sendStartup(Link link) { public void sendStartup(final Link link) {
final StartupCommand[] startup = _resource.initialize(); final StartupCommand[] startup = _resource.initialize();
if (startup != null) { if (startup != null) {
final Command[] commands = new Command[startup.length]; final Command[] commands = new Command[startup.length];
@ -323,7 +332,7 @@ public class Agent implements HandlerFactory, IAgentControl {
} }
} }
protected void setupStartupCommand(StartupCommand startup) { protected void setupStartupCommand(final StartupCommand startup) {
InetAddress addr; InetAddress addr;
try { try {
addr = InetAddress.getLocalHost(); addr = InetAddress.getLocalHost();
@ -349,7 +358,7 @@ public class Agent implements HandlerFactory, IAgentControl {
} }
@Override @Override
public Task create(Task.Type type, Link link, byte[] data) { public Task create(final Task.Type type, final Link link, final byte[] data) {
return new ServerHandler(type, link, data); return new ServerHandler(type, link, data);
} }
@ -391,19 +400,23 @@ public class Agent implements HandlerFactory, IAgentControl {
try { try {
_connection.cleanUp(); _connection.cleanUp();
} catch (IOException e) { } catch (final IOException e) {
s_logger.warn("Fail to clean up old connection. " + e); s_logger.warn("Fail to clean up old connection. " + e);
} }
_connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this); _connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this);
do { do {
s_logger.info("Reconnecting..."); s_logger.info("Reconnecting...");
_connection.start(); try {
_connection.start();
} catch (final NioConnectionException e) {
throw new CloudRuntimeException("Unable to start the connection!", e);
}
_shell.getBackoffAlgorithm().waitBeforeRetry(); _shell.getBackoffAlgorithm().waitBeforeRetry();
} while (!_connection.isStartup()); } while (!_connection.isStartup());
s_logger.info("Connected to the server"); s_logger.info("Connected to the server");
} }
public void processStartupAnswer(Answer answer, Response response, Link link) { public void processStartupAnswer(final Answer answer, final Response response, final Link link) {
boolean cancelled = false; boolean cancelled = false;
synchronized (this) { synchronized (this) {
if (_startup != null) { if (_startup != null) {
@ -450,7 +463,7 @@ public class Agent implements HandlerFactory, IAgentControl {
if (s_logger.isDebugEnabled()) { if (s_logger.isDebugEnabled()) {
if (!requestLogged) // ensures request is logged only once per method call if (!requestLogged) // ensures request is logged only once per method call
{ {
String requestMsg = request.toString(); final String requestMsg = request.toString();
if (requestMsg != null) { if (requestMsg != null) {
s_logger.debug("Request:" + requestMsg); s_logger.debug("Request:" + requestMsg);
} }
@ -464,7 +477,7 @@ public class Agent implements HandlerFactory, IAgentControl {
scheduleWatch(link, request, (long)watch.getInterval() * 1000, watch.getInterval() * 1000); scheduleWatch(link, request, (long)watch.getInterval() * 1000, watch.getInterval() * 1000);
answer = new Answer(cmd, true, null); answer = new Answer(cmd, true, null);
} else if (cmd instanceof ShutdownCommand) { } else if (cmd instanceof ShutdownCommand) {
ShutdownCommand shutdown = (ShutdownCommand)cmd; final ShutdownCommand shutdown = (ShutdownCommand)cmd;
s_logger.debug("Received shutdownCommand, due to: " + shutdown.getReason()); s_logger.debug("Received shutdownCommand, due to: " + shutdown.getReason());
cancelTasks(); cancelTasks();
_reconnectAllowed = false; _reconnectAllowed = false;
@ -481,7 +494,7 @@ public class Agent implements HandlerFactory, IAgentControl {
} else if (cmd instanceof AgentControlCommand) { } else if (cmd instanceof AgentControlCommand) {
answer = null; answer = null;
synchronized (_controlListeners) { synchronized (_controlListeners) {
for (IAgentControlListener listener : _controlListeners) { for (final IAgentControlListener listener : _controlListeners) {
answer = listener.processControlRequest(request, (AgentControlCommand)cmd); answer = listener.processControlRequest(request, (AgentControlCommand)cmd);
if (answer != null) { if (answer != null) {
break; break;
@ -527,7 +540,7 @@ public class Agent implements HandlerFactory, IAgentControl {
response = new Response(request, answers); response = new Response(request, answers);
} finally { } finally {
if (s_logger.isDebugEnabled()) { if (s_logger.isDebugEnabled()) {
String responseMsg = response.toString(); final String responseMsg = response.toString();
if (responseMsg != null) { if (responseMsg != null) {
s_logger.debug(response.toString()); s_logger.debug(response.toString());
} }
@ -553,7 +566,7 @@ public class Agent implements HandlerFactory, IAgentControl {
} else if (answer instanceof AgentControlAnswer) { } else if (answer instanceof AgentControlAnswer) {
// Notice, we are doing callback while holding a lock! // Notice, we are doing callback while holding a lock!
synchronized (_controlListeners) { synchronized (_controlListeners) {
for (IAgentControlListener listener : _controlListeners) { for (final IAgentControlListener listener : _controlListeners) {
listener.processControlResponse(response, (AgentControlAnswer)answer); listener.processControlResponse(response, (AgentControlAnswer)answer);
} }
} }
@ -562,7 +575,7 @@ public class Agent implements HandlerFactory, IAgentControl {
} }
} }
public void processReadyCommand(Command cmd) { public void processReadyCommand(final Command cmd) {
final ReadyCommand ready = (ReadyCommand)cmd; final ReadyCommand ready = (ReadyCommand)cmd;
@ -574,10 +587,10 @@ public class Agent implements HandlerFactory, IAgentControl {
} }
public void processOtherTask(Task task) { public void processOtherTask(final Task task) {
final Object obj = task.get(); final Object obj = task.get();
if (obj instanceof Response) { if (obj instanceof Response) {
if ((System.currentTimeMillis() - _lastPingResponseTime) > _pingInterval * _shell.getPingRetries()) { if (System.currentTimeMillis() - _lastPingResponseTime > _pingInterval * _shell.getPingRetries()) {
s_logger.error("Ping Interval has gone past " + _pingInterval * _shell.getPingRetries() + ". Won't reconnect to mgt server, as connection is still alive"); s_logger.error("Ping Interval has gone past " + _pingInterval * _shell.getPingRetries() + ". Won't reconnect to mgt server, as connection is still alive");
return; return;
} }
@ -633,25 +646,25 @@ public class Agent implements HandlerFactory, IAgentControl {
} }
@Override @Override
public void registerControlListener(IAgentControlListener listener) { public void registerControlListener(final IAgentControlListener listener) {
synchronized (_controlListeners) { synchronized (_controlListeners) {
_controlListeners.add(listener); _controlListeners.add(listener);
} }
} }
@Override @Override
public void unregisterControlListener(IAgentControlListener listener) { public void unregisterControlListener(final IAgentControlListener listener) {
synchronized (_controlListeners) { synchronized (_controlListeners) {
_controlListeners.remove(listener); _controlListeners.remove(listener);
} }
} }
@Override @Override
public AgentControlAnswer sendRequest(AgentControlCommand cmd, int timeoutInMilliseconds) throws AgentControlChannelException { public AgentControlAnswer sendRequest(final AgentControlCommand cmd, final int timeoutInMilliseconds) throws AgentControlChannelException {
Request request = new Request(this.getId(), -1, new Command[] {cmd}, true, false); final Request request = new Request(getId(), -1, new Command[] {cmd}, true, false);
request.setSequence(getNextSequence()); request.setSequence(getNextSequence());
AgentControlListener listener = new AgentControlListener(request); final AgentControlListener listener = new AgentControlListener(request);
registerControlListener(listener); registerControlListener(listener);
try { try {
@ -659,7 +672,7 @@ public class Agent implements HandlerFactory, IAgentControl {
synchronized (listener) { synchronized (listener) {
try { try {
listener.wait(timeoutInMilliseconds); listener.wait(timeoutInMilliseconds);
} catch (InterruptedException e) { } catch (final InterruptedException e) {
s_logger.warn("sendRequest is interrupted, exit waiting"); s_logger.warn("sendRequest is interrupted, exit waiting");
} }
} }
@ -671,13 +684,13 @@ public class Agent implements HandlerFactory, IAgentControl {
} }
@Override @Override
public void postRequest(AgentControlCommand cmd) throws AgentControlChannelException { public void postRequest(final AgentControlCommand cmd) throws AgentControlChannelException {
Request request = new Request(this.getId(), -1, new Command[] {cmd}, true, false); final Request request = new Request(getId(), -1, new Command[] {cmd}, true, false);
request.setSequence(getNextSequence()); request.setSequence(getNextSequence());
postRequest(request); postRequest(request);
} }
private void postRequest(Request request) throws AgentControlChannelException { private void postRequest(final Request request) throws AgentControlChannelException {
if (_link != null) { if (_link != null) {
try { try {
_link.send(request.toBytes()); _link.send(request.toBytes());
@ -694,7 +707,7 @@ public class Agent implements HandlerFactory, IAgentControl {
private AgentControlAnswer _answer; private AgentControlAnswer _answer;
private final Request _request; private final Request _request;
public AgentControlListener(Request request) { public AgentControlListener(final Request request) {
_request = request; _request = request;
} }
@ -703,12 +716,12 @@ public class Agent implements HandlerFactory, IAgentControl {
} }
@Override @Override
public Answer processControlRequest(Request request, AgentControlCommand cmd) { public Answer processControlRequest(final Request request, final AgentControlCommand cmd) {
return null; return null;
} }
@Override @Override
public void processControlResponse(Response response, AgentControlAnswer answer) { public void processControlResponse(final Response response, final AgentControlAnswer answer) {
if (_request.getSequence() == response.getSequence()) { if (_request.getSequence() == response.getSequence()) {
_answer = answer; _answer = answer;
synchronized (this) { synchronized (this) {
@ -797,13 +810,13 @@ public class Agent implements HandlerFactory, IAgentControl {
} }
public class AgentRequestHandler extends Task { public class AgentRequestHandler extends Task {
public AgentRequestHandler(Task.Type type, Link link, Request req) { public AgentRequestHandler(final Task.Type type, final Link link, final Request req) {
super(type, link, req); super(type, link, req);
} }
@Override @Override
protected void doTask(Task task) throws Exception { protected void doTask(final Task task) throws TaskExecutionException {
Request req = (Request)this.get(); final Request req = (Request)get();
if (!(req instanceof Response)) { if (!(req instanceof Response)) {
processRequest(req, task.getLink()); processRequest(req, task.getLink());
} }
@ -811,16 +824,16 @@ public class Agent implements HandlerFactory, IAgentControl {
} }
public class ServerHandler extends Task { public class ServerHandler extends Task {
public ServerHandler(Task.Type type, Link link, byte[] data) { public ServerHandler(final Task.Type type, final Link link, final byte[] data) {
super(type, link, data); super(type, link, data);
} }
public ServerHandler(Task.Type type, Link link, Request req) { public ServerHandler(final Task.Type type, final Link link, final Request req) {
super(type, link, req); super(type, link, req);
} }
@Override @Override
public void doTask(final Task task) { public void doTask(final Task task) throws TaskExecutionException {
if (task.getType() == Task.Type.CONNECT) { if (task.getType() == Task.Type.CONNECT) {
_shell.getBackoffAlgorithm().reset(); _shell.getBackoffAlgorithm().reset();
setLink(task.getLink()); setLink(task.getLink());
@ -835,7 +848,7 @@ public class Agent implements HandlerFactory, IAgentControl {
} else { } else {
//put the requests from mgt server into another thread pool, as the request may take a longer time to finish. Don't block the NIO main thread pool //put the requests from mgt server into another thread pool, as the request may take a longer time to finish. Don't block the NIO main thread pool
//processRequest(request, task.getLink()); //processRequest(request, task.getLink());
_executor.execute(new AgentRequestHandler(this.getType(), this.getLink(), request)); _executor.submit(new AgentRequestHandler(getType(), getLink(), request));
} }
} catch (final ClassNotFoundException e) { } catch (final ClassNotFoundException e) {
s_logger.error("Unable to find this request "); s_logger.error("Unable to find this request ");

View File

@ -103,6 +103,8 @@ import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.TransactionLegacy; import com.cloud.utils.db.TransactionLegacy;
import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.exception.HypervisorVersionChangedException; import com.cloud.utils.exception.HypervisorVersionChangedException;
import com.cloud.utils.exception.NioConnectionException;
import com.cloud.utils.exception.TaskExecutionException;
import com.cloud.utils.fsm.NoTransitionException; import com.cloud.utils.fsm.NoTransitionException;
import com.cloud.utils.fsm.StateMachine2; import com.cloud.utils.fsm.StateMachine2;
import com.cloud.utils.nio.HandlerFactory; import com.cloud.utils.nio.HandlerFactory;
@ -593,7 +595,11 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
startDirectlyConnectedHosts(); startDirectlyConnectedHosts();
if (_connection != null) { if (_connection != null) {
_connection.start(); try {
_connection.start();
} catch (final NioConnectionException e) {
s_logger.error("Error when connecting to the NioServer!", e);
}
} }
_monitorExecutor.scheduleWithFixedDelay(new MonitorTask(), PingInterval.value(), PingInterval.value(), TimeUnit.SECONDS); _monitorExecutor.scheduleWithFixedDelay(new MonitorTask(), PingInterval.value(), PingInterval.value(), TimeUnit.SECONDS);
@ -827,7 +833,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
Status determinedState = investigate(attache); Status determinedState = investigate(attache);
// if state cannot be determined do nothing and bail out // if state cannot be determined do nothing and bail out
if (determinedState == null) { if (determinedState == null) {
if (((System.currentTimeMillis() >> 10) - host.getLastPinged()) > AlertWait.value()) { if ((System.currentTimeMillis() >> 10) - host.getLastPinged() > AlertWait.value()) {
s_logger.warn("Agent " + hostId + " state cannot be determined for more than " + AlertWait + "(" + AlertWait.value() + ") seconds, will go to Alert state"); s_logger.warn("Agent " + hostId + " state cannot be determined for more than " + AlertWait + "(" + AlertWait.value() + ") seconds, will go to Alert state");
determinedState = Status.Alert; determinedState = Status.Alert;
} else { } else {
@ -840,7 +846,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
s_logger.info("The agent " + hostId + " state determined is " + determinedState); s_logger.info("The agent " + hostId + " state determined is " + determinedState);
if (determinedState == Status.Down) { if (determinedState == Status.Down) {
String message = "Host is down: " + host.getId() + "-" + host.getName() + ". Starting HA on the VMs"; final String message = "Host is down: " + host.getId() + "-" + host.getName() + ". Starting HA on the VMs";
s_logger.error(message); s_logger.error(message);
if (host.getType() != Host.Type.SecondaryStorage && host.getType() != Host.Type.ConsoleProxy) { if (host.getType() != Host.Type.SecondaryStorage && host.getType() != Host.Type.ConsoleProxy) {
_alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, host.getDataCenterId(), host.getPodId(), "Host down, " + host.getId(), message); _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, host.getDataCenterId(), host.getPodId(), "Host down, " + host.getId(), message);
@ -1299,7 +1305,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
} }
@Override @Override
protected void doTask(final Task task) throws Exception { protected void doTask(final Task task) throws TaskExecutionException {
final TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB); final TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
try { try {
final Type type = task.getType(); final Type type = task.getType();
@ -1315,6 +1321,10 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
} catch (final UnsupportedVersionException e) { } catch (final UnsupportedVersionException e) {
s_logger.warn(e.getMessage()); s_logger.warn(e.getMessage());
// upgradeAgent(task.getLink(), data, e.getReason()); // upgradeAgent(task.getLink(), data, e.getReason());
} catch (final ClassNotFoundException e) {
final String message = String.format("Exception occured when executing taks! Error '%s'", e.getMessage());
s_logger.error(message);
throw new TaskExecutionException(message, e);
} }
} else if (type == Task.Type.CONNECT) { } else if (type == Task.Type.CONNECT) {
} else if (type == Task.Type.DISCONNECT) { } else if (type == Task.Type.DISCONNECT) {

View File

@ -66,4 +66,6 @@ public interface SerialVersionUID {
public static final long UnableDeleteHostException = Base | 0x29; public static final long UnableDeleteHostException = Base | 0x29;
public static final long AffinityConflictException = Base | 0x2a; public static final long AffinityConflictException = Base | 0x2a;
public static final long JobCancellationException = Base | 0x2b; public static final long JobCancellationException = Base | 0x2b;
public static final long NioConnectionException = Base | 0x2c;
public static final long TaskExecutionException = Base | 0x2d;
} }

View File

@ -0,0 +1,48 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package com.cloud.utils.exception;
import com.cloud.utils.SerialVersionUID;
/**
* Used by the NioConnection class to wrap-up its exceptions.
*/
public class NioConnectionException extends Exception {
private static final long serialVersionUID = SerialVersionUID.NioConnectionException;
protected int csErrorCode;
public NioConnectionException(final String msg, final Throwable cause) {
super(msg, cause);
setCSErrorCode(CSExceptionErrorCode.getCSErrCode(this.getClass().getName()));
}
public NioConnectionException(final String msg) {
super(msg);
}
public void setCSErrorCode(final int cserrcode) {
csErrorCode = cserrcode;
}
public int getCSErrorCode() {
return csErrorCode;
}
}

View File

@ -0,0 +1,48 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package com.cloud.utils.exception;
import com.cloud.utils.SerialVersionUID;
/**
* Used by the Task class to wrap-up its exceptions.
*/
public class TaskExecutionException extends Exception {
private static final long serialVersionUID = SerialVersionUID.NioConnectionException;
protected int csErrorCode;
public TaskExecutionException(final String msg, final Throwable cause) {
super(msg, cause);
setCSErrorCode(CSExceptionErrorCode.getCSErrCode(this.getClass().getName()));
}
public TaskExecutionException(final String msg) {
super(msg);
}
public void setCSErrorCode(final int cserrcode) {
csErrorCode = cserrcode;
}
public int getCSErrorCode() {
return csErrorCode;
}
}

View File

@ -29,9 +29,8 @@ import java.security.GeneralSecurityException;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
import org.apache.log4j.Logger;
import org.apache.cloudstack.utils.security.SSLUtils; import org.apache.cloudstack.utils.security.SSLUtils;
import org.apache.log4j.Logger;
public class NioClient extends NioConnection { public class NioClient extends NioConnection {
private static final Logger s_logger = Logger.getLogger(NioClient.class); private static final Logger s_logger = Logger.getLogger(NioClient.class);
@ -40,12 +39,12 @@ public class NioClient extends NioConnection {
protected String _bindAddress; protected String _bindAddress;
protected SocketChannel _clientConnection; protected SocketChannel _clientConnection;
public NioClient(String name, String host, int port, int workers, HandlerFactory factory) { public NioClient(final String name, final String host, final int port, final int workers, final HandlerFactory factory) {
super(name, port, workers, factory); super(name, port, workers, factory);
_host = host; _host = host;
} }
public void setBindAddress(String ipAddress) { public void setBindAddress(final String ipAddress) {
_bindAddress = ipAddress; _bindAddress = ipAddress;
} }
@ -62,18 +61,18 @@ public class NioClient extends NioConnection {
if (_bindAddress != null) { if (_bindAddress != null) {
s_logger.info("Binding outbound interface at " + _bindAddress); s_logger.info("Binding outbound interface at " + _bindAddress);
InetSocketAddress bindAddr = new InetSocketAddress(_bindAddress, 0); final InetSocketAddress bindAddr = new InetSocketAddress(_bindAddress, 0);
_clientConnection.socket().bind(bindAddr); _clientConnection.socket().bind(bindAddr);
} }
InetSocketAddress peerAddr = new InetSocketAddress(_host, _port); final InetSocketAddress peerAddr = new InetSocketAddress(_host, _port);
_clientConnection.connect(peerAddr); _clientConnection.connect(peerAddr);
SSLEngine sslEngine = null; SSLEngine sslEngine = null;
// Begin SSL handshake in BLOCKING mode // Begin SSL handshake in BLOCKING mode
_clientConnection.configureBlocking(true); _clientConnection.configureBlocking(true);
SSLContext sslContext = Link.initSSLContext(true); final SSLContext sslContext = Link.initSSLContext(true);
sslEngine = sslContext.createSSLEngine(_host, _port); sslEngine = sslContext.createSSLEngine(_host, _port);
sslEngine.setUseClientMode(true); sslEngine.setUseClientMode(true);
sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols())); sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
@ -83,32 +82,31 @@ public class NioClient extends NioConnection {
s_logger.info("Connected to " + _host + ":" + _port); s_logger.info("Connected to " + _host + ":" + _port);
_clientConnection.configureBlocking(false); _clientConnection.configureBlocking(false);
Link link = new Link(peerAddr, this); final Link link = new Link(peerAddr, this);
link.setSSLEngine(sslEngine); link.setSSLEngine(sslEngine);
SelectionKey key = _clientConnection.register(_selector, SelectionKey.OP_READ); final SelectionKey key = _clientConnection.register(_selector, SelectionKey.OP_READ);
link.setKey(key); link.setKey(key);
key.attach(link); key.attach(link);
// Notice we've already connected due to the handshake, so let's get the // Notice we've already connected due to the handshake, so let's get the
// remaining task done // remaining task done
task = _factory.create(Task.Type.CONNECT, link, null); task = _factory.create(Task.Type.CONNECT, link, null);
} catch (GeneralSecurityException e) { } catch (final GeneralSecurityException e) {
_selector.close(); _selector.close();
throw new IOException("Failed to initialise security", e); throw new IOException("Failed to initialise security", e);
} catch (IOException e) { } catch (final IOException e) {
_selector.close(); _selector.close();
throw e; throw e;
} }
_executor.submit(task);
_executor.execute(task);
} }
@Override @Override
protected void registerLink(InetSocketAddress saddr, Link link) { protected void registerLink(final InetSocketAddress saddr, final Link link) {
// don't do anything. // don't do anything.
} }
@Override @Override
protected void unregisterLink(InetSocketAddress saddr) { protected void unregisterLink(final InetSocketAddress saddr) {
// don't do anything. // don't do anything.
} }
@ -119,7 +117,5 @@ public class NioClient extends NioConnection {
_clientConnection.close(); _clientConnection.close();
} }
s_logger.info("NioClient connection closed"); s_logger.info("NioClient connection closed");
} }
}
}

View File

@ -27,6 +27,7 @@ import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.nio.channels.CancelledKeyException; import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
@ -35,7 +36,10 @@ import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -43,21 +47,23 @@ import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
import org.apache.cloudstack.utils.security.SSLUtils;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.cloudstack.utils.security.SSLUtils;
import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.exception.NioConnectionException;
/** /**
* NioConnection abstracts the NIO socket operations. The Java implementation * NioConnection abstracts the NIO socket operations. The Java implementation
* provides that. * provides that.
*/ */
public abstract class NioConnection implements Runnable { public abstract class NioConnection implements Callable<Boolean> {
private static final Logger s_logger = Logger.getLogger(NioConnection.class);; private static final Logger s_logger = Logger.getLogger(NioConnection.class);;
protected Selector _selector; protected Selector _selector;
protected Thread _thread; protected ExecutorService _threadExecutor;
protected Future<Boolean> _futureTask;
protected boolean _isRunning; protected boolean _isRunning;
protected boolean _isStartup; protected boolean _isStartup;
protected int _port; protected int _port;
@ -66,42 +72,48 @@ public abstract class NioConnection implements Runnable {
protected String _name; protected String _name;
protected ExecutorService _executor; protected ExecutorService _executor;
public NioConnection(String name, int port, int workers, HandlerFactory factory) { public NioConnection(final String name, final int port, final int workers, final HandlerFactory factory) {
_name = name; _name = name;
_isRunning = false; _isRunning = false;
_thread = null;
_selector = null; _selector = null;
_port = port; _port = port;
_factory = factory; _factory = factory;
_executor = new ThreadPoolExecutor(workers, 5 * workers, 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(name + "-Handler")); _executor = new ThreadPoolExecutor(workers, 5 * workers, 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(name + "-Handler"));
} }
public void start() { public void start() throws NioConnectionException {
_todos = new ArrayList<ChangeRequest>(); _todos = new ArrayList<ChangeRequest>();
_thread = new Thread(this, _name + "-Selector"); try {
_isRunning = true; init();
_thread.start(); } catch (final ConnectException e) {
// Wait until we got init() done s_logger.warn("Unable to connect to remote: is there a server running on port " + _port);
synchronized (_thread) { } catch (final IOException e) {
try { s_logger.error("Unable to initialize the threads.", e);
_thread.wait(); throw new NioConnectionException(e.getMessage(), e);
} catch (InterruptedException e) { } catch (final Exception e) {
s_logger.warn("Interrupted start thread ", e); s_logger.error("Unable to initialize the threads due to unknown exception.", e);
} throw new NioConnectionException(e.getMessage(), e);
} }
_isStartup = true;
_threadExecutor = Executors.newSingleThreadExecutor();
_futureTask = _threadExecutor.submit(this);
_isRunning = true;
} }
public void stop() { public void stop() {
_executor.shutdown(); _executor.shutdown();
_isRunning = false; _isRunning = false;
if (_thread != null) { if (_threadExecutor != null) {
_thread.interrupt(); _futureTask.cancel(false);
_threadExecutor.shutdown();
} }
} }
public boolean isRunning() { public boolean isRunning() {
return _thread.isAlive(); return !_futureTask.isDone();
} }
public boolean isStartup() { public boolean isStartup() {
@ -109,45 +121,28 @@ public abstract class NioConnection implements Runnable {
} }
@Override @Override
public void run() { public Boolean call() throws NioConnectionException {
synchronized (_thread) {
try {
init();
} catch (ConnectException e) {
s_logger.warn("Unable to connect to remote: is there a server running on port " + _port);
return;
} catch (IOException e) {
s_logger.error("Unable to initialize the threads.", e);
return;
} catch (Exception e) {
s_logger.error("Unable to initialize the threads due to unknown exception.", e);
return;
}
_isStartup = true;
_thread.notifyAll();
}
while (_isRunning) { while (_isRunning) {
try { try {
_selector.select(); _selector.select();
// Someone is ready for I/O, get the ready keys // Someone is ready for I/O, get the ready keys
Set<SelectionKey> readyKeys = _selector.selectedKeys(); final Set<SelectionKey> readyKeys = _selector.selectedKeys();
Iterator<SelectionKey> i = readyKeys.iterator(); final Iterator<SelectionKey> i = readyKeys.iterator();
if (s_logger.isTraceEnabled()) { if (s_logger.isTraceEnabled()) {
s_logger.trace("Keys Processing: " + readyKeys.size()); s_logger.trace("Keys Processing: " + readyKeys.size());
} }
// Walk through the ready keys collection. // Walk through the ready keys collection.
while (i.hasNext()) { while (i.hasNext()) {
SelectionKey sk = i.next(); final SelectionKey sk = i.next();
i.remove(); i.remove();
if (!sk.isValid()) { if (!sk.isValid()) {
if (s_logger.isTraceEnabled()) { if (s_logger.isTraceEnabled()) {
s_logger.trace("Selection Key is invalid: " + sk.toString()); s_logger.trace("Selection Key is invalid: " + sk.toString());
} }
Link link = (Link)sk.attachment(); final Link link = (Link)sk.attachment();
if (link != null) { if (link != null) {
link.terminated(); link.terminated();
} else { } else {
@ -167,13 +162,18 @@ public abstract class NioConnection implements Runnable {
s_logger.trace("Keys Done Processing."); s_logger.trace("Keys Done Processing.");
processTodos(); processTodos();
} catch (Throwable e) { } catch (final ClosedSelectorException e) {
s_logger.warn("Caught an exception but continuing on.", e); /*
* Exception occurred when calling java.nio.channels.Selector.selectedKeys() method. It means the connection has not yet been established. Let's continue trying
* We do not log it here otherwise we will fill the disk with messages.
*/
} catch (final IOException e) {
s_logger.error("Agent will die due to this IOException!", e);
throw new NioConnectionException(e.getMessage(), e);
} }
} }
synchronized (_thread) { _isStartup = false;
_isStartup = false; return true;
}
} }
abstract void init() throws IOException; abstract void init() throws IOException;
@ -182,11 +182,11 @@ public abstract class NioConnection implements Runnable {
abstract void unregisterLink(InetSocketAddress saddr); abstract void unregisterLink(InetSocketAddress saddr);
protected void accept(SelectionKey key) throws IOException { protected void accept(final SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel(); final ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
SocketChannel socketChannel = serverSocketChannel.accept(); final SocketChannel socketChannel = serverSocketChannel.accept();
Socket socket = socketChannel.socket(); final Socket socket = socketChannel.socket();
socket.setKeepAlive(true); socket.setKeepAlive(true);
if (s_logger.isTraceEnabled()) { if (s_logger.isTraceEnabled()) {
@ -198,7 +198,7 @@ public abstract class NioConnection implements Runnable {
SSLEngine sslEngine = null; SSLEngine sslEngine = null;
try { try {
SSLContext sslContext = Link.initSSLContext(false); final SSLContext sslContext = Link.initSSLContext(false);
sslEngine = sslContext.createSSLEngine(); sslEngine = sslContext.createSSLEngine();
sslEngine.setUseClientMode(false); sslEngine.setUseClientMode(false);
sslEngine.setNeedClientAuth(false); sslEngine.setNeedClientAuth(false);
@ -206,7 +206,7 @@ public abstract class NioConnection implements Runnable {
Link.doHandshake(socketChannel, sslEngine, false); Link.doHandshake(socketChannel, sslEngine, false);
} catch (Exception e) { } catch (final Exception e) {
if (s_logger.isTraceEnabled()) { if (s_logger.isTraceEnabled()) {
s_logger.trace("Socket " + socket + " closed on read. Probably -1 returned: " + e.getMessage()); s_logger.trace("Socket " + socket + " closed on read. Probably -1 returned: " + e.getMessage());
} }
@ -219,53 +219,68 @@ public abstract class NioConnection implements Runnable {
s_logger.trace("SSL: Handshake done"); s_logger.trace("SSL: Handshake done");
} }
socketChannel.configureBlocking(false); socketChannel.configureBlocking(false);
InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress(); final InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress();
Link link = new Link(saddr, this); final Link link = new Link(saddr, this);
link.setSSLEngine(sslEngine); link.setSSLEngine(sslEngine);
link.setKey(socketChannel.register(key.selector(), SelectionKey.OP_READ, link)); link.setKey(socketChannel.register(key.selector(), SelectionKey.OP_READ, link));
Task task = _factory.create(Task.Type.CONNECT, link, null); final Task task = _factory.create(Task.Type.CONNECT, link, null);
registerLink(saddr, link); registerLink(saddr, link);
_executor.execute(task);
}
protected void terminate(SelectionKey key) { try {
Link link = (Link)key.attachment(); _executor.submit(task);
closeConnection(key); } catch (final Exception e) {
if (link != null) { s_logger.warn("Exception occurred when submitting the task", e);
link.terminated();
Task task = _factory.create(Task.Type.DISCONNECT, link, null);
unregisterLink(link.getSocketAddress());
_executor.execute(task);
} }
} }
protected void read(SelectionKey key) throws IOException { protected void terminate(final SelectionKey key) {
Link link = (Link)key.attachment(); final Link link = (Link)key.attachment();
closeConnection(key);
if (link != null) {
link.terminated();
final Task task = _factory.create(Task.Type.DISCONNECT, link, null);
unregisterLink(link.getSocketAddress());
try {
_executor.submit(task);
} catch (final Exception e) {
s_logger.warn("Exception occurred when submitting the task", e);
}
}
}
protected void read(final SelectionKey key) throws IOException {
final Link link = (Link)key.attachment();
try { try {
SocketChannel socketChannel = (SocketChannel)key.channel(); final SocketChannel socketChannel = (SocketChannel)key.channel();
if (s_logger.isTraceEnabled()) { if (s_logger.isTraceEnabled()) {
s_logger.trace("Reading from: " + socketChannel.socket().toString()); s_logger.trace("Reading from: " + socketChannel.socket().toString());
} }
byte[] data = link.read(socketChannel); final byte[] data = link.read(socketChannel);
if (data == null) { if (data == null) {
if (s_logger.isTraceEnabled()) { if (s_logger.isTraceEnabled()) {
s_logger.trace("Packet is incomplete. Waiting for more."); s_logger.trace("Packet is incomplete. Waiting for more.");
} }
return; return;
} }
Task task = _factory.create(Task.Type.DATA, link, data); final Task task = _factory.create(Task.Type.DATA, link, data);
_executor.execute(task);
} catch (Exception e) { try {
_executor.submit(task);
} catch (final Exception e) {
s_logger.warn("Exception occurred when submitting the task", e);
}
} catch (final Exception e) {
logDebug(e, key, 1); logDebug(e, key, 1);
terminate(key); terminate(key);
} }
} }
protected void logTrace(Exception e, SelectionKey key, int loc) { protected void logTrace(final Exception e, final SelectionKey key, final int loc) {
if (s_logger.isTraceEnabled()) { if (s_logger.isTraceEnabled()) {
Socket socket = null; Socket socket = null;
if (key != null) { if (key != null) {
SocketChannel ch = (SocketChannel)key.channel(); final SocketChannel ch = (SocketChannel)key.channel();
if (ch != null) { if (ch != null) {
socket = ch.socket(); socket = ch.socket();
} }
@ -275,11 +290,11 @@ public abstract class NioConnection implements Runnable {
} }
} }
protected void logDebug(Exception e, SelectionKey key, int loc) { protected void logDebug(final Exception e, final SelectionKey key, final int loc) {
if (s_logger.isDebugEnabled()) { if (s_logger.isDebugEnabled()) {
Socket socket = null; Socket socket = null;
if (key != null) { if (key != null) {
SocketChannel ch = (SocketChannel)key.channel(); final SocketChannel ch = (SocketChannel)key.channel();
if (ch != null) { if (ch != null) {
socket = ch.socket(); socket = ch.socket();
} }
@ -304,113 +319,122 @@ public abstract class NioConnection implements Runnable {
s_logger.trace("Todos Processing: " + todos.size()); s_logger.trace("Todos Processing: " + todos.size());
} }
SelectionKey key; SelectionKey key;
for (ChangeRequest todo : todos) { for (final ChangeRequest todo : todos) {
switch (todo.type) { switch (todo.type) {
case ChangeRequest.CHANGEOPS: case ChangeRequest.CHANGEOPS:
try { try {
key = (SelectionKey)todo.key; key = (SelectionKey)todo.key;
if (key != null && key.isValid()) { if (key != null && key.isValid()) {
if (todo.att != null) {
key.attach(todo.att);
Link link = (Link)todo.att;
link.setKey(key);
}
key.interestOps(todo.ops);
}
} catch (CancelledKeyException e) {
s_logger.debug("key has been cancelled");
}
break;
case ChangeRequest.REGISTER:
try {
key = ((SocketChannel)(todo.key)).register(_selector, todo.ops, todo.att);
if (todo.att != null) { if (todo.att != null) {
Link link = (Link)todo.att; key.attach(todo.att);
final Link link = (Link)todo.att;
link.setKey(key); link.setKey(key);
} }
} catch (ClosedChannelException e) { key.interestOps(todo.ops);
s_logger.warn("Couldn't register socket: " + todo.key);
try {
((SocketChannel)(todo.key)).close();
} catch (IOException ignore) {
s_logger.info("[ignored] socket channel");
} finally {
Link link = (Link)todo.att;
link.terminated();
}
} }
break; } catch (final CancelledKeyException e) {
case ChangeRequest.CLOSE: s_logger.debug("key has been cancelled");
if (s_logger.isTraceEnabled()) { }
s_logger.trace("Trying to close " + todo.key); break;
case ChangeRequest.REGISTER:
try {
key = ((SocketChannel)todo.key).register(_selector, todo.ops, todo.att);
if (todo.att != null) {
final Link link = (Link)todo.att;
link.setKey(key);
} }
key = (SelectionKey)todo.key; } catch (final ClosedChannelException e) {
closeConnection(key); s_logger.warn("Couldn't register socket: " + todo.key);
if (key != null) { try {
Link link = (Link)key.attachment(); ((SocketChannel)todo.key).close();
if (link != null) { } catch (final IOException ignore) {
link.terminated(); s_logger.info("[ignored] socket channel");
} } finally {
final Link link = (Link)todo.att;
link.terminated();
} }
break; }
default: break;
s_logger.warn("Shouldn't be here"); case ChangeRequest.CLOSE:
throw new RuntimeException("Shouldn't be here"); if (s_logger.isTraceEnabled()) {
s_logger.trace("Trying to close " + todo.key);
}
key = (SelectionKey)todo.key;
closeConnection(key);
if (key != null) {
final Link link = (Link)key.attachment();
if (link != null) {
link.terminated();
}
}
break;
default:
s_logger.warn("Shouldn't be here");
throw new RuntimeException("Shouldn't be here");
} }
} }
s_logger.trace("Todos Done processing"); s_logger.trace("Todos Done processing");
} }
protected void connect(SelectionKey key) throws IOException { protected void connect(final SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel)key.channel(); final SocketChannel socketChannel = (SocketChannel)key.channel();
try { try {
socketChannel.finishConnect(); socketChannel.finishConnect();
key.interestOps(SelectionKey.OP_READ); key.interestOps(SelectionKey.OP_READ);
Socket socket = socketChannel.socket(); final Socket socket = socketChannel.socket();
if (!socket.getKeepAlive()) { if (!socket.getKeepAlive()) {
socket.setKeepAlive(true); socket.setKeepAlive(true);
} }
if (s_logger.isDebugEnabled()) { if (s_logger.isDebugEnabled()) {
s_logger.debug("Connected to " + socket); s_logger.debug("Connected to " + socket);
} }
Link link = new Link((InetSocketAddress)socket.getRemoteSocketAddress(), this); final Link link = new Link((InetSocketAddress)socket.getRemoteSocketAddress(), this);
link.setKey(key); link.setKey(key);
key.attach(link); key.attach(link);
Task task = _factory.create(Task.Type.CONNECT, link, null); final Task task = _factory.create(Task.Type.CONNECT, link, null);
_executor.execute(task);
} catch (IOException e) { try {
_executor.submit(task);
} catch (final Exception e) {
s_logger.warn("Exception occurred when submitting the task", e);
}
} catch (final IOException e) {
logTrace(e, key, 2); logTrace(e, key, 2);
terminate(key); terminate(key);
} }
} }
protected void scheduleTask(Task task) { protected void scheduleTask(final Task task) {
_executor.execute(task); try {
_executor.submit(task);
} catch (final Exception e) {
s_logger.warn("Exception occurred when submitting the task", e);
}
} }
protected void write(SelectionKey key) throws IOException { protected void write(final SelectionKey key) throws IOException {
Link link = (Link)key.attachment(); final Link link = (Link)key.attachment();
try { try {
if (s_logger.isTraceEnabled()) { if (s_logger.isTraceEnabled()) {
s_logger.trace("Writing to " + link.getSocketAddress().toString()); s_logger.trace("Writing to " + link.getSocketAddress().toString());
} }
boolean close = link.write((SocketChannel)key.channel()); final boolean close = link.write((SocketChannel)key.channel());
if (close) { if (close) {
closeConnection(key); closeConnection(key);
link.terminated(); link.terminated();
} else { } else {
key.interestOps(SelectionKey.OP_READ); key.interestOps(SelectionKey.OP_READ);
} }
} catch (Exception e) { } catch (final Exception e) {
logDebug(e, key, 3); logDebug(e, key, 3);
terminate(key); terminate(key);
} }
} }
protected void closeConnection(SelectionKey key) { protected void closeConnection(final SelectionKey key) {
if (key != null) { if (key != null) {
SocketChannel channel = (SocketChannel)key.channel(); final SocketChannel channel = (SocketChannel)key.channel();
key.cancel(); key.cancel();
try { try {
if (channel != null) { if (channel != null) {
@ -419,30 +443,30 @@ public abstract class NioConnection implements Runnable {
} }
channel.close(); channel.close();
} }
} catch (IOException ignore) { } catch (final IOException ignore) {
s_logger.info("[ignored] channel"); s_logger.info("[ignored] channel");
} }
} }
} }
public void register(int ops, SocketChannel key, Object att) { public void register(final int ops, final SocketChannel key, final Object att) {
ChangeRequest todo = new ChangeRequest(key, ChangeRequest.REGISTER, ops, att); final ChangeRequest todo = new ChangeRequest(key, ChangeRequest.REGISTER, ops, att);
synchronized (this) { synchronized (this) {
_todos.add(todo); _todos.add(todo);
} }
_selector.wakeup(); _selector.wakeup();
} }
public void change(int ops, SelectionKey key, Object att) { public void change(final int ops, final SelectionKey key, final Object att) {
ChangeRequest todo = new ChangeRequest(key, ChangeRequest.CHANGEOPS, ops, att); final ChangeRequest todo = new ChangeRequest(key, ChangeRequest.CHANGEOPS, ops, att);
synchronized (this) { synchronized (this) {
_todos.add(todo); _todos.add(todo);
} }
_selector.wakeup(); _selector.wakeup();
} }
public void close(SelectionKey key) { public void close(final SelectionKey key) {
ChangeRequest todo = new ChangeRequest(key, ChangeRequest.CLOSE, 0, null); final ChangeRequest todo = new ChangeRequest(key, ChangeRequest.CLOSE, 0, null);
synchronized (this) { synchronized (this) {
_todos.add(todo); _todos.add(todo);
} }
@ -466,7 +490,7 @@ public abstract class NioConnection implements Runnable {
public int ops; public int ops;
public Object att; public Object att;
public ChangeRequest(Object key, int type, int ops, Object att) { public ChangeRequest(final Object key, final int type, final int ops, final Object att) {
this.key = key; this.key = key;
this.type = type; this.type = type;
this.ops = ops; this.ops = ops;

View File

@ -37,7 +37,7 @@ public class NioServer extends NioConnection {
protected WeakHashMap<InetSocketAddress, Link> _links; protected WeakHashMap<InetSocketAddress, Link> _links;
public NioServer(String name, int port, int workers, HandlerFactory factory) { public NioServer(final String name, final int port, final int workers, final HandlerFactory factory) {
super(name, port, workers, factory); super(name, port, workers, factory);
_localAddr = null; _localAddr = null;
_links = new WeakHashMap<InetSocketAddress, Link>(1024); _links = new WeakHashMap<InetSocketAddress, Link>(1024);
@ -68,12 +68,12 @@ public class NioServer extends NioConnection {
} }
@Override @Override
protected void registerLink(InetSocketAddress addr, Link link) { protected void registerLink(final InetSocketAddress addr, final Link link) {
_links.put(addr, link); _links.put(addr, link);
} }
@Override @Override
protected void unregisterLink(InetSocketAddress saddr) { protected void unregisterLink(final InetSocketAddress saddr) {
_links.remove(saddr); _links.remove(saddr);
} }
@ -86,8 +86,8 @@ public class NioServer extends NioConnection {
* @param data * @param data
* @return null if not sent. attach object in link if sent. * @return null if not sent. attach object in link if sent.
*/ */
public Object send(InetSocketAddress saddr, byte[] data) throws ClosedChannelException { public Object send(final InetSocketAddress saddr, final byte[] data) throws ClosedChannelException {
Link link = _links.get(saddr); final Link link = _links.get(saddr);
if (link == null) { if (link == null) {
return null; return null;
} }

View File

@ -19,14 +19,14 @@
package com.cloud.utils.nio; package com.cloud.utils.nio;
import org.apache.log4j.Logger; import java.util.concurrent.Callable;
import com.cloud.utils.exception.TaskExecutionException;
/** /**
* Task represents one todo item for the AgentManager or the AgentManager * Task represents one todo item for the AgentManager or the AgentManager
*
*/ */
public abstract class Task implements Runnable { public abstract class Task implements Callable<Boolean> {
private static final Logger s_logger = Logger.getLogger(Task.class);
public enum Type { public enum Type {
CONNECT, // Process a new connection. CONNECT, // Process a new connection.
@ -40,13 +40,13 @@ public abstract class Task implements Runnable {
Type _type; Type _type;
Link _link; Link _link;
public Task(Type type, Link link, byte[] data) { public Task(final Type type, final Link link, final byte[] data) {
_data = data; _data = data;
_type = type; _type = type;
_link = link; _link = link;
} }
public Task(Type type, Link link, Object data) { public Task(final Type type, final Link link, final Object data) {
_data = data; _data = data;
_type = type; _type = type;
_link = link; _link = link;
@ -76,14 +76,11 @@ public abstract class Task implements Runnable {
return _type.toString(); return _type.toString();
} }
abstract protected void doTask(Task task) throws Exception; abstract protected void doTask(Task task) throws TaskExecutionException;
@Override @Override
public final void run() { public Boolean call() throws TaskExecutionException {
try { doTask(this);
doTask(this); return true;
} catch (Throwable e) {
s_logger.warn("Caught the following exception but pushing on", e);
}
} }
} }

View File

@ -27,6 +27,7 @@ import junit.framework.TestCase;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.junit.Assert; import org.junit.Assert;
import com.cloud.utils.exception.NioConnectionException;
import com.cloud.utils.nio.HandlerFactory; import com.cloud.utils.nio.HandlerFactory;
import com.cloud.utils.nio.Link; import com.cloud.utils.nio.Link;
import com.cloud.utils.nio.NioClient; import com.cloud.utils.nio.NioClient;
@ -56,7 +57,7 @@ public class NioTest extends TestCase {
private boolean isTestsDone() { private boolean isTestsDone() {
boolean result; boolean result;
synchronized (this) { synchronized (this) {
result = (_testCount == _completedCount); result = _testCount == _completedCount;
} }
return result; return result;
} }
@ -81,16 +82,24 @@ public class NioTest extends TestCase {
_completedCount = 0; _completedCount = 0;
_server = new NioServer("NioTestServer", 7777, 5, new NioTestServer()); _server = new NioServer("NioTestServer", 7777, 5, new NioTestServer());
_server.start(); try {
_server.start();
} catch (final NioConnectionException e) {
fail(e.getMessage());
}
_client = new NioClient("NioTestServer", "127.0.0.1", 7777, 5, new NioTestClient()); _client = new NioClient("NioTestServer", "127.0.0.1", 7777, 5, new NioTestClient());
_client.start(); try {
_client.start();
} catch (final NioConnectionException e) {
fail(e.getMessage());
}
while (_clientLink == null) { while (_clientLink == null) {
try { try {
s_logger.debug("Link is not up! Waiting ..."); s_logger.debug("Link is not up! Waiting ...");
Thread.sleep(1000); Thread.sleep(1000);
} catch (InterruptedException e) { } catch (final InterruptedException e) {
// TODO Auto-generated catch block // TODO Auto-generated catch block
e.printStackTrace(); e.printStackTrace();
} }
@ -101,9 +110,9 @@ public class NioTest extends TestCase {
public void tearDown() { public void tearDown() {
while (!isTestsDone()) { while (!isTestsDone()) {
try { try {
s_logger.debug(this._completedCount + "/" + this._testCount + " tests done. Waiting for completion"); s_logger.debug(_completedCount + "/" + _testCount + " tests done. Waiting for completion");
Thread.sleep(1000); Thread.sleep(1000);
} catch (InterruptedException e) { } catch (final InterruptedException e) {
// TODO Auto-generated catch block // TODO Auto-generated catch block
e.printStackTrace(); e.printStackTrace();
} }
@ -122,7 +131,7 @@ public class NioTest extends TestCase {
s_logger.info("Server stopped."); s_logger.info("Server stopped.");
} }
protected void setClientLink(Link link) { protected void setClientLink(final Link link) {
_clientLink = link; _clientLink = link;
} }
@ -140,13 +149,13 @@ public class NioTest extends TestCase {
getOneMoreTest(); getOneMoreTest();
_clientLink.send(_testBytes); _clientLink.send(_testBytes);
s_logger.info("Client: Data sent"); s_logger.info("Client: Data sent");
} catch (ClosedChannelException e) { } catch (final ClosedChannelException e) {
// TODO Auto-generated catch block // TODO Auto-generated catch block
e.printStackTrace(); e.printStackTrace();
} }
} }
protected void doServerProcess(byte[] data) { protected void doServerProcess(final byte[] data) {
oneMoreTestDone(); oneMoreTestDone();
Assert.assertArrayEquals(_testBytes, data); Assert.assertArrayEquals(_testBytes, data);
s_logger.info("Verify done."); s_logger.info("Verify done.");
@ -155,13 +164,13 @@ public class NioTest extends TestCase {
public class NioTestClient implements HandlerFactory { public class NioTestClient implements HandlerFactory {
@Override @Override
public Task create(Type type, Link link, byte[] data) { public Task create(final Type type, final Link link, final byte[] data) {
return new NioTestClientHandler(type, link, data); return new NioTestClientHandler(type, link, data);
} }
public class NioTestClientHandler extends Task { public class NioTestClientHandler extends Task {
public NioTestClientHandler(Type type, Link link, byte[] data) { public NioTestClientHandler(final Type type, final Link link, final byte[] data) {
super(type, link, data); super(type, link, data);
} }
@ -186,13 +195,13 @@ public class NioTest extends TestCase {
public class NioTestServer implements HandlerFactory { public class NioTestServer implements HandlerFactory {
@Override @Override
public Task create(Type type, Link link, byte[] data) { public Task create(final Type type, final Link link, final byte[] data) {
return new NioTestServerHandler(type, link, data); return new NioTestServerHandler(type, link, data);
} }
public class NioTestServerHandler extends Task { public class NioTestServerHandler extends Task {
public NioTestServerHandler(Type type, Link link, byte[] data) { public NioTestServerHandler(final Type type, final Link link, final byte[] data) {
super(type, link, data); super(type, link, data);
} }