added processing for events

This commit is contained in:
Alex Huang 2011-01-06 15:06:55 -08:00
parent 2e29f21e5a
commit e496a99c1c
9 changed files with 199 additions and 81 deletions

View File

@ -85,6 +85,7 @@ public interface NetworkOffering {
Availability getAvailability();
boolean isDnsService();
boolean isGatewayService();

View File

@ -115,9 +115,11 @@ import com.cloud.user.dao.UserDaoImpl;
import com.cloud.user.dao.UserStatisticsDaoImpl;
import com.cloud.utils.Pair;
import com.cloud.utils.component.Adapter;
import com.cloud.utils.component.AnnotationInterceptor;
import com.cloud.utils.component.ComponentLibrary;
import com.cloud.utils.component.ComponentLocator.ComponentInfo;
import com.cloud.utils.component.Manager;
import com.cloud.utils.db.DatabaseCallback;
import com.cloud.utils.db.GenericDao;
import com.cloud.vm.ItWorkDaoImpl;
import com.cloud.vm.UserVmManagerImpl;
@ -334,4 +336,9 @@ public class DefaultComponentLibrary implements ComponentLibrary {
factories.put(EntityManager.class, EntityManagerImpl.class);
return factories;
}
@Override
public void addInterceptors(List<AnnotationInterceptor<?>> interceptors) {
interceptors.add(new DatabaseCallback());
}
}

View File

@ -17,7 +17,6 @@
*/
package com.cloud.vm;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -62,10 +61,6 @@ import com.cloud.hypervisor.Hypervisor.HypervisorType;
import com.cloud.hypervisor.HypervisorGuru;
import com.cloud.network.NetworkManager;
import com.cloud.network.NetworkVO;
import com.cloud.network.element.NetworkElement;
import com.cloud.network.guru.NetworkGuru;
import com.cloud.resource.Resource;
import com.cloud.resource.Resource.ReservationStrategy;
import com.cloud.service.ServiceOfferingVO;
import com.cloud.service.dao.ServiceOfferingDao;
import com.cloud.storage.DiskOfferingVO;
@ -208,53 +203,53 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Cluster
}
protected void reserveNics(VirtualMachineProfile<? extends VMInstanceVO> vmProfile, DeployDestination dest, ReservationContext context) throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException {
List<NicVO> nics = _nicsDao.listBy(vmProfile.getId());
for (NicVO nic : nics) {
Pair<NetworkGuru, NetworkVO> implemented = _networkMgr.implementNetwork(nic.getNetworkId(), dest, context);
NetworkGuru concierge = implemented.first();
NetworkVO network = implemented.second();
NicProfile profile = null;
if (nic.getReservationStrategy() == ReservationStrategy.Start) {
nic.setState(Resource.State.Reserving);
nic.setReservationId(context.getReservationId());
_nicsDao.update(nic.getId(), nic);
URI broadcastUri = nic.getBroadcastUri();
if (broadcastUri == null) {
network.getBroadcastUri();
}
URI isolationUri = nic.getIsolationUri();
profile = new NicProfile(nic, network, broadcastUri, isolationUri);
concierge.reserve(profile, network, vmProfile, dest, context);
nic.setIp4Address(profile.getIp4Address());
nic.setIp6Address(profile.getIp6Address());
nic.setMacAddress(profile.getMacAddress());
nic.setIsolationUri(profile.getIsolationUri());
nic.setBroadcastUri(profile.getBroadCastUri());
nic.setReserver(concierge.getName());
nic.setState(Resource.State.Reserved);
nic.setNetmask(profile.getNetmask());
nic.setGateway(profile.getGateway());
nic.setAddressFormat(profile.getFormat());
_nicsDao.update(nic.getId(), nic);
} else {
profile = new NicProfile(nic, network, nic.getBroadcastUri(), nic.getIsolationUri());
}
for (NetworkElement element : _networkElements) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Asking " + element.getName() + " to prepare for " + nic);
}
element.prepare(network, profile, vmProfile, dest, context);
}
vmProfile.addNic(profile);
_networksDao.changeActiveNicsBy(network.getId(), 1);
}
// List<NicVO> nics = _nicsDao.listBy(vmProfile.getId());
// for (NicVO nic : nics) {
// Pair<NetworkGuru, NetworkVO> implemented = _networkMgr.implementNetwork(nic.getNetworkId(), dest, context);
// NetworkGuru concierge = implemented.first();
// NetworkVO network = implemented.second();
// NicProfile profile = null;
// if (nic.getReservationStrategy() == ReservationStrategy.Start) {
// nic.setState(Resource.State.Reserving);
// nic.setReservationId(context.getReservationId());
// _nicsDao.update(nic.getId(), nic);
// URI broadcastUri = nic.getBroadcastUri();
// if (broadcastUri == null) {
// network.getBroadcastUri();
// }
//
// URI isolationUri = nic.getIsolationUri();
//
// profile = new NicProfile(nic, network, broadcastUri, isolationUri);
// concierge.reserve(profile, network, vmProfile, dest, context);
// nic.setIp4Address(profile.getIp4Address());
// nic.setIp6Address(profile.getIp6Address());
// nic.setMacAddress(profile.getMacAddress());
// nic.setIsolationUri(profile.getIsolationUri());
// nic.setBroadcastUri(profile.getBroadCastUri());
// nic.setReserver(concierge.getName());
// nic.setState(Resource.State.Reserved);
// nic.setNetmask(profile.getNetmask());
// nic.setGateway(profile.getGateway());
// nic.setAddressFormat(profile.getFormat());
// _nicsDao.update(nic.getId(), nic);
// } else {
// profile = new NicProfile(nic, network, nic.getBroadcastUri(), nic.getIsolationUri());
// }
//
// for (NetworkElement element : _networkElements) {
// if (s_logger.isDebugEnabled()) {
// s_logger.debug("Asking " + element.getName() + " to prepare for " + nic);
// }
// element.prepare(network, profile, vmProfile, dest, context);
// }
//
// vmProfile.addNic(profile);
// _networksDao.changeActiveNicsBy(network.getId(), 1);
// }
}
protected void prepareNics(VirtualMachineProfile<? extends VMInstanceVO> vmProfile, DeployDestionation dest, ReservationContext context) {
protected void prepareNics(VirtualMachineProfile<? extends VMInstanceVO> vmProfile, DeployDestination dest, ReservationContext context) {
}
@ -416,7 +411,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Cluster
Journal journal = new Journal.LogJournal("Creating " + vm, s_logger);
ItWorkVO work = new ItWorkVO(UUID.randomUUID().toString(), _nodeId, ItWorkVO.Type.Start);
ItWorkVO work = new ItWorkVO(UUID.randomUUID().toString(), _nodeId, ItWorkVO.Type.Start, vm.getId());
work = _workDao.persist(work);
ReservationContextImpl context = new ReservationContextImpl(work.getId(), journal, caller, account);
@ -626,7 +621,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Cluster
stateTransitTo(vm, Event.OperationSucceeded, null);
if (cleanup) {
ItWorkVO work = new ItWorkVO(reservationId, _nodeId, Type.Cleanup);
ItWorkVO work = new ItWorkVO(reservationId, _nodeId, Type.Cleanup, vm.getId());
_workDao.persist(work);
}

View File

@ -103,6 +103,9 @@ CREATE TABLE `cloud`.`op_it_work` (
`type` char(32) NOT NULL COMMENT 'type of work',
`state` char(32) NOT NULL COMMENT 'state',
`cancel_taken` timestamp COMMENT 'time it was taken over',
`instance_id` bigint unsigned NOT NULL COMMENT 'vm instance',
`resource_type` char(32) COMMENT 'type of resource being worked on',
`resource_id` bigint unsigned COMMENT 'resource id being worked on',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

View File

@ -0,0 +1,23 @@
/**
*
*/
package com.cloud.utils.component;
import java.lang.reflect.AnnotatedElement;
import net.sf.cglib.proxy.Callback;
/**
* AnnotationIntercepter says it can intercept an annotation.
*/
public interface AnnotationInterceptor<T> {
boolean needToIntercept(AnnotatedElement element);
T interceptStart(AnnotatedElement element);
void interceptComplete(AnnotatedElement element, T attach);
void interceptException(AnnotatedElement element, T attach);
Callback getCallback();
}

View File

@ -50,4 +50,6 @@ public interface ComponentLibrary {
Map<String, List<ComponentInfo<Adapter>>> getAdapters();
Map<Class<?>, Class<?>> getFactories();
void addInterceptors(List<AnnotationInterceptor<?>> interceptors);
}

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Enumeration;
@ -48,6 +49,8 @@ import net.sf.cglib.proxy.Callback;
import net.sf.cglib.proxy.CallbackFilter;
import net.sf.cglib.proxy.Enhancer;
import net.sf.cglib.proxy.Factory;
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;
import net.sf.cglib.proxy.NoOp;
import org.apache.log4j.Logger;
@ -59,8 +62,6 @@ import org.xml.sax.helpers.DefaultHandler;
import com.cloud.utils.Pair;
import com.cloud.utils.PropertiesUtil;
import com.cloud.utils.db.DatabaseCallback;
import com.cloud.utils.db.DatabaseCallbackFilter;
import com.cloud.utils.db.GenericDao;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.mgmt.JmxUtil;
@ -79,18 +80,18 @@ public class ComponentLocator implements ComponentLocatorMBean {
protected static final ThreadLocal<ComponentLocator> s_tl = new ThreadLocal<ComponentLocator>();
protected static final ConcurrentHashMap<Class<?>, Singleton> s_singletons = new ConcurrentHashMap<Class<?>, Singleton>(111);
protected static final HashMap<String, ComponentLocator> s_locators = new HashMap<String, ComponentLocator>();
protected static final Callback[] s_callbacks = new Callback[] { NoOp.INSTANCE, new DatabaseCallback() };
protected static final CallbackFilter s_callbackFilter = new DatabaseCallbackFilter();
protected static final HashMap<Class<?>, InjectInfo> s_factories = new HashMap<Class<?>, InjectInfo>();
protected static Boolean s_once = false;
protected static Callback[] s_callbacks;
protected static CallbackFilter s_callbackFilter;
protected static final List<AnnotationInterceptor<?>> s_interceptors = new ArrayList<AnnotationInterceptor<?>>();
protected HashMap<String, Adapters<? extends Adapter>> _adapterMap;
protected HashMap<String, ComponentInfo<Manager>> _managerMap;
protected LinkedHashMap<String, ComponentInfo<GenericDao<?, ?>>> _daoMap;
protected String _serverName;
protected Object _component;
protected HashMap<Class<?>, Class<?>> _factories;
protected List<Injector> _injectors;
static {
Runtime.getRuntime().addShutdownHook(new CleanupThread());
@ -150,6 +151,9 @@ public class ComponentLocator implements ComponentLocatorMBean {
_managerMap.putAll(library.getManagers());
adapters.putAll(library.getAdapters());
_factories.putAll(library.getFactories());
synchronized(s_interceptors) {
library.addInterceptors(s_interceptors);
}
}
_daoMap.putAll(handler.daos);
@ -183,6 +187,17 @@ public class ComponentLocator implements ComponentLocatorMBean {
s_logger.info("Skipping configuration using " + filename);
return;
}
synchronized(s_interceptors) {
s_callbacks = new Callback[s_interceptors.size() + 2];
int i = 0;
s_callbacks[i++] = NoOp.INSTANCE;
s_callbacks[i++] = new InterceptorDispatcher();
for (AnnotationInterceptor<?> interceptor : s_interceptors) {
s_callbacks[i++] = interceptor.getCallback();
}
s_callbackFilter = new InterceptorFilter();
}
XmlHandler handler = result.first();
HashMap<String, List<ComponentInfo<Adapter>>> adapters = result.second();
@ -830,7 +845,7 @@ public class ComponentLocator implements ComponentLocatorMBean {
if (info.name == null) {
throw new CloudRuntimeException("Missing name attribute for " + interphace.getName());
}
info.name = info.name + "-" + clazzName;
info.name = info.name;
s_logger.debug("Looking for class " + clazzName);
try {
info.clazz = Class.forName(clazzName);
@ -1013,4 +1028,51 @@ public class ComponentLocator implements ComponentLocatorMBean {
this.state = State.Instantiated;
}
}
protected class InterceptorDispatcher implements MethodInterceptor {
@Override
public Object intercept(Object object, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
ArrayList<Pair<AnnotationInterceptor<Object>, Object>> interceptors = new ArrayList<Pair<AnnotationInterceptor<Object>, Object>>();
for (AnnotationInterceptor<?> interceptor : s_interceptors) {
if (interceptor.needToIntercept(method)) {
Object obj = interceptor.interceptStart(method);
interceptors.add(new Pair<AnnotationInterceptor<Object>, Object>((AnnotationInterceptor<Object>)interceptor, obj));
}
}
boolean success = false;
try {
Object obj = methodProxy.invokeSuper(object, args);
success = true;
return obj;
} finally {
for (Pair<AnnotationInterceptor<Object>, Object> interceptor : interceptors) {
if (success) {
interceptor.first().interceptComplete(method, interceptor.second());
} else {
interceptor.first().interceptException(method, interceptor.second());
}
}
}
}
}
protected static class InterceptorFilter implements CallbackFilter {
@Override
public int accept(Method method) {
int index = 0;
for (int i = 2; i < s_callbacks.length; i++) {
AnnotationInterceptor<?> interceptor = (AnnotationInterceptor<?>)s_callbacks[i];
if (interceptor.needToIntercept(method)) {
if (index == 0) {
index = i;
} else {
return 1;
}
}
}
return index;
}
}
}

View File

@ -1,19 +0,0 @@
/**
*
*/
package com.cloud.utils.component;
import java.lang.annotation.Annotation;
import java.lang.reflect.AnnotatedElement;
/**
* Injector implements customized Injectors for ComponentLocator.
*
*/
public interface Injector {
/**
* Can this injector handle injecting into this type of class?
*/
boolean canInject(AnnotatedElement element, Annotation ann);
}

View File

@ -17,20 +17,64 @@
*/
package com.cloud.utils.db;
import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method;
import net.sf.cglib.proxy.Callback;
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;
public class DatabaseCallback implements MethodInterceptor {
import com.cloud.utils.component.AnnotationInterceptor;
public class DatabaseCallback implements MethodInterceptor, AnnotationInterceptor<Transaction> {
@Override
public Object intercept(Object object, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
Transaction txn = Transaction.open(method.getName());
Transaction txn = interceptStart(method);
try {
return methodProxy.invokeSuper(object, args);
} finally {
txn.close();
interceptComplete(method, txn);
}
}
@Override
public boolean needToIntercept(AnnotatedElement element) {
DB db = element.getAnnotation(DB.class);
if (db != null) {
return db.txn();
}
Class<?> clazz = element.getClass().getDeclaringClass();
do {
db = clazz.getAnnotation(DB.class);
if (db != null) {
return db.txn();
}
clazz = clazz.getSuperclass();
} while (clazz != Object.class && clazz != null);
return false;
}
@Override
public Transaction interceptStart(AnnotatedElement element) {
return Transaction.open(((Method)element).getName());
}
@Override
public void interceptComplete(AnnotatedElement element, Transaction txn) {
txn.close();
}
@Override
public void interceptException(AnnotatedElement element, Transaction txn) {
txn.close();
}
@Override
public Callback getCallback() {
return this;
}
}