Coverage for apps / core / validators.py: 95%

60 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-12 10:49 +0000

1"""URL validation utilities for SSRF protection.""" 

2 

3import ipaddress 

4import logging 

5import socket 

6from dataclasses import dataclass 

7from urllib.parse import urlparse 

8 

9logger = logging.getLogger(__name__) 

10 

11BLOCKED_NETWORKS = [ 

12 ipaddress.ip_network("127.0.0.0/8"), 

13 ipaddress.ip_network("10.0.0.0/8"), 

14 ipaddress.ip_network("172.16.0.0/12"), 

15 ipaddress.ip_network("192.168.0.0/16"), 

16 ipaddress.ip_network("169.254.0.0/16"), 

17 ipaddress.ip_network("0.0.0.0/8"), 

18 ipaddress.ip_network("::1/128"), 

19 ipaddress.ip_network("fc00::/7"), 

20 ipaddress.ip_network("fe80::/10"), 

21] 

22 

23# Response size limits for external fetches 

24MAX_HTML_SIZE = 10 * 1024 * 1024 # 10 MB 

25MAX_IMAGE_SIZE = 50 * 1024 * 1024 # 50 MB 

26MAX_REDIRECT_HOPS = 5 

27 

28 

29def is_blocked_ip(ip_str): 

30 """Check if an IP address falls within any blocked range.""" 

31 try: 

32 addr = ipaddress.ip_address(ip_str) 

33 except ValueError: 

34 return True 

35 return any(addr in network for network in BLOCKED_NETWORKS) 

36 

37 

38def resolve_hostname(hostname): 

39 """Resolve a hostname to its IP address via DNS.""" 

40 try: 

41 results = socket.getaddrinfo(hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM) 

42 except socket.gaierror as e: 

43 raise ValueError(f"Could not resolve hostname: {hostname}") from e 

44 if not results: 

45 raise ValueError(f"Could not resolve hostname: {hostname}") 

46 return results[0][4][0] 

47 

48 

49@dataclass(frozen=True) 

50class ResolvedURL: 

51 """URL with pinned DNS resolution to prevent TOCTOU rebinding attacks. 

52 

53 Pass curl_resolve to curl_cffi session.get() to ensure the HTTP client 

54 connects to the same IP that was validated, not a second DNS lookup. 

55 """ 

56 

57 url: str 

58 hostname: str 

59 ip: str 

60 

61 def __str__(self): 

62 return self.url 

63 

64 @property 

65 def curl_resolve(self): 

66 """Resolve list for curl_cffi (maps to libcurl CURLOPT_RESOLVE).""" 

67 return [f"{self.hostname}:80:{self.ip}", f"{self.hostname}:443:{self.ip}"] 

68 

69 

70def validate_url(url): 

71 """Validate a URL for SSRF protection. Returns ResolvedURL with pinned DNS.""" 

72 parsed = urlparse(url) 

73 

74 if parsed.scheme not in ("http", "https"): 

75 raise ValueError(f"URL scheme not allowed: {parsed.scheme}") 

76 

77 hostname = parsed.hostname 

78 if not hostname: 

79 raise ValueError("URL has no hostname") 

80 

81 ip_str = resolve_hostname(hostname) 

82 

83 if is_blocked_ip(ip_str): 

84 raise ValueError("URL not allowed: resolves to blocked IP range.") 

85 

86 return ResolvedURL(url=url, hostname=hostname, ip=ip_str) 

87 

88 

89def validate_redirect_url(url): 

90 """Validate a redirect destination URL against the SSRF blocklist. 

91 

92 Same as validate_url but with clearer logging for redirect chains. 

93 Raises ValueError if the redirect target is blocked. 

94 """ 

95 try: 

96 return validate_url(url) 

97 except ValueError: 

98 logger.warning("Blocked redirect to SSRF-unsafe URL: %s", url) 

99 raise 

100 

101 

102def check_response_size(response, max_size): 

103 """Check Content-Length header against max size. Returns True if safe. 

104 

105 Does not guarantee safety — Content-Length can be absent or spoofed. 

106 Callers should also check actual content length after reading. 

107 """ 

108 content_length = response.headers.get("content-length") 

109 if content_length: 

110 try: 

111 if int(content_length) > max_size: 

112 return False 

113 except (ValueError, TypeError): 

114 pass 

115 return True 

116 

117 

118def check_content_size(content, max_size): 

119 """Check actual content size against limit. Raises ValueError if too large.""" 

120 if len(content) > max_size: 

121 raise ValueError(f"Response too large: {len(content)} bytes (limit: {max_size})") 

← Back to Dashboard