Refactor KubernetesClusterResourceModifierActionWorker (#8801)

Co-authored-by: dahn <daan.hoogland@gmail.com>
This commit is contained in:
Felipe 2024-07-14 11:02:04 -03:00 committed by GitHub
parent 77cc75ab02
commit a87778be9a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 158 additions and 112 deletions

View File

@ -94,11 +94,35 @@ public class CreateFirewallRuleCmd extends BaseAsyncCreateCmd implements Firewal
return ipAddressId;
}
public void setIpAddressId(Long ipAddressId) {
this.ipAddressId = ipAddressId;
}
@Override
public String getProtocol() {
return protocol.trim();
}
public void setProtocol(String protocol) {
this.protocol = protocol;
}
public Integer getPublicStartPort() {
return publicStartPort;
}
public void setPublicStartPort(Integer publicStartPort) {
this.publicStartPort = publicStartPort;
}
public Integer getPublicEndPort() {
return publicEndPort;
}
public void setPublicEndPort(Integer publicEndPort) {
this.publicEndPort = publicEndPort;
}
@Override
public List<String> getSourceCidrList() {
if (cidrlist != null) {

View File

@ -113,6 +113,10 @@ public class CreateNetworkACLCmd extends BaseAsyncCreateCmd {
return p;
}
public void setProtocol(String protocol) {
this.protocol = protocol;
}
public List<String> getSourceCidrList() {
if (cidrlist != null) {
return cidrlist;
@ -136,6 +140,9 @@ public class CreateNetworkACLCmd extends BaseAsyncCreateCmd {
throw new InvalidParameterValueException("Invalid traffic type " + trafficType);
}
public void setTrafficType(String trafficType) {
this.trafficType = trafficType;
}
// ///////////////////////////////////////////////////
// ///////////// API Implementation///////////////////
// ///////////////////////////////////////////////////
@ -144,15 +151,23 @@ public class CreateNetworkACLCmd extends BaseAsyncCreateCmd {
return action;
}
public void setAction(String action) {
this.action = action;
}
public Integer getNumber() {
return number;
}
public Integer getSourcePortStart() {
public Integer getPublicStartPort() {
return publicStartPort;
}
public Integer getSourcePortEnd() {
public void setPublicStartPort(Integer publicStartPort) {
this.publicStartPort = publicStartPort;
}
public Integer getPublicEndPort() {
if (publicEndPort == null) {
if (publicStartPort != null) {
return publicStartPort;
@ -164,10 +179,18 @@ public class CreateNetworkACLCmd extends BaseAsyncCreateCmd {
return null;
}
public void setPublicEndPort(Integer publicEndPort) {
this.publicEndPort = publicEndPort;
}
public Long getNetworkId() {
return networkId;
}
public void setNetworkId(Long networkId) {
this.networkId = networkId;
}
@Override
public long getEntityOwnerId() {
Account caller = CallContext.current().getCallingAccount();
@ -207,6 +230,10 @@ public class CreateNetworkACLCmd extends BaseAsyncCreateCmd {
return aclId;
}
public void setAclId(Long aclId) {
this.aclId = aclId;
}
public String getReason() {
return reason;
}

View File

@ -102,6 +102,10 @@ public class StartVMCmd extends BaseAsyncCmd implements UserCmd {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Long getHostId() {
return hostId;
}

View File

@ -101,6 +101,10 @@ public class ResizeVolumeCmd extends BaseAsyncCmd implements UserCmd {
return getEntityId();
}
public void setId(Long id) {
this.id = id;
}
public Long getMinIops() {
return minIops;
}
@ -113,6 +117,10 @@ public class ResizeVolumeCmd extends BaseAsyncCmd implements UserCmd {
return size;
}
public void setSize(Long size) {
this.size = size;
}
public boolean isShrinkOk() {
return shrinkOk;
}

View File

@ -17,31 +17,6 @@
package com.cloud.kubernetes.cluster.actionworkers;
import static com.cloud.utils.NumbersUtil.toHumanReadableSize;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.cloudstack.api.ApiCommandResourceType;
import org.apache.cloudstack.api.ApiConstants;
import org.apache.cloudstack.api.BaseCmd;
import org.apache.cloudstack.api.command.user.firewall.CreateFirewallRuleCmd;
import org.apache.cloudstack.api.command.user.network.CreateNetworkACLCmd;
import org.apache.cloudstack.api.command.user.volume.ResizeVolumeCmd;
import org.apache.cloudstack.context.CallContext;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import com.cloud.capacity.CapacityManager;
import com.cloud.dc.ClusterDetailsDao;
import com.cloud.dc.ClusterDetailsVO;
@ -57,6 +32,7 @@ import com.cloud.exception.ManagementServerException;
import com.cloud.exception.NetworkRuleConflictException;
import com.cloud.exception.OperationTimedoutException;
import com.cloud.exception.PermissionDeniedException;
import com.cloud.exception.ResourceAllocationException;
import com.cloud.exception.ResourceUnavailableException;
import com.cloud.host.Host;
import com.cloud.host.HostVO;
@ -102,7 +78,6 @@ import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.db.TransactionCallback;
import com.cloud.utils.db.TransactionCallbackWithException;
import com.cloud.utils.db.TransactionStatus;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.net.Ip;
import com.cloud.utils.net.NetUtils;
@ -112,8 +87,30 @@ import com.cloud.vm.UserVmManager;
import com.cloud.vm.VirtualMachine;
import com.cloud.vm.VmDetailConstants;
import com.cloud.vm.dao.VMInstanceDao;
import org.apache.cloudstack.api.ApiCommandResourceType;
import org.apache.cloudstack.api.ApiConstants;
import org.apache.cloudstack.api.BaseCmd;
import org.apache.cloudstack.api.command.user.firewall.CreateFirewallRuleCmd;
import org.apache.cloudstack.api.command.user.network.CreateNetworkACLCmd;
import org.apache.cloudstack.api.command.user.volume.ResizeVolumeCmd;
import org.apache.cloudstack.context.CallContext;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Level;
import javax.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static com.cloud.utils.NumbersUtil.toHumanReadableSize;
public class KubernetesClusterResourceModifierActionWorker extends KubernetesClusterActionWorker {
@Inject
@ -308,17 +305,14 @@ public class KubernetesClusterResourceModifierActionWorker extends KubernetesClu
if (volumeVO.getVolumeType() == Volume.Type.ROOT) {
ResizeVolumeCmd resizeVolumeCmd = new ResizeVolumeCmd();
resizeVolumeCmd = ComponentContext.inject(resizeVolumeCmd);
Field f = resizeVolumeCmd.getClass().getDeclaredField("size");
Field f1 = resizeVolumeCmd.getClass().getDeclaredField("id");
f.setAccessible(true);
f1.setAccessible(true);
f1.set(resizeVolumeCmd, volumeVO.getId());
f.set(resizeVolumeCmd, kubernetesCluster.getNodeRootDiskSize());
resizeVolumeCmd.setSize(kubernetesCluster.getNodeRootDiskSize());
resizeVolumeCmd.setId(volumeVO.getId());
volumeService.resizeVolume(resizeVolumeCmd);
}
}
}
} catch (IllegalAccessException | NoSuchFieldException e) {
} catch (ResourceAllocationException e) {
throw new ManagementServerException(String.format("Failed to resize volume of VM in the Kubernetes cluster : %s", kubernetesCluster.getName()), e);
}
}
@ -431,30 +425,20 @@ public class KubernetesClusterResourceModifierActionWorker extends KubernetesClu
List<String> sourceCidrList = new ArrayList<String>();
sourceCidrList.add("0.0.0.0/0");
CreateFirewallRuleCmd rule = new CreateFirewallRuleCmd();
rule = ComponentContext.inject(rule);
CreateFirewallRuleCmd firewallRule = new CreateFirewallRuleCmd();
firewallRule = ComponentContext.inject(firewallRule);
Field addressField = rule.getClass().getDeclaredField("ipAddressId");
addressField.setAccessible(true);
addressField.set(rule, publicIp.getId());
firewallRule.setIpAddressId(publicIp.getId());
Field protocolField = rule.getClass().getDeclaredField("protocol");
protocolField.setAccessible(true);
protocolField.set(rule, "TCP");
firewallRule.setProtocol("TCP");
Field startPortField = rule.getClass().getDeclaredField("publicStartPort");
startPortField.setAccessible(true);
startPortField.set(rule, startPort);
firewallRule.setPublicStartPort(startPort);
Field endPortField = rule.getClass().getDeclaredField("publicEndPort");
endPortField.setAccessible(true);
endPortField.set(rule, endPort);
firewallRule.setPublicEndPort(endPort);
Field cidrField = rule.getClass().getDeclaredField("cidrlist");
cidrField.setAccessible(true);
cidrField.set(rule, sourceCidrList);
firewallRule.setSourceCidrList(sourceCidrList);
firewallService.createIngressFirewallRule(rule);
firewallService.createIngressFirewallRule(firewallRule);
firewallService.applyIngressFwRules(publicIp.getId(), account);
}
@ -515,6 +499,7 @@ public class KubernetesClusterResourceModifierActionWorker extends KubernetesClu
firewallRule.getSourcePortEnd() == CLUSTER_API_PORT) {
rule = firewallRule;
firewallService.revokeIngressFwRule(firewallRule.getId(), true);
logger.debug("The API firewall rule [%s] with the id [%s] was revoked",firewallRule.getName(),firewallRule.getId());
break;
}
}
@ -528,6 +513,7 @@ public class KubernetesClusterResourceModifierActionWorker extends KubernetesClu
if (firewallRule.getSourcePortStart() == CLUSTER_NODES_DEFAULT_START_SSH_PORT) {
rule = firewallRule;
firewallService.revokeIngressFwRule(firewallRule.getId(), true);
logger.debug("The SSH firewall rule [%s] with the id [%s] was revoked",firewallRule.getName(),firewallRule.getId());
break;
}
}
@ -541,6 +527,7 @@ public class KubernetesClusterResourceModifierActionWorker extends KubernetesClu
for (PortForwardingRuleVO pfRule : pfRules) {
if (pfRule.getVirtualMachineId() == vmId) {
portForwardingRulesDao.remove(pfRule.getId());
logger.debug("The Port forwarding rule [%s] with the id [%s] was removed.", pfRule.getName(), pfRule.getId());
break;
}
}
@ -555,6 +542,7 @@ public class KubernetesClusterResourceModifierActionWorker extends KubernetesClu
for (PortForwardingRuleVO pfRule : pfRules) {
if (startPort <= pfRule.getSourcePortStart() && pfRule.getSourcePortStart() <= endPort) {
portForwardingRulesDao.remove(pfRule.getId());
logger.debug("The Port forwarding rule [{}] with the id [{}] was removed.", pfRule.getName(), pfRule.getId());
}
}
rulesService.applyPortForwardingRules(publicIp.getId(), account);
@ -562,39 +550,36 @@ public class KubernetesClusterResourceModifierActionWorker extends KubernetesClu
protected void removeLoadBalancingRule(final IpAddress publicIp, final Network network,
final Account account) throws ResourceUnavailableException {
List<LoadBalancerVO> rules = loadBalancerDao.listByIpAddress(publicIp.getId());
for (LoadBalancerVO rule : rules) {
if (rule.getNetworkId() == network.getId() &&
rule.getAccountId() == account.getId() &&
rule.getSourcePortStart() == CLUSTER_API_PORT &&
rule.getSourcePortEnd() == CLUSTER_API_PORT) {
lbService.deleteLoadBalancerRule(rule.getId(), true);
break;
}
}
List<LoadBalancerVO> loadBalancerRules = loadBalancerDao.listByIpAddress(publicIp.getId());
loadBalancerRules.stream().filter(lbRules -> lbRules.getNetworkId() == network.getId() && lbRules.getAccountId() == account.getId() && lbRules.getSourcePortStart() == CLUSTER_API_PORT
&& lbRules.getSourcePortEnd() == CLUSTER_API_PORT).forEach(lbRule -> {
lbService.deleteLoadBalancerRule(lbRule.getId(), true);
logger.debug("The load balancing rule with the Id: {} was removed",lbRule.getId());
});
}
protected void provisionVpcTierAllowPortACLRule(final Network network, int startPort, int endPorts) throws NoSuchFieldException,
IllegalAccessException, ResourceUnavailableException {
List<NetworkACLItemVO> aclItems = networkACLItemDao.listByACL(network.getNetworkACLId());
aclItems = aclItems.stream().filter(x -> !NetworkACLItem.State.Revoke.equals(x.getState())).collect(Collectors.toList());
CreateNetworkACLCmd rule = new CreateNetworkACLCmd();
rule = ComponentContext.inject(rule);
Map<String, Object> fieldValues = Map.of(
"protocol", "TCP",
"publicStartPort", startPort,
"publicEndPort", endPorts,
"trafficType", NetworkACLItem.TrafficType.Ingress.toString(),
"networkId", network.getId(),
"aclId", network.getNetworkACLId(),
"action", NetworkACLItem.Action.Allow.toString()
);
for (Map.Entry<String, Object> entry : fieldValues.entrySet()) {
Field field = rule.getClass().getDeclaredField(entry.getKey());
field.setAccessible(true);
field.set(rule, entry.getValue());
}
NetworkACLItem aclRule = networkACLService.createNetworkACLItem(rule);
aclItems = aclItems.stream().filter(networkACLItem -> !NetworkACLItem.State.Revoke.equals(networkACLItem.getState())).collect(Collectors.toList());
CreateNetworkACLCmd networkACLRule = new CreateNetworkACLCmd();
networkACLRule = ComponentContext.inject(networkACLRule);
networkACLRule.setProtocol("TCP");
networkACLRule.setPublicStartPort(startPort);
networkACLRule.setPublicEndPort(endPorts);
networkACLRule.setTrafficType(NetworkACLItem.TrafficType.Ingress.toString());
networkACLRule.setNetworkId(network.getId());
networkACLRule.setAclId(network.getNetworkACLId());
networkACLRule.setAction(NetworkACLItem.Action.Allow.toString());
NetworkACLItem aclRule = networkACLService.createNetworkACLItem(networkACLRule);
networkACLService.moveRuleToTheTopInACLList(aclRule);
networkACLService.applyNetworkACL(aclRule.getAclId());
}
@ -602,13 +587,13 @@ public class KubernetesClusterResourceModifierActionWorker extends KubernetesClu
protected void removeVpcTierAllowPortACLRule(final Network network, int startPort, int endPort) throws NoSuchFieldException,
IllegalAccessException, ResourceUnavailableException {
List<NetworkACLItemVO> aclItems = networkACLItemDao.listByACL(network.getNetworkACLId());
aclItems = aclItems.stream().filter(x -> (x.getProtocol() != null &&
x.getProtocol().equals("TCP") &&
x.getSourcePortStart() != null &&
x.getSourcePortStart().equals(startPort) &&
x.getSourcePortEnd() != null &&
x.getSourcePortEnd().equals(endPort) &&
x.getAction().equals(NetworkACLItem.Action.Allow)))
aclItems = aclItems.stream().filter(networkACLItem -> (networkACLItem.getProtocol() != null &&
networkACLItem.getProtocol().equals("TCP") &&
networkACLItem.getSourcePortStart() != null &&
networkACLItem.getSourcePortStart().equals(startPort) &&
networkACLItem.getSourcePortEnd() != null &&
networkACLItem.getSourcePortEnd().equals(endPort) &&
networkACLItem.getAction().equals(NetworkACLItem.Action.Allow)))
.collect(Collectors.toList());
for (NetworkACLItemVO aclItem : aclItems) {
@ -801,29 +786,27 @@ public class KubernetesClusterResourceModifierActionWorker extends KubernetesClu
protected KubernetesClusterVO updateKubernetesClusterEntry(final Long cores, final Long memory, final Long size,
final Long serviceOfferingId, final Boolean autoscaleEnabled, final Long minSize, final Long maxSize) {
return Transaction.execute(new TransactionCallback<KubernetesClusterVO>() {
@Override
public KubernetesClusterVO doInTransaction(TransactionStatus status) {
KubernetesClusterVO updatedCluster = kubernetesClusterDao.findById(kubernetesCluster.getId());
if (cores != null) {
updatedCluster.setCores(cores);
}
if (memory != null) {
updatedCluster.setMemory(memory);
}
if (size != null) {
updatedCluster.setNodeCount(size);
}
if (serviceOfferingId != null) {
updatedCluster.setServiceOfferingId(serviceOfferingId);
}
if (autoscaleEnabled != null) {
updatedCluster.setAutoscalingEnabled(autoscaleEnabled.booleanValue());
}
updatedCluster.setMinSize(minSize);
updatedCluster.setMaxSize(maxSize);
return kubernetesClusterDao.persist(updatedCluster);
return Transaction.execute((TransactionCallback<KubernetesClusterVO>) status -> {
KubernetesClusterVO updatedCluster = kubernetesClusterDao.createForUpdate(kubernetesCluster.getId());
if (cores != null) {
updatedCluster.setCores(cores);
}
if (memory != null) {
updatedCluster.setMemory(memory);
}
if (size != null) {
updatedCluster.setNodeCount(size);
}
if (serviceOfferingId != null) {
updatedCluster.setServiceOfferingId(serviceOfferingId);
}
if (autoscaleEnabled != null) {
updatedCluster.setAutoscalingEnabled(autoscaleEnabled.booleanValue());
}
updatedCluster.setMinSize(minSize);
updatedCluster.setMaxSize(maxSize);
return kubernetesClusterDao.persist(updatedCluster);
});
}

View File

@ -325,8 +325,8 @@ public class NetworkACLServiceImpl extends ManagerBase implements NetworkACLServ
public NetworkACLItem createNetworkACLItem(CreateNetworkACLCmd createNetworkACLCmd) {
Long aclId = createAclListIfNeeded(createNetworkACLCmd);
Integer sourcePortStart = createNetworkACLCmd.getSourcePortStart();
Integer sourcePortEnd = createNetworkACLCmd.getSourcePortEnd();
Integer sourcePortStart = createNetworkACLCmd.getPublicStartPort();
Integer sourcePortEnd = createNetworkACLCmd.getPublicEndPort();
String protocol = createNetworkACLCmd.getProtocol();
List<String> sourceCidrList = createNetworkACLCmd.getSourceCidrList();
Integer icmpCode = createNetworkACLCmd.getIcmpCode();