Source code for octodns.record.uri

#
#
#

import re

from ..equality import EqualityTupleMixin
from ..idna import idna_encode
from .base import Record, ValuesMixin, unquote
from .rr import RrParseError
from .validator import RecordValidator, ValueValidator


[docs] class UriNameValidator(RecordValidator): ''' Validates that a URI record's name matches the ``_service._protocol`` pattern required by RFC 7553, or is a wildcard. ''' _name_re = re.compile(r'^(\*|_[^\.]+)\.[^\.]+')
[docs] def validate(self, record_cls, name, fqdn, data): if not self._name_re.match(name): return ['invalid name for URI record'] return []
[docs] class UriValueValidator(ValueValidator): ''' Validates URI rdata: priority and weight are present and integer-parsable, and target is non-empty. '''
[docs] def validate(self, value_cls, data, _type): reasons = [] for value in data: # TODO: validate algorithm and fingerprint_type values try: int(value['priority']) except KeyError: reasons.append('missing priority') except ValueError: reasons.append(f'invalid priority "{value["priority"]}"') try: int(value['weight']) except KeyError: reasons.append('missing weight') except ValueError: reasons.append(f'invalid weight "{value["weight"]}"') try: target = value['target'] if not target: reasons.append('missing target') continue # actual validation of the target is non-trivial and specific # to the details of the schema etc. rfc3986 has support for # validation, but we don't currently require the module and # this seems too esoteric a use case to add it except KeyError: reasons.append('missing target') return reasons
[docs] class UriValueRfcValidator(ValueValidator): ''' Strict URI rdata validator per RFC 7553 §4. - ``priority`` must be an integer in [0, 65535] (uint16). - ``weight`` must be an integer in [0, 65535] (uint16). Enabled as part of the ``strict`` validator set:: manager: enabled: - strict '''
[docs] def validate(self, value_cls, data, _type): reasons = [] for value in data: for field in ('priority', 'weight'): if field not in value: reasons.append(f'missing {field}') else: try: int_val = int(value[field]) if not 0 <= int_val <= 65535: reasons.append( f'invalid {field} "{int_val}"; must be 0-65535' ) except (ValueError, TypeError): reasons.append(f'invalid {field} "{value[field]}"') if 'target' not in value: reasons.append('missing target') return reasons
[docs] class UriValue(EqualityTupleMixin, dict): VALIDATORS = [ UriValueValidator('uri-value', sets={'legacy'}), UriValueRfcValidator('uri-value-rfc', sets={'strict'}), ]
[docs] @classmethod def _schema(cls): return { 'type': 'object', 'required': ['priority', 'weight', 'target'], 'properties': { 'priority': {'type': 'integer', 'minimum': 0, 'maximum': 65535}, 'weight': {'type': 'integer', 'minimum': 0, 'maximum': 65535}, 'target': {'type': 'string'}, }, }
[docs] @classmethod def parse_rdata_text(self, value): try: priority, weight, target = value.split(' ') except ValueError: raise RrParseError() try: priority = int(priority) except ValueError: pass try: weight = int(weight) except ValueError: pass target = unquote(target) return {'priority': priority, 'weight': weight, 'target': target}
[docs] @classmethod def process(cls, values): return [cls(v) for v in values]
[docs] def __init__(self, value): super().__init__( { 'priority': int(value['priority']), 'weight': int(value['weight']), 'target': idna_encode(value['target']), } )
@property def priority(self): return self['priority'] @priority.setter def priority(self, value): self['priority'] = value @property def weight(self): return self['weight'] @weight.setter def weight(self, value): self['weight'] = value @property def target(self): return self['target'] @target.setter def target(self, value): self['target'] = value @property def data(self): return self @property def rdata_text(self): return f'{self.priority} {self.weight} "{self.target}"'
[docs] def template(self, params): if '{' not in self.target: return self new = self.__class__(self) new.target = new.target.format(**params) return new
def __hash__(self): return hash(self.__repr__())
[docs] def _equality_tuple(self): return (self.priority, self.weight, self.target)
[docs] def __repr__(self): return f"'{self.priority} {self.weight} \"{self.target}\"'"
[docs] class UriNameRfcValidator(RecordValidator): ''' Strict URI name validator per RFC 7553 §3 and RFC 6335 §5.1. Requires the first two labels of the record name to be ``_service._proto`` (``*._proto`` is still accepted for wildcards). Both label bodies (after the leading ``_``) must conform to the RFC 6335 §5.1 service name syntax: 1-15 characters, starting with a letter, ending with a letter or digit, containing only letters, digits, and hyphens, and with no consecutive hyphens. Enabled as part of the ``strict`` validator set:: manager: enabled: - strict ''' _max_len = 15
[docs] @classmethod def _is_valid_service_name(cls, body): if not body or len(body) > cls._max_len: return False if not body[0].isalpha(): return False if not body[-1].isalnum(): return False if '--' in body: return False return all(c.isalnum() or c == '-' for c in body)
[docs] def validate(self, record_cls, name, fqdn, data): labels = name.split('.') if name else [] if len(labels) < 2: return ['URI name must have at least two labels (_service._proto)'] reasons = [] service, proto = labels[0], labels[1] if service != '*' and not ( service.startswith('_') and self._is_valid_service_name(service[1:]) ): reasons.append(f'invalid URI service label "{service}"') if not ( proto.startswith('_') and self._is_valid_service_name(proto[1:]) ): reasons.append(f'invalid URI proto label "{proto}"') return reasons
[docs] class UriRecord(ValuesMixin, Record): REFERENCES = ('https://datatracker.ietf.org/doc/html/rfc7553',) _type = 'URI' _value_type = UriValue VALIDATORS = [ UriNameValidator('uri-name', sets={'legacy'}), UriNameRfcValidator('uri-name-rfc', sets={'strict'}), ]
Record.register_type(UriRecord)