#
#
#
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from fnmatch import filter as fnmatch_filter
from hashlib import sha256
from importlib import import_module
from importlib.metadata import PackageNotFoundError
from importlib.metadata import version as module_version
from json import dumps
from logging import INFO, getLogger
from re import compile as re_compile
from sys import stdout
from . import __version__
from .deprecation import deprecated
from .idna import IdnaDict, idna_decode, idna_encode
from .processor.arpa import AutoArpa
from .processor.meta import MetaProcessor
from .provider.base import BaseProvider
from .provider.plan import Plan
from .provider.yaml import SplitYamlProvider, YamlProvider
from .secret.environ import EnvironSecrets
from .yaml import safe_load
from .zone import Zone
[docs]
class _AggregateTarget(object):
id = 'aggregate'
[docs]
def __init__(self, targets):
self.targets = targets
self.SUPPORTS = targets[0].SUPPORTS
for target in targets[1:]:
self.SUPPORTS = self.SUPPORTS & target.SUPPORTS
[docs]
def supports(self, record):
for target in self.targets:
if not target.supports(record):
return False
return True
def __getattr__(self, name):
if name.startswith('SUPPORTS_'):
# special case to handle any current or future SUPPORTS_* by
# returning whether all providers support the requested
# functionality.
for target in self.targets:
if not getattr(target, name):
return False
return True
klass = self.__class__.__name__
raise AttributeError(f'{klass} object has no attribute {name}')
[docs]
class MakeThreadFuture(object):
[docs]
def __init__(self, func, args, kwargs):
self.func = func
self.args = args
self.kwargs = kwargs
[docs]
def result(self):
return self.func(*self.args, **self.kwargs)
[docs]
class MainThreadExecutor(object):
'''
Dummy executor that runs things on the main thread during the invocation
of submit, but still returns a future object with the result. This allows
code to be written to handle async, even in the case where we don't want to
use multiple threads/workers and would prefer that things flow as if
traditionally written.
'''
[docs]
def submit(self, func, *args, **kwargs):
return MakeThreadFuture(func, args, kwargs)
[docs]
class ManagerException(Exception):
pass
[docs]
class Manager(object):
log = getLogger('Manager')
plan_log = getLogger('Plan')
[docs]
@classmethod
def _plan_keyer(cls, p):
plan = p[1]
return len(plan.changes[0].record.zone.name) if plan.changes else 0
[docs]
def __init__(
self,
config_file,
max_workers=None,
include_meta=False,
auto_arpa=False,
enable_checksum=False,
):
version = self._try_version('octodns', version=__version__)
self.log.info(
'__init__: config_file=%s, (octoDNS %s)', config_file, version
)
self._configured_sub_zones = None
# Read our config file
with open(config_file, 'r') as fh:
self.config = safe_load(fh, enforce_order=False)
zones = self.config['zones']
self.config['zones'] = self._config_zones(zones)
manager_config = self.config.get('manager') or {}
self._executor = self._config_executor(manager_config, max_workers)
self.include_meta = self._config_include_meta(
manager_config, include_meta
)
self.enable_checksum = self._config_enable_checksum(
manager_config, enable_checksum
)
# add our hard-coded environ handler first so that other secret
# providers can pull in env variables w/it
self.secret_handlers = {'env': EnvironSecrets('env')}
secret_handlers_config = self.config.get('secret_handlers') or {}
self.secret_handlers.update(
self._config_secret_handlers(secret_handlers_config)
)
self.auto_arpa = self._config_auto_arpa(manager_config, auto_arpa)
self.global_processors = manager_config.get('processors') or []
self.log.info('__init__: global_processors=%s', self.global_processors)
self.global_post_processors = (
manager_config.get('post_processors') or []
)
self.log.info(
'__init__: global_post_processors=%s', self.global_post_processors
)
providers_config = self.config['providers']
self.providers = self._config_providers(providers_config)
processors_config = self.config.get('processors') or {}
self.processors = self._config_processors(processors_config)
if self.auto_arpa:
self.log.info(
'__init__: adding auto-arpa to processors and providers, prepending it to global_post_processors list'
)
kwargs = self.auto_arpa if isinstance(self.auto_arpa, dict) else {}
auto_arpa = AutoArpa('auto-arpa', **kwargs)
self.providers[auto_arpa.name] = auto_arpa
self.processors[auto_arpa.name] = auto_arpa
self.global_post_processors = [
auto_arpa.name
] + self.global_post_processors
if self.include_meta:
self.log.info(
'__init__: adding meta to processors and providers, appending it to global_post_processors list'
)
meta = MetaProcessor(
'meta',
record_name='octodns-meta',
include_time=False,
include_provider=True,
)
self.processors[meta.id] = meta
self.global_post_processors.append(meta.id)
plan_outputs_config = manager_config.get('plan_outputs') or {
'_logger': {
'class': 'octodns.provider.plan.PlanLogger',
'level': 'info',
}
}
self.plan_outputs = self._config_plan_outputs(plan_outputs_config)
[docs]
def _config_zones(self, zones):
# record the set of configured zones we have as they are
configured_zones = set([z.lower() for z in zones.keys()])
# walk the configured zones
for name in configured_zones:
if 'xn--' not in name:
continue
# this is an IDNA format zone name
decoded = idna_decode(name)
# do we also have a config for its utf-8
if decoded in configured_zones:
raise ManagerException(
f'"{decoded}" configured both in utf-8 and idna "{name}"'
)
# convert the zones portion of things into an IdnaDict
return IdnaDict(zones)
[docs]
def _config_executor(self, manager_config, max_workers=None):
max_workers = (
manager_config.get('max_workers') or 1
if max_workers is None
else max_workers
)
self.log.info('_config_executor: max_workers=%d', max_workers)
if max_workers > 1:
return ThreadPoolExecutor(max_workers=max_workers)
return MainThreadExecutor()
[docs]
def _config_enable_checksum(self, manager_config, enable_checksum=False):
enable_checksum = enable_checksum or manager_config.get(
'enable_checksum', False
)
self.log.info(
'_config_enable_checksum: enable_checksum=%s', enable_checksum
)
return enable_checksum
[docs]
def _config_auto_arpa(self, manager_config, auto_arpa=False):
auto_arpa = auto_arpa or manager_config.get('auto_arpa', False)
self.log.info('_config_auto_arpa: auto_arpa=%s', auto_arpa)
return auto_arpa
[docs]
def _config_secret_handlers(self, secret_handlers_config):
self.log.debug('_config_secret_handlers: configuring secret_handlers')
secret_handlers = {}
for sh_name, sh_config in secret_handlers_config.items():
# Get our class and remove it from the secret handler config
try:
_class = sh_config.pop('class')
except KeyError:
self.log.exception('Invalid secret handler class')
raise ManagerException(
f'Secret Handler {sh_name} is missing class, {sh_config.context}'
)
_class, module, version = self._get_named_class(
'secret handler', _class, sh_config.context
)
kwargs = self._build_kwargs(sh_config)
try:
secret_handlers[sh_name] = _class(sh_name, **kwargs)
self.log.info(
'__init__: secret_handler=%s (%s %s)',
sh_name,
module,
version,
)
except TypeError:
self.log.exception('Invalid secret handler config')
raise ManagerException(
f'Incorrect secret handler config for {sh_name}, {sh_config.context}'
)
return secret_handlers
[docs]
def _config_providers(self, providers_config):
self.log.debug('_config_providers: configuring providers')
providers = {}
for provider_name, provider_config in providers_config.items():
# Get our class and remove it from the provider_config
try:
_class = provider_config.pop('class')
except KeyError:
self.log.exception('Invalid provider class')
raise ManagerException(
f'Provider {provider_name} is missing class, {provider_config.context}'
)
_class, module, version = self._get_named_class(
'provider', _class, provider_config.context
)
kwargs = self._build_kwargs(provider_config)
try:
providers[provider_name] = _class(provider_name, **kwargs)
self.log.info(
'__init__: provider=%s (%s %s)',
provider_name,
module,
version,
)
except TypeError:
self.log.exception('Invalid provider config')
raise ManagerException(
f'Incorrect provider config for {provider_name}, {provider_config.context}'
)
return providers
[docs]
def _config_processors(self, processors_config):
processors = {}
for processor_name, processor_config in processors_config.items():
try:
_class = processor_config.pop('class')
except KeyError:
self.log.exception('Invalid processor class')
raise ManagerException(
f'Processor {processor_name} is missing class, {processor_config.context}'
)
_class, module, version = self._get_named_class(
'processor', _class, processor_config.context
)
kwargs = self._build_kwargs(processor_config)
try:
processors[processor_name] = _class(processor_name, **kwargs)
self.log.info(
'__init__: processor=%s (%s %s)',
processor_name,
module,
version,
)
except TypeError:
self.log.exception('Invalid processor config')
raise ManagerException(
f'Incorrect processor config for {processor_name}, {processor_config.context}'
)
return processors
[docs]
def _config_plan_outputs(self, plan_outputs_config):
plan_outputs = {}
for plan_output_name, plan_output_config in plan_outputs_config.items():
context = getattr(plan_output_config, 'context', '')
try:
_class = plan_output_config.pop('class')
except KeyError:
self.log.exception('Invalid plan_output class')
raise ManagerException(
f'plan_output {plan_output_name} is missing class, {context}'
)
_class, module, version = self._get_named_class(
'plan_output', _class, context
)
kwargs = self._build_kwargs(plan_output_config)
try:
plan_outputs[plan_output_name] = _class(
plan_output_name, **kwargs
)
# Don't print out version info for the default output
if plan_output_name != '_logger':
self.log.info(
'__init__: plan_output=%s (%s %s)',
plan_output_name,
module,
version,
)
except TypeError:
self.log.exception('Invalid plan_output config')
raise ManagerException(
f'Incorrect plan_output config for {plan_output_name}, {context}'
)
return plan_outputs
[docs]
def _try_version(self, module_name, module=None, version=None):
try:
# Always try and use the official lookup first
return module_version(module_name)
except PackageNotFoundError:
pass
# If we were passed a version that's next in line
if version is not None:
return version
# finally try and import the module and see if it has a __VERSION__
if module is None:
module = import_module(module_name)
# TODO: remove the __VERSION__ fallback eventually?
return getattr(
module, '__version__', getattr(module, '__VERSION__', None)
)
[docs]
def _import_module(self, module_name):
current = module_name
_next = current.rsplit('.', 1)[0]
module = import_module(current)
version = self._try_version(current, module=module)
# If we didn't find a version in the specific module we're importing,
# we'll try walking up the hierarchy, as long as there is one (`.`),
# looking for it.
while version is None and current != _next:
current = _next
_next = current.rsplit('.', 1)[0]
version = self._try_version(current)
return module, version or 'n/a'
[docs]
def _get_named_class(self, _type, _class, context):
try:
module_name, class_name = _class.rsplit('.', 1)
module, version = self._import_module(module_name)
except (ImportError, ValueError):
self.log.exception(
'_get_{}_class: Unable to import module %s', _class
)
raise ManagerException(
f'Unknown {_type} class: {_class}, {context}'
)
try:
return getattr(module, class_name), module_name, version
except AttributeError:
self.log.exception(
'_get_named_class: Unable to get class %s from module %s',
class_name,
module,
)
raise ManagerException(
f'Unknown {_type} class: {_class}, {context}'
)
[docs]
def _build_kwargs(self, source):
# Build up the arguments we need to pass to the provider
kwargs = {}
for k, v in source.items():
if isinstance(v, dict):
v = self._build_kwargs(v)
elif isinstance(v, str):
if '/' in v:
handler, name = v.split('/', 1)
try:
handler = self.secret_handlers[handler]
except KeyError:
# we don't have a matching handler, but don't want to
# make that an error b/c config values will often
# contain /. We don't want to print the values in case
# they're sensitive so just provide the key, and even
# that only at debug level.
self.log.debug(
'_build_kwargs: failed to find handler for key "%s"',
k,
)
else:
v = handler.fetch(name, source)
kwargs[k] = v
return kwargs
[docs]
def _populate_and_plan(
self,
zone_name,
processors,
sources,
targets,
desired=None,
lenient=False,
):
zone = self.get_zone(zone_name)
self.log.debug(
'sync: populating, zone=%s, lenient=%s',
zone.decoded_name,
lenient,
)
if desired:
# This is an alias zone, rather than populate it we'll copy the
# records over from `desired`.
for _, records in desired._records.items():
for record in records:
zone.add_record(record.copy(zone=zone), lenient=lenient)
else:
for source in sources:
try:
source.populate(zone, lenient=lenient)
except TypeError as e:
if "unexpected keyword argument 'lenient'" not in str(e):
raise
deprecated(
f'`populate` method does not support the `lenient` param, fallback is DEPRECATED. Will be removed in 2.0. Class {source.__class__.__name__}',
stacklevel=99,
)
self.log.warning(
'provider %s does not accept lenient param',
source.__class__.__name__,
)
source.populate(zone)
for processor in processors:
try:
zone = processor.process_source_zone(
zone, sources=sources, lenient=lenient
)
except TypeError as e:
if "unexpected keyword argument 'lenient'" not in str(e):
raise
deprecated(
f'`process_source_zone` method does not support the `lenient` param, fallback is DEPRECATED. Will be removed in 2.0. Class {processor.__class__.__name__}',
stacklevel=99,
)
self.log.warning(
'processor %s does not accept lenient param',
processor.__class__.__name__,
)
zone = processor.process_source_zone(zone, sources=sources)
self.log.debug('sync: planning, zone=%s', zone.decoded_name)
plans = []
for target in targets:
try:
plan = target.plan(zone, processors=processors, lenient=lenient)
except TypeError as e:
e_str = str(e)
if "keyword argument 'lenient'" in e_str:
deprecated(
f'`plan` method does not support the `lenient` param, fallback is DEPRECATED. Will be removed in 2.0. Class {target.__class__.__name__}',
stacklevel=99,
)
self.log.warning(
'provider.plan %s does not accept lenient param',
target.__class__.__name__,
)
try:
plan = target.plan(zone, processors=processors)
except TypeError as e2:
if "keyword argument 'processors'" not in str(e2):
raise
deprecated(
f'`plan` method does not support the `processors` param, fallback is DEPRECATED. Will be removed in 2.0. Class {target.__class__.__name__}',
stacklevel=99,
)
self.log.warning(
'provider.plan %s does not accept processors param',
target.__class__.__name__,
)
plan = target.plan(zone)
elif "keyword argument 'processors'" in e_str:
deprecated(
f'`plan` method does not support the `processors` param, fallback is DEPRECATED. Will be removed in 2.0. Class {target.__class__.__name__}',
stacklevel=99,
)
self.log.warning(
'provider.plan %s does not accept processors param',
target.__class__.__name__,
)
plan = target.plan(zone)
else:
raise
for processor in processors:
try:
plan = processor.process_plan(
plan, sources=sources, target=target, lenient=lenient
)
except TypeError as e:
if "unexpected keyword argument 'lenient'" not in str(e):
raise
deprecated(
f'`process_plan` method does not support the `lenient` param, fallback is DEPRECATED. Will be removed in 2.0. Class {processor.__class__.__name__}',
stacklevel=99,
)
self.log.warning(
'processor %s does not accept lenient param',
processor.__class__.__name__,
)
plan = processor.process_plan(
plan, sources=sources, target=target
)
if plan:
plans.append((target, plan))
# Return the zone as it's the desired state
return plans, zone
[docs]
def _get_sources(self, decoded_zone_name, config, eligible_sources):
try:
sources = config['sources'] or []
except KeyError:
raise ManagerException(
f'Zone {decoded_zone_name} is missing sources'
)
if eligible_sources and not [
s for s in sources if s in eligible_sources
]:
return None
self.log.info('_get_sources: sources=%s', sources)
try:
# rather than using a list comprehension, we break this loop
# out so that the `except` block below can reference the
# `source`
collected = []
for source in sources:
collected.append(self.providers[source])
sources = collected
except KeyError:
raise ManagerException(
f'Zone {decoded_zone_name}, unknown source: {source}'
)
return sources
[docs]
def _get_processors(self, decoded_zone_name, config):
# Build list of processor names
processors = (
self.global_processors
+ (config.get('processors') or [])
+ self.global_post_processors
)
# Translate processor names to processor objects
try:
collected = []
for processor in processors:
collected.append(self.processors[processor])
processors = collected
except KeyError:
raise ManagerException(
f'Zone {decoded_zone_name}, unknown processor: {processor}'
)
return processors
[docs]
def _preprocess_zones(self, zones, eligible_sources=None, sources=None):
'''
This may modify the passed in zone object, it should be ignored after
the call and the zones returned from this function should be used
instead.
'''
source_zones = {}
# list since we'll be modifying zones in the loop
for name, config in list(zones.items()):
if name[0] != '*':
# this isn't a dynamic zone config, move along
continue
# it's dynamic, get a list of zone names from the configured sources
found_sources = sources or self._get_sources(
name, config, eligible_sources
)
self.log.info(
'_preprocess_zones: dynamic zone=%s, sources=%s',
name,
list(s.id for s in found_sources),
)
candidates = set()
for source in found_sources:
if source.id not in source_zones:
if not hasattr(source, 'list_zones'):
raise ManagerException(
f'dynamic zone={name} includes a source, {source.id}, that does not support `list_zones`'
)
# get this source's zones
listed_zones = set(source.list_zones())
# cache them
source_zones[source.id] = listed_zones
self.log.debug(
'_preprocess_zones: source=%s, list_zones=%s',
source.id,
listed_zones,
)
# add this source's zones to the candidates
candidates |= source_zones[source.id]
self.log.debug(
'_preprocess_zones: name=%s, candidates=%s', name, candidates
)
# remove any zones that are already configured, either explicitly or
# from a previous dyanmic config
candidates -= set(zones.keys())
if glob := config.pop('glob', None):
self.log.debug(
'_preprocess_zones: name=%s, glob=%s', name, glob
)
candidates = set(fnmatch_filter(candidates, glob))
elif regex := config.pop('regex', None):
self.log.debug(
'_preprocess_zones: name=%s, regex=%s', name, regex
)
regex = re_compile(regex)
self.log.debug(
'_preprocess_zones: name=%s, compiled=%s', name, regex
)
candidates = set(z for z in candidates if regex.search(z))
else:
# old-style wildcard that uses everything
self.log.debug(
'_preprocess_zones: name=%s, old semantics, catch all', name
)
self.log.debug(
'_preprocess_zones: name=%s, matches=%s', name, candidates
)
for match in candidates:
zones[match] = config
# remove the dynamic config element so we don't try and populate it
del zones[name]
return zones
[docs]
def sync(
self,
eligible_zones=[],
eligible_sources=[],
eligible_targets=[],
dry_run=True,
force=False,
plan_output_fh=stdout,
checksum=None,
):
self.log.info(
'sync: eligible_zones=%s, eligible_targets=%s, dry_run=%s, force=%s, plan_output_fh=%s, checksum=%s',
eligible_zones,
eligible_targets,
dry_run,
force,
getattr(plan_output_fh, 'name', plan_output_fh.__class__.__name__),
checksum,
)
zones = self.config['zones']
zones = self._preprocess_zones(zones, eligible_sources)
if eligible_zones:
zones = IdnaDict({n: zones.get(n) for n in eligible_zones})
includes_arpa = any(e.endswith('arpa.') for e in zones.keys())
if self.auto_arpa and includes_arpa:
# it's not safe to mess with auto_arpa when we don't have a complete
# picture of records, so if any filtering is happening while arpa
# zones are in play we need to abort
if any(e.endswith('arpa.') for e in eligible_zones):
raise ManagerException(
'ARPA zones cannot be synced during partial runs when auto_arpa is enabled'
)
if eligible_sources:
raise ManagerException(
'eligible_sources is incompatible with auto_arpa'
)
if eligible_targets:
raise ManagerException(
'eligible_targets is incompatible with auto_arpa'
)
aliased_zones = {}
delayed_arpa = []
futures = []
for zone_name, config in zones.items():
if config is None:
raise ManagerException(
f'Requested zone "{zone_name}" not found in config'
)
decoded_zone_name = idna_decode(zone_name)
self.log.info('sync: zone=%s', decoded_zone_name)
if 'alias' in config:
source_zone = config['alias']
# Check that the source zone is defined.
if source_zone not in self.config['zones']:
msg = f'Invalid alias zone {decoded_zone_name}: source zone {idna_decode(source_zone)} does not exist'
self.log.error(msg)
raise ManagerException(msg)
# Check that the source zone is not an alias zone itself.
if 'alias' in self.config['zones'][source_zone]:
msg = f'Invalid alias zone {decoded_zone_name}: source zone {idna_decode(source_zone)} is an alias zone'
self.log.error(msg)
raise ManagerException(msg)
aliased_zones[zone_name] = source_zone
continue
lenient = config.get('lenient', False)
sources = self._get_sources(
decoded_zone_name, config, eligible_sources
)
try:
targets = config['targets'] or []
except KeyError:
raise ManagerException(
f'Zone {decoded_zone_name} is missing targets'
)
processors = self._get_processors(decoded_zone_name, config)
self.log.info('sync: processors=%s', [p.id for p in processors])
if not sources:
self.log.info('sync: no eligible sources, skipping')
continue
if eligible_targets:
targets = [t for t in targets if t in eligible_targets]
if not targets:
# Don't bother planning (and more importantly populating) zones
# when we don't have any eligible targets, waste of
# time/resources
self.log.info('sync: no eligible targets, skipping')
continue
self.log.info('sync: targets=%s', targets)
try:
trgs = []
for target in targets:
trg = self.providers[target]
if not isinstance(trg, BaseProvider):
raise ManagerException(
f'{trg} - "{target}" does not support targeting'
)
trgs.append(trg)
targets = trgs
except KeyError:
raise ManagerException(
f'Zone {decoded_zone_name}, unknown ' f'target: {target}'
)
kwargs = {
'zone_name': zone_name,
'processors': processors,
'sources': sources,
'targets': targets,
'lenient': lenient,
}
if self.auto_arpa and zone_name.endswith('arpa.'):
delayed_arpa.append(kwargs)
else:
futures.append(
self._executor.submit(self._populate_and_plan, **kwargs)
)
# Wait on all results and unpack/flatten the plans and store the
# desired states in case we need them below
plans = []
desired = {}
for future in futures:
ps, d = future.result()
desired[d.name] = d
for plan in ps:
plans.append(plan)
# Populate aliases zones.
futures = []
for zone_name, zone_source in aliased_zones.items():
source_config = self.config['zones'][zone_source]
try:
desired_config = desired[zone_source]
except KeyError:
raise ManagerException(
f'Zone {idna_decode(zone_name)} cannot be synced '
f'without zone {zone_source} sinced '
'it is aliased'
)
futures.append(
self._executor.submit(
self._populate_and_plan,
zone_name,
processors,
[],
[self.providers[t] for t in source_config['targets']],
desired=desired_config,
lenient=lenient,
)
)
# Wait on results and unpack/flatten the plans, ignore the desired here
# as these are aliased zones
plans += [p for f in futures for p in f.result()[0]]
if delayed_arpa:
# if delaying arpa all of the non-arpa zones have been processed now
# so it's time to plan them
self.log.info(
'sync: processing %d delayed arpa zones', len(delayed_arpa)
)
# populate and plan them
futures = [
self._executor.submit(self._populate_and_plan, **kwargs)
for kwargs in delayed_arpa
]
# wait on the results and unpack/flatten the plans
plans += [p for f in futures for p in f.result()[0]]
# Best effort sort plans children first so that we create/update
# children zones before parents which should allow us to more safely
# extract things into sub-zones. Combining a child back into a parent
# can't really be done all that safely in general so we'll optimize for
# this direction.
plans.sort(key=self._plan_keyer, reverse=True)
for output in self.plan_outputs.values():
output.run(plans=plans, log=self.plan_log, fh=plan_output_fh)
computed_checksum = None
if plans and self.enable_checksum:
data = [p[1].data for p in plans]
data = dumps(data)
csum = sha256()
csum.update(data.encode('utf-8'))
computed_checksum = csum.hexdigest()
checksum_log = getLogger('Checksum')
checksum_log.setLevel(INFO)
checksum_log.info('checksum=%s', computed_checksum)
if not force:
self.log.debug('sync: checking safety')
for target, plan in plans:
plan.raise_if_unsafe()
if dry_run and not checksum:
return 0
elif computed_checksum and computed_checksum != checksum:
raise ManagerException(
f'checksum={checksum} does not match computed={computed_checksum}'
)
total_changes = 0
self.log.debug('sync: applying')
zones = self.config['zones']
for target, plan in plans:
zone_name = plan.existing.decoded_name
if zones[zone_name].get('always-dry-run', False):
self.log.info(
'sync: zone=%s skipping always-dry-run', zone_name
)
continue
total_changes += target.apply(plan)
self.log.info('sync: %d total changes', total_changes)
return total_changes
[docs]
def compare(self, a, b, zone):
'''
Compare zone data between 2 sources.
Note: only things supported by both sources will be considered
'''
self.log.info('compare: a=%s, b=%s, zone=%s', a, b, zone)
try:
a = [self.providers[source] for source in a]
b = [self.providers[source] for source in b]
except KeyError as e:
raise ManagerException(f'Unknown source: {e.args[0]}')
za = self.get_zone(zone)
for source in a:
source.populate(za)
zb = self.get_zone(zone)
for source in b:
source.populate(zb)
return zb.changes(za, _AggregateTarget(a + b))
[docs]
def dump(
self,
zone,
output_dir,
sources,
lenient=False,
split=False,
output_provider=None,
):
'''
Dump zone data from the specified source
'''
self.log.info(
'dump: zone=%s, output_dir=%s, output_provider=%s, '
'lenient=%s, split=%s, sources=%s',
zone,
output_dir,
output_provider,
lenient,
split,
sources,
)
try:
sources = [self.providers[s] for s in sources]
except KeyError as e:
raise ManagerException(f'Unknown source: {e.args[0]}')
if output_provider:
self.log.info(
'dump: using specified output_provider=%s', output_provider
)
try:
target = self.providers[output_provider]
except KeyError as e:
raise ManagerException(f'Unknown output_provider: {e.args[0]}')
# The chosen output provider has to support a directory property so
# that we can tell it where the user has requested the dumped files
# to reside.
if not hasattr(target, 'directory'):
msg = (
f'output_provider={output_provider}, does not support '
'directory property'
)
raise ManagerException(msg)
if target.directory != output_dir:
# If the requested target doesn't match what's configured in
# the chosen provider then we'll need to set it. Before doing
# that we make a copy of the provider so that it can remain
# unchanged and potentially be used as a source, e.g. copying
# from one yaml to another
if not hasattr(target, 'copy'):
msg = (
f'output_provider={output_provider}, does not '
'support copy method'
)
raise ManagerException(msg)
target = target.copy()
self.log.info(
'dump: setting directory of output_provider copy to %s',
output_dir,
)
target.directory = output_dir
else:
self.log.info('dump: using custom YamlProvider')
clz = YamlProvider
if split:
clz = SplitYamlProvider
target = clz('dump', output_dir)
zones = self.config['zones']
zones = self._preprocess_zones(zones, sources=sources)
if '*' in zone:
# we want to do everything
zones = zones.items()
else:
# we want to do a specific zone
try:
zones = [(zone, zones[zone])]
except KeyError:
raise ManagerException(
f'Requested zone "{zone}" not found in config'
)
for zone_name, config in zones:
decoded_zone_name = idna_decode(zone_name)
self.log.info('dump: zone=%s', decoded_zone_name)
processors = self._get_processors(decoded_zone_name, config)
self.log.info('dump: processors=%s', [p.id for p in processors])
zone = self.get_zone(zone_name)
for source in sources:
source.populate(zone, lenient=lenient)
# Apply processors
for processor in processors:
try:
zone = processor.process_source_zone(
zone, sources=sources, lenient=lenient
)
except TypeError as e:
if "unexpected keyword argument 'lenient'" not in str(e):
raise
deprecated(
f'`process_source_zone` method does not support the `lenient` param, fallback is DEPRECATED. Will be removed in 2.0. Class {processor.__class__.__name__}',
stacklevel=99,
)
self.log.warning(
'processor %s does not accept lenient param',
processor.__class__.__name__,
)
zone = processor.process_source_zone(zone, sources=sources)
plan = target.plan(zone)
if plan is None:
plan = Plan(zone, zone, [], False)
target.apply(plan)
[docs]
def validate_configs(self, lenient=False):
# TODO: this code can probably be shared with stuff in sync
zones = self.config['zones']
zones = self._preprocess_zones(zones)
for zone_name, config in zones.items():
decoded_zone_name = idna_decode(zone_name)
zone = self.get_zone(zone_name)
source_zone = config.get('alias')
if source_zone:
if source_zone not in self.config['zones']:
self.log.exception('Invalid alias zone')
raise ManagerException(
f'Invalid alias zone {decoded_zone_name}: '
f'source zone {source_zone} does '
'not exist'
)
if 'alias' in self.config['zones'][source_zone]:
self.log.exception('Invalid alias zone')
raise ManagerException(
f'Invalid alias zone {decoded_zone_name}: '
'source zone {source_zone} is an '
'alias zone'
)
# this is just here to satisfy coverage, see
# https://github.com/nedbat/coveragepy/issues/198
source_zone = source_zone
continue
try:
sources = config['sources']
except KeyError:
raise ManagerException(
f'Zone {decoded_zone_name} is missing sources'
)
try:
# rather than using a list comprehension, we break this
# loop out so that the `except` block below can reference
# the `source`
collected = []
for source in sources:
collected.append(self.providers[source])
sources = collected
except KeyError:
raise ManagerException(
f'Zone {decoded_zone_name}, unknown source: ' + source
)
lenient = lenient or config.get('lenient', False)
for source in sources:
if isinstance(source, YamlProvider):
source.populate(zone, lenient=lenient)
# check that processors are in order if any are specified
processors = config.get('processors') or []
try:
# same as above, but for processors this time
for processor in processors:
collected.append(self.processors[processor])
except KeyError:
raise ManagerException(
f'Zone {decoded_zone_name}, unknown '
f'processor: {processor}'
)
[docs]
def get_zone(self, zone_name):
if not zone_name[-1] == '.':
raise ManagerException(
f'Invalid zone name {idna_decode(zone_name)}, missing ending dot'
)
zone = self.config['zones'].get(zone_name)
if zone is not None:
sub_zones = self.configured_sub_zones(zone_name)
update_pcent_threshold = zone.get("update_pcent_threshold", None)
delete_pcent_threshold = zone.get("delete_pcent_threshold", None)
return Zone(
idna_encode(zone_name),
sub_zones,
update_pcent_threshold,
delete_pcent_threshold,
)
raise ManagerException(f'Unknown zone name {idna_decode(zone_name)}')