# -- coding: utf-8 -- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from .CsDatabag import CsCmdLine, CsGuestNetwork from .CsAddress import CsAddress import logging class CsConfig(object): """ A class to cache all the stuff that the other classes need """ __LOG_FILE = "/var/log/cloud.log" __LOG_LEVEL = "INFO" __LOG_FORMAT = "%(asctime)s %(levelname)-8s %(message)s" cl = None gn = None def __init__(self): self.fw = [] # Each nftables rule contains # 1. type. If not set, it is a rule. Another valid option is "chain". # 2. chain. The chain of the rule (if type is not set), or the name of chain (if type is "chain"). # 3. rule. The configuration of the rule or chain. self.nft_ipv4_acl = [] self.nft_ipv4_fw = [] self.nft_ipv6_acl = [] self.nft_ipv6_fw = [] def set_address(self): self.ips = CsAddress("ips", self) @classmethod def get_cmdline_instance(cls): if cls.cl is None: cls.cl = CsCmdLine("cmdline") return cls.cl @classmethod def get_guestnetwork_instance(cls): if cls.gn is None: cls.gn = CsGuestNetwork("guestnetwork") return cls.gn def cmdline(self): return self.get_cmdline_instance() def guestnetwork(self): return self.get_guestnetwork_instance() def address(self): return self.ips def get_fw(self): return self.fw def get_nft_ipv4_acl(self): return self.nft_ipv4_acl def get_nft_ipv4_fw(self): return self.nft_ipv4_fw def get_ipv6_acl(self): return self.nft_ipv6_acl def get_ipv6_fw(self): return self.nft_ipv6_fw def get_logger(self): return self.__LOG_FILE def get_level(self): return self.__LOG_LEVEL def is_vpc(self): return self.cl.get_type() == 'vpcrouter' def is_router(self): return self.cl.get_type() == 'router' def is_routed(self): return self.cmdline().idata().get('is_routed', 'false') == 'true' def is_dhcp(self): return self.cl.get_type() == 'dhcpsrvr' def has_dns(self): return not self.use_extdns() def has_metadata(self): return any((self.is_vpc(), self.is_router(), self.is_dhcp())) def get_domain(self): return self.cl.get_domain() def use_extdns(self): return self.cmdline().idata().get('useextdns', 'false') == 'true' def expose_dns(self): return self.cmdline().idata().get('exposedns', 'false') == 'true' def use_router_ip_as_resolver(self): return self.cl.get_use_router_ip_as_resolver() def get_dns(self): conf = self.cmdline().idata() dns = [] if not self.use_extdns(): if not self.is_vpc() and self.cl.is_redundant() and self.cl.get_guest_gw(): dns.append(self.cl.get_guest_gw()) else: dns.append(self.address().get_guest_ip()) if 'userouteripresolver' not in conf: for name in ('dns1', 'dns2'): if name in conf: dns.append(conf[name]) return dns def get_format(self): return self.__LOG_FORMAT def get_ingress_chain(self, device, ip): if self.is_vpc(): return "ACL_INBOUND_%s" % device else: return "FIREWALL_%s" % ip def get_egress_chain(self, device, ip): if self.is_vpc(): return "ACL_OUTBOUND_%s" % device else: return "FW_EGRESS_RULES" def get_egress_table(self): if self.is_vpc(): return 'mangle' else: return "" def has_public_network(self): return self.cmdline().idata().get('has_public_network', 'true') == 'true'