Source code for octodns.record.srv

#
#
#

import re

from ..equality import EqualityTupleMixin
from ..idna import idna_encode
from .base import Record, ValuesMixin, unquote
from .rr import RrParseError
from .target import (
    _check_target_format,
    _check_target_not_ip,
    _check_target_trailing_dot,
)
from .validator import RecordValidator, ValueValidator


[docs] class SrvNameValidator(RecordValidator): ''' Validates that an SRV record's name matches the ``_service._protocol`` pattern required by RFC 2782 (e.g. ``_http._tcp``), or is a wildcard. ''' _name_re = re.compile(r'^(\*|_[^\.]+)\.[^\.]+')
[docs] def validate(self, record_cls, name, fqdn, data): if not self._name_re.match(name): return ['invalid name for SRV record'] return []
[docs] class SrvValueValidator(ValueValidator): ''' Validates SRV rdata: priority, weight, and port are present and integer-parsable, and target is a valid FQDN. '''
[docs] def validate(self, value_cls, data, _type): reasons = [] for value in data: # TODO: validate algorithm and fingerprint_type values try: int(value['priority']) except KeyError: reasons.append('missing priority') except ValueError: reasons.append(f'invalid priority "{value["priority"]}"') try: int(value['weight']) except KeyError: reasons.append('missing weight') except ValueError: reasons.append(f'invalid weight "{value["weight"]}"') try: int(value['port']) except KeyError: reasons.append('missing port') except ValueError: reasons.append(f'invalid port "{value["port"]}"') try: target = value['target'] reasons += _check_target_format(target, _type, 'target') except KeyError: reasons.append('missing target') return reasons
[docs] class SrvNameRfcValidator(RecordValidator): ''' Strict SRV name validator per RFC 2782 and RFC 6335 §5.1. Requires the first two labels of the record name to be ``_service._proto`` (``*._proto`` is still accepted for wildcards). Both the service and proto label bodies (after the leading ``_``) must conform to the RFC 6335 §5.1 service name syntax: 1-15 characters, starting with a letter, ending with a letter or digit, containing only letters, digits, and hyphens, and with no consecutive hyphens. Enabled as part of the ``rfc`` validator set:: manager: enabled: - rfc ''' _max_len = 15
[docs] @classmethod def _is_valid_service_name(cls, body): if not body or len(body) > cls._max_len: return False if not body[0].isalpha(): return False if not body[-1].isalnum(): return False if '--' in body: return False return all(c.isalnum() or c == '-' for c in body)
[docs] def validate(self, record_cls, name, fqdn, data): labels = name.split('.') if name else [] if len(labels) < 2: return ['SRV name must have at least two labels (_service._proto)'] reasons = [] service, proto = labels[0], labels[1] if service != '*' and not ( service.startswith('_') and self._is_valid_service_name(service[1:]) ): reasons.append(f'invalid SRV service label "{service}"') if not ( proto.startswith('_') and self._is_valid_service_name(proto[1:]) ): reasons.append(f'invalid SRV proto label "{proto}"') return reasons
[docs] class SrvValueRfcValidator(ValueValidator): ''' Strict SRV rdata validator per RFC 2782. - ``priority``, ``weight``, and ``port`` must each be in the 0-65535 range. - When ``target`` is ``"."``, ``priority``, ``weight``, and ``port`` must all be ``0`` (RFC 2782 "service not available" convention). - When ``target`` is not ``"."``, ``port`` must be greater than 0 (port 0 is IANA-reserved). Assumes the base ``SrvValueValidator`` has already caught missing or non-integer fields; entries that fail those checks are skipped here to avoid duplicated reasons. Enabled as part of the ``rfc`` validator set:: manager: enabled: - rfc '''
[docs] @staticmethod def _as_int(value, field): try: return int(value[field]) except (KeyError, ValueError, TypeError): return None
[docs] def validate(self, value_cls, data, _type): reasons = [] for value in data: fields = { name: self._as_int(value, name) for name in ('priority', 'weight', 'port') } for name, v in fields.items(): if v is not None and not 0 <= v <= 65535: reasons.append(f'{name} "{v}" out of range 0-65535') target = value.get('target') if target == '.': for name, v in fields.items(): if v is not None and v != 0: reasons.append(f'{name} must be 0 when target is "."') elif target and fields['port'] == 0: reasons.append( 'port 0 is reserved; must be > 0 when target is not "."' ) return reasons
[docs] class SrvValueBestPracticeValidator(ValueValidator): ''' Checks that the SRV ``target`` field ends with a trailing ``.`` (fully-qualified name). Enabled as part of the ``best-practice`` validator set:: manager: enabled: - best-practice '''
[docs] def validate(self, value_cls, data, _type): reasons = [] for value in data: target = value.get('target') if target: reasons += _check_target_trailing_dot(target, _type, 'target') return reasons
[docs] class SrvValueNotIpValidator(ValueValidator): ''' Checks that the SRV ``target`` field is not an IP address. '''
[docs] def validate(self, value_cls, data, _type): reasons = [] for value in data: target = value.get('target') if target: reasons += _check_target_not_ip(target, _type, 'target') return reasons
[docs] class SrvValue(EqualityTupleMixin, dict): VALIDATORS = [ SrvValueValidator('srv-value', sets={'legacy'}), SrvValueRfcValidator('srv-value-rfc', sets={'strict'}), SrvValueNotIpValidator( 'srv-value-not-ip', sets={'strict', 'best-practice'} ), SrvValueBestPracticeValidator( 'srv-value-best-practice', sets={'best-practice'} ), ]
[docs] @classmethod def _schema(cls): return { 'type': 'object', 'required': ['priority', 'weight', 'port', 'target'], 'properties': { 'priority': {'type': 'integer', 'minimum': 0, 'maximum': 65535}, 'weight': {'type': 'integer', 'minimum': 0, 'maximum': 65535}, 'port': {'type': 'integer', 'minimum': 0, 'maximum': 65535}, 'target': {'type': 'string'}, }, }
[docs] @classmethod def parse_rdata_text(self, value): try: priority, weight, port, target = value.split(' ') except ValueError: raise RrParseError() try: priority = int(priority) except ValueError: pass try: weight = int(weight) except ValueError: pass try: port = int(port) except ValueError: pass target = unquote(target) return { 'priority': priority, 'weight': weight, 'port': port, 'target': target, }
[docs] @classmethod def process(cls, values): return [cls(v) for v in values]
[docs] def __init__(self, value): super().__init__( { 'priority': int(value['priority']), 'weight': int(value['weight']), 'port': int(value['port']), 'target': idna_encode(value['target']), } )
@property def priority(self): return self['priority'] @priority.setter def priority(self, value): self['priority'] = value @property def weight(self): return self['weight'] @weight.setter def weight(self, value): self['weight'] = value @property def port(self): return self['port'] @port.setter def port(self, value): self['port'] = value @property def target(self): return self['target'] @target.setter def target(self, value): self['target'] = value @property def data(self): return self @property def rdata_text(self): return f"{self.priority} {self.weight} {self.port} {self.target}"
[docs] def template(self, params): if '{' not in self.target: return self new = self.__class__(self) new.target = new.target.format(**params) return new
def __hash__(self): return hash(self.__repr__())
[docs] def _equality_tuple(self): return (self.priority, self.weight, self.port, self.target)
[docs] def __repr__(self): return f"'{self.priority} {self.weight} {self.port} {self.target}'"
[docs] class SrvRecord(ValuesMixin, Record): REFERENCES = ( 'https://datatracker.ietf.org/doc/html/rfc2782', 'https://datatracker.ietf.org/doc/html/rfc6335', ) _type = 'SRV' _value_type = SrvValue VALIDATORS = [ SrvNameValidator('srv-name', sets={'legacy'}), SrvNameRfcValidator('srv-name-rfc', sets={'strict'}), ]
Record.register_type(SrvRecord)