Introduce scheduled executor wrapper with dynamic interval (#8916)

* Introduce scheduled executor wrapper with dynamic interval

* Add validation for configkey
This commit is contained in:
Vishesh 2024-04-17 15:15:37 +05:30 committed by GitHub
parent ebaf5a47b9
commit 63a0797b18
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 297 additions and 6 deletions

View File

@ -38,7 +38,7 @@ public interface ResourceLimitService {
static final ConfigKey<Long> MaxProjectSecondaryStorage = new ConfigKey<>("Project Defaults", Long.class, "max.project.secondary.storage", "400",
"The default maximum secondary storage space (in GiB) that can be used for a project", false);
static final ConfigKey<Long> ResourceCountCheckInterval = new ConfigKey<>("Advanced", Long.class, "resourcecount.check.interval", "300",
"Time (in seconds) to wait before running resource recalculation and fixing task. Default is 300 seconds, Setting this to 0 disables execution of the task", false);
"Time (in seconds) to wait before running resource recalculation and fixing task. Default is 300 seconds, Setting this to 0 disables execution of the task", true);
static final ConfigKey<String> ResourceLimitHostTags = new ConfigKey<>("Advanced", String.class, "resource.limit.host.tags", "",
"A comma-separated list of tags for host resource limits", true);
static final ConfigKey<String> ResourceLimitStorageTags = new ConfigKey<>("Advanced", String.class, "resource.limit.storage.tags", "",

View File

@ -0,0 +1,114 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.framework.config;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
/**
* Uses a ScheduledExecutorService and config key to execute a runnable,
* dynamically rescheduling based on the long value of the config key.
* Timing is similar to ScheduledExecutorService.scheduleAtFixedRate(),
* but we look up the next runtime dynamically via the config key.
* <p>
* If config key is zero, this disables the execution. We skip execution
* and check once a minute in order to re-start execution if re-enabled.
*/
public class ConfigKeyScheduledExecutionWrapper implements Runnable {
protected Logger logger = LogManager.getLogger(getClass());
private final ScheduledExecutorService executorService;
private final Runnable command;
private final ConfigKey<?> configKey;
private final TimeUnit unit;
private long enableIntervalSeconds = 60;
private void validateArgs(ScheduledExecutorService executorService, Runnable command, ConfigKey<?> configKey) {
if (executorService == null) {
throw new IllegalArgumentException("ExecutorService cannot be null");
}
if (command == null) {
throw new IllegalArgumentException("Command cannot be null");
}
if (configKey == null) {
throw new IllegalArgumentException("ConfigKey cannot be null");
}
if (!(configKey.value() instanceof Long || configKey.value() instanceof Integer)) {
throw new IllegalArgumentException("ConfigKey value must be a Long or Integer");
}
}
public ConfigKeyScheduledExecutionWrapper(ScheduledExecutorService executorService, Runnable command,
ConfigKey<?> configKey, TimeUnit unit) {
validateArgs(executorService, command, configKey);
this.executorService = executorService;
this.command = command;
this.configKey = configKey;
this.unit = unit;
}
protected ConfigKeyScheduledExecutionWrapper(ScheduledExecutorService executorService, Runnable command,
ConfigKey<?> configKey, int enableIntervalSeconds, TimeUnit unit) {
validateArgs(executorService, command, configKey);
this.executorService = executorService;
this.command = command;
this.configKey = configKey;
this.unit = unit;
this.enableIntervalSeconds = enableIntervalSeconds;
}
public ScheduledFuture<?> start() {
long duration = getConfigValue();
duration = duration < 0 ? 0 : duration;
return this.executorService.schedule(this, duration, this.unit);
}
long getConfigValue() {
if (this.configKey.value() instanceof Long) {
return (Long) this.configKey.value();
} else if (this.configKey.value() instanceof Integer) {
return (Integer) this.configKey.value();
} else {
throw new IllegalArgumentException("ConfigKey value must be a Long or Integer");
}
}
@Override
public void run() {
if (getConfigValue() <= 0) {
executorService.schedule(this, enableIntervalSeconds, TimeUnit.SECONDS);
return;
}
long startTime = System.nanoTime();
try {
command.run();
} catch (Throwable t) {
logger.warn(String.format("Last run of %s encountered an error", this.command.getClass()), t);
} finally {
long elapsed = System.nanoTime() - startTime;
long delay = this.unit.toNanos(getConfigValue()) - elapsed;
delay = delay > 0 ? delay : 0;
executorService.schedule(this, delay, NANOSECONDS);
}
}
}

View File

@ -0,0 +1,177 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.framework.config;
import com.cloud.utils.concurrency.NamedThreadFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isOneOf;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class ConfigKeyScheduledExecutionWrapperTest {
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new NamedThreadFactory("TestExecutor"));
@Mock
ConfigKey<Integer> configKey;
@Test(expected = IllegalArgumentException.class)
public void nullExecutorTest() {
TestRunnable runnable = new TestRunnable();
ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(null, runnable, configKey, TimeUnit.SECONDS);
}
@Test(expected = IllegalArgumentException.class)
public void nullCommandTest() {
ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, null, configKey, TimeUnit.SECONDS);
}
@Test(expected = IllegalArgumentException.class)
public void nullConfigKeyTest() {
TestRunnable runnable = new TestRunnable();
ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, runnable, null, TimeUnit.SECONDS);
}
@Test(expected = IllegalArgumentException.class)
public void invalidConfigKeyTest() {
TestRunnable runnable = new TestRunnable();
ConfigKey<String> configKey = new ConfigKey<>(String.class, "test", "test", "test", "test", true,
ConfigKey.Scope.Global, null, null, null, null, null, ConfigKey.Kind.CSV, null);
ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, TimeUnit.SECONDS);
}
@Test
public void scheduleOncePerSecondTest() {
when(configKey.value()).thenReturn(1);
TestRunnable runnable = new TestRunnable();
ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, TimeUnit.SECONDS);
runner.start();
waitSeconds(3);
assertThat("Runnable ran once per second", runnable.getRunCount(), isOneOf(2, 3));
}
private void waitSeconds(int seconds) {
try {
Thread.sleep(seconds * 1000L + 100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Test
public void scheduleTwicePerSecondTest() {
when(configKey.value()).thenReturn(500);
TestRunnable runnable = new TestRunnable();
ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, TimeUnit.MILLISECONDS);
runner.start();
waitSeconds(2);
assertThat("Runnable ran twice per second", runnable.getRunCount(), isOneOf(3, 4));
}
@Test
public void scheduleDynamicTest() {
// start with twice per second, then switch to four times per second
when(configKey.value()).thenReturn(500);
TestRunnable runnable = new TestRunnable();
ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, TimeUnit.MILLISECONDS);
runner.start();
waitSeconds(2);
assertThat("Runnable ran twice per second", runnable.getRunCount(), isOneOf(3, 4));
runnable.resetRunCount();
when(configKey.value()).thenReturn(250);
waitSeconds(2);
assertThat("Runnable ran four times per second", runnable.getRunCount(), isOneOf(7, 8));
}
@Test
public void noOverlappingRunsTest() {
when(configKey.value()).thenReturn(200);
TestRunnable runnable = new TestRunnable(1);
ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, TimeUnit.MILLISECONDS);
runner.start();
waitSeconds(3);
assertThat("Slow runnable on tight schedule runs without overlap", runnable.getRunCount(), isOneOf(2, 3));
}
@Test
public void temporaryDisableRunsTest() {
// start with twice per second, then disable, then start again
when(configKey.value()).thenReturn(500);
TestRunnable runnable = new TestRunnable();
ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, 1, TimeUnit.MILLISECONDS);
runner.start();
waitSeconds(2);
assertThat("Runnable ran twice per second", runnable.getRunCount(), isOneOf(3, 4));
runnable.resetRunCount();
when(configKey.value()).thenReturn(0);
waitSeconds(2);
assertThat("Runnable ran zero times per second", runnable.getRunCount(), is(0));
runnable.resetRunCount();
when(configKey.value()).thenReturn(500);
waitSeconds(2);
assertThat("Runnable ran twice per second", runnable.getRunCount(), isOneOf(3, 4));
}
static class TestRunnable implements Runnable {
private Integer runCount = 0;
private int waitSeconds = 0;
TestRunnable(int waitSeconds) {
this.waitSeconds = waitSeconds;
}
TestRunnable() {
}
@Override
public void run() {
runCount++;
if (waitSeconds > 0) {
try {
Thread.sleep(waitSeconds * 1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public int getRunCount() {
return this.runCount;
}
public void resetRunCount() {
this.runCount = 0;
}
}
}

View File

@ -43,6 +43,7 @@ import org.apache.cloudstack.api.response.TaggedResourceLimitAndCountResponse;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.ConfigKeyScheduledExecutionWrapper;
import org.apache.cloudstack.framework.config.Configurable;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
@ -197,7 +198,6 @@ public class ResourceLimitManagerImpl extends ManagerBase implements ResourceLim
protected SearchBuilder<ResourceCountVO> ResourceCountSearch;
ScheduledExecutorService _rcExecutor;
long _resourceCountCheckInterval = 0;
Map<String, Long> accountResourceLimitMap = new HashMap<>();
Map<String, Long> domainResourceLimitMap = new HashMap<>();
Map<String, Long> projectResourceLimitMap = new HashMap<>();
@ -220,8 +220,9 @@ public class ResourceLimitManagerImpl extends ManagerBase implements ResourceLim
@Override
public boolean start() {
if (_resourceCountCheckInterval > 0) {
_rcExecutor.scheduleAtFixedRate(new ResourceCountCheckTask(), _resourceCountCheckInterval, _resourceCountCheckInterval, TimeUnit.SECONDS);
if (ResourceCountCheckInterval.value() >= 0) {
ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(_rcExecutor, new ResourceCountCheckTask(), ResourceCountCheckInterval, TimeUnit.SECONDS);
runner.start();
}
return true;
}
@ -258,8 +259,7 @@ public class ResourceLimitManagerImpl extends ManagerBase implements ResourceLim
snapshotSizeSearch.join("snapshots", join2, snapshotSizeSearch.entity().getSnapshotId(), join2.entity().getId(), JoinBuilder.JoinType.INNER);
snapshotSizeSearch.done();
_resourceCountCheckInterval = ResourceCountCheckInterval.value();
if (_resourceCountCheckInterval > 0) {
if (ResourceCountCheckInterval.value() >= 0) {
_rcExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("ResourceCountChecker"));
}