New feature: Reconcile commands (CopyCommand, MigrateCommand, MigrateVolumeCommand) (#10514)

This commit is contained in:
Wei Zhou 2025-05-02 09:15:03 +02:00 committed by GitHub
parent d7d9d131b2
commit fd74895ad0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
70 changed files with 3769 additions and 90 deletions

View File

@ -800,6 +800,9 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
}
commandsInProgress.incrementAndGet();
try {
if (cmd.isReconcile()) {
cmd.setRequestSequence(request.getSequence());
}
answer = serverResource.executeRequest(cmd);
} finally {
commandsInProgress.decrementAndGet();
@ -1021,6 +1024,8 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
if ((answer.isSendStartup()) && reconnectAllowed) {
logger.info("Management server requested startup command to reinitialize the agent");
sendStartup(link);
} else {
serverResource.processPingAnswer((PingAnswer) answer);
}
shell.setAvoidHosts(answer.getAvoidMsList());
}
@ -1087,6 +1092,9 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
Answer answer = null;
commandsInProgress.incrementAndGet();
try {
if (command.isReconcile()) {
command.setRequestSequence(req.getSequence());
}
answer = serverResource.executeRequest(command);
} finally {
commandsInProgress.decrementAndGet();

View File

@ -35,6 +35,23 @@ public abstract class Command {
Continue, Stop
}
public enum State {
CREATED, // Command is created by management server
STARTED, // Command is started by agent
PROCESSING, // Processing by agent
PROCESSING_IN_BACKEND, // Processing in backend by agent
COMPLETED, // Operation succeeds by agent or management server
FAILED, // Operation fails by agent
RECONCILE_RETRY, // Ready for retry of reconciliation
RECONCILING, // Being reconciled by management server
RECONCILED, // Reconciled by management server
RECONCILE_SKIPPED, // Skip the reconciliation as the resource state is inconsistent with the command
RECONCILE_FAILED, // Fail to reconcile by management server
TIMED_OUT, // Timed out on management server or agent
INTERRUPTED, // Interrupted by management server or agent (for example agent is restarted),
DANGLED_IN_BACKEND // Backend process which cannot be processed normally (for example agent is restarted)
}
public static final String HYPERVISOR_TYPE = "hypervisorType";
// allow command to carry over hypervisor or other environment related context info
@ -42,6 +59,7 @@ public abstract class Command {
protected Map<String, String> contextMap = new HashMap<String, String>();
private int wait; //in second
private boolean bypassHostMaintenance = false;
private transient long requestSequence = 0L;
protected Command() {
this.wait = 0;
@ -82,6 +100,10 @@ public abstract class Command {
return contextMap.get(name);
}
public Map<String, String> getContextMap() {
return contextMap;
}
public boolean allowCaching() {
return true;
}
@ -94,6 +116,18 @@ public abstract class Command {
this.bypassHostMaintenance = bypassHostMaintenance;
}
public boolean isReconcile() {
return false;
}
public long getRequestSequence() {
return requestSequence;
}
public void setRequestSequence(long requestSequence) {
this.requestSequence = requestSequence;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -46,7 +46,7 @@ public class DiskTO {
private Long diskSeq;
private String path;
private Volume.Type type;
private Map<String, String> _details;
private Map<String, String> details;
public DiskTO() {
@ -92,10 +92,10 @@ public class DiskTO {
}
public void setDetails(Map<String, String> details) {
_details = details;
this.details = details;
}
public Map<String, String> getDetails() {
return _details;
return details;
}
}

View File

@ -36,7 +36,7 @@ public class NetworkTO {
protected TrafficType type;
protected URI broadcastUri;
protected URI isolationUri;
protected boolean isSecurityGroupEnabled;
protected boolean securityGroupEnabled;
protected String name;
protected String ip6address;
protected String ip6gateway;
@ -112,7 +112,7 @@ public class NetworkTO {
}
public void setSecurityGroupEnabled(boolean enabled) {
this.isSecurityGroupEnabled = enabled;
this.securityGroupEnabled = enabled;
}
/**
@ -221,7 +221,7 @@ public class NetworkTO {
}
public boolean isSecurityGroupEnabled() {
return this.isSecurityGroupEnabled;
return this.securityGroupEnabled;
}
public void setIp6Dns1(String ip6Dns1) {

View File

@ -86,6 +86,14 @@ public class NicTO extends NetworkTO {
this.nicUuid = uuid;
}
public String getNicUuid() {
return nicUuid;
}
public void setNicUuid(String nicUuid) {
this.nicUuid = nicUuid;
}
@Override
public String toString() {
return new StringBuilder("[Nic:").append(type).append("-").append(ip).append("-").append(broadcastUri).append("]").toString();

View File

@ -61,7 +61,7 @@ public class VirtualMachineTO {
@LogLevel(LogLevel.Log4jLevel.Off)
String vncPassword;
String vncAddr;
Map<String, String> params;
Map<String, String> details;
String uuid;
String bootType;
String bootMode;
@ -191,7 +191,11 @@ public class VirtualMachineTO {
return maxSpeed;
}
public boolean getLimitCpuUse() {
public boolean isEnableHA() {
return enableHA;
}
public boolean isLimitCpuUse() {
return limitCpuUse;
}
@ -289,11 +293,11 @@ public class VirtualMachineTO {
}
public Map<String, String> getDetails() {
return params;
return details;
}
public void setDetails(Map<String, String> params) {
this.params = params;
this.details = params;
}
public String getUuid() {

View File

@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.command;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.hypervisor.Hypervisor;
import org.apache.cloudstack.api.ApiCommandResourceType;
import org.apache.cloudstack.framework.config.ConfigKey;
import java.util.Arrays;
import java.util.List;
public interface ReconcileCommandService {
ConfigKey<Boolean> ReconcileCommandsEnabled = new ConfigKey<>("Advanced", Boolean.class,
"reconcile.commands.enabled", "false",
"Indicates whether the background task to reconcile the commands is enabled or not",
false);
ConfigKey<Integer> ReconcileCommandsInterval = new ConfigKey<>("Advanced", Integer.class,
"reconcile.commands.interval", "60",
"Interval (in seconds) for the background task to reconcile the commands",
false);
ConfigKey<Integer> ReconcileCommandsMaxAttempts = new ConfigKey<>("Advanced", Integer.class,
"reconcile.commands.max.attempts", "30",
"The maximum number of attempts to reconcile the commands",
true);
ConfigKey<Integer> ReconcileCommandsWorkers = new ConfigKey<>("Advanced", Integer.class,
"reconcile.commands.workers", "100",
"The Number of worker threads to reconcile the commands",
false);
List<Hypervisor.HypervisorType> SupportedHypervisorTypes = Arrays.asList(Hypervisor.HypervisorType.KVM);
void persistReconcileCommands(Long hostId, Long requestSequence, Command[] cmd);
boolean updateReconcileCommand(long requestSeq, Command command, Answer answer, Command.State newStateByManagement, Command.State newStateByAgent);
void processCommand(Command pingCommand, Answer pingAnswer);
void processAnswers(long requestSeq, Command[] commands, Answer[] answers);
void updateReconcileCommandToInterruptedByManagementServerId(long managementServerId);
void updateReconcileCommandToInterruptedByHostId(long hostId);
boolean isReconcileResourceNeeded(long resourceId, ApiCommandResourceType resourceType);
}

View File

@ -29,14 +29,14 @@ import com.cloud.agent.api.to.VirtualMachineTO;
public class MigrateCommand extends Command {
private String vmName;
private String destIp;
private String destinationIp;
private Map<String, MigrateDiskInfo> migrateStorage;
private boolean migrateStorageManaged;
private boolean migrateNonSharedInc;
private boolean autoConvergence;
private String hostGuid;
private boolean isWindows;
private VirtualMachineTO vmTO;
private boolean windows;
private VirtualMachineTO virtualMachine;
private boolean executeInSequence = false;
private List<MigrateDiskInfo> migrateDiskInfoList = new ArrayList<>();
private Map<String, DpdkTO> dpdkInterfaceMapping = new HashMap<>();
@ -64,11 +64,11 @@ public class MigrateCommand extends Command {
protected MigrateCommand() {
}
public MigrateCommand(String vmName, String destIp, boolean isWindows, VirtualMachineTO vmTO, boolean executeInSequence) {
public MigrateCommand(String vmName, String destinationIp, boolean windows, VirtualMachineTO virtualMachine, boolean executeInSequence) {
this.vmName = vmName;
this.destIp = destIp;
this.isWindows = isWindows;
this.vmTO = vmTO;
this.destinationIp = destinationIp;
this.windows = windows;
this.virtualMachine = virtualMachine;
this.executeInSequence = executeInSequence;
}
@ -105,15 +105,15 @@ public class MigrateCommand extends Command {
}
public boolean isWindows() {
return isWindows;
return windows;
}
public VirtualMachineTO getVirtualMachine() {
return vmTO;
return virtualMachine;
}
public String getDestinationIp() {
return destIp;
return destinationIp;
}
public String getVmName() {
@ -233,4 +233,9 @@ public class MigrateCommand extends Command {
this.isSourceDiskOnStorageFileSystem = isDiskOnFileSystemStorage;
}
}
@Override
public boolean isReconcile() {
return true;
}
}

View File

@ -19,6 +19,7 @@
package com.cloud.agent.api;
import java.util.ArrayList;
import java.util.List;
public class PingAnswer extends Answer {
@ -27,6 +28,8 @@ public class PingAnswer extends Answer {
private boolean sendStartup = false;
private List<String> avoidMsList;
private List<String> reconcileCommands = new ArrayList<>();
protected PingAnswer() {
}
@ -49,6 +52,18 @@ public class PingAnswer extends Answer {
this.sendStartup = sendStartup;
}
public List<String> getReconcileCommands() {
return reconcileCommands;
}
public void setReconcileCommands(List<String> reconcileCommands) {
this.reconcileCommands = reconcileCommands;
}
public void addReconcileCommand(String reconcileCommand) {
this.reconcileCommands.add(reconcileCommand);
}
public List<String> getAvoidMsList() {
return avoidMsList;
}

View File

@ -20,11 +20,14 @@
package com.cloud.agent.api;
import com.cloud.host.Host;
import org.apache.cloudstack.command.CommandInfo;
public class PingCommand extends Command {
Host.Type hostType;
long hostId;
boolean outOfBand;
@LogLevel(LogLevel.Log4jLevel.Trace)
private CommandInfo[] commandInfos = new CommandInfo[] {};
protected PingCommand() {
}
@ -78,4 +81,12 @@ public class PingCommand extends Command {
result = 31 * result + (int) (hostId ^ (hostId >>> 32));
return result;
}
public CommandInfo[] getCommandInfos() {
return commandInfos;
}
public void setCommandInfos(CommandInfo[] commandInfos) {
this.commandInfos = commandInfos;
}
}

View File

@ -145,4 +145,9 @@ public class MigrateVolumeCommand extends Command {
}
public String getChainInfo() { return chainInfo; }
@Override
public boolean isReconcile() {
return true;
}
}

View File

@ -22,6 +22,7 @@ package com.cloud.resource;
import com.cloud.agent.IAgentControl;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.PingAnswer;
import com.cloud.agent.api.PingCommand;
import com.cloud.agent.api.StartupCommand;
import com.cloud.host.Host;
@ -90,4 +91,5 @@ public interface ServerResource extends Manager {
return false;
}
default void processPingAnswer(PingAnswer answer) {};
}

View File

@ -62,7 +62,7 @@ public class GsonHelper {
LOGGER.info("Default Builder inited.");
}
static Gson setDefaultGsonConfig(GsonBuilder builder) {
public static Gson setDefaultGsonConfig(GsonBuilder builder) {
builder.setVersion(1.5);
InterfaceTypeAdaptor<DataStoreTO> dsAdaptor = new InterfaceTypeAdaptor<DataStoreTO>();
builder.registerTypeAdapter(DataStoreTO.class, dsAdaptor);

View File

@ -0,0 +1,124 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.command;
import com.cloud.agent.api.Command;
import com.cloud.serializer.GsonHelper;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.util.Date;
public class CommandInfo {
public static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSSZ";
public static final Gson GSON = GsonHelper.setDefaultGsonConfig(new GsonBuilder().setDateFormat(DATE_FORMAT));
long requestSeq;
Command.State state;
Date startTime;
Date updateTime;
String commandName;
String command;
int timeout;
String answerName;
String answer;
public CommandInfo() {
}
public CommandInfo(long requestSeq, Command command, Command.State state) {
this.requestSeq = requestSeq;
this.state = state;
this.startTime = this.updateTime = new Date();
this.commandName = command.getClass().getName();
this.command = GSON.toJson(command);
this.timeout = command.getWait();
}
public long getRequestSeq() {
return requestSeq;
}
public void setRequestSeq(long requestSeq) {
this.requestSeq = requestSeq;
}
public Command.State getState() {
return state;
}
public void setState(Command.State state) {
this.state = state;
this.updateTime = new Date();
}
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
public String getCommandName() {
return commandName;
}
public void setCommandName(String commandName) {
this.commandName = commandName;
}
public String getCommand() {
return command;
}
public void setCommand(String command) {
this.command = command;
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public String getAnswerName() {
return answerName;
}
public void setAnswerName(String answerName) {
this.answerName = answerName;
}
public String getAnswer() {
return answer;
}
public void setAnswer(String answer) {
this.answer = answer;
}
}

View File

@ -0,0 +1,45 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package org.apache.cloudstack.command;
import com.cloud.agent.api.Answer;
import org.apache.cloudstack.api.ApiCommandResourceType;
public class ReconcileAnswer extends Answer {
ApiCommandResourceType resourceType;
Long resourceId;
public ApiCommandResourceType getResourceType() {
return resourceType;
}
public void setResourceType(ApiCommandResourceType resourceType) {
this.resourceType = resourceType;
}
public Long getResourceId() {
return resourceId;
}
public void setResourceId(Long resourceId) {
this.resourceId = resourceId;
}
}

View File

@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.command;
import com.cloud.agent.api.Command;
public class ReconcileCommand extends Command {
@Override
public boolean executeInSequence() {
return false;
}
@Override
public int getWait() {
return 30; // timeout is 30 seconds
}
}

View File

@ -0,0 +1,192 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.command;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.JsonSyntaxException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
public class ReconcileCommandUtils {
protected static final Logger LOGGER = LogManager.getLogger(ReconcileCommandUtils.class.getName());
public static void createLogFileForCommand(final String logPath, final Command cmd) {
updateLogFileForCommand(logPath, cmd, Command.State.CREATED);
}
public static void updateLogFileForCommand(final String logPath, final Command cmd, final Command.State state) {
if (cmd.isReconcile()) {
String logFileName = getLogFileNameForCommand(logPath, cmd);
LOGGER.debug(String.format("Updating log file %s with %s state", logFileName, state));
File logFile = new File(logFileName);
CommandInfo commandInfo = null;
if (logFile.exists()) {
commandInfo = readLogFileForCommand(logFileName);
logFile.delete();
}
if (commandInfo == null) {
commandInfo = new CommandInfo(cmd.getRequestSequence(), cmd, state);
} else {
commandInfo.setState(state);
}
try {
BufferedWriter writer = new BufferedWriter(new FileWriter(logFile));
writer.write(CommandInfo.GSON.toJson(commandInfo));
writer.close();
} catch (IOException e) {
LOGGER.error(String.format("Failed to write log file %s", logFile));
}
}
}
public static void updateLogFileForCommand(final String logFullPath, final Command.State state) {
File logFile = new File(logFullPath);
LOGGER.debug(String.format("Updating log file %s with %s state", logFile.getName(), state));
if (!logFile.exists()) {
return;
}
CommandInfo commandInfo = readLogFileForCommand(logFullPath);
if (commandInfo != null) {
commandInfo.setState(state);
}
logFile.delete();
try {
BufferedWriter writer = new BufferedWriter(new FileWriter(logFile));
writer.write(CommandInfo.GSON.toJson(commandInfo));
writer.close();
} catch (IOException e) {
LOGGER.error(String.format("Failed to write log file %s", logFile));
}
}
public static void deleteLogFileForCommand(final String logPath, final Command cmd) {
if (cmd.isReconcile()) {
File logFile = new File(getLogFileNameForCommand(logPath, cmd));
LOGGER.debug(String.format("Removing log file %s", logFile.getName()));
if (logFile.exists()) {
logFile.delete();
}
}
}
public static void deleteLogFile(final String logFullPath) {
File logFile = new File(logFullPath);
LOGGER.debug(String.format("Removing log file %s ", logFile.getName()));
if (logFile.exists()) {
logFile.delete();
}
}
public static String getLogFileNameForCommand(final String logPath, final Command cmd) {
return String.format("%s/%s-%s.json", logPath, cmd.getRequestSequence(), cmd);
}
public static CommandInfo readLogFileForCommand(final String logFullPath) {
try {
ObjectMapper objectMapper = new ObjectMapper();
SimpleDateFormat df = new SimpleDateFormat(CommandInfo.DATE_FORMAT);
objectMapper.setDateFormat(df);
return objectMapper.readValue(new File(logFullPath), CommandInfo.class);
} catch (IOException e) {
LOGGER.error(String.format("Failed to read log file %s: %s", logFullPath, e.getMessage()));
return null;
}
}
public static Command parseCommandInfo(final CommandInfo commandInfo) {
if (commandInfo.getCommandName() == null || commandInfo.getCommand() == null) {
return null;
}
return parseCommandInfo(commandInfo.getCommandName(), commandInfo.getCommand());
}
public static Command parseCommandInfo(final String commandName, final String commandInfo) {
Object parsedObject = null;
try {
Class<?> commandClazz = Class.forName(commandName);
parsedObject = CommandInfo.GSON.fromJson(commandInfo, commandClazz);
} catch (ClassNotFoundException | JsonSyntaxException e) {
LOGGER.error(String.format("Failed to parse command from CommandInfo %s due to %s", commandInfo, e.getMessage()));
}
if (parsedObject != null) {
return (Command) parsedObject;
}
return null;
}
public static Answer parseAnswerFromCommandInfo(final CommandInfo commandInfo) {
if (commandInfo.getAnswerName() == null || commandInfo.getAnswer() == null) {
return null;
}
return parseAnswerFromAnswerInfo(commandInfo.getAnswerName(), commandInfo.getAnswer());
}
public static Answer parseAnswerFromAnswerInfo(final String answerName, final String answerInfo) {
Object parsedObject = null;
try {
Class<?> commandClazz = Class.forName(answerName);
parsedObject = CommandInfo.GSON.fromJson(answerInfo, commandClazz);
} catch (ClassNotFoundException | JsonSyntaxException e) {
LOGGER.error(String.format("Failed to parse answer from answerInfo %s due to %s", answerInfo, e.getMessage()));
}
if (parsedObject != null) {
return (Answer) parsedObject;
}
return null;
}
public static void updateLogFileWithAnswerForCommand(final String logPath, final Command cmd, final Answer answer) {
if (cmd.isReconcile()) {
String logFileName = getLogFileNameForCommand(logPath, cmd);
LOGGER.debug(String.format("Updating log file %s with answer %s", logFileName, answer));
File logFile = new File(logFileName);
if (!logFile.exists()) {
return;
}
CommandInfo commandInfo = readLogFileForCommand(logFile.getAbsolutePath());
if (commandInfo == null) {
return;
}
if (Command.State.STARTED.equals(commandInfo.getState())) {
if (answer.getResult()) {
commandInfo.setState(Command.State.COMPLETED);
} else {
commandInfo.setState(Command.State.FAILED);
}
}
commandInfo.setAnswerName(answer.toString());
commandInfo.setAnswer(CommandInfo.GSON.toJson(answer));
try {
BufferedWriter writer = new BufferedWriter(new FileWriter(logFile));
writer.write(CommandInfo.GSON.toJson(commandInfo));
writer.close();
} catch (IOException e) {
LOGGER.error(String.format("Failed to write log file %s", logFile));
}
}
}
}

View File

@ -0,0 +1,56 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package org.apache.cloudstack.command;
import org.apache.cloudstack.storage.volume.VolumeOnStorageTO;
public class ReconcileCopyAnswer extends ReconcileVolumeAnswer {
boolean isSkipped = false;
String reason;
public ReconcileCopyAnswer(boolean isSkipped, String reason) {
super();
this.isSkipped = isSkipped;
this.reason = reason;
}
public ReconcileCopyAnswer(boolean isSkipped, boolean result, String reason) {
super();
this.isSkipped = isSkipped;
this.result = result;
this.reason = reason;
}
public ReconcileCopyAnswer(VolumeOnStorageTO volumeOnSource, VolumeOnStorageTO volumeOnDestination) {
this.isSkipped = false;
this.result = true;
this.volumeOnSource = volumeOnSource;
this.volumeOnDestination = volumeOnDestination;
}
public boolean isSkipped() {
return isSkipped;
}
public String getReason() {
return reason;
}
}

View File

@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.command;
import com.cloud.agent.api.to.DataTO;
import java.util.Map;
public class ReconcileCopyCommand extends ReconcileCommand {
DataTO srcData;
DataTO destData;
Map<String, String> option; // details of source volume
Map<String, String> option2; // details of destination volume
public ReconcileCopyCommand(DataTO srcData, DataTO destData, Map<String, String> option, Map<String, String> option2) {
this.srcData = srcData;
this.destData = destData;
this.option = option;
this.option2 = option2;
}
public DataTO getSrcData() {
return srcData;
}
public DataTO getDestData() {
return destData;
}
public Map<String, String> getOption() {
return option;
}
public Map<String, String> getOption2() {
return option2;
}
}

View File

@ -0,0 +1,68 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package org.apache.cloudstack.command;
import com.cloud.vm.VirtualMachine;
import java.util.List;
public class ReconcileMigrateAnswer extends ReconcileAnswer {
Long hostId;
String vmName;
VirtualMachine.State vmState;
List<String> vmDisks;
public ReconcileMigrateAnswer() {
}
public ReconcileMigrateAnswer(String vmName, VirtualMachine.State vmState) {
this.vmName = vmName;
this.vmState = vmState;
}
public Long getHostId() {
return hostId;
}
public void setHostId(Long hostId) {
this.hostId = hostId;
}
public String getVmName() {
return vmName;
}
public VirtualMachine.State getVmState() {
return vmState;
}
public void setVmState(VirtualMachine.State vmState) {
this.vmState = vmState;
}
public List<String> getVmDisks() {
return vmDisks;
}
public void setVmDisks(List<String> vmDisks) {
this.vmDisks = vmDisks;
}
}

View File

@ -0,0 +1,31 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.command;
public class ReconcileMigrateCommand extends ReconcileCommand {
String vmName;
public ReconcileMigrateCommand(String vmName) {
this.vmName = vmName;
}
public String getVmName() {
return vmName;
}
}

View File

@ -0,0 +1,50 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package org.apache.cloudstack.command;
import org.apache.cloudstack.storage.volume.VolumeOnStorageTO;
import java.util.List;
public class ReconcileMigrateVolumeAnswer extends ReconcileVolumeAnswer {
String vmName;
List<String> vmDiskPaths;
public ReconcileMigrateVolumeAnswer(VolumeOnStorageTO volumeOnSource, VolumeOnStorageTO volumeOnDestination) {
super(volumeOnSource, volumeOnDestination);
}
public String getVmName() {
return vmName;
}
public void setVmName(String vmName) {
this.vmName = vmName;
}
public List<String> getVmDiskPaths() {
return vmDiskPaths;
}
public void setVmDiskPaths(List<String> vmDiskPaths) {
this.vmDiskPaths = vmDiskPaths;
}
}

View File

@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.command;
import com.cloud.agent.api.to.DataTO;
public class ReconcileMigrateVolumeCommand extends ReconcileCommand {
DataTO srcData;
DataTO destData;
String vmName;
public ReconcileMigrateVolumeCommand(DataTO srcData, DataTO destData) {
this.srcData = srcData;
this.destData = destData;
}
public DataTO getSrcData() {
return srcData;
}
public DataTO getDestData() {
return destData;
}
public String getVmName() {
return vmName;
}
public void setVmName(String vmName) {
this.vmName = vmName;
}
}

View File

@ -0,0 +1,46 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package org.apache.cloudstack.command;
import org.apache.cloudstack.storage.volume.VolumeOnStorageTO;
public class ReconcileVolumeAnswer extends ReconcileAnswer {
// (1) null object: volume is not available. For example the source is secondary storage
// (2) otherwise, if volumeOnSource.getPath() is null, the volume cannot be found on primary storage pool
VolumeOnStorageTO volumeOnSource;
VolumeOnStorageTO volumeOnDestination;
public ReconcileVolumeAnswer() {
}
public ReconcileVolumeAnswer(VolumeOnStorageTO volumeOnSource, VolumeOnStorageTO volumeOnDestination) {
this.volumeOnSource = volumeOnSource;
this.volumeOnDestination = volumeOnDestination;
}
public VolumeOnStorageTO getVolumeOnSource() {
return volumeOnSource;
}
public VolumeOnStorageTO getVolumeOnDestination() {
return volumeOnDestination;
}
}

View File

@ -93,4 +93,9 @@ public class CopyCommand extends StorageSubSystemCommand {
public void setExecuteInSequence(final boolean inSeq) {
executeInSequence = inSeq;
}
@Override
public boolean isReconcile() {
return true;
}
}

View File

@ -25,8 +25,6 @@ import static org.junit.Assert.assertFalse;
import com.cloud.agent.api.CheckGuestOsMappingCommand;
import org.junit.Test;
import com.cloud.agent.api.AgentControlCommand;
public class CheckGuestOsMappingCommandTest {
@Test

View File

@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.command;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.MigrateCommand;
import com.cloud.agent.api.to.VirtualMachineTO;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
public class ReconcileCommandUtilsTest {
final static String COMMANDS_LOG_PATH = "/tmp";
@Test
public void createAndReadAndDeleteLogFilesForCommand() {
final String vmName = "TestVM";
final String destIp = "DestinationHost";
final VirtualMachineTO vmTO = Mockito.mock(VirtualMachineTO.class);
final MigrateCommand command = new MigrateCommand(vmName, destIp, false, vmTO, false);
long requestSequence = 10000L;
command.setRequestSequence(requestSequence);
String logFile = ReconcileCommandUtils.getLogFileNameForCommand(COMMANDS_LOG_PATH, command);
ReconcileCommandUtils.createLogFileForCommand(COMMANDS_LOG_PATH, command);
Assert.assertTrue((new File(logFile).exists()));
CommandInfo commandInfo = ReconcileCommandUtils.readLogFileForCommand(logFile);
Assert.assertNotNull(commandInfo);
Assert.assertEquals(command.getClass().getName(), commandInfo.getCommandName());
Assert.assertEquals(Command.State.CREATED, commandInfo.getState());
Command parseCommand = ReconcileCommandUtils.parseCommandInfo(commandInfo);
System.out.println("command state is " + commandInfo);
Assert.assertNotNull(parseCommand);
Assert.assertTrue(parseCommand instanceof MigrateCommand);
Assert.assertEquals(vmName,((MigrateCommand) parseCommand).getVmName());
Assert.assertEquals(destIp,((MigrateCommand) parseCommand).getDestinationIp());
ReconcileCommandUtils.updateLogFileForCommand(COMMANDS_LOG_PATH, command, Command.State.PROCESSING);
CommandInfo newCommandInfo = ReconcileCommandUtils.readLogFileForCommand(logFile);
System.out.println("new command state is " + newCommandInfo);
Assert.assertNotNull(newCommandInfo);
Assert.assertEquals(command.getClass().getName(), newCommandInfo.getCommandName());
Assert.assertEquals(Command.State.PROCESSING, newCommandInfo.getState());
ReconcileCommandUtils.deleteLogFileForCommand(COMMANDS_LOG_PATH, command);
Assert.assertFalse((new File(logFile).exists()));
}
}

View File

@ -32,7 +32,11 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.cloud.agent.api.CleanupPersistentNetworkResourceCommand;
import com.cloud.hypervisor.Hypervisor.HypervisorType;
import com.cloud.utils.Pair;
import com.cloud.utils.exception.CloudRuntimeException;
import org.apache.cloudstack.agent.lb.SetupMSListCommand;
import org.apache.cloudstack.command.ReconcileAnswer;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.utils.reflectiontostringbuilderutils.ReflectionToStringBuilderUtils;
import org.apache.logging.log4j.Logger;
@ -114,6 +118,7 @@ public abstract class AgentAttache {
protected final long _id;
protected String _uuid;
protected String _name = null;
protected HypervisorType _hypervisorType;
protected final ConcurrentHashMap<Long, Listener> _waitForList;
protected final LinkedList<Request> _requests;
protected Long _currentSequence;
@ -135,10 +140,11 @@ public abstract class AgentAttache {
Arrays.sort(s_commandsNotAllowedInConnectingMode);
}
protected AgentAttache(final AgentManagerImpl agentMgr, final long id, final String uuid, final String name, final boolean maintenance) {
protected AgentAttache(final AgentManagerImpl agentMgr, final long id, final String uuid, final String name, final HypervisorType hypervisorType, final boolean maintenance) {
_id = id;
_uuid = uuid;
_name = name;
_hypervisorType = hypervisorType;
_waitForList = new ConcurrentHashMap<Long, Listener>();
_currentSequence = null;
_maintenance = maintenance;
@ -261,6 +267,10 @@ public abstract class AgentAttache {
return _name;
}
public HypervisorType get_hypervisorType() {
return _hypervisorType;
}
public int getQueueSize() {
return _requests.size();
}
@ -406,11 +416,18 @@ public abstract class AgentAttache {
try {
for (int i = 0; i < 2; i++) {
Answer[] answers = null;
Command[] cmds = req.getCommands();
if (cmds != null && cmds.length == 1 && (cmds[0] != null) && cmds[0].isReconcile()
&& !sl.isDisconnected() && _agentMgr.isReconcileCommandsEnabled(_hypervisorType)) {
// only available if (1) the only command is a Reconcile command (2) agent is connected; (3) reconciliation is enabled; (4) hypervisor is KVM;
answers = waitForAnswerOfReconcileCommand(sl, seq, cmds[0], wait);
} else {
try {
answers = sl.waitFor(wait);
} catch (final InterruptedException e) {
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Interrupted");
}
}
if (answers != null) {
new Response(req, answers).logD("Received: ", false);
return answers;
@ -428,11 +445,13 @@ public abstract class AgentAttache {
if (current != null && seq != current) {
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Waited too long.");
_agentMgr.updateReconcileCommandsIfNeeded(req.getSequence(), req.getCommands(), Command.State.TIMED_OUT);
throw new OperationTimedoutException(req.getCommands(), _id, seq, wait, false);
}
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Waiting some more time because this is the current command");
}
_agentMgr.updateReconcileCommandsIfNeeded(req.getSequence(), req.getCommands(), Command.State.TIMED_OUT);
throw new OperationTimedoutException(req.getCommands(), _id, seq, wait * 2, true);
} catch (OperationTimedoutException e) {
logger.warn(LOG_SEQ_FORMATTED_STRING, seq, "Timed out on " + req.toString());
@ -449,12 +468,57 @@ public abstract class AgentAttache {
if (req.executeInSequence() && (current != null && current == seq)) {
sendNext(seq);
}
_agentMgr.updateReconcileCommandsIfNeeded(req.getSequence(), req.getCommands(), Command.State.TIMED_OUT);
throw new OperationTimedoutException(req.getCommands(), _id, seq, wait, false);
} finally {
unregisterListener(seq);
}
}
private Answer[] waitForAnswerOfReconcileCommand(SynchronousListener sl, final long seq, final Command command, final int wait) {
Answer[] answers = null;
int waitTimeLeft = wait;
while (waitTimeLeft > 0) {
int waitTime = Math.min(waitTimeLeft, _agentMgr.getReconcileInterval());
logger.debug(String.format("Waiting %s seconds for the answer of reconcile command %s-%s", waitTime, seq, command));
if (sl.isDisconnected()) {
logger.debug(String.format("Disconnected while waiting for the answer of reconcile command %s-%s", seq, command));
break;
}
try {
answers = sl.waitFor(waitTime);
} catch (final InterruptedException e) {
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Interrupted");
}
if (answers != null) {
break;
}
logger.debug(String.format("Getting the answer of reconcile command from cloudstack database for %s-%s", seq, command));
Pair<Command.State, Answer> commandInfo = _agentMgr.getStateAndAnswerOfReconcileCommand(seq, command);
if (commandInfo == null) {
logger.debug(String.format("Cannot get the answer of reconcile command from cloudstack database for %s-%s", seq, command));
continue;
}
Command.State state = commandInfo.first();
if (Command.State.INTERRUPTED.equals(state)) {
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Interrupted by agent, will reconcile it");
throw new CloudRuntimeException("Interrupted by agent");
} else if (Command.State.DANGLED_IN_BACKEND.equals(state)) {
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Dangling in backend, it seems the agent was restarted, will reconcile it");
throw new CloudRuntimeException("It is not being processed by agent");
}
Answer answer = commandInfo.second();
logger.debug(String.format("Got the answer of reconcile command from cloudstack database for %s-%s: %s", seq, command, answer));
if (answer != null && !(answer instanceof ReconcileAnswer)) {
answers = new Answer[] { answer };
break;
}
waitTimeLeft -= waitTime;
}
return answers;
}
protected synchronized void sendNext(final long seq) {
_currentSequence = null;
if (_requests.isEmpty()) {

View File

@ -43,6 +43,10 @@ import javax.naming.ConfigurationException;
import org.apache.cloudstack.agent.lb.IndirectAgentLB;
import org.apache.cloudstack.ca.CAManager;
import org.apache.cloudstack.command.ReconcileCommandService;
import org.apache.cloudstack.command.ReconcileCommandUtils;
import org.apache.cloudstack.command.ReconcileCommandVO;
import org.apache.cloudstack.command.dao.ReconcileCommandDao;
import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.Configurable;
@ -177,6 +181,10 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
protected HighAvailabilityManager _haMgr = null;
@Inject
protected AlertManager _alertMgr = null;
@Inject
protected ReconcileCommandService reconcileCommandService;
@Inject
ReconcileCommandDao reconcileCommandDao;
@Inject
protected HypervisorGuruManager _hvGuruMgr;
@ -207,6 +215,9 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
private final ConcurrentHashMap<String, Long> newAgentConnections = new ConcurrentHashMap<>();
protected ScheduledExecutorService newAgentConnectionsMonitor;
private boolean _reconcileCommandsEnabled = false;
private Integer _reconcileCommandInterval;
@Inject
ResourceManager _resourceMgr;
@Inject
@ -275,6 +286,9 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
initializeCommandTimeouts();
_reconcileCommandsEnabled = ReconcileCommandService.ReconcileCommandsEnabled.value();
_reconcileCommandInterval = ReconcileCommandService.ReconcileCommandsInterval.value();
return true;
}
@ -643,7 +657,13 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
final Request req = new Request(hostId, agent.getName(), _nodeId, cmds, commands.stopOnError(), true);
req.setSequence(agent.getNextSequence());
reconcileCommandService.persistReconcileCommands(hostId, req.getSequence(), cmds);
final Answer[] answers = agent.send(req, wait);
reconcileCommandService.processAnswers(req.getSequence(), cmds, answers);
notifyAnswersToMonitors(hostId, req.getSequence(), answers);
commands.setAnswers(answers);
return answers;
@ -940,7 +960,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
protected AgentAttache createAttacheForDirectConnect(final Host host, final ServerResource resource) {
logger.debug("create DirectAgentAttache for {}", host);
final DirectAgentAttache attache = new DirectAgentAttache(this, host.getId(), host.getUuid(), host.getName(), resource, host.isInMaintenanceStates());
final DirectAgentAttache attache = new DirectAgentAttache(this, host.getId(), host.getUuid(), host.getName(), host.getHypervisorType(), resource, host.isInMaintenanceStates());
AgentAttache old;
synchronized (_agents) {
@ -1123,6 +1143,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
host = _hostDao.findById(hostId); // Maybe the host magically reappeared?
if (host != null && host.getStatus() == Status.Down) {
_haMgr.scheduleRestartForVmsOnHost(host, true, HighAvailabilityManager.ReasonType.HostDown);
reconcileCommandService.updateReconcileCommandToInterruptedByHostId(hostId);
}
return true;
}
@ -1285,7 +1306,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
protected AgentAttache createAttacheForConnect(final HostVO host, final Link link) {
logger.debug("create ConnectedAgentAttache for {}", host);
final AgentAttache attache = new ConnectedAgentAttache(this, host.getId(), host.getUuid(), host.getName(), link, host.isInMaintenanceStates());
final AgentAttache attache = new ConnectedAgentAttache(this, host.getId(), host.getUuid(), host.getName(), host.getHypervisorType(), link, host.isInMaintenanceStates());
link.attach(attache);
AgentAttache old;
@ -1629,6 +1650,9 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
}
final List<String> avoidMsList = _mshostDao.listNonUpStateMsIPs();
answer = new PingAnswer((PingCommand)cmd, avoidMsList, requestStartupCommand);
// Add or update reconcile tasks
reconcileCommandService.processCommand(cmd, answer);
} else if (cmd instanceof ReadyAnswer) {
final HostVO host = _hostDao.findById(attache.getId());
if (host == null) {
@ -2082,6 +2106,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
params.put(Config.RouterAggregationCommandEachTimeout.toString(), _configDao.getValue(Config.RouterAggregationCommandEachTimeout.toString()));
params.put(Config.MigrateWait.toString(), _configDao.getValue(Config.MigrateWait.toString()));
params.put(NetworkOrchestrationService.TUNGSTEN_ENABLED.key(), String.valueOf(NetworkOrchestrationService.TUNGSTEN_ENABLED.valueIn(host.getDataCenterId())));
params.put(ReconcileCommandService.ReconcileCommandsEnabled.key(), String.valueOf(_reconcileCommandsEnabled));
try {
SetHostParamsCommand cmds = new SetHostParamsCommand(params);
@ -2168,4 +2193,36 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
private GlobalLock getHostJoinLock(Long hostId) {
return GlobalLock.getInternLock(String.format("%s-%s", "Host-Join", hostId));
}
public boolean isReconcileCommandsEnabled(HypervisorType hypervisorType) {
return _reconcileCommandsEnabled && ReconcileCommandService.SupportedHypervisorTypes.contains(hypervisorType);
}
public void updateReconcileCommandsIfNeeded(long requestSeq, Command[] commands, Command.State state) {
if (!_reconcileCommandsEnabled) {
return;
}
for (Command command: commands) {
if (command.isReconcile()) {
reconcileCommandService.updateReconcileCommand(requestSeq, command, null, state, null);
}
}
}
public Pair<Command.State, Answer> getStateAndAnswerOfReconcileCommand(long requestSeq, Command command) {
ReconcileCommandVO reconcileCommandVO = reconcileCommandDao.findCommand(requestSeq, command.toString());
if (reconcileCommandVO == null) {
return null;
}
Command.State state = reconcileCommandVO.getStateByAgent();
if (reconcileCommandVO.getAnswerName() == null || reconcileCommandVO.getAnswerInfo() == null) {
return new Pair<>(state, null);
}
Answer answer = ReconcileCommandUtils.parseAnswerFromAnswerInfo(reconcileCommandVO.getAnswerName(), reconcileCommandVO.getAnswerInfo());
return new Pair<>(state, answer);
}
public Integer getReconcileInterval() {
return _reconcileCommandInterval;
}
}

View File

@ -31,6 +31,7 @@ import com.cloud.agent.api.Command;
import com.cloud.agent.transport.Request;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.host.Status;
import com.cloud.hypervisor.Hypervisor;
import com.cloud.utils.nio.Link;
public class ClusteredAgentAttache extends ConnectedAgentAttache implements Routable {
@ -44,14 +45,14 @@ public class ClusteredAgentAttache extends ConnectedAgentAttache implements Rout
s_clusteredAgentMgr = agentMgr;
}
public ClusteredAgentAttache(final AgentManagerImpl agentMgr, final long id, final String uuid, final String name) {
super(agentMgr, id, uuid, name, null, false);
public ClusteredAgentAttache(final AgentManagerImpl agentMgr, final long id, final String uuid, final String name, final Hypervisor.HypervisorType hypervisorType) {
super(agentMgr, id, uuid, name, hypervisorType, null, false);
_forward = true;
_transferRequests = new LinkedList<Request>();
}
public ClusteredAgentAttache(final AgentManagerImpl agentMgr, final long id, final String uuid, final String name, final Link link, final boolean maintenance) {
super(agentMgr, id, uuid, name, link, maintenance);
public ClusteredAgentAttache(final AgentManagerImpl agentMgr, final long id, final String uuid, final String name, final Hypervisor.HypervisorType hypervisorType, final Link link, final boolean maintenance) {
super(agentMgr, id, uuid, name, hypervisorType, link, maintenance);
_forward = link == null;
_transferRequests = new LinkedList<Request>();
}

View File

@ -264,7 +264,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
protected AgentAttache createAttache(final HostVO host) {
logger.debug("create forwarding ClusteredAgentAttache for {}", host);
long id = host.getId();
final AgentAttache attache = new ClusteredAgentAttache(this, id, host.getUuid(), host.getName());
final AgentAttache attache = new ClusteredAgentAttache(this, id, host.getUuid(), host.getName(), host.getHypervisorType());
AgentAttache old;
synchronized (_agents) {
old = _agents.get(host.getId());
@ -280,7 +280,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
@Override
protected AgentAttache createAttacheForConnect(final HostVO host, final Link link) {
logger.debug("create ClusteredAgentAttache for {}", host);
final AgentAttache attache = new ClusteredAgentAttache(this, host.getId(), host.getUuid(), host.getName(), link, host.isInMaintenanceStates());
final AgentAttache attache = new ClusteredAgentAttache(this, host.getId(), host.getUuid(), host.getName(), host.getHypervisorType(), link, host.isInMaintenanceStates());
link.attach(attache);
AgentAttache old;
synchronized (_agents) {
@ -296,7 +296,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
@Override
protected AgentAttache createAttacheForDirectConnect(final Host host, final ServerResource resource) {
logger.debug("Create ClusteredDirectAgentAttache for {}.", host);
final DirectAgentAttache attache = new ClusteredDirectAgentAttache(this, host.getId(), host.getUuid(), host.getName(), _nodeId, resource, host.isInMaintenanceStates());
final DirectAgentAttache attache = new ClusteredDirectAgentAttache(this, host.getId(), host.getUuid(), host.getName(), host.getHypervisorType(), _nodeId, resource, host.isInMaintenanceStates());
AgentAttache old;
synchronized (_agents) {
old = _agents.get(host.getId());

View File

@ -20,14 +20,15 @@ import com.cloud.agent.transport.Request;
import com.cloud.agent.transport.Response;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.exception.UnsupportedVersionException;
import com.cloud.hypervisor.Hypervisor;
import com.cloud.resource.ServerResource;
import com.cloud.utils.exception.CloudRuntimeException;
public class ClusteredDirectAgentAttache extends DirectAgentAttache implements Routable {
private final long _nodeId;
public ClusteredDirectAgentAttache(ClusteredAgentManagerImpl agentMgr, long id, String uuid, String name, long mgmtId, ServerResource resource, boolean maintenance) {
super(agentMgr, id, uuid, name, resource, maintenance);
public ClusteredDirectAgentAttache(ClusteredAgentManagerImpl agentMgr, long id, String uuid, String name, final Hypervisor.HypervisorType hypervisorType, long mgmtId, ServerResource resource, boolean maintenance) {
super(agentMgr, id, uuid, name, hypervisorType, resource, maintenance);
_nodeId = mgmtId;
}

View File

@ -22,6 +22,7 @@ import java.nio.channels.ClosedChannelException;
import com.cloud.agent.transport.Request;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.host.Status;
import com.cloud.hypervisor.Hypervisor;
import com.cloud.utils.nio.Link;
/**
@ -31,8 +32,8 @@ public class ConnectedAgentAttache extends AgentAttache {
protected Link _link;
public ConnectedAgentAttache(final AgentManagerImpl agentMgr, final long id, final String uuid, final String name, final Link link, final boolean maintenance) {
super(agentMgr, id, uuid, name, maintenance);
public ConnectedAgentAttache(final AgentManagerImpl agentMgr, final long id, final String uuid, final String name, final Hypervisor.HypervisorType hypervisorType, final Link link, final boolean maintenance) {
super(agentMgr, id, uuid, name, hypervisorType, maintenance);
_link = link;
}

View File

@ -35,6 +35,7 @@ import com.cloud.agent.transport.Request;
import com.cloud.agent.transport.Response;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.host.Status;
import com.cloud.hypervisor.Hypervisor;
import com.cloud.resource.ServerResource;
import org.apache.logging.log4j.ThreadContext;
@ -51,8 +52,8 @@ public class DirectAgentAttache extends AgentAttache {
AtomicInteger _outstandingTaskCount;
AtomicInteger _outstandingCronTaskCount;
public DirectAgentAttache(AgentManagerImpl agentMgr, long id, String uuid,String name, ServerResource resource, boolean maintenance) {
super(agentMgr, id, uuid, name, maintenance);
public DirectAgentAttache(AgentManagerImpl agentMgr, long id, String uuid, String name, final Hypervisor.HypervisorType hypervisorType, ServerResource resource, boolean maintenance) {
super(agentMgr, id, uuid, name, hypervisorType, maintenance);
_resource = resource;
_outstandingTaskCount = new AtomicInteger(0);
_outstandingCronTaskCount = new AtomicInteger(0);

View File

@ -19,11 +19,12 @@ package com.cloud.agent.manager;
import com.cloud.agent.transport.Request;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.host.Status;
import com.cloud.hypervisor.Hypervisor;
public class DummyAttache extends AgentAttache {
public DummyAttache(AgentManagerImpl agentMgr, long id, String uuid, String name, boolean maintenance) {
super(agentMgr, id, uuid, name, maintenance);
public DummyAttache(AgentManagerImpl agentMgr, long id, String uuid, String name, final Hypervisor.HypervisorType hypervisorType, boolean maintenance) {
super(agentMgr, id, uuid, name, hypervisorType, maintenance);
}
@Override

View File

@ -2843,12 +2843,30 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
throw new CloudRuntimeException(details);
}
} catch (final OperationTimedoutException e) {
boolean success = false;
if (HypervisorType.KVM.equals(vm.getHypervisorType())) {
try {
final Answer answer = _agentMgr.send(vm.getHostId(), new CheckVirtualMachineCommand(vm.getInstanceName()));
if (answer != null && answer.getResult() && answer instanceof CheckVirtualMachineAnswer) {
final CheckVirtualMachineAnswer vmAnswer = (CheckVirtualMachineAnswer) answer;
if (VirtualMachine.PowerState.PowerOn.equals(vmAnswer.getState())) {
logger.info(String.format("Vm %s is found on destination host %s. Migration is successful", vm, vm.getHostId()));
success = true;
}
}
} catch (Exception ex) {
logger.error(String.format("Failed to get state of VM %s on destination host %s: %s", vm, vm.getHostId(), ex.getMessage()));
}
}
if (!success) {
if (e.isActive()) {
logger.warn("Active migration command so scheduling a restart for {}", vm, e);
_haMgr.scheduleRestart(vm, true);
}
throw new AgentUnavailableException("Operation timed out on migrating " + vm, dstHostId);
}
}
}
try {
if (!changeState(vm, VirtualMachine.Event.OperationSucceeded, dstHostId, work, Step.Started)) {

View File

@ -25,6 +25,7 @@ import com.cloud.exception.ConnectionException;
import com.cloud.host.HostVO;
import com.cloud.host.Status;
import com.cloud.host.dao.HostDao;
import com.cloud.hypervisor.Hypervisor;
import com.cloud.utils.Pair;
import org.junit.Assert;
import org.junit.Before;
@ -47,7 +48,7 @@ public class AgentManagerImplTest {
host = new HostVO("some-Uuid");
host.setDataCenterId(1L);
cmds = new StartupCommand[]{new StartupRoutingCommand()};
attache = new ConnectedAgentAttache(null, 1L, "uuid", "kvm-attache", null, false);
attache = new ConnectedAgentAttache(null, 1L, "uuid", "kvm-attache", Hypervisor.HypervisorType.KVM, null, false);
hostDao = Mockito.mock(HostDao.class);
storagePoolMonitor = Mockito.mock(Listener.class);

View File

@ -22,6 +22,7 @@ import static org.mockito.Mockito.mock;
import org.junit.Test;
import com.cloud.hypervisor.Hypervisor;
import com.cloud.utils.nio.Link;
public class ConnectedAgentAttacheTest {
@ -31,8 +32,8 @@ public class ConnectedAgentAttacheTest {
Link link = mock(Link.class);
ConnectedAgentAttache agentAttache1 = new ConnectedAgentAttache(null, 0, "uuid", null, link, false);
ConnectedAgentAttache agentAttache2 = new ConnectedAgentAttache(null, 0, "uuid", null, link, false);
ConnectedAgentAttache agentAttache1 = new ConnectedAgentAttache(null, 0, "uuid", null, Hypervisor.HypervisorType.KVM, link, false);
ConnectedAgentAttache agentAttache2 = new ConnectedAgentAttache(null, 0, "uuid", null, Hypervisor.HypervisorType.KVM,link, false);
assertTrue(agentAttache1.equals(agentAttache2));
}
@ -42,7 +43,7 @@ public class ConnectedAgentAttacheTest {
Link link = mock(Link.class);
ConnectedAgentAttache agentAttache1 = new ConnectedAgentAttache(null, 0, "uuid", null, link, false);
ConnectedAgentAttache agentAttache1 = new ConnectedAgentAttache(null, 0, "uuid", null, Hypervisor.HypervisorType.KVM, link, false);
assertFalse(agentAttache1.equals(null));
}
@ -53,8 +54,8 @@ public class ConnectedAgentAttacheTest {
Link link1 = mock(Link.class);
Link link2 = mock(Link.class);
ConnectedAgentAttache agentAttache1 = new ConnectedAgentAttache(null, 0, "uuid", null, link1, false);
ConnectedAgentAttache agentAttache2 = new ConnectedAgentAttache(null, 0, "uuid", null, link2, false);
ConnectedAgentAttache agentAttache1 = new ConnectedAgentAttache(null, 0, "uuid", null, Hypervisor.HypervisorType.KVM, link1, false);
ConnectedAgentAttache agentAttache2 = new ConnectedAgentAttache(null, 0, "uuid", null, Hypervisor.HypervisorType.KVM, link2, false);
assertFalse(agentAttache1.equals(agentAttache2));
}
@ -64,8 +65,8 @@ public class ConnectedAgentAttacheTest {
Link link1 = mock(Link.class);
ConnectedAgentAttache agentAttache1 = new ConnectedAgentAttache(null, 1, "uuid", null, link1, false);
ConnectedAgentAttache agentAttache2 = new ConnectedAgentAttache(null, 2, "uuid", null, link1, false);
ConnectedAgentAttache agentAttache1 = new ConnectedAgentAttache(null, 1, "uuid", null, Hypervisor.HypervisorType.KVM, link1, false);
ConnectedAgentAttache agentAttache2 = new ConnectedAgentAttache(null, 2, "uuid", null, Hypervisor.HypervisorType.KVM, link1, false);
assertFalse(agentAttache1.equals(agentAttache2));
}
@ -75,7 +76,7 @@ public class ConnectedAgentAttacheTest {
Link link1 = mock(Link.class);
ConnectedAgentAttache agentAttache1 = new ConnectedAgentAttache(null, 1, "uuid", null, link1, false);
ConnectedAgentAttache agentAttache1 = new ConnectedAgentAttache(null, 1, "uuid", null, Hypervisor.HypervisorType.KVM, link1, false);
assertFalse(agentAttache1.equals("abc"));
}

View File

@ -24,6 +24,7 @@ import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.MockitoJUnitRunner;
import com.cloud.hypervisor.Hypervisor;
import com.cloud.resource.ServerResource;
import java.util.UUID;
@ -42,7 +43,7 @@ public class DirectAgentAttacheTest {
@Before
public void setup() {
directAgentAttache = new DirectAgentAttache(_agentMgr, _id, _uuid, "myDirectAgentAttache", _resource, false);
directAgentAttache = new DirectAgentAttache(_agentMgr, _id, _uuid, "myDirectAgentAttache", Hypervisor.HypervisorType.KVM, _resource, false);
MockitoAnnotations.initMocks(directAgentAttache);
}

View File

@ -162,4 +162,6 @@ public interface VolumeDao extends GenericDao<VolumeVO, Long>, StateDao<Volume.S
List<VolumeVO> searchRemovedByVms(List<Long> vmIds, Long batchSize);
VolumeVO findOneByIScsiName(String iScsiName);
VolumeVO findByLastIdAndState(long lastVolumeId, Volume.State...states);
}

View File

@ -49,6 +49,7 @@ import com.cloud.utils.db.DB;
import com.cloud.utils.db.Filter;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.GenericSearchBuilder;
import com.cloud.utils.db.QueryBuilder;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.SearchCriteria.Func;
@ -397,6 +398,7 @@ public class VolumeDaoImpl extends GenericDaoBase<VolumeVO, Long> implements Vol
AllFieldsSearch.and("name", AllFieldsSearch.entity().getName(), Op.EQ);
AllFieldsSearch.and("passphraseId", AllFieldsSearch.entity().getPassphraseId(), Op.EQ);
AllFieldsSearch.and("iScsiName", AllFieldsSearch.entity().get_iScsiName(), Op.EQ);
AllFieldsSearch.and("path", AllFieldsSearch.entity().getPath(), Op.EQ);
AllFieldsSearch.done();
RootDiskStateSearch = createSearchBuilder();
@ -904,9 +906,18 @@ public class VolumeDaoImpl extends GenericDaoBase<VolumeVO, Long> implements Vol
return searchIncludingRemoved(sc, filter, null, false);
}
@Override
public VolumeVO findOneByIScsiName(String iScsiName) {
SearchCriteria<VolumeVO> sc = AllFieldsSearch.create();
sc.setParameters("iScsiName", iScsiName);
return findOneIncludingRemovedBy(sc);
}
@Override
public VolumeVO findByLastIdAndState(long lastVolumeId, State ...states) {
QueryBuilder<VolumeVO> sc = QueryBuilder.create(VolumeVO.class);
sc.and(sc.entity().getLastId(), SearchCriteria.Op.EQ, lastVolumeId);
sc.and(sc.entity().getState(), SearchCriteria.Op.IN, (Object[]) states);
return sc.find();
}
}

View File

@ -0,0 +1,216 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.command;
import java.util.Date;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
import javax.persistence.Temporal;
import javax.persistence.TemporalType;
import com.cloud.agent.api.Command;
import com.cloud.utils.db.GenericDao;
import org.apache.cloudstack.acl.InfrastructureEntity;
import org.apache.cloudstack.api.ApiCommandResourceType;
import org.apache.cloudstack.api.InternalIdentity;
@Entity
@Table(name = "reconcile_commands")
public class ReconcileCommandVO implements InfrastructureEntity, InternalIdentity {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "id")
private long id;
@Column(name = "management_server_id")
private long managementServerId;
@Column(name = "host_id")
private long hostId;
@Column(name = "request_sequence")
private long requestSequence;
@Column(name = "resource_id")
private long resourceId;
@Column(name = "resource_type")
private ApiCommandResourceType resourceType;
@Column(name = "state_by_management")
private Command.State stateByManagement;
@Column(name = "state_by_agent")
private Command.State stateByAgent;
@Column(name = "command_name")
private String commandName;
@Column(name = "command_info", length = 65535)
private String commandInfo;
@Column(name = "answer_name")
private String answerName;
@Column(name = "answer_info", length = 65535)
private String answerInfo;
@Column(name = GenericDao.CREATED_COLUMN)
private Date created;
@Column(name= GenericDao.REMOVED_COLUMN)
private Date removed;
@Temporal(TemporalType.TIMESTAMP)
@Column(name= "updated")
private Date updated;
@Column(name= "retry_count")
private Long retryCount = 0L;
@Override
public long getId() {
return id;
}
public long getManagementServerId() {
return managementServerId;
}
public void setManagementServerId(long managementServerId) {
this.managementServerId = managementServerId;
}
public long getHostId() {
return hostId;
}
public void setHostId(long hostId) {
this.hostId = hostId;
}
public long getRequestSequence() {
return requestSequence;
}
public void setRequestSequence(long requestSequence) {
this.requestSequence = requestSequence;
}
public long getResourceId() {
return resourceId;
}
public void setResourceId(long resourceId) {
this.resourceId = resourceId;
}
public ApiCommandResourceType getResourceType() {
return resourceType;
}
public void setResourceType(ApiCommandResourceType resourceType) {
this.resourceType = resourceType;
}
public Command.State getStateByManagement() {
return stateByManagement;
}
public void setStateByManagement(Command.State stateByManagement) {
this.stateByManagement = stateByManagement;
}
public Command.State getStateByAgent() {
return stateByAgent;
}
public void setStateByAgent(Command.State stateByAgent) {
this.stateByAgent = stateByAgent;
}
public String getCommandName() {
return commandName;
}
public void setCommandName(String commandName) {
this.commandName = commandName;
}
public String getCommandInfo() {
return commandInfo;
}
public void setCommandInfo(String commandInfo) {
this.commandInfo = commandInfo;
}
public Date getCreated() {
return created;
}
public void setCreated(Date created) {
this.created = created;
}
public String getAnswerName() {
return answerName;
}
public void setAnswerName(String answerName) {
this.answerName = answerName;
}
public String getAnswerInfo() {
return answerInfo;
}
public void setAnswerInfo(String answerInfo) {
this.answerInfo = answerInfo;
}
public Date getRemoved() {
return removed;
}
public void setRemoved(Date removed) {
this.removed = removed;
}
public Date getUpdated() {
return updated;
}
public void setUpdated(Date updated) {
this.updated = updated;
}
public Long getRetryCount() {
return retryCount;
}
public void setRetryCount(Long retryCount) {
this.retryCount = retryCount;
}
}

View File

@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.command.dao;
import com.cloud.agent.api.Command;
import com.cloud.utils.db.GenericDao;
import org.apache.cloudstack.api.ApiCommandResourceType;
import org.apache.cloudstack.command.ReconcileCommandVO;
import java.util.List;
public interface ReconcileCommandDao extends GenericDao<ReconcileCommandVO, Long> {
List<ReconcileCommandVO> listByManagementServerId(long managementServerId);
List<ReconcileCommandVO> listByHostId(long hostId);
List<ReconcileCommandVO> listByState(Command.State... states);
void removeCommand(long commandId, String commandName, Command.State state);
ReconcileCommandVO findCommand(long reqSequence, String commandName);
void updateCommandsToInterruptedByManagementServerId(long managementServerId);
void updateCommandsToInterruptedByHostId(long hostId);
List<ReconcileCommandVO> listByResourceIdAndTypeAndStates(long resourceId, ApiCommandResourceType resourceType, Command.State... states);
}

View File

@ -0,0 +1,134 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.command.dao;
import java.util.Date;
import java.util.List;
import org.apache.cloudstack.api.ApiCommandResourceType;
import org.apache.cloudstack.command.ReconcileCommandVO;
import org.springframework.stereotype.Component;
import com.cloud.agent.api.Command;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.QueryBuilder;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
@Component
@DB
public class ReconcileCommandDaoImpl extends GenericDaoBase<ReconcileCommandVO, Long> implements ReconcileCommandDao {
final SearchBuilder<ReconcileCommandVO> updateCommandSearch;
final SearchBuilder<ReconcileCommandVO> resourceSearch;
public ReconcileCommandDaoImpl() {
updateCommandSearch = createSearchBuilder();
updateCommandSearch.and("managementServerId", updateCommandSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ);
updateCommandSearch.and("stateByManagement", updateCommandSearch.entity().getStateByManagement(), SearchCriteria.Op.IN);
updateCommandSearch.and("hostId", updateCommandSearch.entity().getHostId(), SearchCriteria.Op.EQ);
updateCommandSearch.and("stateByAgent", updateCommandSearch.entity().getStateByAgent(), SearchCriteria.Op.IN);
updateCommandSearch.and("reqSequence", updateCommandSearch.entity().getRequestSequence(), SearchCriteria.Op.EQ);
updateCommandSearch.and("commandName", updateCommandSearch.entity().getCommandName(), SearchCriteria.Op.EQ);
updateCommandSearch.done();
resourceSearch = createSearchBuilder();
resourceSearch.and("resourceId", resourceSearch.entity().getResourceId(), SearchCriteria.Op.EQ);
resourceSearch.and("resourceType", resourceSearch.entity().getResourceType(), SearchCriteria.Op.EQ);
resourceSearch.and("stateByManagement", resourceSearch.entity().getStateByManagement(), SearchCriteria.Op.IN);
resourceSearch.done();
}
@Override
public List<ReconcileCommandVO> listByManagementServerId(long managementServerId) {
QueryBuilder<ReconcileCommandVO> sc = QueryBuilder.create(ReconcileCommandVO.class);
sc.and(sc.entity().getManagementServerId(), SearchCriteria.Op.EQ, managementServerId);
return sc.list();
}
@Override
public List<ReconcileCommandVO> listByHostId(long hostId) {
QueryBuilder<ReconcileCommandVO> sc = QueryBuilder.create(ReconcileCommandVO.class);
sc.and(sc.entity().getHostId(), SearchCriteria.Op.EQ, hostId);
return sc.list();
}
@Override
public List<ReconcileCommandVO> listByState(Command.State... states) {
QueryBuilder<ReconcileCommandVO> sc = QueryBuilder.create(ReconcileCommandVO.class);
sc.and(sc.entity().getStateByManagement(), SearchCriteria.Op.IN, (Object[]) states);
return sc.list();
}
@Override
public void removeCommand(long reqSequence, String commandName, Command.State state) {
SearchCriteria<ReconcileCommandVO> sc = updateCommandSearch.create();
sc.setParameters("reqSequence", reqSequence);
sc.setParameters("commandName", commandName);
ReconcileCommandVO vo = createForUpdate();
if (state != null) {
vo.setStateByManagement(state);
}
vo.setRemoved(new Date());
update(vo, sc);
}
@Override
public ReconcileCommandVO findCommand(long reqSequence, String commandName) {
SearchCriteria<ReconcileCommandVO> sc = updateCommandSearch.create();
sc.setParameters("reqSequence", reqSequence);
sc.setParameters("commandName", commandName);
return findOneBy(sc);
}
@Override
public void updateCommandsToInterruptedByManagementServerId(long managementServerId) {
SearchCriteria<ReconcileCommandVO> sc = updateCommandSearch.create();
sc.setParameters("managementServerId", managementServerId);
sc.setParameters("stateByManagement", Command.State.CREATED, Command.State.RECONCILING);
ReconcileCommandVO vo = createForUpdate();
vo.setStateByManagement(Command.State.INTERRUPTED);
update(vo, sc);
}
@Override
public void updateCommandsToInterruptedByHostId(long hostId) {
SearchCriteria<ReconcileCommandVO> sc = updateCommandSearch.create();
sc.setParameters("hostId", hostId);
sc.setParameters("stateByAgent", Command.State.STARTED, Command.State.PROCESSING, Command.State.PROCESSING_IN_BACKEND);
ReconcileCommandVO vo = createForUpdate();
vo.setStateByAgent(Command.State.INTERRUPTED);
update(vo, sc);
}
@Override
public List<ReconcileCommandVO> listByResourceIdAndTypeAndStates(long resourceId, ApiCommandResourceType resourceType, Command.State... states) {
QueryBuilder<ReconcileCommandVO> sc = QueryBuilder.create(ReconcileCommandVO.class);
sc.and(sc.entity().getResourceId(), SearchCriteria.Op.EQ, resourceId);
sc.and(sc.entity().getResourceType(), SearchCriteria.Op.EQ, resourceType);
sc.and(sc.entity().getStateByManagement(), SearchCriteria.Op.IN, (Object[]) states);
return sc.list();
}
}

View File

@ -300,4 +300,5 @@
<bean id="BgpPeerNetworkMapDaoImpl" class="org.apache.cloudstack.network.dao.BgpPeerNetworkMapDaoImpl" />
<bean id="SharedFSDaoImpl" class="org.apache.cloudstack.storage.sharedfs.dao.SharedFSDaoImpl" />
<bean id="SharedFSJoinDaoImpl" class="org.apache.cloudstack.storage.sharedfs.query.dao.SharedFSJoinDaoImpl" />
<bean id="ReconcileCommandDaoImpl" class="org.apache.cloudstack.command.dao.ReconcileCommandDaoImpl" />
</beans>

View File

@ -37,3 +37,26 @@ WHERE rp.rule = 'quotaStatement'
AND NOT EXISTS(SELECT 1 FROM cloud.role_permissions rp_ WHERE rp.role_id = rp_.role_id AND rp_.rule = 'quotaCreditsList');
CALL `cloud`.`IDEMPOTENT_ADD_COLUMN`('cloud.host', 'last_mgmt_server_id', 'bigint unsigned DEFAULT NULL COMMENT "last management server this host is connected to" AFTER `mgmt_server_id`');
-- Add table for reconcile commands
CREATE TABLE IF NOT EXISTS `cloud`.`reconcile_commands` (
`id` bigint unsigned NOT NULL UNIQUE AUTO_INCREMENT,
`management_server_id` bigint unsigned NOT NULL COMMENT 'node id of the management server',
`host_id` bigint unsigned NOT NULL COMMENT 'id of the host',
`request_sequence` bigint unsigned NOT NULL COMMENT 'sequence of the request',
`resource_id` bigint unsigned DEFAULT NULL COMMENT 'id of the resource',
`resource_type` varchar(255) COMMENT 'type if the resource',
`state_by_management` varchar(255) COMMENT 'state of the command updated by management server',
`state_by_agent` varchar(255) COMMENT 'state of the command updated by cloudstack agent',
`command_name` varchar(255) COMMENT 'name of the command',
`command_info` MEDIUMTEXT COMMENT 'info of the command',
`answer_name` varchar(255) COMMENT 'name of the answer',
`answer_info` MEDIUMTEXT COMMENT 'info of the answer',
`created` datetime COMMENT 'date the reconcile command was created',
`removed` datetime COMMENT 'date the reconcile command was removed',
`updated` datetime COMMENT 'date the reconcile command was updated',
`retry_count` bigint unsigned DEFAULT 0 COMMENT 'The retry count of reconciliation',
PRIMARY KEY(`id`),
INDEX `i_reconcile_command__host_id`(`host_id`),
CONSTRAINT `fk_reconcile_command__host_id` FOREIGN KEY (`host_id`) REFERENCES `host`(`id`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

View File

@ -32,6 +32,8 @@ import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import com.cloud.agent.api.CheckVirtualMachineAnswer;
import com.cloud.agent.api.CheckVirtualMachineCommand;
import com.cloud.agent.api.PrepareForMigrationAnswer;
import org.apache.cloudstack.engine.subsystem.api.storage.ChapInfo;
import org.apache.cloudstack.engine.subsystem.api.storage.ClusterScope;
@ -2011,6 +2013,8 @@ public class StorageSystemDataMotionStrategy implements DataMotionStrategy {
@Override
public void copyAsync(Map<VolumeInfo, DataStore> volumeDataStoreMap, VirtualMachineTO vmTO, Host srcHost, Host destHost, AsyncCompletionCallback<CopyCommandResult> callback) {
String errMsg = null;
boolean success = false;
Map<VolumeInfo, VolumeInfo> srcVolumeInfoToDestVolumeInfo = new HashMap<>();
try {
if (srcHost.getHypervisorType() != HypervisorType.KVM) {
@ -2024,7 +2028,6 @@ public class StorageSystemDataMotionStrategy implements DataMotionStrategy {
List<MigrateDiskInfo> migrateDiskInfoList = new ArrayList<MigrateDiskInfo>();
Map<String, MigrateCommand.MigrateDiskInfo> migrateStorage = new HashMap<>();
Map<VolumeInfo, VolumeInfo> srcVolumeInfoToDestVolumeInfo = new HashMap<>();
boolean managedStorageDestination = false;
boolean migrateNonSharedInc = false;
@ -2140,12 +2143,29 @@ public class StorageSystemDataMotionStrategy implements DataMotionStrategy {
boolean kvmAutoConvergence = StorageManager.KvmAutoConvergence.value();
migrateCommand.setAutoConvergence(kvmAutoConvergence);
MigrateAnswer migrateAnswer = (MigrateAnswer)agentManager.send(srcHost.getId(), migrateCommand);
boolean success = migrateAnswer != null && migrateAnswer.getResult();
MigrateAnswer migrateAnswer = null;
try {
migrateAnswer = (MigrateAnswer)agentManager.send(srcHost.getId(), migrateCommand);
success = migrateAnswer != null && migrateAnswer.getResult();
} catch (OperationTimedoutException ex) {
if (HypervisorType.KVM.equals(vm.getHypervisorType())) {
final Answer answer = agentManager.send(destHost.getId(), new CheckVirtualMachineCommand(vm.getInstanceName()));
if (answer != null && answer.getResult() && answer instanceof CheckVirtualMachineAnswer) {
final CheckVirtualMachineAnswer vmAnswer = (CheckVirtualMachineAnswer)answer;
if (VirtualMachine.PowerState.PowerOn.equals(vmAnswer.getState())) {
logger.info(String.format("Vm %s is found on destination host %s. Migration is successful", vm, destHost));
success = true;
}
}
}
if (!success) {
throw ex;
}
}
handlePostMigration(success, srcVolumeInfoToDestVolumeInfo, vmTO, destHost);
if (!success) {
if (migrateAnswer == null) {
throw new CloudRuntimeException("Unable to get an answer to the migrate command");
}
@ -2155,6 +2175,7 @@ public class StorageSystemDataMotionStrategy implements DataMotionStrategy {
throw new CloudRuntimeException(errMsg);
}
}
} catch (AgentUnavailableException | OperationTimedoutException | CloudRuntimeException ex) {
String volumesAndStorages = volumeDataStoreMap.entrySet().stream().map(entry -> formatEntryOfVolumesAndStoragesAsJsonToDisplayOnLog(entry)).collect(Collectors.joining(","));
@ -2163,6 +2184,15 @@ public class StorageSystemDataMotionStrategy implements DataMotionStrategy {
throw new CloudRuntimeException(errMsg);
} finally {
if (!success && !srcVolumeInfoToDestVolumeInfo.isEmpty()) {
for (VolumeInfo destVolumeInfo : srcVolumeInfoToDestVolumeInfo.values()) {
logger.info(String.format("Expunging dest volume [id: %s, state: %s] as part of failed VM migration with volumes command for VM [%s].", destVolumeInfo.getId(), destVolumeInfo.getState(), vmTO.getId()));
destVolumeInfo.processEvent(Event.OperationFailed);
destVolumeInfo.processEvent(Event.DestroyRequested);
_volumeService.expungeVolumeAsync(destVolumeInfo);
}
}
CopyCmdAnswer copyCmdAnswer = new CopyCmdAnswer(errMsg);
CopyCommandResult result = new CopyCommandResult(null, copyCmdAnswer);
@ -2372,6 +2402,7 @@ public class StorageSystemDataMotionStrategy implements DataMotionStrategy {
newVol.setPodId(storagePoolVO.getPodId());
newVol.setPoolId(storagePoolVO.getId());
newVol.setLastPoolId(lastPoolId);
newVol.setLastId(volume.getId());
if (volume.getPassphraseId() != null) {
newVol.setPassphraseId(volume.getPassphraseId());

View File

@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.naming.ConfigurationException;
import org.apache.cloudstack.command.ReconcileCommandService;
import org.apache.cloudstack.framework.config.ConfigDepot;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.Configurable;
@ -104,6 +105,9 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
private StatusAdministrator statusAdministrator;
@Inject
protected ReconcileCommandService reconcileCommandService;
//
// pay attention to _mshostId and _msid
// _mshostId is the primary key of management host table
@ -1013,6 +1017,8 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
logger.warn("Management node " + host.getId() + " is detected inactive by timestamp and did not send node status to all other nodes");
host.setState(ManagementServerHost.State.Down);
_mshostDao.update(host.getId(), host);
reconcileCommandService.updateReconcileCommandToInterruptedByManagementServerId(host.getMsid());
}
}
} else {

View File

@ -37,6 +37,7 @@ import javax.naming.ConfigurationException;
import org.apache.cloudstack.api.ApiCommandResourceType;
import org.apache.cloudstack.api.ApiErrorCode;
import org.apache.cloudstack.command.ReconcileCommandService;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
@ -71,6 +72,7 @@ import com.cloud.network.dao.NetworkDao;
import com.cloud.network.dao.NetworkVO;
import com.cloud.storage.Snapshot;
import com.cloud.storage.Volume;
import com.cloud.storage.VolumeVO;
import com.cloud.storage.VolumeDetailVO;
import com.cloud.storage.dao.SnapshotDao;
import com.cloud.storage.dao.SnapshotDetailsDao;
@ -167,6 +169,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
private NetworkDao networkDao;
@Inject
private NetworkOrchestrationService networkOrchestrationService;
@Inject
private ReconcileCommandService reconcileCommandService;
private volatile long _executionRunNumber = 1;
@ -1197,6 +1201,23 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
return true;
}
if (vol.getState().isTransitional()) {
if (Volume.State.Migrating.equals(vol.getState())) {
if (ReconcileCommandService.ReconcileCommandsEnabled.value()) {
if (reconcileCommandService.isReconcileResourceNeeded(volumeId, ApiCommandResourceType.Volume)) {
logger.debug(String.format("Skipping cleaning up Migrating volume: %s, it will be reconciled", vol));
return true;
}
if (vol.getInstanceId() != null && reconcileCommandService.isReconcileResourceNeeded(vol.getInstanceId(), ApiCommandResourceType.VirtualMachine)) {
logger.debug(String.format("Skipping cleaning up Migrating volume: %s, the vm %s will be reconciled", vol, _vmInstanceDao.findById(vol.getInstanceId())));
return true;
}
}
VolumeVO destVolume = _volsDao.findByLastIdAndState(vol.getId(), Volume.State.Migrating, Volume.State.Creating);
if (destVolume != null) {
logger.debug(String.format("Found destination volume of Migrating volume %s: %s", vol, destVolume));
cleanupVolume(destVolume.getId());
}
}
logger.debug("Cleaning up volume with Id: " + volumeId);
boolean status = vol.stateTransit(Volume.Event.OperationFailed);
cleanupFailedVolumesCreatedFromSnapshots(volumeId);
@ -1213,6 +1234,18 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
return true;
}
if (vmInstanceVO.getState().isTransitional()) {
if (VirtualMachine.State.Migrating.equals(vmInstanceVO.getState())) {
if (ReconcileCommandService.ReconcileCommandsEnabled.value()
&& reconcileCommandService.isReconcileResourceNeeded(vmId, ApiCommandResourceType.VirtualMachine)) {
logger.debug(String.format("Skipping cleaning up Instance %s, it will be reconciled", vmInstanceVO));
return true;
}
logger.debug("Cleaning up volumes with instance Id: " + vmId);
List<VolumeVO> volumes = _volsDao.findByInstance(vmInstanceVO.getId());
for (VolumeVO volume : volumes) {
cleanupVolume(volume.getId());
}
}
logger.debug("Cleaning up Instance with Id: " + vmId);
return virtualMachineManager.stateTransitTo(vmInstanceVO, VirtualMachine.Event.OperationFailed, vmInstanceVO.getHostId());
}

View File

@ -43,6 +43,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -53,12 +54,16 @@ import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.cloudstack.api.ApiConstants.IoDriverPolicy;
import org.apache.cloudstack.command.CommandInfo;
import org.apache.cloudstack.command.ReconcileCommandService;
import org.apache.cloudstack.command.ReconcileCommandUtils;
import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
import org.apache.cloudstack.storage.command.browser.ListDataStoreObjectsCommand;
import org.apache.cloudstack.storage.configdrive.ConfigDrive;
import org.apache.cloudstack.storage.to.PrimaryDataStoreTO;
import org.apache.cloudstack.storage.to.TemplateObjectTO;
import org.apache.cloudstack.storage.to.VolumeObjectTO;
import org.apache.cloudstack.storage.volume.VolumeOnStorageTO;
import org.apache.cloudstack.utils.bytescale.ByteScaleUtils;
import org.apache.cloudstack.utils.cryptsetup.CryptSetup;
import org.apache.cloudstack.utils.hypervisor.HypervisorUtils;
@ -110,6 +115,7 @@ import org.xml.sax.SAXException;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.HostVmStateReportEntry;
import com.cloud.agent.api.PingAnswer;
import com.cloud.agent.api.PingCommand;
import com.cloud.agent.api.PingRoutingCommand;
import com.cloud.agent.api.PingRoutingWithNwGroupsCommand;
@ -144,6 +150,7 @@ import com.cloud.exception.InternalErrorException;
import com.cloud.host.Host.Type;
import com.cloud.hypervisor.Hypervisor.HypervisorType;
import com.cloud.hypervisor.kvm.dpdk.DpdkHelper;
import com.cloud.hypervisor.kvm.resource.disconnecthook.DisconnectHook;
import com.cloud.hypervisor.kvm.resource.LibvirtVMDef.ChannelDef;
import com.cloud.hypervisor.kvm.resource.LibvirtVMDef.ClockDef;
import com.cloud.hypervisor.kvm.resource.LibvirtVMDef.ConsoleDef;
@ -213,6 +220,7 @@ import com.cloud.utils.ssh.SshHelper;
import com.cloud.vm.VirtualMachine;
import com.cloud.vm.VirtualMachine.PowerState;
import com.cloud.vm.VmDetailConstants;
import com.google.gson.Gson;
/**
@ -342,6 +350,8 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
public static final int LIBVIRT_CGROUPV2_WEIGHT_MIN = 2;
public static final int LIBVIRT_CGROUPV2_WEIGHT_MAX = 10000;
public static final String COMMANDS_LOG_PATH = "/usr/share/cloudstack-agent/tmp/commands";
private String modifyVlanPath;
private String versionStringPath;
private String patchScriptPath;
@ -512,6 +522,8 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
private boolean isTungstenEnabled = false;
private boolean isReconcileCommandsEnabled = false;
private static Gson gson = new Gson();
/**
@ -555,6 +567,8 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
}
}
protected List<DisconnectHook> _disconnectHooks = new CopyOnWriteArrayList<>();
@Override
public ExecutionResult executeInVR(final String routerIp, final String script, final String args) {
return executeInVR(routerIp, script, args, timeout);
@ -774,6 +788,7 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
public StorageSubsystemCommandHandler getStorageHandler() {
return storageHandler;
}
private static final class KeyValueInterpreter extends OutputInterpreter {
private final Map<String, String> map = new HashMap<String, String>();
@ -1517,6 +1532,18 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
isTungstenEnabled = Boolean.parseBoolean(params.get(NetworkOrchestrationService.TUNGSTEN_ENABLED.key()));
}
if (params.get(ReconcileCommandService.ReconcileCommandsEnabled.key()) != null) {
isReconcileCommandsEnabled = Boolean.parseBoolean(params.get(ReconcileCommandService.ReconcileCommandsEnabled.key()));
}
if (isReconcileCommandsEnabled) {
File commandsLogPath = new File(COMMANDS_LOG_PATH);
if (!commandsLogPath.exists()) {
commandsLogPath.mkdirs();
}
// Update state of reconcile commands
getCommandInfosFromLogFiles(true);
}
return true;
}
@ -1963,6 +1990,9 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
*/
@Override
public Answer executeRequest(final Command cmd) {
if (isReconcileCommandsEnabled) {
ReconcileCommandUtils.updateLogFileForCommand(COMMANDS_LOG_PATH, cmd, Command.State.STARTED);
}
final LibvirtRequestWrapper wrapper = LibvirtRequestWrapper.getInstance();
try {
@ -1972,6 +2002,67 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
}
}
public CommandInfo[] getCommandInfosFromLogFiles(boolean update) {
File commandsLogPath = new File(COMMANDS_LOG_PATH);
File[] files = commandsLogPath.listFiles();
if (files != null) {
CommandInfo[] commandInfos = new CommandInfo[files.length];
int i = 0;
for (File file : files) {
CommandInfo commandInfo = ReconcileCommandUtils.readLogFileForCommand(file.getAbsolutePath());
if (commandInfo == null) {
continue;
}
if (update) {
if (Command.State.PROCESSING.equals(commandInfo.getState())) {
ReconcileCommandUtils.updateLogFileForCommand(file.getAbsolutePath(), Command.State.INTERRUPTED);
} else if (Command.State.PROCESSING_IN_BACKEND.equals(commandInfo.getState())) {
ReconcileCommandUtils.updateLogFileForCommand(file.getAbsolutePath(), Command.State.DANGLED_IN_BACKEND);
}
}
logger.debug(String.format("Adding reconcile command with seq: %s, command: %s, answer: %s", commandInfo.getRequestSeq(), commandInfo.getCommandName(), commandInfo.getAnswer()));
commandInfos[i++] = commandInfo;
}
return commandInfos;
}
return new CommandInfo[0];
}
public void createOrUpdateLogFileForCommand(Command command, Command.State state) {
if (isReconcileCommandsEnabled) {
ReconcileCommandUtils.updateLogFileForCommand(COMMANDS_LOG_PATH, command, state);
}
}
public void createOrUpdateLogFileForCommand(Command command, Answer answer) {
if (isReconcileCommandsEnabled) {
ReconcileCommandUtils.updateLogFileWithAnswerForCommand(LibvirtComputingResource.COMMANDS_LOG_PATH, command, answer);
}
}
@Override
public void processPingAnswer(PingAnswer answer) {
PingCommand pingCommand = answer.getCommand();
List<String> reconcileCommands = answer.getReconcileCommands();
CommandInfo[] commandInfos = pingCommand.getCommandInfos();
for (CommandInfo commandInfo : commandInfos) {
String commandKey = getCommandKey(commandInfo.getRequestSeq(), commandInfo.getCommandName());
if (Arrays.asList(Command.State.COMPLETED, Command.State.FAILED, Command.State.INTERRUPTED, Command.State.TIMED_OUT).contains(commandInfo.getState())) {
logger.debug(String.format("Removing command %s in %s state as it has been received by the management server", commandKey, commandInfo.getState()));
String fileName = String.format("%s/%s-%s.json", COMMANDS_LOG_PATH, commandInfo.getRequestSeq(), commandInfo.getCommandName());
ReconcileCommandUtils.deleteLogFile(fileName);
} else if (!reconcileCommands.contains(commandKey)) {
logger.debug(String.format("Removing command %s in %s state as it cannot be found by the management server", commandKey, commandInfo.getState()));
String fileName = String.format("%s/%s-%s.json", COMMANDS_LOG_PATH, commandInfo.getRequestSeq(), commandInfo.getCommandName());
ReconcileCommandUtils.deleteLogFile(fileName);
}
}
}
private String getCommandKey(long requestSeq, String commandName) {
return requestSeq + "-" + commandName;
}
public synchronized boolean destroyTunnelNetwork(final String bridge) {
findOrCreateTunnelNetwork(bridge);
@ -2534,7 +2625,7 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
* Set quota and period tags on 'ctd' when CPU limit use is set
*/
protected void setQuotaAndPeriod(VirtualMachineTO vmTO, CpuTuneDef ctd) {
if (vmTO.getLimitCpuUse() && vmTO.getCpuQuotaPercentage() != null) {
if (vmTO.isLimitCpuUse() && vmTO.getCpuQuotaPercentage() != null) {
Double cpuQuotaPercentage = vmTO.getCpuQuotaPercentage();
int period = CpuTuneDef.DEFAULT_PERIOD;
int quota = (int) (period * cpuQuotaPercentage);
@ -3712,6 +3803,9 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
if (healthCheckResult != HealthCheckResult.IGNORE) {
pingRoutingCommand.setHostHealthCheckResult(healthCheckResult == HealthCheckResult.SUCCESS);
}
if (isReconcileCommandsEnabled) {
pingRoutingCommand.setCommandInfos(getCommandInfosFromLogFiles(false));
}
return pingRoutingCommand;
}
@ -5656,4 +5750,64 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
throw new RuntimeException(e);
}
}
@Override
public void disconnected() {
LOGGER.info("Detected agent disconnect event, running through " + _disconnectHooks.size() + " disconnect hooks");
for (DisconnectHook hook : _disconnectHooks) {
hook.start();
}
long start = System.currentTimeMillis();
for (DisconnectHook hook : _disconnectHooks) {
try {
long elapsed = System.currentTimeMillis() - start;
long remaining = hook.getTimeoutMs() - elapsed;
long joinWait = remaining > 0 ? remaining : 1;
hook.join(joinWait);
hook.interrupt();
} catch (InterruptedException ex) {
LOGGER.warn("Interrupted disconnect hook: " + ex.getMessage());
}
}
_disconnectHooks.clear();
}
public void addDisconnectHook(DisconnectHook hook) {
LOGGER.debug("Adding disconnect hook " + hook);
_disconnectHooks.add(hook);
}
public void removeDisconnectHook(DisconnectHook hook) {
LOGGER.debug("Removing disconnect hook " + hook);
if (_disconnectHooks.contains(hook)) {
LOGGER.debug("Removing disconnect hook " + hook);
_disconnectHooks.remove(hook);
} else {
LOGGER.debug("Requested removal of disconnect hook, but hook not found: " + hook);
}
}
public VolumeOnStorageTO getVolumeOnStorage(PrimaryDataStoreTO primaryStore, String volumePath) {
try {
if (primaryStore.isManaged()) {
if (!storagePoolManager.connectPhysicalDisk(primaryStore.getPoolType(), primaryStore.getUuid(), volumePath, primaryStore.getDetails())) {
logger.warn(String.format("Failed to connect src volume %s, in storage pool %s", volumePath, primaryStore));
}
}
final KVMPhysicalDisk srcVolume = storagePoolManager.getPhysicalDisk(primaryStore.getPoolType(), primaryStore.getUuid(), volumePath);
if (srcVolume == null) {
logger.debug("Failed to get physical disk for volume: " + volumePath);
throw new CloudRuntimeException("Failed to get physical disk for volume at path: " + volumePath);
}
return new VolumeOnStorageTO(HypervisorType.KVM, srcVolume.getName(), srcVolume.getName(), srcVolume.getPath(),
srcVolume.getFormat().toString(), srcVolume.getSize(), srcVolume.getVirtualSize());
} catch (final CloudRuntimeException e) {
logger.debug(String.format("Failed to get volume %s on storage %s: %s", volumePath, primaryStore, e));
return new VolumeOnStorageTO();
} finally {
if (primaryStore.isManaged()) {
storagePoolManager.disconnectPhysicalDisk(primaryStore.getPoolType(), primaryStore.getUuid(), volumePath);
}
}
}
}

View File

@ -0,0 +1,59 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package com.cloud.hypervisor.kvm.resource.disconnecthook;
/**
DisconnectHooks can be used to cleanup/cancel long running commands when
connection to the management server is interrupted (which results in job
failure). Agent CommandWrappers can register a hook with the
libvirtComputingResource at the beginning of processing, and
libvirtComputingResource will call it upon disconnect. The CommandWrapper can
also remove the hook upon completion of the command.
DisconnectHooks should implement a run() method that is safe to call and will
fail cleanly if there is no cleanup to do. Otherwise the CommandWrapper
registering/deregistering the hook should account for any race conditions
introduced by the ordering of when the command is processed and when the hook
is registered/deregistered.
If a timeout is set, the hook's run() will be interrupted. It will be up to
run() to determine what to do with the InterruptedException, but the hook
processing will not wait any longer for the hook to complete.
Avoid doing anything time intensive as DisconnectHooks will delay agent
shutdown.
*/
public abstract class DisconnectHook extends Thread {
// Default timeout is 10 seconds
long timeoutMs = 10000;
public DisconnectHook(String name) {
super();
this.setName(this.getClass().getName() + "-" + name);
}
public DisconnectHook(String name, long timeout) {
this(name);
this.timeoutMs = timeout;
}
public long getTimeoutMs(){ return timeoutMs; }
}

View File

@ -0,0 +1,50 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package com.cloud.hypervisor.kvm.resource.disconnecthook;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.libvirt.Domain;
import org.libvirt.LibvirtException;
public class MigrationCancelHook extends DisconnectHook {
private static final Logger LOGGER = LogManager.getLogger(MigrationCancelHook.class);
Domain _migratingDomain;
String _vmName;
public MigrationCancelHook(Domain migratingDomain) throws LibvirtException {
super(migratingDomain.getName());
_migratingDomain = migratingDomain;
_vmName = migratingDomain.getName();
}
@Override
public void run() {
LOGGER.info("Interrupted migration of " + _vmName);
try {
if (_migratingDomain.abortJob() == 0) {
LOGGER.warn("Aborted migration job for " + _vmName);
}
} catch (LibvirtException ex) {
LOGGER.warn("Failed to abort migration job for " + _vmName, ex);
}
}
}

View File

@ -0,0 +1,53 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package com.cloud.hypervisor.kvm.resource.disconnecthook;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.libvirt.Domain;
import org.libvirt.LibvirtException;
public class VolumeMigrationCancelHook extends DisconnectHook {
private static final Logger LOGGER = LogManager.getLogger(VolumeMigrationCancelHook.class);
Domain _migratingDomain;
String _vmName;
String _destDiskLabel;
public VolumeMigrationCancelHook(Domain migratingDomain, String destDiskLabel) throws LibvirtException {
super(migratingDomain.getName());
_migratingDomain = migratingDomain;
_vmName = migratingDomain.getName();
_destDiskLabel = destDiskLabel;
}
@Override
public void run() {
LOGGER.info("Interrupted volume migration of " + _vmName);
if (_migratingDomain != null && _destDiskLabel != null) {
try {
_migratingDomain.blockJobAbort(_destDiskLabel, Domain.BlockJobAbortFlags.ASYNC);
LOGGER.warn(String.format("Aborted block job for vm %s and volume: %s", _vmName, _destDiskLabel));
} catch (LibvirtException ex) {
LOGGER.error(String.format("Failed to abort block job for vm %s and volume: %s due to %s", _vmName, _destDiskLabel, ex.getMessage()));
}
}
}
}

View File

@ -20,6 +20,8 @@
package com.cloud.hypervisor.kvm.resource.wrapper;
import org.libvirt.Connect;
import org.libvirt.Domain;
import org.libvirt.DomainInfo;
import org.libvirt.LibvirtException;
import com.cloud.agent.api.Answer;
@ -45,6 +47,11 @@ public final class LibvirtCheckVirtualMachineCommandWrapper extends CommandWrapp
vncPort = libvirtComputingResource.getVncPort(conn, command.getVmName());
}
Domain vm = conn.domainLookupByName(command.getVmName());
if (state == PowerState.PowerOn && DomainInfo.DomainState.VIR_DOMAIN_PAUSED.equals(vm.getInfo().state)) {
return new CheckVirtualMachineAnswer(command, PowerState.PowerUnknown, vncPort);
}
return new CheckVirtualMachineAnswer(command, state, vncPort);
} catch (final LibvirtException e) {
return new CheckVirtualMachineAnswer(command, e.getMessage());

View File

@ -23,6 +23,7 @@ import java.io.File;
import java.util.Map;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.storage.CopyVolumeAnswer;
import com.cloud.agent.api.storage.CopyVolumeCommand;
import com.cloud.agent.api.to.DiskTO;
@ -92,7 +93,10 @@ public final class LibvirtCopyVolumeCommandWrapper extends CommandWrapper<CopyVo
secondaryStoragePool.createFolder(volumeDestPath);
storagePoolMgr.deleteStoragePool(secondaryStoragePool.getType(), secondaryStoragePool.getUuid());
secondaryStoragePool = storagePoolMgr.getStoragePoolByURI(secondaryStorageUrl + volumeDestPath);
libvirtComputingResource.createOrUpdateLogFileForCommand(command, Command.State.PROCESSING_IN_BACKEND);
storagePoolMgr.copyPhysicalDisk(volume, destVolumeName, secondaryStoragePool, 0);
libvirtComputingResource.createOrUpdateLogFileForCommand(command, Command.State.COMPLETED);
return new CopyVolumeAnswer(command, true, null, null, volumeName);
} else {
@ -101,11 +105,14 @@ public final class LibvirtCopyVolumeCommandWrapper extends CommandWrapper<CopyVo
final KVMPhysicalDisk volume = secondaryStoragePool.getPhysicalDisk(command.getVolumePath() + ".qcow2");
libvirtComputingResource.createOrUpdateLogFileForCommand(command, Command.State.PROCESSING_IN_BACKEND);
storagePoolMgr.copyPhysicalDisk(volume, volumeName, primaryPool, 0);
libvirtComputingResource.createOrUpdateLogFileForCommand(command, Command.State.COMPLETED);
return new CopyVolumeAnswer(command, true, null, null, volumeName);
}
} catch (final CloudRuntimeException e) {
libvirtComputingResource.createOrUpdateLogFileForCommand(command, Command.State.FAILED);
return new CopyVolumeAnswer(command, false, e.toString(), null, null);
} finally {
if (secondaryStoragePool != null) {
@ -150,10 +157,13 @@ public final class LibvirtCopyVolumeCommandWrapper extends CommandWrapper<CopyVo
KVMPhysicalDisk srcPhysicalDisk = storagePoolMgr.getPhysicalDisk(srcPrimaryDataStore.getPoolType(), srcPrimaryDataStore.getUuid(), srcPath);
libvirtComputingResource.createOrUpdateLogFileForCommand(command, Command.State.PROCESSING_IN_BACKEND);
storagePoolMgr.copyPhysicalDisk(srcPhysicalDisk, destVolumeName, secondaryStoragePool, command.getWait() * 1000);
libvirtComputingResource.createOrUpdateLogFileForCommand(command, Command.State.COMPLETED);
return new CopyVolumeAnswer(command, true, null, null, destVolumePath + destVolumeName);
} catch (final CloudRuntimeException e) {
libvirtComputingResource.createOrUpdateLogFileForCommand(command, Command.State.FAILED);
return new CopyVolumeAnswer(command, false, e.toString(), null, null);
} finally {
try {

View File

@ -64,6 +64,7 @@ import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.MigrateAnswer;
import com.cloud.agent.api.MigrateCommand;
import com.cloud.agent.api.MigrateCommand.MigrateDiskInfo;
@ -73,6 +74,7 @@ import com.cloud.agent.api.to.DpdkTO;
import com.cloud.agent.api.to.VirtualMachineTO;
import com.cloud.agent.properties.AgentProperties;
import com.cloud.agent.properties.AgentPropertiesFileHandler;
import com.cloud.hypervisor.kvm.resource.disconnecthook.MigrationCancelHook;
import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
import com.cloud.hypervisor.kvm.resource.LibvirtConnection;
import com.cloud.hypervisor.kvm.resource.LibvirtVMDef.DiskDef;
@ -111,6 +113,7 @@ public final class LibvirtMigrateCommandWrapper extends CommandWrapper<MigrateCo
}
String result = null;
Command.State commandState = null;
List<InterfaceDef> ifaces = null;
List<DiskDef> disks;
@ -121,6 +124,7 @@ public final class LibvirtMigrateCommandWrapper extends CommandWrapper<MigrateCo
Connect conn = null;
String xmlDesc = null;
List<Ternary<String, Boolean, String>> vmsnapshots = null;
MigrationCancelHook cancelHook = null;
try {
final LibvirtUtilitiesHelper libvirtUtilitiesHelper = libvirtComputingResource.getLibvirtUtilitiesHelper();
@ -237,6 +241,12 @@ public final class LibvirtMigrateCommandWrapper extends CommandWrapper<MigrateCo
final ExecutorService executor = Executors.newFixedThreadPool(1);
boolean migrateNonSharedInc = command.isMigrateNonSharedInc() && !migrateStorageManaged;
// add cancel hook before we start. If migration fails to start and hook is called, it's non-fatal
cancelHook = new MigrationCancelHook(dm);
libvirtComputingResource.addDisconnectHook(cancelHook);
libvirtComputingResource.createOrUpdateLogFileForCommand(command, Command.State.PROCESSING);
final Callable<Domain> worker = new MigrateKVMAsync(libvirtComputingResource, dm, dconn, xmlDesc,
migrateStorage, migrateNonSharedInc,
command.isAutoConvergence(), vmName, command.getDestinationIp(), migrateDiskLabels);
@ -278,6 +288,8 @@ public final class LibvirtMigrateCommandWrapper extends CommandWrapper<MigrateCo
logger.info(String.format("Aborting migration of VM [%s] with domain job [%s] due to time out after %d seconds.", vmName, job, migrateWait));
dm.abortJob();
result = String.format("Migration of VM [%s] was cancelled by CloudStack due to time out after %d seconds.", vmName, migrateWait);
commandState = Command.State.FAILED;
libvirtComputingResource.createOrUpdateLogFileForCommand(command, commandState);
logger.debug(result);
break;
} catch (final LibvirtException e) {
@ -338,6 +350,9 @@ public final class LibvirtMigrateCommandWrapper extends CommandWrapper<MigrateCo
result = "Exception during migrate: " + e.getMessage();
}
} finally {
if (cancelHook != null) {
libvirtComputingResource.removeDisconnectHook(cancelHook);
}
try {
if (dm != null && result != null) {
// restore vm snapshots in case of failed migration
@ -373,6 +388,11 @@ public final class LibvirtMigrateCommandWrapper extends CommandWrapper<MigrateCo
vifDriver.unplug(iface, libvirtComputingResource.shouldDeleteBridge(vlanToPersistenceMap, vlanId));
}
}
commandState = Command.State.COMPLETED;
libvirtComputingResource.createOrUpdateLogFileForCommand(command, commandState);
} else if (commandState == null) {
commandState = Command.State.FAILED;
libvirtComputingResource.createOrUpdateLogFileForCommand(command, commandState);
}
return new MigrateAnswer(command, result == null, result, null);

View File

@ -20,10 +20,12 @@
package com.cloud.hypervisor.kvm.resource.wrapper;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.storage.MigrateVolumeAnswer;
import com.cloud.agent.api.storage.MigrateVolumeCommand;
import com.cloud.agent.api.to.DiskTO;
import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
import com.cloud.hypervisor.kvm.resource.disconnecthook.VolumeMigrationCancelHook;
import com.cloud.hypervisor.kvm.storage.KVMPhysicalDisk;
import com.cloud.hypervisor.kvm.storage.KVMStoragePool;
import com.cloud.hypervisor.kvm.storage.KVMStoragePoolManager;
@ -103,6 +105,8 @@ public class LibvirtMigrateVolumeCommandWrapper extends CommandWrapper<MigrateVo
final String destDiskFileName = ScaleIOUtil.DISK_NAME_PREFIX + destSystemId + "-" + destVolumeId;
final String diskFilePath = ScaleIOUtil.DISK_PATH + File.separator + destDiskFileName;
VolumeMigrationCancelHook cancelHook = null;
Domain dm = null;
try {
final LibvirtUtilitiesHelper libvirtUtilitiesHelper = libvirtComputingResource.getLibvirtUtilitiesHelper();
@ -136,10 +140,23 @@ public class LibvirtMigrateVolumeCommandWrapper extends CommandWrapper<MigrateVo
TypedParameter[] parameters = new TypedParameter[1];
parameters[0] = parameter;
cancelHook = new VolumeMigrationCancelHook(dm, destDiskLabel);
libvirtComputingResource.addDisconnectHook(cancelHook);
libvirtComputingResource.createOrUpdateLogFileForCommand(command, Command.State.PROCESSING_IN_BACKEND);
dm.blockCopy(destDiskLabel, diskdef, parameters, Domain.BlockCopyFlags.REUSE_EXT);
logger.info(String.format("Block copy has started for the volume %s : %s ", destDiskLabel, srcPath));
return checkBlockJobStatus(command, dm, destDiskLabel, srcPath, destPath, libvirtComputingResource, conn, srcSecretUUID);
MigrateVolumeAnswer answer = checkBlockJobStatus(command, dm, destDiskLabel, srcPath, destPath, libvirtComputingResource, conn, srcSecretUUID);
if (answer != null) {
if (answer.getResult()) {
libvirtComputingResource.createOrUpdateLogFileForCommand(command, Command.State.COMPLETED);
} else {
libvirtComputingResource.createOrUpdateLogFileForCommand(command, Command.State.FAILED);
}
}
return answer;
} catch (Exception e) {
String msg = "Migrate volume failed due to " + e.toString();
logger.warn(msg, e);
@ -150,8 +167,12 @@ public class LibvirtMigrateVolumeCommandWrapper extends CommandWrapper<MigrateVo
logger.error("Migrate volume failed while aborting the block job due to " + ex.getMessage());
}
}
libvirtComputingResource.createOrUpdateLogFileForCommand(command, Command.State.FAILED);
return new MigrateVolumeAnswer(command, false, msg, null);
} finally {
if (cancelHook != null) {
libvirtComputingResource.removeDisconnectHook(cancelHook);
}
if (dm != null) {
try {
dm.free();
@ -299,6 +320,11 @@ public class LibvirtMigrateVolumeCommandWrapper extends CommandWrapper<MigrateVo
String destPath = destDetails != null && destDetails.get(DiskTO.IQN) != null ? destDetails.get(DiskTO.IQN) :
(destVolumeObjectTO.getPath() != null ? destVolumeObjectTO.getPath() : UUID.randomUUID().toString());
// Update path in the command for reconciliation
if (destVolumeObjectTO.getPath() == null) {
destVolumeObjectTO.setPath(destPath);
}
try {
KVMStoragePool sourceStoragePool = storagePoolManager.getStoragePool(srcPrimaryDataStore.getPoolType(), srcPrimaryDataStore.getUuid());
@ -317,12 +343,16 @@ public class LibvirtMigrateVolumeCommandWrapper extends CommandWrapper<MigrateVo
return new MigrateVolumeAnswer(command, false, "Unable to connect destination volume on hypervisor", srcPath);
}
libvirtComputingResource.createOrUpdateLogFileForCommand(command, Command.State.PROCESSING_IN_BACKEND);
KVMPhysicalDisk newDiskCopy = storagePoolManager.copyPhysicalDisk(srcPhysicalDisk, destPath, destPrimaryStorage, command.getWaitInMillSeconds());
if (newDiskCopy == null) {
libvirtComputingResource.createOrUpdateLogFileForCommand(command, Command.State.FAILED);
return new MigrateVolumeAnswer(command, false, "Copy command failed to return handle to copied physical disk", destPath);
}
libvirtComputingResource.createOrUpdateLogFileForCommand(command, Command.State.COMPLETED);
}
catch (Exception ex) {
libvirtComputingResource.createOrUpdateLogFileForCommand(command, Command.State.FAILED);
return new MigrateVolumeAnswer(command, false, ex.getMessage(), null);
}
finally {

View File

@ -0,0 +1,258 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package com.cloud.hypervisor.kvm.resource.wrapper;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.to.DataObjectType;
import com.cloud.agent.api.to.DataStoreTO;
import com.cloud.agent.api.to.DataTO;
import com.cloud.agent.api.to.DiskTO;
import com.cloud.agent.api.to.NfsTO;
import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
import com.cloud.hypervisor.kvm.resource.LibvirtVMDef;
import com.cloud.resource.CommandWrapper;
import com.cloud.resource.ResourceWrapper;
import com.cloud.storage.DataStoreRole;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.vm.VirtualMachine;
import org.apache.cloudstack.command.ReconcileAnswer;
import org.apache.cloudstack.command.ReconcileCommand;
import org.apache.cloudstack.command.ReconcileCopyAnswer;
import org.apache.cloudstack.command.ReconcileCopyCommand;
import org.apache.cloudstack.command.ReconcileMigrateAnswer;
import org.apache.cloudstack.command.ReconcileMigrateCommand;
import org.apache.cloudstack.command.ReconcileMigrateVolumeAnswer;
import org.apache.cloudstack.command.ReconcileMigrateVolumeCommand;
import org.apache.cloudstack.storage.to.PrimaryDataStoreTO;
import org.apache.cloudstack.storage.to.VolumeObjectTO;
import org.apache.cloudstack.storage.volume.VolumeOnStorageTO;
import org.libvirt.Connect;
import org.libvirt.Domain;
import org.libvirt.DomainInfo.DomainState;
import org.libvirt.LibvirtException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@ResourceWrapper(handles = ReconcileCommand.class)
public final class LibvirtReconcileCommandWrapper extends CommandWrapper<ReconcileCommand, Answer, LibvirtComputingResource> {
@Override
public Answer execute(final ReconcileCommand command, final LibvirtComputingResource libvirtComputingResource) {
if (command instanceof ReconcileMigrateCommand) {
return handle((ReconcileMigrateCommand) command, libvirtComputingResource);
} else if (command instanceof ReconcileCopyCommand) {
return handle((ReconcileCopyCommand) command, libvirtComputingResource);
} else if (command instanceof ReconcileMigrateVolumeCommand) {
return handle((ReconcileMigrateVolumeCommand) command, libvirtComputingResource);
}
return new ReconcileAnswer();
}
private ReconcileAnswer handle(final ReconcileMigrateCommand reconcileCommand, final LibvirtComputingResource libvirtComputingResource) {
String vmName = reconcileCommand.getVmName();
final LibvirtUtilitiesHelper libvirtUtilitiesHelper = libvirtComputingResource.getLibvirtUtilitiesHelper();
ReconcileMigrateAnswer answer;
try {
Connect conn = libvirtUtilitiesHelper.getConnectionByVmName(vmName);
Domain vm = conn.domainLookupByName(vmName);
DomainState domainState = vm.getInfo().state;
logger.debug(String.format("Found VM %s with domain state %s", vmName, domainState));
VirtualMachine.State state = getState(domainState);
List<String> disks = null;
if (VirtualMachine.State.Running.equals(state)) {
disks = getVmDiskPaths(libvirtComputingResource.getDisks(conn, vmName));
}
answer = new ReconcileMigrateAnswer(vmName, state);
answer.setVmDisks(disks);
} catch (LibvirtException e) {
logger.debug(String.format("Failed to get state of VM %s, assume it is Stopped", vmName));
VirtualMachine.State state = VirtualMachine.State.Stopped;
answer = new ReconcileMigrateAnswer(vmName, state);
}
return answer;
}
static VirtualMachine.State getState(DomainState domainState) {
VirtualMachine.State state;
if (domainState == DomainState.VIR_DOMAIN_RUNNING) {
state = VirtualMachine.State.Running;
} else if (Arrays.asList(DomainState.VIR_DOMAIN_SHUTDOWN, DomainState.VIR_DOMAIN_SHUTOFF, DomainState.VIR_DOMAIN_CRASHED).contains(domainState)) {
state = VirtualMachine.State.Stopped;
} else if (domainState == DomainState.VIR_DOMAIN_PAUSED) {
state = VirtualMachine.State.Unknown;
} else {
state = VirtualMachine.State.Unknown;
}
return state;
}
private List<String> getVmDiskPaths(List<LibvirtVMDef.DiskDef> diskDefs) {
List<String> diskPaths = new ArrayList<String>();
for (LibvirtVMDef.DiskDef diskDef : diskDefs) {
if (diskDef.getDiskPath() != null) {
diskPaths.add(diskDef.getDiskPath());
}
}
return diskPaths;
}
private ReconcileAnswer handle(final ReconcileCopyCommand reconcileCommand, final LibvirtComputingResource libvirtComputingResource) {
DataTO srcData = reconcileCommand.getSrcData();
DataTO destData = reconcileCommand.getDestData();
DataStoreTO srcDataStore = srcData.getDataStore();
DataStoreTO destDataStore = destData.getDataStore();
// consistent with StorageSubsystemCommandHandlerBase.execute(CopyCommand cmd)
if (srcData.getObjectType() == DataObjectType.TEMPLATE &&
(srcData.getDataStore().getRole() == DataStoreRole.Image || srcData.getDataStore().getRole() == DataStoreRole.ImageCache) &&
destData.getDataStore().getRole() == DataStoreRole.Primary) {
String reason = "copy template to primary storage";
return new ReconcileCopyAnswer(true, reason);
} else if (srcData.getObjectType() == DataObjectType.TEMPLATE && srcDataStore.getRole() == DataStoreRole.Primary &&
destDataStore.getRole() == DataStoreRole.Primary) {
String reason = "clone template to a volume";
return new ReconcileCopyAnswer(true, reason);
} else if (srcData.getObjectType() == DataObjectType.VOLUME &&
(srcData.getDataStore().getRole() == DataStoreRole.ImageCache || srcDataStore.getRole() == DataStoreRole.Image)) {
logger.debug("Reconciling: copy volume from image cache to primary");
return reconcileCopyVolumeFromImageCacheToPrimary(srcData, destData, reconcileCommand.getOption2(), libvirtComputingResource);
} else if (srcData.getObjectType() == DataObjectType.VOLUME && srcData.getDataStore().getRole() == DataStoreRole.Primary) {
if (destData.getObjectType() == DataObjectType.VOLUME) {
if ((srcData instanceof VolumeObjectTO && ((VolumeObjectTO)srcData).isDirectDownload()) ||
destData.getDataStore().getRole() == DataStoreRole.Primary) {
logger.debug("Reconciling: copy volume from primary to primary");
return reconcileCopyVolumeFromPrimaryToPrimary(srcData, destData, libvirtComputingResource);
} else {
logger.debug("Reconciling: copy volume from primary to secondary");
return reconcileCopyVolumeFromPrimaryToSecondary(srcData, destData, reconcileCommand.getOption(), libvirtComputingResource);
}
} else if (destData.getObjectType() == DataObjectType.TEMPLATE) {
String reason = "create volume from template";
return new ReconcileCopyAnswer(true, reason);
}
} else if (srcData.getObjectType() == DataObjectType.SNAPSHOT && destData.getObjectType() == DataObjectType.SNAPSHOT &&
srcData.getDataStore().getRole() == DataStoreRole.Primary) {
String reason = "backup snapshot from primary";
return new ReconcileCopyAnswer(true, reason);
} else if (srcData.getObjectType() == DataObjectType.SNAPSHOT && destData.getObjectType() == DataObjectType.VOLUME) {
String reason = "create volume from snapshot";
return new ReconcileCopyAnswer(true, reason);
} else if (srcData.getObjectType() == DataObjectType.SNAPSHOT && destData.getObjectType() == DataObjectType.TEMPLATE) {
String reason = "create template from snapshot";
return new ReconcileCopyAnswer(true, reason);
}
return new ReconcileCopyAnswer(true, "not implemented yet");
}
private ReconcileCopyAnswer reconcileCopyVolumeFromImageCacheToPrimary(DataTO srcData, DataTO destData, Map<String, String> details, LibvirtComputingResource libvirtComputingResource) {
// consistent with KVMStorageProcessor.copyVolumeFromImageCacheToPrimary
final DataStoreTO srcStore = srcData.getDataStore();
if (!(srcStore instanceof NfsTO)) {
return new ReconcileCopyAnswer(true, "can only handle nfs storage as source");
}
final DataStoreTO destStore = destData.getDataStore();
final PrimaryDataStoreTO primaryStore = (PrimaryDataStoreTO)destStore;
String path = destData.getPath();
if (path == null) {
path = details != null ? details.get(DiskTO.PATH) : null;
}
if (path == null) {
path = details != null ? details.get(DiskTO.IQN) : null;
}
if (path == null) {
return new ReconcileCopyAnswer(true, "path and iqn on destination storage are null");
}
try {
VolumeOnStorageTO volumeOnDestination = libvirtComputingResource.getVolumeOnStorage(primaryStore, path);
return new ReconcileCopyAnswer(null, volumeOnDestination);
} catch (final CloudRuntimeException e) {
logger.debug("Failed to reconcile CopyVolumeFromImageCacheToPrimary: ", e);
return new ReconcileCopyAnswer(false, false, e.toString());
}
}
private ReconcileCopyAnswer reconcileCopyVolumeFromPrimaryToPrimary(DataTO srcData, DataTO destData, LibvirtComputingResource libvirtComputingResource) {
// consistent with KVMStorageProcessor.copyVolumeFromPrimaryToPrimary
final String srcVolumePath = srcData.getPath();
final String destVolumePath = destData.getPath();
final DataStoreTO srcStore = srcData.getDataStore();
final DataStoreTO destStore = destData.getDataStore();
final PrimaryDataStoreTO srcPrimaryStore = (PrimaryDataStoreTO)srcStore;
final PrimaryDataStoreTO destPrimaryStore = (PrimaryDataStoreTO)destStore;
VolumeOnStorageTO volumeOnSource = null;
VolumeOnStorageTO volumeOnDestination = null;
try {
volumeOnSource = libvirtComputingResource.getVolumeOnStorage(srcPrimaryStore, srcVolumePath);
if (destPrimaryStore.isManaged() || destVolumePath != null) {
volumeOnDestination = libvirtComputingResource.getVolumeOnStorage(destPrimaryStore, destVolumePath);
}
return new ReconcileCopyAnswer(volumeOnSource, volumeOnDestination);
} catch (final CloudRuntimeException e) {
logger.debug("Failed to reconcile CopyVolumeFromPrimaryToPrimary: ", e);
return new ReconcileCopyAnswer(false, false, e.toString());
}
}
private ReconcileCopyAnswer reconcileCopyVolumeFromPrimaryToSecondary(DataTO srcData, DataTO destData, Map<String, String> details, LibvirtComputingResource libvirtComputingResource) {
// consistent with KVMStorageProcessor.copyVolumeFromPrimaryToSecondary
final String srcVolumePath = srcData.getPath();
final DataStoreTO srcStore = srcData.getDataStore();
final DataStoreTO destStore = destData.getDataStore();
final PrimaryDataStoreTO primaryStore = (PrimaryDataStoreTO)srcStore;
if (!(destStore instanceof NfsTO)) {
return new ReconcileCopyAnswer(true, "can only handle nfs storage as destination");
}
VolumeOnStorageTO volumeOnSource = libvirtComputingResource.getVolumeOnStorage(primaryStore, srcVolumePath);
return new ReconcileCopyAnswer(volumeOnSource, null);
}
private ReconcileAnswer handle(final ReconcileMigrateVolumeCommand reconcileCommand, final LibvirtComputingResource libvirtComputingResource) {
// consistent with LibvirtMigrateVolumeCommandWrapper.execute
DataTO srcData = reconcileCommand.getSrcData();
DataTO destData = reconcileCommand.getDestData();
PrimaryDataStoreTO srcDataStore = (PrimaryDataStoreTO) srcData.getDataStore();
PrimaryDataStoreTO destDataStore = (PrimaryDataStoreTO) destData.getDataStore();
VolumeOnStorageTO volumeOnSource = libvirtComputingResource.getVolumeOnStorage(srcDataStore, srcData.getPath());
VolumeOnStorageTO volumeOnDestination = libvirtComputingResource.getVolumeOnStorage(destDataStore, destData.getPath());
ReconcileMigrateVolumeAnswer answer = new ReconcileMigrateVolumeAnswer(volumeOnSource, volumeOnDestination);
String vmName = reconcileCommand.getVmName();
if (vmName != null) {
try {
LibvirtUtilitiesHelper libvirtUtilitiesHelper = libvirtComputingResource.getLibvirtUtilitiesHelper();
Connect conn = libvirtUtilitiesHelper.getConnectionByVmName(vmName);
List<String> disks = getVmDiskPaths(libvirtComputingResource.getDisks(conn, vmName));
answer.setVmName(vmName);
answer.setVmDiskPaths(disks);
} catch (LibvirtException e) {
logger.error(String.format("Unable to get disks for %s due to %s", vmName, e.getMessage()));
}
}
return answer;
}
}

View File

@ -75,6 +75,13 @@ public class LibvirtRequestWrapper extends RequestWrapper {
if (commandWrapper == null) {
throw new CommandNotSupported("No way to handle " + command.getClass());
}
return commandWrapper.execute(command, serverResource);
Answer answer = commandWrapper.execute(command, serverResource);
if (answer != null && command.isReconcile() && serverResource instanceof LibvirtComputingResource) {
LibvirtComputingResource libvirtComputingResource = (LibvirtComputingResource) serverResource;
libvirtComputingResource.createOrUpdateLogFileForCommand(command, answer);
}
return answer;
}
}

View File

@ -43,6 +43,7 @@ import java.util.stream.Collectors;
import javax.naming.ConfigurationException;
import com.cloud.agent.api.Command;
import org.apache.cloudstack.agent.directdownload.DirectDownloadAnswer;
import org.apache.cloudstack.agent.directdownload.DirectDownloadCommand;
import org.apache.cloudstack.direct.download.DirectDownloadHelper;
@ -341,8 +342,8 @@ public class KVMStorageProcessor implements StorageProcessor {
}
}
private String derivePath(PrimaryDataStoreTO primaryStore, DataTO destData, Map<String, String> details) {
String path;
public static String derivePath(PrimaryDataStoreTO primaryStore, DataTO destData, Map<String, String> details) {
String path = null;
if (primaryStore.getPoolType() == StoragePoolType.FiberChannel) {
path = destData.getPath();
} else {
@ -527,6 +528,11 @@ public class KVMStorageProcessor implements StorageProcessor {
final String volumeName = UUID.randomUUID().toString();
// Update path in the command for reconciliation
if (destData.getPath() == null) {
((VolumeObjectTO) destData).setPath(volumeName);
}
final int index = srcVolumePath.lastIndexOf(File.separator);
final String volumeDir = srcVolumePath.substring(0, index);
String srcVolumeName = srcVolumePath.substring(index + 1);
@ -543,7 +549,9 @@ public class KVMStorageProcessor implements StorageProcessor {
volume.setDispName(srcVol.getName());
volume.setVmName(srcVol.getVmName());
resource.createOrUpdateLogFileForCommand(cmd, Command.State.PROCESSING_IN_BACKEND);
final KVMPhysicalDisk newDisk = storagePoolMgr.copyPhysicalDisk(volume, path != null ? path : volumeName, primaryPool, cmd.getWaitInMillSeconds());
resource.createOrUpdateLogFileForCommand(cmd, Command.State.COMPLETED);
storagePoolMgr.disconnectPhysicalDisk(primaryStore.getPoolType(), primaryStore.getUuid(), path);
@ -2556,22 +2564,30 @@ public class KVMStorageProcessor implements StorageProcessor {
} else {
final String volumeName = UUID.randomUUID().toString();
destVolumeName = volumeName + "." + destFormat.getFileExtension();
// Update path in the command for reconciliation
if (destData.getPath() == null) {
((VolumeObjectTO) destData).setPath(destVolumeName);
}
}
destPool = storagePoolMgr.getStoragePool(destPrimaryStore.getPoolType(), destPrimaryStore.getUuid());
try {
Volume.Type volumeType = srcVol.getVolumeType();
resource.createOrUpdateLogFileForCommand(cmd, Command.State.PROCESSING_IN_BACKEND);
if (srcVol.getPassphrase() != null && (Volume.Type.ROOT.equals(volumeType) || Volume.Type.DATADISK.equals(volumeType))) {
volume.setQemuEncryptFormat(QemuObject.EncryptFormat.LUKS);
storagePoolMgr.copyPhysicalDisk(volume, destVolumeName, destPool, cmd.getWaitInMillSeconds(), srcVol.getPassphrase(), destVol.getPassphrase(), srcVol.getProvisioningType());
} else {
storagePoolMgr.copyPhysicalDisk(volume, destVolumeName, destPool, cmd.getWaitInMillSeconds());
}
resource.createOrUpdateLogFileForCommand(cmd, Command.State.COMPLETED);
} catch (Exception e) { // Any exceptions while copying the disk, should send failed answer with the error message
String errMsg = String.format("Failed to copy volume [uuid: %s, name: %s] to dest storage [id: %s, name: %s], due to %s",
srcVol.getUuid(), srcVol.getName(), destPrimaryStore.getUuid(), destPrimaryStore.getName(), e.toString());
logger.debug(errMsg, e);
resource.createOrUpdateLogFileForCommand(cmd, Command.State.FAILED);
throw new CloudRuntimeException(errMsg);
} finally {
if (srcPrimaryStore.isManaged()) {

View File

@ -0,0 +1,191 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package com.cloud.hypervisor.kvm.resource;
import com.cloud.hypervisor.kvm.resource.disconnecthook.DisconnectHook;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class DisconnectHooksTest {
class TestHook extends DisconnectHook {
private boolean _started = false;
private boolean _executed = false;
private boolean _withTimeout = false;
private long _runtime;
public TestHook() { super("foo"); }
public TestHook(long timeout, long runtime) {
super("foo", timeout);
_withTimeout = true;
_runtime = runtime;
}
@Override
public void run() {
_started = true;
if (this._withTimeout) {
try {
Thread.sleep(this._runtime);
} catch (InterruptedException e) {
throw new RuntimeException("TestHook interrupted while sleeping");
}
}
this._executed = true;
}
protected boolean hasRun() {
return _executed;
}
protected boolean hasStarted() {
return _started;
}
}
LibvirtComputingResource libvirtComputingResource;
@Before
public void setup() {
libvirtComputingResource = new LibvirtComputingResource();
}
@Test
public void addHookWithoutRun() {
TestHook hook = new TestHook();
libvirtComputingResource = new LibvirtComputingResource();
libvirtComputingResource.addDisconnectHook(hook);
// test that we added hook but did not run it
Assert.assertEquals(1, libvirtComputingResource._disconnectHooks.size());
Assert.assertFalse(hook.hasRun());
}
@Test
public void addHookWithRun() {
TestHook hook = new TestHook();
libvirtComputingResource = new LibvirtComputingResource();
libvirtComputingResource.addDisconnectHook(hook);
// test that we added hook but did not run it
Assert.assertEquals(1, libvirtComputingResource._disconnectHooks.size());
Assert.assertFalse(hook.hasRun());
// test that we run and remove hook on disconnect
libvirtComputingResource.disconnected();
Assert.assertTrue(hook.hasRun());
Assert.assertEquals(0, libvirtComputingResource._disconnectHooks.size());
}
@Test
public void addAndRemoveHooksWithAndWithoutRun() {
TestHook hook1 = new TestHook();
TestHook hook2 = new TestHook();
libvirtComputingResource.addDisconnectHook(hook1);
libvirtComputingResource.addDisconnectHook(hook2);
Assert.assertEquals(2, libvirtComputingResource._disconnectHooks.size());
Assert.assertFalse(hook1.hasRun());
Assert.assertFalse(hook2.hasRun());
// remove first hook but leave second hook
libvirtComputingResource.removeDisconnectHook(hook1);
libvirtComputingResource.disconnected();
// ensure removed hook did not run
Assert.assertFalse(hook1.hasRun());
// ensure remaining hook did run
Assert.assertTrue(hook2.hasRun());
Assert.assertEquals(0, libvirtComputingResource._disconnectHooks.size());
}
@Test
public void addAndRunHooksOneWithTimeout() {
// test that hook stops running when we exceed timeout
long timeout = 500;
TestHook hook1 = new TestHook(timeout, timeout + 100);
TestHook hook2 = new TestHook();
libvirtComputingResource.addDisconnectHook(hook1);
libvirtComputingResource.addDisconnectHook(hook2);
libvirtComputingResource.disconnected();
Assert.assertTrue(hook2.hasRun());
try {
Thread.sleep(timeout);
} catch (Exception ignored){}
Assert.assertTrue(hook1.hasStarted());
Assert.assertFalse(hook1.isAlive());
Assert.assertFalse(hook1.hasRun());
Assert.assertEquals(0, libvirtComputingResource._disconnectHooks.size());
}
@Test
public void addAndRunTwoHooksWithTimeout() {
// test that hooks stop running when we exceed timeout
// test for parallel timeout rather than additive
long timeout = 500;
TestHook hook1 = new TestHook(timeout, timeout + 100);
TestHook hook2 = new TestHook(timeout, timeout + 100);
libvirtComputingResource.addDisconnectHook(hook1);
libvirtComputingResource.addDisconnectHook(hook2);
libvirtComputingResource.disconnected();
// if the timeouts were additive (e.g. if we were sequentially looping through join(timeout)), the second Hook
// would get enough time to complete (500 for first Hook and 500 for itself) and not be interrupted.
try {
Thread.sleep(timeout*2);
} catch (Exception ignored){}
Assert.assertTrue(hook1.hasStarted());
Assert.assertFalse(hook1.isAlive());
Assert.assertFalse(hook1.hasRun());
Assert.assertTrue(hook2.hasStarted());
Assert.assertFalse(hook2.isAlive());
Assert.assertFalse(hook2.hasRun());
Assert.assertEquals(0, libvirtComputingResource._disconnectHooks.size());
}
@Test
public void addAndRunTimeoutHooksToCompletion() {
// test we can run to completion if we don't take as long as timeout, and they run parallel
long timeout = 500;
TestHook hook1 = new TestHook(timeout, timeout - 100);
TestHook hook2 = new TestHook(timeout, timeout - 100);
libvirtComputingResource.addDisconnectHook(hook1);
libvirtComputingResource.addDisconnectHook(hook2);
libvirtComputingResource.disconnected();
try {
Thread.sleep(timeout);
} catch (Exception ignored){}
Assert.assertTrue(hook1.hasStarted());
Assert.assertTrue(hook1.hasRun());
Assert.assertTrue(hook2.hasStarted());
Assert.assertTrue(hook2.hasRun());
Assert.assertEquals(0, libvirtComputingResource._disconnectHooks.size());
}
}

View File

@ -1760,6 +1760,8 @@ public class LibvirtComputingResourceTest {
@Test
public void testCheckVirtualMachineCommand() {
final Connect conn = Mockito.mock(Connect.class);
final Domain vm = Mockito.mock(Domain.class);
final DomainInfo domainInfo = Mockito.mock(DomainInfo.class);
final LibvirtUtilitiesHelper libvirtUtilitiesHelper = Mockito.mock(LibvirtUtilitiesHelper.class);
final String vmName = "Test";
@ -1768,6 +1770,8 @@ public class LibvirtComputingResourceTest {
when(libvirtComputingResourceMock.getLibvirtUtilitiesHelper()).thenReturn(libvirtUtilitiesHelper);
try {
when(libvirtUtilitiesHelper.getConnectionByVmName(vmName)).thenReturn(conn);
when(conn.domainLookupByName(vmName)).thenReturn(vm);
when(vm.getInfo()).thenReturn(domainInfo);
} catch (final LibvirtException e) {
fail(e.getMessage());
}
@ -5521,7 +5525,7 @@ public class LibvirtComputingResourceTest {
@Test
public void testSetQuotaAndPeriod() {
double pct = 0.33d;
Mockito.when(vmTO.getLimitCpuUse()).thenReturn(true);
Mockito.when(vmTO.isLimitCpuUse()).thenReturn(true);
Mockito.when(vmTO.getCpuQuotaPercentage()).thenReturn(pct);
CpuTuneDef cpuTuneDef = new CpuTuneDef();
final LibvirtComputingResource lcr = new LibvirtComputingResource();
@ -5532,7 +5536,7 @@ public class LibvirtComputingResourceTest {
@Test
public void testSetQuotaAndPeriodNoCpuLimitUse() {
Mockito.when(vmTO.getLimitCpuUse()).thenReturn(false);
Mockito.when(vmTO.isLimitCpuUse()).thenReturn(false);
CpuTuneDef cpuTuneDef = new CpuTuneDef();
final LibvirtComputingResource lcr = new LibvirtComputingResource();
lcr.setQuotaAndPeriod(vmTO, cpuTuneDef);
@ -5543,7 +5547,7 @@ public class LibvirtComputingResourceTest {
@Test
public void testSetQuotaAndPeriodMinQuota() {
double pct = 0.01d;
Mockito.when(vmTO.getLimitCpuUse()).thenReturn(true);
Mockito.when(vmTO.isLimitCpuUse()).thenReturn(true);
Mockito.when(vmTO.getCpuQuotaPercentage()).thenReturn(pct);
CpuTuneDef cpuTuneDef = new CpuTuneDef();
final LibvirtComputingResource lcr = new LibvirtComputingResource();

View File

@ -1961,7 +1961,7 @@ public class VmwareResource extends ServerResourceBase implements StoragePoolRes
// Check if license supports the feature
VmwareHelper.isFeatureLicensed(hyperHost, FeatureKeyConstants.HOTPLUG);
VmwareHelper.setVmScaleUpConfig(vmConfigSpec, vmSpec.getCpus(), vmSpec.getMaxSpeed(), getReservedCpuMHZ(vmSpec), (int) requestedMaxMemoryInMb, ramMb,
vmSpec.getLimitCpuUse());
vmSpec.isLimitCpuUse());
if (!vmMo.configureVm(vmConfigSpec)) {
throw new Exception("Unable to execute ScaleVmCommand");
@ -2188,7 +2188,7 @@ public class VmwareResource extends ServerResourceBase implements StoragePoolRes
}
tearDownVm(vmMo);
} else if (!hyperHost.createBlankVm(vmNameOnVcenter, vmInternalCSName, vmSpec.getCpus(), vmSpec.getMaxSpeed().intValue(), getReservedCpuMHZ(vmSpec),
vmSpec.getLimitCpuUse(), (int) (vmSpec.getMaxRam() / ResourceType.bytesToMiB), getReservedMemoryMb(vmSpec), guestOsId, rootDiskDataStoreDetails.first(), false,
vmSpec.isLimitCpuUse(), (int) (vmSpec.getMaxRam() / ResourceType.bytesToMiB), getReservedMemoryMb(vmSpec), guestOsId, rootDiskDataStoreDetails.first(), false,
controllerInfo, systemVm)) {
throw new Exception("Failed to create VM. vmName: " + vmInternalCSName);
}
@ -2232,7 +2232,7 @@ public class VmwareResource extends ServerResourceBase implements StoragePoolRes
VirtualDeviceConfigSpec[] deviceConfigSpecArray = new VirtualDeviceConfigSpec[totalChangeDevices];
DiskTO[] sortedDisks = sortVolumesByDeviceId(disks);
VmwareHelper.setBasicVmConfig(vmConfigSpec, vmSpec.getCpus(), vmSpec.getMaxSpeed(), getReservedCpuMHZ(vmSpec), (int) (vmSpec.getMaxRam() / (1024 * 1024)),
getReservedMemoryMb(vmSpec), guestOsId, vmSpec.getLimitCpuUse(), deployAsIs);
getReservedMemoryMb(vmSpec), guestOsId, vmSpec.isLimitCpuUse(), deployAsIs);
// Check for multi-cores per socket settings
int numCoresPerSocket = 1;

View File

@ -1387,7 +1387,7 @@ public abstract class CitrixResourceBase extends ServerResourceBase implements S
cpuWeight = _maxWeight;
}
if (vmSpec.getLimitCpuUse()) {
if (vmSpec.isLimitCpuUse()) {
// CPU cap is per VM, so need to assign cap based on the number
// of vcpus
utilization = (int)(vmSpec.getMaxSpeed() * 0.99 * vmSpec.getCpus() / _host.getSpeed() * 100);
@ -4709,7 +4709,7 @@ public abstract class CitrixResourceBase extends ServerResourceBase implements S
cpuWeight = _maxWeight;
}
if (vmSpec.getLimitCpuUse()) {
if (vmSpec.isLimitCpuUse()) {
long utilization; // max CPU cap, default is unlimited
utilization = (int)(vmSpec.getMaxSpeed() * 0.99 * vmSpec.getCpus() / _host.getSpeed() * 100);
// vm.addToVCPUsParamsLive(conn, "cap",

View File

@ -62,6 +62,7 @@ import org.apache.cloudstack.storage.datastore.util.ScaleIOUtil;
import org.apache.cloudstack.storage.to.SnapshotObjectTO;
import org.apache.cloudstack.storage.to.VolumeObjectTO;
import org.apache.cloudstack.storage.volume.VolumeObject;
import org.apache.cloudstack.utils.reflectiontostringbuilderutils.ReflectionToStringBuilderUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
@ -79,6 +80,7 @@ import com.cloud.agent.api.to.DiskTO;
import com.cloud.agent.api.to.StorageFilerTO;
import com.cloud.alert.AlertManager;
import com.cloud.configuration.Config;
import com.cloud.exception.OperationTimedoutException;
import com.cloud.host.Host;
import com.cloud.host.HostVO;
import com.cloud.host.dao.HostDao;
@ -893,9 +895,7 @@ public class ScaleIOPrimaryDataStoreDriver implements PrimaryDataStoreDriver {
boolean migrateStatus = answer.getResult();
if (migrateStatus) {
updateVolumeAfterCopyVolume(srcData, destData);
updateSnapshotsAfterCopyVolume(srcData, destData);
deleteSourceVolumeAfterSuccessfulBlockCopy(srcData, host);
updateAfterSuccessfulVolumeMigration(srcData, destData, host);
logger.debug("Successfully migrated migrate PowerFlex volume {} to storage pool {}", srcData, destStore);
answer = new Answer(null, true, null);
} else {
@ -906,6 +906,17 @@ public class ScaleIOPrimaryDataStoreDriver implements PrimaryDataStoreDriver {
} catch (Exception e) {
logger.error("Failed to migrate PowerFlex volume: {} due to: {}", srcData, e.getMessage());
answer = new Answer(null, false, e.getMessage());
if (e.getMessage().contains(OperationTimedoutException.class.getName())) {
logger.error(String.format("The PowerFlex volume %s might have been migrated because the exception is %s, checking the volume on destination pool", srcData, OperationTimedoutException.class.getName()));
Boolean volumeOnDestination = getVolumeStateOnPool(destStore, destVolumePath);
if (volumeOnDestination) {
logger.error(String.format("The PowerFlex volume %s has been migrated to destination pool %s", srcData, destStore.getName()));
updateAfterSuccessfulVolumeMigration(srcData, destData, host);
answer = new Answer(null, true, null);
} else {
logger.error(String.format("The PowerFlex volume %s has not been migrated completely to destination pool %s", srcData, destStore.getName()));
}
}
}
if (destVolumePath != null && !answer.getResult()) {
@ -915,6 +926,40 @@ public class ScaleIOPrimaryDataStoreDriver implements PrimaryDataStoreDriver {
return answer;
}
private void updateAfterSuccessfulVolumeMigration(DataObject srcData, DataObject destData, Host host) {
try {
updateVolumeAfterCopyVolume(srcData, destData);
updateSnapshotsAfterCopyVolume(srcData, destData);
deleteSourceVolumeAfterSuccessfulBlockCopy(srcData, host);
} catch (Exception ex) {
logger.error(String.format("Error while update PowerFlex volume: %s after successfully migration due to: %s", srcData, ex.getMessage()));
}
}
public Boolean getVolumeStateOnPool(DataStore srcStore, String srcVolumePath) {
try {
// check the state of volume on pool via ScaleIO gateway
final ScaleIOGatewayClient client = getScaleIOClient(srcStore);
final String sourceScaleIOVolumeId = ScaleIOUtil.getVolumePath(srcVolumePath);
final org.apache.cloudstack.storage.datastore.api.Volume sourceScaleIOVolume = client.getVolume(sourceScaleIOVolumeId);
logger.debug(String.format("The PowerFlex volume %s on pool %s is: %s", srcVolumePath, srcStore.getName(),
ReflectionToStringBuilderUtils.reflectOnlySelectedFields(sourceScaleIOVolume, "id", "name", "vtreeId", "sizeInGB", "volumeSizeInGb")));
if (sourceScaleIOVolume == null || StringUtils.isEmpty(sourceScaleIOVolume.getVtreeId())) {
return false;
}
Pair<Long, Long> volumeStats = getVolumeStats(storagePoolDao.findById(srcStore.getId()), srcVolumePath);
if (volumeStats == null) {
logger.debug(String.format("Unable to find volume stats for %s on pool %s", srcVolumePath, srcStore.getName()));
return false;
}
logger.debug(String.format("Found volume stats for %s: provisionedSizeInBytes = %s, allocatedSizeInBytes = %s on pool %s", srcVolumePath, volumeStats.first(), volumeStats.second(), srcStore.getName()));
return volumeStats.first().equals(volumeStats.second());
} catch (Exception ex) {
logger.error(String.format("Failed to check if PowerFlex volume %s exists on source pool %s", srcVolumePath, srcStore.getName()));
}
return null;
}
protected void updateVolumeAfterCopyVolume(DataObject srcData, DataObject destData) {
// destination volume is already created and volume path is set in database by this time at "CreateObjectAnswer createAnswer = createVolume((VolumeInfo) destData, destStore.getId());"
final long srcVolumeId = srcData.getId();

View File

@ -379,7 +379,7 @@ public class HighAvailabilityManagerImpl extends ManagerBase implements Configur
protected void wakeupWorkers() {
logger.debug("Wakeup workers HA");
for (WorkerThread worker : _workers) {
worker.wakup();
worker.wakeup();
}
}
@ -589,6 +589,10 @@ public class HighAvailabilityManagerImpl extends ManagerBase implements Configur
vm.getUpdated() + " previous updated = " + work.getUpdateTime());
return null;
}
if (vm.getHostId() != null && !vm.getHostId().equals(work.getHostId())) {
logger.info("VM " + vm + " has been changed. Current host id = " + vm.getHostId() + " Previous host id = " + work.getHostId());
return null;
}
AlertManager.AlertType alertType = AlertManager.AlertType.ALERT_TYPE_USERVM;
if (VirtualMachine.Type.DomainRouter.equals(vm.getType())) {
@ -1209,7 +1213,7 @@ public class HighAvailabilityManagerImpl extends ManagerBase implements Configur
}
}
public synchronized void wakup() {
public synchronized void wakeup() {
notifyAll();
}
}

View File

@ -128,7 +128,7 @@ public class KVMGuru extends HypervisorGuruBase implements HypervisorGuru {
* @param vmProfile vm profile
*/
protected void setVmQuotaPercentage(VirtualMachineTO to, VirtualMachineProfile vmProfile) {
if (to.getLimitCpuUse()) {
if (to.isLimitCpuUse()) {
VirtualMachine vm = vmProfile.getVirtualMachine();
HostVO host = hostDao.findById(vm.getHostId());
if (host == null) {

View File

@ -382,4 +382,7 @@
<bean id="sharedFSServiceImpl" class="org.apache.cloudstack.storage.sharedfs.SharedFSServiceImpl">
<property name="sharedFSProviders" value="#{sharedFSProvidersRegistry.registered}" />
</bean>
<bean id="reconcileCommandServiceImpl" class="org.apache.cloudstack.command.ReconcileCommandServiceImpl">
</bean>
</beans>

View File

@ -114,7 +114,7 @@ public class KVMGuruTest {
@Before
public void setup() throws UnsupportedEncodingException {
Mockito.when(vmTO.getLimitCpuUse()).thenReturn(true);
Mockito.when(vmTO.isLimitCpuUse()).thenReturn(true);
Mockito.when(vmProfile.getVirtualMachine()).thenReturn(vm);
Mockito.when(vm.getHostId()).thenReturn(hostId);
Mockito.when(hostDao.findById(hostId)).thenReturn(host);
@ -156,7 +156,7 @@ public class KVMGuruTest {
@Test
public void testSetVmQuotaPercentageNotCPULimit() {
Mockito.when(vmTO.getLimitCpuUse()).thenReturn(false);
Mockito.when(vmTO.isLimitCpuUse()).thenReturn(false);
guru.setVmQuotaPercentage(vmTO, vmProfile);
Mockito.verify(vmProfile, Mockito.never()).getVirtualMachine();
Mockito.verify(vmTO, Mockito.never()).setCpuQuotaPercentage(Mockito.anyDouble());