Coverage for apps / core / validators.py: 95%
60 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-12 10:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-12 10:49 +0000
1"""URL validation utilities for SSRF protection."""
3import ipaddress
4import logging
5import socket
6from dataclasses import dataclass
7from urllib.parse import urlparse
9logger = logging.getLogger(__name__)
11BLOCKED_NETWORKS = [
12 ipaddress.ip_network("127.0.0.0/8"),
13 ipaddress.ip_network("10.0.0.0/8"),
14 ipaddress.ip_network("172.16.0.0/12"),
15 ipaddress.ip_network("192.168.0.0/16"),
16 ipaddress.ip_network("169.254.0.0/16"),
17 ipaddress.ip_network("0.0.0.0/8"),
18 ipaddress.ip_network("::1/128"),
19 ipaddress.ip_network("fc00::/7"),
20 ipaddress.ip_network("fe80::/10"),
21]
23# Response size limits for external fetches
24MAX_HTML_SIZE = 10 * 1024 * 1024 # 10 MB
25MAX_IMAGE_SIZE = 50 * 1024 * 1024 # 50 MB
26MAX_REDIRECT_HOPS = 5
29def is_blocked_ip(ip_str):
30 """Check if an IP address falls within any blocked range."""
31 try:
32 addr = ipaddress.ip_address(ip_str)
33 except ValueError:
34 return True
35 return any(addr in network for network in BLOCKED_NETWORKS)
38def resolve_hostname(hostname):
39 """Resolve a hostname to its IP address via DNS."""
40 try:
41 results = socket.getaddrinfo(hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM)
42 except socket.gaierror as e:
43 raise ValueError(f"Could not resolve hostname: {hostname}") from e
44 if not results:
45 raise ValueError(f"Could not resolve hostname: {hostname}")
46 return results[0][4][0]
49@dataclass(frozen=True)
50class ResolvedURL:
51 """URL with pinned DNS resolution to prevent TOCTOU rebinding attacks.
53 Pass curl_resolve to curl_cffi session.get() to ensure the HTTP client
54 connects to the same IP that was validated, not a second DNS lookup.
55 """
57 url: str
58 hostname: str
59 ip: str
61 def __str__(self):
62 return self.url
64 @property
65 def curl_resolve(self):
66 """Resolve list for curl_cffi (maps to libcurl CURLOPT_RESOLVE)."""
67 return [f"{self.hostname}:80:{self.ip}", f"{self.hostname}:443:{self.ip}"]
70def validate_url(url):
71 """Validate a URL for SSRF protection. Returns ResolvedURL with pinned DNS."""
72 parsed = urlparse(url)
74 if parsed.scheme not in ("http", "https"):
75 raise ValueError(f"URL scheme not allowed: {parsed.scheme}")
77 hostname = parsed.hostname
78 if not hostname:
79 raise ValueError("URL has no hostname")
81 ip_str = resolve_hostname(hostname)
83 if is_blocked_ip(ip_str):
84 raise ValueError("URL not allowed: resolves to blocked IP range.")
86 return ResolvedURL(url=url, hostname=hostname, ip=ip_str)
89def validate_redirect_url(url):
90 """Validate a redirect destination URL against the SSRF blocklist.
92 Same as validate_url but with clearer logging for redirect chains.
93 Raises ValueError if the redirect target is blocked.
94 """
95 try:
96 return validate_url(url)
97 except ValueError:
98 logger.warning("Blocked redirect to SSRF-unsafe URL: %s", url)
99 raise
102def check_response_size(response, max_size):
103 """Check Content-Length header against max size. Returns True if safe.
105 Does not guarantee safety — Content-Length can be absent or spoofed.
106 Callers should also check actual content length after reading.
107 """
108 content_length = response.headers.get("content-length")
109 if content_length:
110 try:
111 if int(content_length) > max_size:
112 return False
113 except (ValueError, TypeError):
114 pass
115 return True
118def check_content_size(content, max_size):
119 """Check actual content size against limit. Raises ValueError if too large."""
120 if len(content) > max_size:
121 raise ValueError(f"Response too large: {len(content)} bytes (limit: {max_size})")