Source code for octodns.manager

#
#
#

from collections import deque
from concurrent.futures import ThreadPoolExecutor
from fnmatch import filter as fnmatch_filter
from hashlib import sha256
from importlib import import_module
from importlib.metadata import PackageNotFoundError
from importlib.metadata import version as module_version
from json import dumps
from logging import INFO, getLogger
from re import compile as re_compile
from sys import stdout

from . import __version__
from .deprecation import deprecated
from .idna import IdnaDict, idna_decode, idna_encode
from .processor.arpa import AutoArpa
from .processor.meta import MetaProcessor
from .provider.base import BaseProvider
from .provider.plan import Plan
from .provider.yaml import SplitYamlProvider, YamlProvider
from .secret.environ import EnvironSecrets
from .yaml import safe_load
from .zone import Zone


[docs] class _AggregateTarget(object): id = 'aggregate'
[docs] def __init__(self, targets): self.targets = targets self.SUPPORTS = targets[0].SUPPORTS for target in targets[1:]: self.SUPPORTS = self.SUPPORTS & target.SUPPORTS
[docs] def supports(self, record): for target in self.targets: if not target.supports(record): return False return True
def __getattr__(self, name): if name.startswith('SUPPORTS_'): # special case to handle any current or future SUPPORTS_* by # returning whether all providers support the requested # functionality. for target in self.targets: if not getattr(target, name): return False return True klass = self.__class__.__name__ raise AttributeError(f'{klass} object has no attribute {name}')
[docs] class MakeThreadFuture(object):
[docs] def __init__(self, func, args, kwargs): self.func = func self.args = args self.kwargs = kwargs
[docs] def result(self): return self.func(*self.args, **self.kwargs)
[docs] class MainThreadExecutor(object): ''' Dummy executor that runs things on the main thread during the invocation of submit, but still returns a future object with the result. This allows code to be written to handle async, even in the case where we don't want to use multiple threads/workers and would prefer that things flow as if traditionally written. '''
[docs] def submit(self, func, *args, **kwargs): return MakeThreadFuture(func, args, kwargs)
[docs] class ManagerException(Exception): pass
[docs] class Manager(object): log = getLogger('Manager') plan_log = getLogger('Plan')
[docs] @classmethod def _plan_keyer(cls, p): plan = p[1] return len(plan.changes[0].record.zone.name) if plan.changes else 0
[docs] def __init__( self, config_file, max_workers=None, include_meta=False, auto_arpa=False, enable_checksum=False, ): version = self._try_version('octodns', version=__version__) self.log.info( '__init__: config_file=%s, (octoDNS %s)', config_file, version ) self._configured_sub_zones = None # Read our config file with open(config_file, 'r') as fh: self.config = safe_load(fh, enforce_order=False) zones = self.config['zones'] self.config['zones'] = self._config_zones(zones) manager_config = self.config.get('manager') or {} self._executor = self._config_executor(manager_config, max_workers) self.include_meta = self._config_include_meta( manager_config, include_meta ) self.enable_checksum = self._config_enable_checksum( manager_config, enable_checksum ) # add our hard-coded environ handler first so that other secret # providers can pull in env variables w/it self.secret_handlers = {'env': EnvironSecrets('env')} secret_handlers_config = self.config.get('secret_handlers') or {} self.secret_handlers.update( self._config_secret_handlers(secret_handlers_config) ) self.auto_arpa = self._config_auto_arpa(manager_config, auto_arpa) self.global_processors = manager_config.get('processors') or [] self.log.info('__init__: global_processors=%s', self.global_processors) self.global_post_processors = ( manager_config.get('post_processors') or [] ) self.log.info( '__init__: global_post_processors=%s', self.global_post_processors ) providers_config = self.config['providers'] self.providers = self._config_providers(providers_config) processors_config = self.config.get('processors') or {} self.processors = self._config_processors(processors_config) if self.auto_arpa: self.log.info( '__init__: adding auto-arpa to processors and providers, prepending it to global_post_processors list' ) kwargs = self.auto_arpa if isinstance(self.auto_arpa, dict) else {} auto_arpa = AutoArpa('auto-arpa', **kwargs) self.providers[auto_arpa.name] = auto_arpa self.processors[auto_arpa.name] = auto_arpa self.global_post_processors = [ auto_arpa.name ] + self.global_post_processors if self.include_meta: self.log.info( '__init__: adding meta to processors and providers, appending it to global_post_processors list' ) meta = MetaProcessor( 'meta', record_name='octodns-meta', include_time=False, include_provider=True, ) self.processors[meta.id] = meta self.global_post_processors.append(meta.id) plan_outputs_config = manager_config.get('plan_outputs') or { '_logger': { 'class': 'octodns.provider.plan.PlanLogger', 'level': 'info', } } self.plan_outputs = self._config_plan_outputs(plan_outputs_config)
[docs] def _config_zones(self, zones): # record the set of configured zones we have as they are configured_zones = set([z.lower() for z in zones.keys()]) # walk the configured zones for name in configured_zones: if 'xn--' not in name: continue # this is an IDNA format zone name decoded = idna_decode(name) # do we also have a config for its utf-8 if decoded in configured_zones: raise ManagerException( f'"{decoded}" configured both in utf-8 and idna "{name}"' ) # convert the zones portion of things into an IdnaDict return IdnaDict(zones)
[docs] def _config_executor(self, manager_config, max_workers=None): max_workers = ( manager_config.get('max_workers') or 1 if max_workers is None else max_workers ) self.log.info('_config_executor: max_workers=%d', max_workers) if max_workers > 1: return ThreadPoolExecutor(max_workers=max_workers) return MainThreadExecutor()
[docs] def _config_include_meta(self, manager_config, include_meta=False): include_meta = include_meta or manager_config.get('include_meta', False) self.log.info('_config_include_meta: include_meta=%s', include_meta) return include_meta
[docs] def _config_enable_checksum(self, manager_config, enable_checksum=False): enable_checksum = enable_checksum or manager_config.get( 'enable_checksum', False ) self.log.info( '_config_enable_checksum: enable_checksum=%s', enable_checksum ) return enable_checksum
[docs] def _config_auto_arpa(self, manager_config, auto_arpa=False): auto_arpa = auto_arpa or manager_config.get('auto_arpa', False) self.log.info('_config_auto_arpa: auto_arpa=%s', auto_arpa) return auto_arpa
[docs] def _config_secret_handlers(self, secret_handlers_config): self.log.debug('_config_secret_handlers: configuring secret_handlers') secret_handlers = {} for sh_name, sh_config in secret_handlers_config.items(): # Get our class and remove it from the secret handler config try: _class = sh_config.pop('class') except KeyError: self.log.exception('Invalid secret handler class') raise ManagerException( f'Secret Handler {sh_name} is missing class, {sh_config.context}' ) _class, module, version = self._get_named_class( 'secret handler', _class, sh_config.context ) kwargs = self._build_kwargs(sh_config) try: secret_handlers[sh_name] = _class(sh_name, **kwargs) self.log.info( '__init__: secret_handler=%s (%s %s)', sh_name, module, version, ) except TypeError: self.log.exception('Invalid secret handler config') raise ManagerException( f'Incorrect secret handler config for {sh_name}, {sh_config.context}' ) return secret_handlers
[docs] def _config_providers(self, providers_config): self.log.debug('_config_providers: configuring providers') providers = {} for provider_name, provider_config in providers_config.items(): # Get our class and remove it from the provider_config try: _class = provider_config.pop('class') except KeyError: self.log.exception('Invalid provider class') raise ManagerException( f'Provider {provider_name} is missing class, {provider_config.context}' ) _class, module, version = self._get_named_class( 'provider', _class, provider_config.context ) kwargs = self._build_kwargs(provider_config) try: providers[provider_name] = _class(provider_name, **kwargs) self.log.info( '__init__: provider=%s (%s %s)', provider_name, module, version, ) except TypeError: self.log.exception('Invalid provider config') raise ManagerException( f'Incorrect provider config for {provider_name}, {provider_config.context}' ) return providers
[docs] def _config_processors(self, processors_config): processors = {} for processor_name, processor_config in processors_config.items(): try: _class = processor_config.pop('class') except KeyError: self.log.exception('Invalid processor class') raise ManagerException( f'Processor {processor_name} is missing class, {processor_config.context}' ) _class, module, version = self._get_named_class( 'processor', _class, processor_config.context ) kwargs = self._build_kwargs(processor_config) try: processors[processor_name] = _class(processor_name, **kwargs) self.log.info( '__init__: processor=%s (%s %s)', processor_name, module, version, ) except TypeError: self.log.exception('Invalid processor config') raise ManagerException( f'Incorrect processor config for {processor_name}, {processor_config.context}' ) return processors
[docs] def _config_plan_outputs(self, plan_outputs_config): plan_outputs = {} for plan_output_name, plan_output_config in plan_outputs_config.items(): context = getattr(plan_output_config, 'context', '') try: _class = plan_output_config.pop('class') except KeyError: self.log.exception('Invalid plan_output class') raise ManagerException( f'plan_output {plan_output_name} is missing class, {context}' ) _class, module, version = self._get_named_class( 'plan_output', _class, context ) kwargs = self._build_kwargs(plan_output_config) try: plan_outputs[plan_output_name] = _class( plan_output_name, **kwargs ) # Don't print out version info for the default output if plan_output_name != '_logger': self.log.info( '__init__: plan_output=%s (%s %s)', plan_output_name, module, version, ) except TypeError: self.log.exception('Invalid plan_output config') raise ManagerException( f'Incorrect plan_output config for {plan_output_name}, {context}' ) return plan_outputs
[docs] def _try_version(self, module_name, module=None, version=None): try: # Always try and use the official lookup first return module_version(module_name) except PackageNotFoundError: pass # If we were passed a version that's next in line if version is not None: return version # finally try and import the module and see if it has a __VERSION__ if module is None: module = import_module(module_name) # TODO: remove the __VERSION__ fallback eventually? return getattr( module, '__version__', getattr(module, '__VERSION__', None) )
[docs] def _import_module(self, module_name): current = module_name _next = current.rsplit('.', 1)[0] module = import_module(current) version = self._try_version(current, module=module) # If we didn't find a version in the specific module we're importing, # we'll try walking up the hierarchy, as long as there is one (`.`), # looking for it. while version is None and current != _next: current = _next _next = current.rsplit('.', 1)[0] version = self._try_version(current) return module, version or 'n/a'
[docs] def _get_named_class(self, _type, _class, context): try: module_name, class_name = _class.rsplit('.', 1) module, version = self._import_module(module_name) except (ImportError, ValueError): self.log.exception( '_get_{}_class: Unable to import module %s', _class ) raise ManagerException( f'Unknown {_type} class: {_class}, {context}' ) try: return getattr(module, class_name), module_name, version except AttributeError: self.log.exception( '_get_named_class: Unable to get class %s from module %s', class_name, module, ) raise ManagerException( f'Unknown {_type} class: {_class}, {context}' )
[docs] def _build_kwargs(self, source): # Build up the arguments we need to pass to the provider kwargs = {} for k, v in source.items(): if isinstance(v, dict): v = self._build_kwargs(v) elif isinstance(v, str): if '/' in v: handler, name = v.split('/', 1) try: handler = self.secret_handlers[handler] except KeyError: # we don't have a matching handler, but don't want to # make that an error b/c config values will often # contain /. We don't want to print the values in case # they're sensitive so just provide the key, and even # that only at debug level. self.log.debug( '_build_kwargs: failed to find handler for key "%s"', k, ) else: v = handler.fetch(name, source) kwargs[k] = v return kwargs
[docs] def configured_sub_zones(self, zone_name): ''' Accepts either UTF-8 or IDNA encoded zone name and returns the list of any configured sub-zones in IDNA form. E.g. for the following Configured zones: - some.com. - other.some.com. - deep.thing.some.com. It would return - other - deep.thing ''' if self._configured_sub_zones is None: # First time through we compute all the sub-zones configured_sub_zones = IdnaDict() # Get a list of all of our zone names. Sort them from shortest to # longest so that parents will always come before their subzones zones = sorted( self.config['zones'].keys(), key=lambda z: len(z), reverse=True ) zones = deque(zones) # Until we're done processing zones while zones: # Grab the one we'lre going to work on now zone = zones.pop() dotted = f'.{zone}' trimmer = len(dotted) subs = set() # look at all the zone names that come after it for candidate in zones: # If they end with this zone's dotted name, it's a sub if candidate.endswith(dotted): # We want subs to exclude the zone portion subs.add(candidate[:-trimmer]) configured_sub_zones[zone] = subs self._configured_sub_zones = configured_sub_zones return self._configured_sub_zones.get(zone_name, set())
[docs] def _populate_and_plan( self, zone_name, processors, sources, targets, desired=None, lenient=False, ): zone = self.get_zone(zone_name) self.log.debug( 'sync: populating, zone=%s, lenient=%s', zone.decoded_name, lenient, ) if desired: # This is an alias zone, rather than populate it we'll copy the # records over from `desired`. for _, records in desired._records.items(): for record in records: zone.add_record(record.copy(zone=zone), lenient=lenient) else: for source in sources: try: source.populate(zone, lenient=lenient) except TypeError as e: if "unexpected keyword argument 'lenient'" not in str(e): raise deprecated( f'`populate` method does not support the `lenient` param, fallback is DEPRECATED. Will be removed in 2.0. Class {source.__class__.__name__}', stacklevel=99, ) self.log.warning( 'provider %s does not accept lenient param', source.__class__.__name__, ) source.populate(zone) for processor in processors: try: zone = processor.process_source_zone( zone, sources=sources, lenient=lenient ) except TypeError as e: if "unexpected keyword argument 'lenient'" not in str(e): raise deprecated( f'`process_source_zone` method does not support the `lenient` param, fallback is DEPRECATED. Will be removed in 2.0. Class {processor.__class__.__name__}', stacklevel=99, ) self.log.warning( 'processor %s does not accept lenient param', processor.__class__.__name__, ) zone = processor.process_source_zone(zone, sources=sources) self.log.debug('sync: planning, zone=%s', zone.decoded_name) plans = [] for target in targets: try: plan = target.plan(zone, processors=processors, lenient=lenient) except TypeError as e: e_str = str(e) if "keyword argument 'lenient'" in e_str: deprecated( f'`plan` method does not support the `lenient` param, fallback is DEPRECATED. Will be removed in 2.0. Class {target.__class__.__name__}', stacklevel=99, ) self.log.warning( 'provider.plan %s does not accept lenient param', target.__class__.__name__, ) try: plan = target.plan(zone, processors=processors) except TypeError as e2: if "keyword argument 'processors'" not in str(e2): raise deprecated( f'`plan` method does not support the `processors` param, fallback is DEPRECATED. Will be removed in 2.0. Class {target.__class__.__name__}', stacklevel=99, ) self.log.warning( 'provider.plan %s does not accept processors param', target.__class__.__name__, ) plan = target.plan(zone) elif "keyword argument 'processors'" in e_str: deprecated( f'`plan` method does not support the `processors` param, fallback is DEPRECATED. Will be removed in 2.0. Class {target.__class__.__name__}', stacklevel=99, ) self.log.warning( 'provider.plan %s does not accept processors param', target.__class__.__name__, ) plan = target.plan(zone) else: raise for processor in processors: try: plan = processor.process_plan( plan, sources=sources, target=target, lenient=lenient ) except TypeError as e: if "unexpected keyword argument 'lenient'" not in str(e): raise deprecated( f'`process_plan` method does not support the `lenient` param, fallback is DEPRECATED. Will be removed in 2.0. Class {processor.__class__.__name__}', stacklevel=99, ) self.log.warning( 'processor %s does not accept lenient param', processor.__class__.__name__, ) plan = processor.process_plan( plan, sources=sources, target=target ) if plan: plans.append((target, plan)) # Return the zone as it's the desired state return plans, zone
[docs] def _get_sources(self, decoded_zone_name, config, eligible_sources): try: sources = config['sources'] or [] except KeyError: raise ManagerException( f'Zone {decoded_zone_name} is missing sources' ) if eligible_sources and not [ s for s in sources if s in eligible_sources ]: return None self.log.info('_get_sources: sources=%s', sources) try: # rather than using a list comprehension, we break this loop # out so that the `except` block below can reference the # `source` collected = [] for source in sources: collected.append(self.providers[source]) sources = collected except KeyError: raise ManagerException( f'Zone {decoded_zone_name}, unknown source: {source}' ) return sources
[docs] def _get_processors(self, decoded_zone_name, config): # Build list of processor names processors = ( self.global_processors + (config.get('processors') or []) + self.global_post_processors ) # Translate processor names to processor objects try: collected = [] for processor in processors: collected.append(self.processors[processor]) processors = collected except KeyError: raise ManagerException( f'Zone {decoded_zone_name}, unknown processor: {processor}' ) return processors
[docs] def _preprocess_zones(self, zones, eligible_sources=None, sources=None): ''' This may modify the passed in zone object, it should be ignored after the call and the zones returned from this function should be used instead. ''' source_zones = {} # list since we'll be modifying zones in the loop for name, config in list(zones.items()): if name[0] != '*': # this isn't a dynamic zone config, move along continue # it's dynamic, get a list of zone names from the configured sources found_sources = sources or self._get_sources( name, config, eligible_sources ) self.log.info( '_preprocess_zones: dynamic zone=%s, sources=%s', name, list(s.id for s in found_sources), ) candidates = set() for source in found_sources: if source.id not in source_zones: if not hasattr(source, 'list_zones'): raise ManagerException( f'dynamic zone={name} includes a source, {source.id}, that does not support `list_zones`' ) # get this source's zones listed_zones = set(source.list_zones()) # cache them source_zones[source.id] = listed_zones self.log.debug( '_preprocess_zones: source=%s, list_zones=%s', source.id, listed_zones, ) # add this source's zones to the candidates candidates |= source_zones[source.id] self.log.debug( '_preprocess_zones: name=%s, candidates=%s', name, candidates ) # remove any zones that are already configured, either explicitly or # from a previous dyanmic config candidates -= set(zones.keys()) if glob := config.pop('glob', None): self.log.debug( '_preprocess_zones: name=%s, glob=%s', name, glob ) candidates = set(fnmatch_filter(candidates, glob)) elif regex := config.pop('regex', None): self.log.debug( '_preprocess_zones: name=%s, regex=%s', name, regex ) regex = re_compile(regex) self.log.debug( '_preprocess_zones: name=%s, compiled=%s', name, regex ) candidates = set(z for z in candidates if regex.search(z)) else: # old-style wildcard that uses everything self.log.debug( '_preprocess_zones: name=%s, old semantics, catch all', name ) self.log.debug( '_preprocess_zones: name=%s, matches=%s', name, candidates ) for match in candidates: zones[match] = config # remove the dynamic config element so we don't try and populate it del zones[name] return zones
[docs] def sync( self, eligible_zones=[], eligible_sources=[], eligible_targets=[], dry_run=True, force=False, plan_output_fh=stdout, checksum=None, ): self.log.info( 'sync: eligible_zones=%s, eligible_targets=%s, dry_run=%s, force=%s, plan_output_fh=%s, checksum=%s', eligible_zones, eligible_targets, dry_run, force, getattr(plan_output_fh, 'name', plan_output_fh.__class__.__name__), checksum, ) zones = self.config['zones'] zones = self._preprocess_zones(zones, eligible_sources) if eligible_zones: zones = IdnaDict({n: zones.get(n) for n in eligible_zones}) includes_arpa = any(e.endswith('arpa.') for e in zones.keys()) if self.auto_arpa and includes_arpa: # it's not safe to mess with auto_arpa when we don't have a complete # picture of records, so if any filtering is happening while arpa # zones are in play we need to abort if any(e.endswith('arpa.') for e in eligible_zones): raise ManagerException( 'ARPA zones cannot be synced during partial runs when auto_arpa is enabled' ) if eligible_sources: raise ManagerException( 'eligible_sources is incompatible with auto_arpa' ) if eligible_targets: raise ManagerException( 'eligible_targets is incompatible with auto_arpa' ) aliased_zones = {} delayed_arpa = [] futures = [] for zone_name, config in zones.items(): if config is None: raise ManagerException( f'Requested zone "{zone_name}" not found in config' ) decoded_zone_name = idna_decode(zone_name) self.log.info('sync: zone=%s', decoded_zone_name) if 'alias' in config: source_zone = config['alias'] # Check that the source zone is defined. if source_zone not in self.config['zones']: msg = f'Invalid alias zone {decoded_zone_name}: source zone {idna_decode(source_zone)} does not exist' self.log.error(msg) raise ManagerException(msg) # Check that the source zone is not an alias zone itself. if 'alias' in self.config['zones'][source_zone]: msg = f'Invalid alias zone {decoded_zone_name}: source zone {idna_decode(source_zone)} is an alias zone' self.log.error(msg) raise ManagerException(msg) aliased_zones[zone_name] = source_zone continue lenient = config.get('lenient', False) sources = self._get_sources( decoded_zone_name, config, eligible_sources ) try: targets = config['targets'] or [] except KeyError: raise ManagerException( f'Zone {decoded_zone_name} is missing targets' ) processors = self._get_processors(decoded_zone_name, config) self.log.info('sync: processors=%s', [p.id for p in processors]) if not sources: self.log.info('sync: no eligible sources, skipping') continue if eligible_targets: targets = [t for t in targets if t in eligible_targets] if not targets: # Don't bother planning (and more importantly populating) zones # when we don't have any eligible targets, waste of # time/resources self.log.info('sync: no eligible targets, skipping') continue self.log.info('sync: targets=%s', targets) try: trgs = [] for target in targets: trg = self.providers[target] if not isinstance(trg, BaseProvider): raise ManagerException( f'{trg} - "{target}" does not support targeting' ) trgs.append(trg) targets = trgs except KeyError: raise ManagerException( f'Zone {decoded_zone_name}, unknown ' f'target: {target}' ) kwargs = { 'zone_name': zone_name, 'processors': processors, 'sources': sources, 'targets': targets, 'lenient': lenient, } if self.auto_arpa and zone_name.endswith('arpa.'): delayed_arpa.append(kwargs) else: futures.append( self._executor.submit(self._populate_and_plan, **kwargs) ) # Wait on all results and unpack/flatten the plans and store the # desired states in case we need them below plans = [] desired = {} for future in futures: ps, d = future.result() desired[d.name] = d for plan in ps: plans.append(plan) # Populate aliases zones. futures = [] for zone_name, zone_source in aliased_zones.items(): source_config = self.config['zones'][zone_source] try: desired_config = desired[zone_source] except KeyError: raise ManagerException( f'Zone {idna_decode(zone_name)} cannot be synced ' f'without zone {zone_source} sinced ' 'it is aliased' ) futures.append( self._executor.submit( self._populate_and_plan, zone_name, processors, [], [self.providers[t] for t in source_config['targets']], desired=desired_config, lenient=lenient, ) ) # Wait on results and unpack/flatten the plans, ignore the desired here # as these are aliased zones plans += [p for f in futures for p in f.result()[0]] if delayed_arpa: # if delaying arpa all of the non-arpa zones have been processed now # so it's time to plan them self.log.info( 'sync: processing %d delayed arpa zones', len(delayed_arpa) ) # populate and plan them futures = [ self._executor.submit(self._populate_and_plan, **kwargs) for kwargs in delayed_arpa ] # wait on the results and unpack/flatten the plans plans += [p for f in futures for p in f.result()[0]] # Best effort sort plans children first so that we create/update # children zones before parents which should allow us to more safely # extract things into sub-zones. Combining a child back into a parent # can't really be done all that safely in general so we'll optimize for # this direction. plans.sort(key=self._plan_keyer, reverse=True) for output in self.plan_outputs.values(): output.run(plans=plans, log=self.plan_log, fh=plan_output_fh) computed_checksum = None if plans and self.enable_checksum: data = [p[1].data for p in plans] data = dumps(data) csum = sha256() csum.update(data.encode('utf-8')) computed_checksum = csum.hexdigest() checksum_log = getLogger('Checksum') checksum_log.setLevel(INFO) checksum_log.info('checksum=%s', computed_checksum) if not force: self.log.debug('sync: checking safety') for target, plan in plans: plan.raise_if_unsafe() if dry_run and not checksum: return 0 elif computed_checksum and computed_checksum != checksum: raise ManagerException( f'checksum={checksum} does not match computed={computed_checksum}' ) total_changes = 0 self.log.debug('sync: applying') zones = self.config['zones'] for target, plan in plans: zone_name = plan.existing.decoded_name if zones[zone_name].get('always-dry-run', False): self.log.info( 'sync: zone=%s skipping always-dry-run', zone_name ) continue total_changes += target.apply(plan) self.log.info('sync: %d total changes', total_changes) return total_changes
[docs] def compare(self, a, b, zone): ''' Compare zone data between 2 sources. Note: only things supported by both sources will be considered ''' self.log.info('compare: a=%s, b=%s, zone=%s', a, b, zone) try: a = [self.providers[source] for source in a] b = [self.providers[source] for source in b] except KeyError as e: raise ManagerException(f'Unknown source: {e.args[0]}') za = self.get_zone(zone) for source in a: source.populate(za) zb = self.get_zone(zone) for source in b: source.populate(zb) return zb.changes(za, _AggregateTarget(a + b))
[docs] def dump( self, zone, output_dir, sources, lenient=False, split=False, output_provider=None, ): ''' Dump zone data from the specified source ''' self.log.info( 'dump: zone=%s, output_dir=%s, output_provider=%s, ' 'lenient=%s, split=%s, sources=%s', zone, output_dir, output_provider, lenient, split, sources, ) try: sources = [self.providers[s] for s in sources] except KeyError as e: raise ManagerException(f'Unknown source: {e.args[0]}') if output_provider: self.log.info( 'dump: using specified output_provider=%s', output_provider ) try: target = self.providers[output_provider] except KeyError as e: raise ManagerException(f'Unknown output_provider: {e.args[0]}') # The chosen output provider has to support a directory property so # that we can tell it where the user has requested the dumped files # to reside. if not hasattr(target, 'directory'): msg = ( f'output_provider={output_provider}, does not support ' 'directory property' ) raise ManagerException(msg) if target.directory != output_dir: # If the requested target doesn't match what's configured in # the chosen provider then we'll need to set it. Before doing # that we make a copy of the provider so that it can remain # unchanged and potentially be used as a source, e.g. copying # from one yaml to another if not hasattr(target, 'copy'): msg = ( f'output_provider={output_provider}, does not ' 'support copy method' ) raise ManagerException(msg) target = target.copy() self.log.info( 'dump: setting directory of output_provider copy to %s', output_dir, ) target.directory = output_dir else: self.log.info('dump: using custom YamlProvider') clz = YamlProvider if split: clz = SplitYamlProvider target = clz('dump', output_dir) zones = self.config['zones'] zones = self._preprocess_zones(zones, sources=sources) if '*' in zone: # we want to do everything zones = zones.items() else: # we want to do a specific zone try: zones = [(zone, zones[zone])] except KeyError: raise ManagerException( f'Requested zone "{zone}" not found in config' ) for zone_name, config in zones: decoded_zone_name = idna_decode(zone_name) self.log.info('dump: zone=%s', decoded_zone_name) processors = self._get_processors(decoded_zone_name, config) self.log.info('dump: processors=%s', [p.id for p in processors]) zone = self.get_zone(zone_name) for source in sources: source.populate(zone, lenient=lenient) # Apply processors for processor in processors: try: zone = processor.process_source_zone( zone, sources=sources, lenient=lenient ) except TypeError as e: if "unexpected keyword argument 'lenient'" not in str(e): raise deprecated( f'`process_source_zone` method does not support the `lenient` param, fallback is DEPRECATED. Will be removed in 2.0. Class {processor.__class__.__name__}', stacklevel=99, ) self.log.warning( 'processor %s does not accept lenient param', processor.__class__.__name__, ) zone = processor.process_source_zone(zone, sources=sources) plan = target.plan(zone) if plan is None: plan = Plan(zone, zone, [], False) target.apply(plan)
[docs] def validate_configs(self, lenient=False): # TODO: this code can probably be shared with stuff in sync zones = self.config['zones'] zones = self._preprocess_zones(zones) for zone_name, config in zones.items(): decoded_zone_name = idna_decode(zone_name) zone = self.get_zone(zone_name) source_zone = config.get('alias') if source_zone: if source_zone not in self.config['zones']: self.log.exception('Invalid alias zone') raise ManagerException( f'Invalid alias zone {decoded_zone_name}: ' f'source zone {source_zone} does ' 'not exist' ) if 'alias' in self.config['zones'][source_zone]: self.log.exception('Invalid alias zone') raise ManagerException( f'Invalid alias zone {decoded_zone_name}: ' 'source zone {source_zone} is an ' 'alias zone' ) # this is just here to satisfy coverage, see # https://github.com/nedbat/coveragepy/issues/198 source_zone = source_zone continue try: sources = config['sources'] except KeyError: raise ManagerException( f'Zone {decoded_zone_name} is missing sources' ) try: # rather than using a list comprehension, we break this # loop out so that the `except` block below can reference # the `source` collected = [] for source in sources: collected.append(self.providers[source]) sources = collected except KeyError: raise ManagerException( f'Zone {decoded_zone_name}, unknown source: ' + source ) lenient = lenient or config.get('lenient', False) for source in sources: if isinstance(source, YamlProvider): source.populate(zone, lenient=lenient) # check that processors are in order if any are specified processors = config.get('processors') or [] try: # same as above, but for processors this time for processor in processors: collected.append(self.processors[processor]) except KeyError: raise ManagerException( f'Zone {decoded_zone_name}, unknown ' f'processor: {processor}' )
[docs] def get_zone(self, zone_name): if not zone_name[-1] == '.': raise ManagerException( f'Invalid zone name {idna_decode(zone_name)}, missing ending dot' ) zone = self.config['zones'].get(zone_name) if zone is not None: sub_zones = self.configured_sub_zones(zone_name) update_pcent_threshold = zone.get("update_pcent_threshold", None) delete_pcent_threshold = zone.get("delete_pcent_threshold", None) return Zone( idna_encode(zone_name), sub_zones, update_pcent_threshold, delete_pcent_threshold, ) raise ManagerException(f'Unknown zone name {idna_decode(zone_name)}')