Coverage for apps / core / device_code_api.py: 93%

97 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-02 13:22 +0000

1"""Device code authorization flow API — only active in passkey mode.""" 

2 

3import logging 

4 

5from django.conf import settings 

6from django.contrib.auth import login 

7from django.db import IntegrityError, transaction 

8from django.http import Http404 

9from django.utils import timezone 

10from django_ratelimit.decorators import ratelimit 

11from ninja import Router, Schema, Status 

12 

13from apps.core.auth import SessionAuth 

14from apps.core.auth_helpers import passkey_user_profile_response, require_passkey_mode 

15from apps.core.models import DeviceCode, generate_device_code 

16 

17security_logger = logging.getLogger("security") 

18 

19router = Router(tags=["device"]) 

20 

21_AUTH_BACKEND = "django.contrib.auth.backends.ModelBackend" 

22 

23 

24class AuthorizeIn(Schema): 

25 code: str 

26 

27 

28@router.post("/code/", response={201: dict, 429: dict}) 

29@ratelimit(key="ip", rate="10/h", method="POST", block=False) 

30def request_code(request): 

31 """Generate a new device pairing code.""" 

32 require_passkey_mode(request) 

33 if getattr(request, "limited", False): 

34 security_logger.warning( 

35 "Rate limit hit: device/code/ from %s", 

36 request.META.get("REMOTE_ADDR"), 

37 ) 

38 return Status(429, {"error": "Too many attempts. Please try again later."}) 

39 

40 # Ensure session exists 

41 if not request.session.session_key: 

42 request.session.create() 

43 

44 session_key = request.session.session_key 

45 

46 # Invalidate any existing pending/authorized codes for this session 

47 DeviceCode.objects.filter(session_key=session_key, status__in=["pending", "authorized"]).update( 

48 status="invalidated" 

49 ) 

50 

51 # Clean up expired codes for this session 

52 DeviceCode.objects.filter(session_key=session_key, expires_at__lt=timezone.now()).exclude( 

53 status="authorized" 

54 ).delete() 

55 

56 # Generate unique code with retry, handling DB-level unique constraint 

57 device_code = None 

58 for _ in range(10): 

59 code = generate_device_code() 

60 try: 

61 device_code = DeviceCode.objects.create( 

62 code=code, 

63 session_key=session_key, 

64 expires_at=timezone.now() + timezone.timedelta(seconds=settings.DEVICE_CODE_EXPIRY_SECONDS), 

65 ) 

66 break 

67 except IntegrityError: 

68 continue 

69 

70 if device_code is None: 

71 return Status(429, {"error": "Unable to generate code. Please try again."}) 

72 

73 # F-17: Mark this session as having an in-flight device-code flow so 

74 # `poll_status` can distinguish legitimate pollers (410 UX) from 

75 # unauthenticated scanner probes (404, indistinguishable from never-existed). 

76 request.session["device_code_flow"] = True 

77 

78 security_logger.info( 

79 "Device code generated: session=%s from %s", 

80 session_key[:8], 

81 request.META.get("REMOTE_ADDR"), 

82 ) 

83 

84 return Status( 

85 201, 

86 { 

87 "code": device_code.code, 

88 "expires_in": settings.DEVICE_CODE_EXPIRY_SECONDS, 

89 "poll_interval": 5, 

90 "poll_url": "/api/auth/device/poll/", 

91 }, 

92 ) 

93 

94 

95@router.get("/poll/", response={200: dict, 202: dict, 410: dict}) 

96@ratelimit(key="ip", rate="180/h", method="GET", block=False) 

97@transaction.atomic 

98def poll_status(request): 

99 """Poll for device code authorization status.""" 

100 require_passkey_mode(request) 

101 

102 # F-17: Callers without an in-flight device-code flow must be 

103 # indistinguishable from a never-existed path (v1.42.0+ passkey-mode 

104 # invariant). The marker is set in `request_code` on successful code 

105 # generation; legitimate pollers keep the 410 UX below. 

106 if not request.session.get("device_code_flow"): 

107 raise Http404 

108 

109 if getattr(request, "limited", False): 

110 return Status(410, {"status": "expired", "error": "Too many requests."}) 

111 

112 session_key = request.session.session_key 

113 if not session_key: 

114 return Status(410, {"status": "expired", "error": "No active code. Please request a new one."}) 

115 

116 device_code = ( 

117 DeviceCode.objects.select_for_update(of=("self",)) 

118 .select_related("authorizing_user") 

119 .filter(session_key=session_key, status__in=["pending", "authorized"]) 

120 .order_by("-created_at") 

121 .first() 

122 ) 

123 if device_code is None: 

124 return Status(410, {"status": "expired", "error": "No active code. Please request a new one."}) 

125 

126 # Check expiry 

127 if device_code.is_expired and device_code.status == "pending": 

128 device_code.status = "expired" 

129 device_code.save(update_fields=["status"]) 

130 return Status( 

131 410, 

132 { 

133 "status": "expired", 

134 "error": "Code has expired. Please request a new one.", 

135 }, 

136 ) 

137 

138 if device_code.status == "pending": 

139 return Status(202, {"status": "pending"}) 

140 

141 # Authorized — verify authorizing_user exists 

142 user = device_code.authorizing_user 

143 if user is None: 

144 device_code.status = "invalidated" 

145 device_code.save(update_fields=["status"]) 

146 return Status(410, {"status": "expired", "error": "Authorization invalid. Please request a new code."}) 

147 

148 # Establish session for this device 

149 login(request, user, backend=_AUTH_BACKEND) 

150 request.session["profile_id"] = user.profile.id 

151 

152 security_logger.info( 

153 "Device code consumed: user_id=%s, session=%s from %s", 

154 user.pk, 

155 session_key[:8], 

156 request.META.get("REMOTE_ADDR"), 

157 ) 

158 

159 # Mark code as consumed so it can't be re-polled 

160 device_code.status = "expired" 

161 device_code.save(update_fields=["status"]) 

162 

163 response = passkey_user_profile_response(user, user.profile) 

164 response["status"] = "authorized" 

165 return Status(200, response) 

166 

167 

168@router.post("/authorize/", response={200: dict, 400: dict, 429: dict}, auth=SessionAuth()) 

169@ratelimit(key="ip", rate="20/h", method="POST", block=False) 

170@transaction.atomic 

171def authorize_code(request, data: AuthorizeIn): 

172 """Authorize a pending device code (called by authenticated modern device).""" 

173 require_passkey_mode(request) 

174 if getattr(request, "limited", False): 

175 security_logger.warning( 

176 "Rate limit hit: device/authorize/ from %s", 

177 request.META.get("REMOTE_ADDR"), 

178 ) 

179 return Status(429, {"error": "Too many attempts. Please try again later."}) 

180 

181 normalized_code = data.code.strip().upper() 

182 

183 try: 

184 device_code = DeviceCode.objects.select_for_update().get(code=normalized_code, status="pending") 

185 except DeviceCode.DoesNotExist: 

186 security_logger.warning( 

187 "Device code authorize: invalid code from user_id=%s, ip=%s", 

188 request.user.pk, 

189 request.META.get("REMOTE_ADDR"), 

190 ) 

191 return Status(400, {"error": "Invalid or expired code"}) 

192 

193 # Check expiry 

194 if device_code.is_expired: 

195 device_code.status = "expired" 

196 device_code.save(update_fields=["status"]) 

197 return Status(400, {"error": "Invalid or expired code"}) 

198 

199 # Authorize the code 

200 device_code.status = "authorized" 

201 device_code.authorizing_user = request.user 

202 device_code.save(update_fields=["status", "authorizing_user"]) 

203 

204 security_logger.info( 

205 "Device code authorized by user_id=%s from %s", 

206 request.user.pk, 

207 request.META.get("REMOTE_ADDR"), 

208 ) 

209 

210 return Status(200, {"message": "Device authorized"}) 

← Back to Dashboard