#
#
#
import re
from ..equality import EqualityTupleMixin
from ..idna import idna_encode
from .base import Record, ValuesMixin, unquote
from .rr import RrParseError
from .target import (
_check_target_format,
_check_target_not_ip,
_check_target_trailing_dot,
)
from .validator import RecordValidator, ValueValidator
[docs]
class SrvNameValidator(RecordValidator):
'''
Validates that an SRV record's name matches the
``_service._protocol`` pattern required by RFC 2782 (e.g.
``_http._tcp``), or is a wildcard.
'''
_name_re = re.compile(r'^(\*|_[^\.]+)\.[^\.]+')
[docs]
def validate(self, record_cls, name, fqdn, data):
if not self._name_re.match(name):
return ['invalid name for SRV record']
return []
[docs]
class SrvValueValidator(ValueValidator):
'''
Validates SRV rdata: priority, weight, and port are present and
integer-parsable, and target is a valid FQDN.
'''
[docs]
def validate(self, value_cls, data, _type):
reasons = []
for value in data:
# TODO: validate algorithm and fingerprint_type values
try:
int(value['priority'])
except KeyError:
reasons.append('missing priority')
except ValueError:
reasons.append(f'invalid priority "{value["priority"]}"')
try:
int(value['weight'])
except KeyError:
reasons.append('missing weight')
except ValueError:
reasons.append(f'invalid weight "{value["weight"]}"')
try:
int(value['port'])
except KeyError:
reasons.append('missing port')
except ValueError:
reasons.append(f'invalid port "{value["port"]}"')
try:
target = value['target']
reasons += _check_target_format(target, _type, 'target')
except KeyError:
reasons.append('missing target')
return reasons
[docs]
class SrvNameRfcValidator(RecordValidator):
'''
Strict SRV name validator per RFC 2782 and RFC 6335 §5.1.
Requires the first two labels of the record name to be
``_service._proto`` (``*._proto`` is still accepted for wildcards).
Both the service and proto label bodies (after the leading ``_``)
must conform to the RFC 6335 §5.1 service name syntax: 1-15
characters, starting with a letter, ending with a letter or digit,
containing only letters, digits, and hyphens, and with no
consecutive hyphens.
Enabled as part of the ``rfc`` validator set::
manager:
enabled:
- rfc
'''
_max_len = 15
[docs]
@classmethod
def _is_valid_service_name(cls, body):
if not body or len(body) > cls._max_len:
return False
if not body[0].isalpha():
return False
if not body[-1].isalnum():
return False
if '--' in body:
return False
return all(c.isalnum() or c == '-' for c in body)
[docs]
def validate(self, record_cls, name, fqdn, data):
labels = name.split('.') if name else []
if len(labels) < 2:
return ['SRV name must have at least two labels (_service._proto)']
reasons = []
service, proto = labels[0], labels[1]
if service != '*' and not (
service.startswith('_') and self._is_valid_service_name(service[1:])
):
reasons.append(f'invalid SRV service label "{service}"')
if not (
proto.startswith('_') and self._is_valid_service_name(proto[1:])
):
reasons.append(f'invalid SRV proto label "{proto}"')
return reasons
[docs]
class SrvValueRfcValidator(ValueValidator):
'''
Strict SRV rdata validator per RFC 2782.
- ``priority``, ``weight``, and ``port`` must each be in the
0-65535 range.
- When ``target`` is ``"."``, ``priority``, ``weight``, and
``port`` must all be ``0`` (RFC 2782 "service not available"
convention).
- When ``target`` is not ``"."``, ``port`` must be greater than
0 (port 0 is IANA-reserved).
Assumes the base ``SrvValueValidator`` has already caught missing
or non-integer fields; entries that fail those checks are skipped
here to avoid duplicated reasons. Enabled as part of the ``rfc``
validator set::
manager:
enabled:
- rfc
'''
[docs]
@staticmethod
def _as_int(value, field):
try:
return int(value[field])
except (KeyError, ValueError, TypeError):
return None
[docs]
def validate(self, value_cls, data, _type):
reasons = []
for value in data:
fields = {
name: self._as_int(value, name)
for name in ('priority', 'weight', 'port')
}
for name, v in fields.items():
if v is not None and not 0 <= v <= 65535:
reasons.append(f'{name} "{v}" out of range 0-65535')
target = value.get('target')
if target == '.':
for name, v in fields.items():
if v is not None and v != 0:
reasons.append(f'{name} must be 0 when target is "."')
elif target and fields['port'] == 0:
reasons.append(
'port 0 is reserved; must be > 0 when target is not "."'
)
return reasons
[docs]
class SrvValueBestPracticeValidator(ValueValidator):
'''
Checks that the SRV ``target`` field ends with a trailing ``.``
(fully-qualified name).
Enabled as part of the ``best-practice`` validator set::
manager:
enabled:
- best-practice
'''
[docs]
def validate(self, value_cls, data, _type):
reasons = []
for value in data:
target = value.get('target')
if target:
reasons += _check_target_trailing_dot(target, _type, 'target')
return reasons
[docs]
class SrvValueNotIpValidator(ValueValidator):
'''
Checks that the SRV ``target`` field is not an IP address.
'''
[docs]
def validate(self, value_cls, data, _type):
reasons = []
for value in data:
target = value.get('target')
if target:
reasons += _check_target_not_ip(target, _type, 'target')
return reasons
[docs]
class SrvValue(EqualityTupleMixin, dict):
VALIDATORS = [
SrvValueValidator('srv-value', sets={'legacy'}),
SrvValueRfcValidator('srv-value-rfc', sets={'strict'}),
SrvValueNotIpValidator(
'srv-value-not-ip', sets={'strict', 'best-practice'}
),
SrvValueBestPracticeValidator(
'srv-value-best-practice', sets={'best-practice'}
),
]
[docs]
@classmethod
def _schema(cls):
return {
'type': 'object',
'required': ['priority', 'weight', 'port', 'target'],
'properties': {
'priority': {'type': 'integer', 'minimum': 0, 'maximum': 65535},
'weight': {'type': 'integer', 'minimum': 0, 'maximum': 65535},
'port': {'type': 'integer', 'minimum': 0, 'maximum': 65535},
'target': {'type': 'string'},
},
}
[docs]
@classmethod
def parse_rdata_text(self, value):
try:
priority, weight, port, target = value.split(' ')
except ValueError:
raise RrParseError()
try:
priority = int(priority)
except ValueError:
pass
try:
weight = int(weight)
except ValueError:
pass
try:
port = int(port)
except ValueError:
pass
target = unquote(target)
return {
'priority': priority,
'weight': weight,
'port': port,
'target': target,
}
[docs]
@classmethod
def process(cls, values):
return [cls(v) for v in values]
[docs]
def __init__(self, value):
super().__init__(
{
'priority': int(value['priority']),
'weight': int(value['weight']),
'port': int(value['port']),
'target': idna_encode(value['target']),
}
)
@property
def priority(self):
return self['priority']
@priority.setter
def priority(self, value):
self['priority'] = value
@property
def weight(self):
return self['weight']
@weight.setter
def weight(self, value):
self['weight'] = value
@property
def port(self):
return self['port']
@port.setter
def port(self, value):
self['port'] = value
@property
def target(self):
return self['target']
@target.setter
def target(self, value):
self['target'] = value
@property
def data(self):
return self
@property
def rdata_text(self):
return f"{self.priority} {self.weight} {self.port} {self.target}"
[docs]
def template(self, params):
if '{' not in self.target:
return self
new = self.__class__(self)
new.target = new.target.format(**params)
return new
def __hash__(self):
return hash(self.__repr__())
[docs]
def _equality_tuple(self):
return (self.priority, self.weight, self.port, self.target)
[docs]
def __repr__(self):
return f"'{self.priority} {self.weight} {self.port} {self.target}'"
[docs]
class SrvRecord(ValuesMixin, Record):
REFERENCES = (
'https://datatracker.ietf.org/doc/html/rfc2782',
'https://datatracker.ietf.org/doc/html/rfc6335',
)
_type = 'SRV'
_value_type = SrvValue
VALIDATORS = [
SrvNameValidator('srv-name', sets={'legacy'}),
SrvNameRfcValidator('srv-name-rfc', sets={'strict'}),
]
Record.register_type(SrvRecord)