bug 8866: Direct Network Usage, TrafficSentinel support added, work in progress

This commit is contained in:
kishan 2011-05-12 19:43:23 +05:30
parent bc360f499d
commit d456f89095
11 changed files with 84 additions and 69 deletions

View File

@ -0,0 +1,28 @@
/**
* Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.agent.api;
import com.cloud.host.Host;
public class StartupTrafficMonitorCommand extends StartupCommand {
public StartupTrafficMonitorCommand() {
super(Host.Type.TrafficMonitor);
}
}

View File

@ -35,6 +35,7 @@ public interface Host {
ExternalFirewall(false),
ExternalLoadBalancer(false),
PxeServer(false),
TrafficMonitor(false),
ExternalDhcp(false);
boolean _virtual;

View File

@ -35,4 +35,6 @@ public interface UsageEventDao extends GenericDao<UsageEventVO, Long> {
List<UsageEventVO> getRecentEvents(Date endDate) throws UsageServerException;
List<UsageEventVO> listIpEvents(Date startDate, Date endDate);
}

View File

@ -28,6 +28,7 @@ import javax.ejb.Local;
import org.apache.log4j.Logger;
import com.cloud.event.EventTypes;
import com.cloud.event.UsageEventVO;
import com.cloud.exception.UsageServerException;
import com.cloud.utils.DateUtil;
@ -43,6 +44,7 @@ public class UsageEventDaoImpl extends GenericDaoBase<UsageEventVO, Long> implem
public static final Logger s_logger = Logger.getLogger(UsageEventDaoImpl.class.getName());
private final SearchBuilder<UsageEventVO> latestEventsSearch;
private final SearchBuilder<UsageEventVO> IpeventsSearch;
private static final String COPY_EVENTS = "INSERT INTO cloud_usage.usage_event (id, type, account_id, created, zone_id, resource_id, resource_name, offering_id, template_id, size, resource_type) " +
"SELECT id, type, account_id, created, zone_id, resource_id, resource_name, offering_id, template_id, size, resource_type FROM cloud.usage_event vmevt WHERE vmevt.id > ? and vmevt.id <= ? ";
private static final String COPY_ALL_EVENTS = "INSERT INTO cloud_usage.usage_event (id, type, account_id, created, zone_id, resource_id, resource_name, offering_id, template_id, size, resource_type) " +
@ -55,7 +57,15 @@ public class UsageEventDaoImpl extends GenericDaoBase<UsageEventVO, Long> implem
latestEventsSearch.and("processed", latestEventsSearch.entity().isProcessed(), SearchCriteria.Op.EQ);
latestEventsSearch.and("enddate", latestEventsSearch.entity().getCreateDate(), SearchCriteria.Op.LTEQ);
latestEventsSearch.done();
}
IpeventsSearch = createSearchBuilder();
IpeventsSearch.and("startdate", IpeventsSearch.entity().getCreateDate(), SearchCriteria.Op.GTEQ);
IpeventsSearch.and("enddate", IpeventsSearch.entity().getCreateDate(), SearchCriteria.Op.LTEQ);
IpeventsSearch.and().op("assignEvent", IpeventsSearch.entity().getType(), SearchCriteria.Op.EQ);
IpeventsSearch.or("releaseEvent", IpeventsSearch.entity().getType(), SearchCriteria.Op.EQ);
IpeventsSearch.closeParen();
IpeventsSearch.done();
}
@Override
public List<UsageEventVO> listLatestEvents(Date endDate) {
@ -159,4 +169,16 @@ public class UsageEventDaoImpl extends GenericDaoBase<UsageEventVO, Long> implem
txn.close();
}
}
}
@Override
public List<UsageEventVO> listIpEvents(Date startDate, Date endDate) {
Filter filter = new Filter(UsageEventVO.class, "createDate", Boolean.TRUE, null, null);
SearchCriteria<UsageEventVO> sc = IpeventsSearch.create();
sc.setParameters("startdate", startDate);
sc.setParameters("enddate", endDate);
sc.setParameters("assignEvent", EventTypes.EVENT_NET_IP_ASSIGN);
sc.setParameters("releaseEvent", EventTypes.EVENT_NET_IP_RELEASE);
return listBy(sc, filter);
}
}

View File

@ -69,6 +69,7 @@ import com.cloud.agent.api.StartupProxyCommand;
import com.cloud.agent.api.StartupPxeServerCommand;
import com.cloud.agent.api.StartupRoutingCommand;
import com.cloud.agent.api.StartupStorageCommand;
import com.cloud.agent.api.StartupTrafficMonitorCommand;
import com.cloud.agent.api.UnsupportedAnswer;
import com.cloud.agent.manager.allocator.HostAllocator;
import com.cloud.agent.manager.allocator.PodAllocator;
@ -2386,6 +2387,8 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, ResourceS
type = Host.Type.PxeServer;
} else if (startup instanceof StartupExternalDhcpCommand) {
type = Host.Type.ExternalDhcp;
} else if (startup instanceof StartupTrafficMonitorCommand) {
type = Host.Type.TrafficMonitor;
} else {
assert false : "Did someone add a new Startup command?";
}
@ -2645,7 +2648,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, ResourceS
}
Long podId = null;
if (p == null) {
if (type != Host.Type.SecondaryStorage && type != Host.Type.ExternalFirewall && type != Host.Type.ExternalLoadBalancer) {
if (type != Host.Type.SecondaryStorage && type != Host.Type.ExternalFirewall && type != Host.Type.ExternalLoadBalancer && type != Host.Type.TrafficMonitor) {
/*
* s_logger.info("Unable to find the pod so we are creating one." ); p = createPod(pod, dcId,

View File

@ -106,7 +106,8 @@ public class AgentMonitor extends Thread implements Listener {
for (HostVO host : hosts) {
if (host.getType().equals(Host.Type.ExternalFirewall) ||
host.getType().equals(Host.Type.ExternalLoadBalancer)) {
host.getType().equals(Host.Type.ExternalLoadBalancer) ||
host.getType().equals(Host.Type.TrafficMonitor)) {
continue;
}

View File

@ -169,4 +169,5 @@ public interface HostDao extends GenericDao<HostVO, Long> {
boolean directConnect(HostVO host, long msId, boolean secondConnect);
HostVO findTrafficMonitorHost();
}

View File

@ -799,4 +799,17 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
return secondaryStorageHosts;
}
@Override
public HostVO findTrafficMonitorHost() {
SearchCriteria<HostVO> sc = TypeSearch.create();
sc.setParameters("type", Host.Type.TrafficMonitor);
List<HostVO> trafficHosts = listBy(sc);
if (trafficHosts == null || trafficHosts.size() < 1) {
return null;
} else {
return trafficHosts.get(0);
}
}
}

View File

@ -50,6 +50,6 @@ public interface IPAddressDao extends GenericDao<IPAddressVO, Long> {
IPAddressVO findByAccountAndIp(long accountId, String ipAddress);
IPAddressVO findByIpAddress(String ipAddress);
List<IPAddressVO> listAllocatedIps();
}

View File

@ -52,6 +52,7 @@ public class IPAddressDaoImpl extends GenericDaoBase<IPAddressVO, Long> implemen
protected final GenericSearchBuilder<IPAddressVO, Integer> AllocatedIpCount;
protected final GenericSearchBuilder<IPAddressVO, Integer> AllIpCountForDashboard;
protected final GenericSearchBuilder<IPAddressVO, Integer> AllocatedIpCountForDashboard;
protected final SearchBuilder<IPAddressVO> AllocatedIpSearch;
// make it public for JUnit test
@ -96,6 +97,10 @@ public class IPAddressDaoImpl extends GenericDaoBase<IPAddressVO, Long> implemen
AllocatedIpCountForDashboard.and("dc", AllocatedIpCountForDashboard.entity().getDataCenterId(), Op.EQ);
AllocatedIpCountForDashboard.and("allocated", AllocatedIpCountForDashboard.entity().getAllocatedTime(), Op.NNULL);
AllocatedIpCountForDashboard.done();
AllocatedIpSearch = createSearchBuilder();
AllocatedIpSearch.and("allocated", AllocatedIpSearch.entity().getAllocatedTime(), Op.NNULL);
AllocatedIpSearch.done();
}
@Override
@ -269,9 +274,8 @@ public class IPAddressDaoImpl extends GenericDaoBase<IPAddressVO, Long> implemen
}
@Override
public IPAddressVO findByIpAddress(String ipAddress) {
SearchCriteria<IPAddressVO> sc = AllFieldsSearch.create();
sc.setParameters("ipAddress", ipAddress);
return findOneBy(sc);
public List<IPAddressVO> listAllocatedIps() {
SearchCriteria<IPAddressVO> sc = AllocatedIpSearch.create();
return listBy(sc);
}
}

View File

@ -627,66 +627,6 @@ public class VirtualNetworkApplianceManagerImpl implements VirtualNetworkApplian
@Override
public void run() {
//Direct Network Usage
URL trafficSentinel;
try {
//Query traffic Sentinel
if(trafficSentinelHostname != null){
trafficSentinel = new URL(trafficSentinelHostname+"/inmsf/Query?script=var+q+%3D+Query.topN(%22historytrmx%22,%0D%0A+++++++++++++++++%22ipsource,bytes%22,%0D%0A+++++++++++++++++%22sourcezone+!%3D+EXTERNAL" +
"+%26+destinationzone+%3D+EXTERNAL%22,%0D%0A+++++++++++++++++%22end+-+5+minutes,+end%22,%0D%0A+++++++++++++++++%22bytes%22,%0D%0A+++++++++++++++++100000);%0D%0A%0D%0Avar+totalsSent+%3D+" +
"{};%0D%0A%0D%0Avar+t+%3D+q.run(%0D%0A++function(row,table)+{%0D%0A++++if(row[0])+{++++%0D%0A++++++totalsSent[row[0]]+%3D+row[1];%0D%0A++++}%0D%0A++});%0D%0A%0D%0Avar+totalsRcvd+%3D+{};" +
"%0D%0A%0D%0Avar+q+%3D+Query.topN(%22historytrmx%22,%0D%0A+++++++++++++++++%22ipdestination,bytes%22,%0D%0A+++++++++++++++++%22destinationzone+!%3D+EXTERNAL+%26+sourcezone+%3D+EXTERNAL%22," +
"%0D%0A+++++++++++++++++%22end+-+5minutes,+end%22,%0D%0A+++++++++++++++++%22bytes%22,%0D%0A+++++++++++++++++100000);%0D%0A%0D%0Avar+t+%3D+q.run(%0D%0A++function(row,table)+{%0D%0A++++" +
"if(row[0])+{%0D%0A++++++totalsRcvd[row[0]]+%3D+row[1];%0D%0A++++}%0D%0A++});%0D%0A%0D%0Afor+(var+addr+in+totalsSent)+{%0D%0A++++var+TS+%3D+0;%0D%0A++++var+TR+%3D+0;%0D%0A++++if(totalsSent[addr])" +
"+TS+%3D+totalsSent[addr];%0D%0A++++if(totalsRcvd[addr])+TR+%3D+totalsRcvd[addr];%0D%0A++++println(addr+%2B+%22,%22+%2B+TS+%2B+%22,%22+%2B+TR);%0D%0A}&authenticate=basic&resultFormat=txt");
BufferedReader in = new BufferedReader(
new InputStreamReader(trafficSentinel.openStream()));
String inputLine;
while ((inputLine = in.readLine()) != null){
//Parse the script output
StringTokenizer st = new StringTokenizer(inputLine, ",");
if(st.countTokens() == 3){
String publicIp = st.nextToken();
//Find the account owning the IP
IPAddressVO ipaddress = _ipAddressDao.findByIpAddress(publicIp);
if(ipaddress == null || ipaddress.getAccountId() == Account.ACCOUNT_ID_SYSTEM){
continue;
}
Long bytesSent = new Long(st.nextToken());
Long bytesRcvd = new Long(st.nextToken());
if(bytesSent == null || bytesRcvd == null){
s_logger.debug("Incorrect bytes for IP: "+publicIp);
}
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
txn.start();
//update user_statistics
UserStatisticsVO stats = _statsDao.lock(ipaddress.getAccountId(), ipaddress.getDataCenterId(), null, 0L, "DirectNetwork");
if (stats == null) {
stats = new UserStatisticsVO(ipaddress.getAccountId(), ipaddress.getDataCenterId(), null, 0L, "DirectNetwork", null);
stats.setCurrentBytesSent(bytesSent);
stats.setCurrentBytesReceived(bytesRcvd);
_statsDao.persist(stats);
} else {
stats.setCurrentBytesSent(stats.getCurrentBytesSent() + bytesSent);
stats.setCurrentBytesReceived(stats.getCurrentBytesReceived() + bytesRcvd);
_statsDao.update(stats.getId(), stats);
}
txn.commit();
txn.close();
}
}
in.close();
}
} catch (MalformedURLException e1) {
s_logger.info("Invalid T raffic Sentinel URL",e1);
} catch (IOException e) {
s_logger.debug("Error in direct network usage accounting",e);
}
final List<DomainRouterVO> routers = _routerDao.listVirtualUpByHostId(null);
s_logger.debug("Found " + routers.size() + " running routers. ");