server: fix TransactionLegacy DB connection leaks due to DB switching by B&R thread (#4121)

BackupSync task would switch between databases to update backup usage
metrics in the cloud_usage.usage_backup table. The current framework
and the usage in ManagedContext causes database connection
(LegacyTransaction) leaks. When the thread runs faster, the issue is
easily reproducible and checking via heap dump analysis or using JMX
MBeans. This fixes by moving the task of backup data updation for
usage data to the usage server by publishing usage events instead of
switching between databases in a local thread while in a
ManagedContextRunnable.

Signed-off-by: Rohit Yadav <rohit.yadav@shapeblue.com>
This commit is contained in:
Rohit Yadav 2020-06-16 13:30:31 +05:30 committed by GitHub
parent 77947f23fd
commit b54d19b3b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 75 additions and 79 deletions

View File

@ -492,6 +492,7 @@ public class EventTypes {
public static final String EVENT_VM_BACKUP_RESTORE_VOLUME_TO_VM = "BACKUP.RESTORE.VOLUME.TO.VM";
public static final String EVENT_VM_BACKUP_SCHEDULE_CONFIGURE = "BACKUP.SCHEDULE.CONFIGURE";
public static final String EVENT_VM_BACKUP_SCHEDULE_DELETE = "BACKUP.SCHEDULE.DELETE";
public static final String EVENT_VM_BACKUP_USAGE_METRIC = "BACKUP.USAGE.METRIC";
// external network device events
public static final String EVENT_EXTERNAL_NVP_CONTROLLER_ADD = "PHYSICAL.NVPCONTROLLER.ADD";

View File

@ -20,14 +20,11 @@ package com.cloud.usage.dao;
import java.util.Date;
import java.util.List;
import org.apache.cloudstack.backup.Backup;
import com.cloud.usage.UsageBackupVO;
import com.cloud.utils.db.GenericDao;
import com.cloud.vm.VirtualMachine;
public interface UsageBackupDao extends GenericDao<UsageBackupVO, Long> {
void updateMetrics(VirtualMachine vm, Backup.Metric metric);
void removeUsage(Long accountId, Long zoneId, Long backupId);
void updateMetrics(Long vmId, Long size, Long virtualSize);
void removeUsage(Long accountId, Long vmId, Date eventDate);
List<UsageBackupVO> getUsageRecords(Long accountId, Date startDate, Date endDate);
}

View File

@ -19,69 +19,68 @@ package com.cloud.usage.dao;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import org.apache.cloudstack.backup.Backup;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;
import com.cloud.exception.CloudException;
import com.cloud.usage.UsageBackupVO;
import com.cloud.utils.DateUtil;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.QueryBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.db.TransactionCallback;
import com.cloud.utils.db.TransactionLegacy;
import com.cloud.utils.db.TransactionStatus;
import com.cloud.vm.VirtualMachine;
@Component
public class UsageBackupDaoImpl extends GenericDaoBase<UsageBackupVO, Long> implements UsageBackupDao {
public static final Logger LOGGER = Logger.getLogger(UsageBackupDaoImpl.class);
protected static final String GET_USAGE_RECORDS_BY_ACCOUNT = "SELECT id, zone_id, account_id, domain_id, vm_id, backup_offering_id, size, protected_size, created, removed FROM cloud_usage.usage_backup WHERE " +
protected static final String UPDATE_DELETED = "UPDATE usage_backup SET removed = ? WHERE account_id = ? AND vm_id = ? and removed IS NULL";
protected static final String GET_USAGE_RECORDS_BY_ACCOUNT = "SELECT id, zone_id, account_id, domain_id, vm_id, backup_offering_id, size, protected_size, created, removed FROM usage_backup WHERE " +
" account_id = ? AND ((removed IS NULL AND created <= ?) OR (created BETWEEN ? AND ?) OR (removed BETWEEN ? AND ?) " +
" OR ((created <= ?) AND (removed >= ?)))";
@Override
public void updateMetrics(final VirtualMachine vm, Backup.Metric metric) {
boolean result = Transaction.execute(TransactionLegacy.USAGE_DB, new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(final TransactionStatus status) {
final QueryBuilder<UsageBackupVO> qb = QueryBuilder.create(UsageBackupVO.class);
qb.and(qb.entity().getVmId(), SearchCriteria.Op.EQ, vm.getId());
final UsageBackupVO entry = findOneBy(qb.create());
if (entry == null) {
return false;
}
entry.setSize(metric.getBackupSize());
entry.setProtectedSize(metric.getDataSize());
return update(entry.getId(), entry);
public void updateMetrics(final Long vmId, final Long size, final Long virtualSize) {
try (TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.USAGE_DB)) {
SearchCriteria<UsageBackupVO> sc = this.createSearchCriteria();
sc.addAnd("vmId", SearchCriteria.Op.EQ, vmId);
UsageBackupVO vo = findOneBy(sc);
if (vo != null) {
vo.setSize(size);
vo.setProtectedSize(virtualSize);
update(vo.getId(), vo);
}
});
if (!result) {
LOGGER.trace("Failed to update backup metrics for VM ID: " + vm.getId());
} catch (final Exception e) {
LOGGER.error("Error updating backup metrics: " + e.getMessage(), e);
}
}
@Override
public void removeUsage(Long accountId, Long zoneId, Long vmId) {
boolean result = Transaction.execute(TransactionLegacy.USAGE_DB, new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(final TransactionStatus status) {
final QueryBuilder<UsageBackupVO> qb = QueryBuilder.create(UsageBackupVO.class);
qb.and(qb.entity().getAccountId(), SearchCriteria.Op.EQ, accountId);
qb.and(qb.entity().getZoneId(), SearchCriteria.Op.EQ, zoneId);
qb.and(qb.entity().getVmId(), SearchCriteria.Op.EQ, vmId);
final UsageBackupVO entry = findOneBy(qb.create());
return remove(qb.create()) > 0;
public void removeUsage(Long accountId, Long vmId, Date eventDate) {
TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.USAGE_DB);
try {
txn.start();
try (PreparedStatement pstmt = txn.prepareStatement(UPDATE_DELETED);) {
if (pstmt != null) {
pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), eventDate));
pstmt.setLong(2, accountId);
pstmt.setLong(3, vmId);
pstmt.executeUpdate();
}
} catch (SQLException e) {
LOGGER.error("Error removing UsageBackupVO: " + e.getMessage(), e);
throw new CloudException("Remove backup usage exception: " + e.getMessage(), e);
}
});
if (!result) {
LOGGER.warn("Failed to remove usage entry for backup of VM ID: " + vmId);
txn.commit();
} catch (Exception e) {
txn.rollback();
LOGGER.error("Exception caught while removing UsageBackupVO: " + e.getMessage(), e);
} finally {
txn.close();
}
}

View File

@ -150,30 +150,28 @@ public class TransactionLegacy implements Closeable {
public static TransactionLegacy open(final String name, final short databaseId, final boolean forceDbChange) {
TransactionLegacy txn = tls.get();
boolean isNew = false;
if (txn == null) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Creating the transaction: " + name);
}
txn = new TransactionLegacy(name, false, databaseId);
tls.set(txn);
isNew = true;
s_mbean.addTransaction(txn);
} else if (forceDbChange) {
final short currentDbId = txn.getDatabaseId();
if (currentDbId != databaseId) {
// we need to end the current transaction and switch databases
txn.close(txn.getName());
if (txn.close(txn.getName()) && txn.getCurrentConnection() == null) {
s_mbean.removeTransaction(txn);
}
txn = new TransactionLegacy(name, false, databaseId);
tls.set(txn);
isNew = true;
s_mbean.addTransaction(txn);
}
}
txn.checkConnection();
txn.takeOver(name, false);
if (isNew) {
s_mbean.addTransaction(txn);
}
return txn;
}
@ -762,8 +760,8 @@ public class TransactionLegacy implements Closeable {
}
_conn.close();
_conn = null;
s_mbean.removeTransaction(this);
}
} catch (final SQLException e) {
s_logger.warn("Unable to close connection", e);
}

View File

@ -87,8 +87,13 @@ public class DummyBackupProvider extends AdapterBase implements BackupProvider {
public Map<VirtualMachine, Backup.Metric> getBackupMetrics(Long zoneId, List<VirtualMachine> vms) {
final Map<VirtualMachine, Backup.Metric> metrics = new HashMap<>();
final Backup.Metric metric = new Backup.Metric(1000L, 100L);
if (vms == null || vms.isEmpty()) {
return metrics;
}
for (VirtualMachine vm : vms) {
metrics.put(vm, metric);
if (vm != null) {
metrics.put(vm, metric);
}
}
return metrics;
}

View File

@ -216,9 +216,12 @@ public class VeeamBackupProvider extends AdapterBase implements BackupProvider,
@Override
public Map<VirtualMachine, Backup.Metric> getBackupMetrics(final Long zoneId, final List<VirtualMachine> vms) {
final Map<VirtualMachine, Backup.Metric> metrics = new HashMap<>();
if (vms == null || vms.isEmpty()) {
return metrics;
}
final Map<String, Backup.Metric> backendMetrics = getClient(zoneId).getBackupMetrics();
for (final VirtualMachine vm : vms) {
if (!backendMetrics.containsKey(vm.getUuid())) {
if (vm == null || !backendMetrics.containsKey(vm.getUuid())) {
continue;
}
metrics.put(vm, backendMetrics.get(vm.getUuid()));

View File

@ -84,7 +84,6 @@ import com.cloud.storage.Volume;
import com.cloud.storage.VolumeVO;
import com.cloud.storage.dao.DiskOfferingDao;
import com.cloud.storage.dao.VolumeDao;
import com.cloud.usage.dao.UsageBackupDao;
import com.cloud.user.Account;
import com.cloud.user.AccountManager;
import com.cloud.user.AccountService;
@ -126,8 +125,6 @@ public class BackupManagerImpl extends ManagerBase implements BackupManager {
@Inject
private AccountManager accountManager;
@Inject
private UsageBackupDao usageBackupDao;
@Inject
private VolumeDao volumeDao;
@Inject
private DataCenterDao dataCenterDao;
@ -1001,7 +998,6 @@ public class BackupManagerImpl extends ManagerBase implements BackupManager {
@Override
protected void runInContext() {
final int SYNC_INTERVAL = BackupSyncPollingInterval.value().intValue();
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Backup sync background task is running...");
@ -1022,31 +1018,23 @@ public class BackupManagerImpl extends ManagerBase implements BackupManager {
continue;
}
// Sync backup usage metrics
final Map<VirtualMachine, Backup.Metric> metrics = backupProvider.getBackupMetrics(dataCenter.getId(), new ArrayList<>(vms));
final GlobalLock syncBackupMetricsLock = GlobalLock.getInternLock("BackupSyncTask_metrics_zone_" + dataCenter.getId());
if (syncBackupMetricsLock.lock(SYNC_INTERVAL)) {
try {
for (final VirtualMachine vm : metrics.keySet()) {
final Backup.Metric metric = metrics.get(vm);
if (metric != null) {
usageBackupDao.updateMetrics(vm, metric);
}
try {
for (final VirtualMachine vm : metrics.keySet()) {
final Backup.Metric metric = metrics.get(vm);
if (metric != null) {
// Sync out-of-band backups
backupProvider.syncBackups(vm, metric);
// Emit a usage event, update usage metric for the VM by the usage server
UsageEventUtils.publishUsageEvent(EventTypes.EVENT_VM_BACKUP_USAGE_METRIC, vm.getAccountId(),
vm.getDataCenterId(), vm.getId(), "Backup-" + vm.getHostName() + "-" + vm.getUuid(),
vm.getBackupOfferingId(), null, metric.getBackupSize(), metric.getDataSize(),
Backup.class.getSimpleName(), vm.getUuid());
}
} finally {
syncBackupMetricsLock.unlock();
}
}
// Sync out-of-band backups
for (final VirtualMachine vm : vms) {
final GlobalLock syncBackupsLock = GlobalLock.getInternLock("BackupSyncTask_backup_vm_" + vm.getId());
if (syncBackupsLock.lock(SYNC_INTERVAL)) {
try {
backupProvider.syncBackups(vm, metrics.get(vm));
} finally {
syncBackupsLock.unlock();
}
} catch (final Throwable e) {
if (LOG.isTraceEnabled()) {
LOG.trace("Failed to sync backup usage metrics and out-of-band backups");
}
}
}

View File

@ -1081,7 +1081,10 @@ public class UsageManagerImpl extends ManagerBase implements UsageManager, Runna
}
private boolean isBackupEvent(String eventType) {
return eventType != null && (eventType.equals(EventTypes.EVENT_VM_BACKUP_OFFERING_ASSIGN) || eventType.equals(EventTypes.EVENT_VM_BACKUP_OFFERING_REMOVE));
return eventType != null && (
eventType.equals(EventTypes.EVENT_VM_BACKUP_OFFERING_ASSIGN) ||
eventType.equals(EventTypes.EVENT_VM_BACKUP_OFFERING_REMOVE) ||
eventType.equals(EventTypes.EVENT_VM_BACKUP_USAGE_METRIC));
}
private void createVMHelperEvent(UsageEventVO event) {
@ -1913,7 +1916,9 @@ public class UsageManagerImpl extends ManagerBase implements UsageManager, Runna
final UsageBackupVO backupVO = new UsageBackupVO(zoneId, accountId, domainId, vmId, backupOfferingId, created);
usageBackupDao.persist(backupVO);
} else if (EventTypes.EVENT_VM_BACKUP_OFFERING_REMOVE.equals(event.getType())) {
usageBackupDao.removeUsage(accountId, zoneId, vmId);
usageBackupDao.removeUsage(accountId, vmId, event.getCreateDate());
} else if (EventTypes.EVENT_VM_BACKUP_USAGE_METRIC.equals(event.getType())) {
usageBackupDao.updateMetrics(vmId, event.getSize(), event.getVirtualSize());
}
}