refactor, improve startuptask

Signed-off-by: Abhishek Kumar <abhishek.mrt22@gmail.com>
This commit is contained in:
Abhishek Kumar 2024-10-17 19:30:26 +05:30
parent 290f7a944f
commit c33aa96025

View File

@ -37,6 +37,7 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.naming.ConfigurationException;
@ -51,6 +52,7 @@ import org.apache.cloudstack.managed.context.ManagedContextTimerTask;
import org.apache.cloudstack.utils.security.KeyStoreUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.apache.log4j.MDC;
@ -141,7 +143,7 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
long _pingInterval = 0;
AtomicInteger _inProgress = new AtomicInteger();
StartupTask _startup = null;
private final AtomicReference<StartupTask> startupTask = new AtomicReference<>();
long _startupWaitDefault = 180000;
long _startupWait = _startupWaitDefault;
boolean _reconnectAllowed = true;
@ -155,6 +157,18 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
private String _keystoreCertImportPath;
private String hostname;
private static String getLinkLog(final Link link) {
if (link == null) {
return "";
}
StringBuilder str = new StringBuilder();
if (s_logger.isTraceEnabled()) {
str.append(System.identityHashCode(link)).append("-");
}
str.append(link.getSocketAddress());
return str.toString();
}
// for simulator use only
public Agent(final IAgentShell shell) {
_shell = shell;
@ -342,8 +356,8 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
_resource = null;
}
if (_startup != null) {
_startup = null;
if (startupTask.get() != null) {
startupTask.set(null);
}
if (_ugentTaskPool != null) {
@ -452,9 +466,21 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
_shell.setPersistentProperty(null, "pod", "");
}
public synchronized void lockStartupTask(final Link link) {
_startup = new StartupTask(link);
_timer.schedule(_startup, _startupWait);
public void lockStartupTask(final Link link) {
if (s_logger.isTraceEnabled()) {
s_logger.info(String.format("Creating startup task - %s", getLinkLog(link)));
}
StartupTask currentTask = startupTask.get();
if (currentTask != null) {
s_logger.warn("A Startup task is already locked or in progress.");
return;
}
currentTask = new StartupTask(link, this);
if (startupTask.compareAndSet(null, currentTask)) {
_timer.schedule(currentTask, _startupWait);
return;
}
s_logger.warn("Failed to lock a StartupTask!");
}
public void sendStartup(final Link link) {
@ -477,6 +503,9 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
try {
link.send(request.toBytes());
} catch (final ClosedChannelException e) {
if (s_logger.isTraceEnabled()) {
s_logger.trace(String.format("Unable to send request to %s", getLinkLog(link)));
}
s_logger.warn("Unable to send request: " + request.toString());
}
@ -522,15 +551,23 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
return new ServerHandler(type, link, data);
}
protected void reconnectIfNeeded(final Link link) {
if (ObjectUtils.allNotNull(link, _link) && _link != link) {
s_logger.debug(String.format("Ignoring reconnect as agent link %s is different from given link %s",
getLinkLog(link), getLinkLog(_link)));
return;
}
reconnect(link);
}
protected void reconnect(final Link link) {
if (!_reconnectAllowed) {
return;
}
synchronized (this) {
if (_startup != null) {
_startup.cancel();
_startup = null;
}
StartupTask task = startupTask.getAndSet(null);
if (task != null) {
task.cancel();
}
if (link != null) {
@ -580,13 +617,11 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
public void processStartupAnswer(final Answer answer, final Response response, final Link link) {
boolean cancelled = false;
synchronized (this) {
if (_startup != null) {
_startup.cancel();
_startup = null;
} else {
cancelled = true;
}
StartupTask task = startupTask.getAndSet(null);
if (task != null) {
task.cancel();
} else {
cancelled = true;
}
final StartupAnswer startup = (StartupAnswer)answer;
if (!startup.getResult()) {
@ -596,7 +631,7 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
s_logger.trace(String.format("%s does not allow exit on failure, reconnecting",
_resource.getClass().getSimpleName()));
}
reconnect(link);
reconnectIfNeeded(link);
return;
}
System.exit(1);
@ -665,7 +700,7 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
s_logger.trace(String.format("%s does not allow exit on failure, reconnecting",
_resource.getClass().getSimpleName()));
}
reconnect(link);
reconnectIfNeeded(link);
return;
}
System.exit(1);
@ -1084,11 +1119,13 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
public class StartupTask extends ManagedContextTimerTask {
protected Link _link;
protected Agent agent;
protected volatile boolean cancelled = false;
public StartupTask(final Link link) {
public StartupTask(final Link link, final Agent agent) {
s_logger.debug("Startup task created");
_link = link;
this.agent = agent;
}
@Override
@ -1104,15 +1141,18 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
}
@Override
protected synchronized void runInContext() {
protected void runInContext() {
if (!cancelled) {
if (s_logger.isInfoEnabled()) {
s_logger.info("The startup command is now cancelled");
s_logger.info("The startup command is now cancelled. Attempting reconnect");
}
cancelled = true;
_startup = null;
startupTask.set(null);
_startupWait = _startupWaitDefault * 2;
reconnect(_link);
if (s_logger.isTraceEnabled()) {
s_logger.info(String.format("Executing reconnect from task - %s", getLinkLog(_link)));
}
reconnectIfNeeded(_link);
}
}
}
@ -1164,8 +1204,10 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
s_logger.error("Error parsing task", e);
}
} else if (task.getType() == Task.Type.DISCONNECT) {
reconnect(task.getLink());
return;
if (s_logger.isDebugEnabled()) {
s_logger.info(String.format("Executing disconnect task - %s", getLinkLog(task.getLink())));
}
reconnectIfNeeded(task.getLink());
} else if (task.getType() == Task.Type.OTHER) {
processOtherTask(task);
}
@ -1237,32 +1279,31 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
}
final String preferredHost = msList[0];
final String connectedHost = _shell.getConnectedHost();
if (s_logger.isTraceEnabled()) {
s_logger.trace("Running preferred host checker task, connected host=" + connectedHost + ", preferred host=" + preferredHost);
if (s_logger.isDebugEnabled()) {
s_logger.debug("Running preferred host checker task, connected host=" + connectedHost + ", preferred host=" + preferredHost);
}
if (preferredHost != null && !preferredHost.equals(connectedHost) && _link != null) {
boolean isHostUp = true;
try (final Socket socket = new Socket()) {
socket.connect(new InetSocketAddress(preferredHost, _shell.getPort()), 5000);
} catch (final IOException e) {
isHostUp = false;
if (s_logger.isTraceEnabled()) {
s_logger.trace("Host: " + preferredHost + " is not reachable");
}
if (preferredHost == null || preferredHost.equals(connectedHost) || _link == null) {
return;
}
boolean isHostUp = true;
try (final Socket socket = new Socket()) {
socket.connect(new InetSocketAddress(preferredHost, _shell.getPort()), 5000);
} catch (final IOException e) {
isHostUp = false;
if (s_logger.isDebugEnabled()) {
s_logger.debug("Host: " + preferredHost + " is not reachable");
}
if (isHostUp && _link != null && _inProgress.get() == 0) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Preferred host " + preferredHost + " is found to be reachable, trying to reconnect");
}
_shell.resetHostCounter();
reconnect(_link);
}
if (isHostUp && _link != null && _inProgress.get() == 0) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Preferred host " + preferredHost + " is found to be reachable, trying to reconnect");
}
_shell.resetHostCounter();
reconnect(_link);
}
} catch (Throwable t) {
s_logger.error("Error caught while attempting to connect to preferred host", t);
}
}
}
}