Coverage for apps / core / device_code_api.py: 95%

105 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-12 10:49 +0000

1"""Device code authorization flow API — only active in passkey mode.""" 

2 

3import logging 

4 

5from django.conf import settings 

6from django.contrib.auth import login 

7from django.db import IntegrityError, transaction 

8from django.db.models import F 

9from django.utils import timezone 

10from django_ratelimit.decorators import ratelimit 

11from ninja import Router, Schema, Status 

12 

13from apps.core.auth import SessionAuth 

14from apps.core.auth_helpers import passkey_user_profile_response, require_passkey_mode 

15from apps.core.models import DeviceCode, generate_device_code 

16 

17security_logger = logging.getLogger("security") 

18 

19router = Router(tags=["device"]) 

20 

21_AUTH_BACKEND = "django.contrib.auth.backends.ModelBackend" 

22 

23 

24class AuthorizeIn(Schema): 

25 code: str 

26 

27 

28@router.post("/code/", response={201: dict, 429: dict}) 

29@ratelimit(key="ip", rate="10/h", method="POST", block=False) 

30def request_code(request): 

31 """Generate a new device pairing code.""" 

32 require_passkey_mode(request) 

33 if getattr(request, "limited", False): 

34 security_logger.warning( 

35 "Rate limit hit: device/code/ from %s", 

36 request.META.get("REMOTE_ADDR"), 

37 ) 

38 return Status(429, {"error": "Too many attempts. Please try again later."}) 

39 

40 # Ensure session exists 

41 if not request.session.session_key: 

42 request.session.create() 

43 

44 session_key = request.session.session_key 

45 

46 # Invalidate any existing pending/authorized codes for this session 

47 DeviceCode.objects.filter(session_key=session_key, status__in=["pending", "authorized"]).update( 

48 status="invalidated" 

49 ) 

50 

51 # Clean up expired codes for this session 

52 DeviceCode.objects.filter(session_key=session_key, expires_at__lt=timezone.now()).exclude( 

53 status="authorized" 

54 ).delete() 

55 

56 # Generate unique code with retry, handling DB-level unique constraint 

57 device_code = None 

58 for _ in range(10): 

59 code = generate_device_code() 

60 try: 

61 device_code = DeviceCode.objects.create( 

62 code=code, 

63 session_key=session_key, 

64 attempts_remaining=settings.DEVICE_CODE_MAX_ATTEMPTS, 

65 expires_at=timezone.now() + timezone.timedelta(seconds=settings.DEVICE_CODE_EXPIRY_SECONDS), 

66 ) 

67 break 

68 except IntegrityError: 

69 continue 

70 

71 if device_code is None: 

72 return Status(429, {"error": "Unable to generate code. Please try again."}) 

73 

74 security_logger.info( 

75 "Device code generated: session=%s from %s", 

76 session_key[:8], 

77 request.META.get("REMOTE_ADDR"), 

78 ) 

79 

80 return Status( 

81 201, 

82 { 

83 "code": device_code.code, 

84 "expires_in": settings.DEVICE_CODE_EXPIRY_SECONDS, 

85 "poll_interval": 5, 

86 "poll_url": "/api/auth/device/poll/", 

87 }, 

88 ) 

89 

90 

91@router.get("/poll/", response={200: dict, 202: dict, 410: dict}) 

92@ratelimit(key="ip", rate="180/h", method="GET", block=False) 

93@transaction.atomic 

94def poll_status(request): 

95 """Poll for device code authorization status.""" 

96 require_passkey_mode(request) 

97 if getattr(request, "limited", False): 

98 return Status(410, {"status": "expired", "error": "Too many requests."}) 

99 

100 session_key = request.session.session_key 

101 if not session_key: 

102 return Status(410, {"status": "expired", "error": "No active code. Please request a new one."}) 

103 

104 device_code = ( 

105 DeviceCode.objects.select_for_update(of=("self",)) 

106 .select_related("authorizing_user") 

107 .filter(session_key=session_key, status__in=["pending", "authorized"]) 

108 .order_by("-created_at") 

109 .first() 

110 ) 

111 if device_code is None: 

112 return Status(410, {"status": "expired", "error": "No active code. Please request a new one."}) 

113 

114 # Check expiry 

115 if device_code.is_expired and device_code.status == "pending": 

116 device_code.status = "expired" 

117 device_code.save(update_fields=["status"]) 

118 return Status( 

119 410, 

120 { 

121 "status": "expired", 

122 "error": "Code has expired. Please request a new one.", 

123 }, 

124 ) 

125 

126 if device_code.status == "pending": 

127 return Status(202, {"status": "pending"}) 

128 

129 # Authorized — verify authorizing_user exists 

130 user = device_code.authorizing_user 

131 if user is None: 

132 device_code.status = "invalidated" 

133 device_code.save(update_fields=["status"]) 

134 return Status(410, {"status": "expired", "error": "Authorization invalid. Please request a new code."}) 

135 

136 # Establish session for this device 

137 login(request, user, backend=_AUTH_BACKEND) 

138 request.session["profile_id"] = user.profile.id 

139 

140 security_logger.info( 

141 "Device code consumed: user_id=%s, session=%s from %s", 

142 user.pk, 

143 session_key[:8], 

144 request.META.get("REMOTE_ADDR"), 

145 ) 

146 

147 # Mark code as consumed so it can't be re-polled 

148 device_code.status = "expired" 

149 device_code.save(update_fields=["status"]) 

150 

151 response = passkey_user_profile_response(user, user.profile) 

152 response["status"] = "authorized" 

153 return Status(200, response) 

154 

155 

156@router.post("/authorize/", response={200: dict, 400: dict, 429: dict}, auth=SessionAuth()) 

157@ratelimit(key="ip", rate="20/h", method="POST", block=False) 

158@transaction.atomic 

159def authorize_code(request, data: AuthorizeIn): 

160 """Authorize a pending device code (called by authenticated modern device).""" 

161 require_passkey_mode(request) 

162 if getattr(request, "limited", False): 

163 security_logger.warning( 

164 "Rate limit hit: device/authorize/ from %s", 

165 request.META.get("REMOTE_ADDR"), 

166 ) 

167 return Status(429, {"error": "Too many attempts. Please try again later."}) 

168 

169 normalized_code = data.code.strip().upper() 

170 

171 try: 

172 device_code = DeviceCode.objects.select_for_update().get(code=normalized_code, status="pending") 

173 except DeviceCode.DoesNotExist: 

174 security_logger.warning( 

175 "Device code authorize: invalid code from user_id=%s, ip=%s", 

176 request.user.pk, 

177 request.META.get("REMOTE_ADDR"), 

178 ) 

179 return Status(400, {"error": "Invalid or expired code"}) 

180 

181 # Check expiry 

182 if device_code.is_expired: 

183 device_code.status = "expired" 

184 device_code.save(update_fields=["status"]) 

185 return Status(400, {"error": "Invalid or expired code"}) 

186 

187 # Check and decrement attempts_remaining (FR-006, FR-007) 

188 if device_code.attempts_remaining <= 0: 

189 device_code.status = "invalidated" 

190 device_code.save(update_fields=["status"]) 

191 security_logger.warning( 

192 "Device code exhausted attempts: code=%s, user_id=%s", 

193 normalized_code, 

194 request.user.pk, 

195 ) 

196 return Status(400, {"error": "Code invalidated: too many attempts"}) 

197 

198 # Decrement attempts atomically 

199 DeviceCode.objects.filter(pk=device_code.pk).update(attempts_remaining=F("attempts_remaining") - 1) 

200 device_code.refresh_from_db() 

201 

202 # If attempts now exhausted after decrement, invalidate 

203 if device_code.attempts_remaining <= 0: 

204 device_code.status = "invalidated" 

205 device_code.save(update_fields=["status"]) 

206 return Status(400, {"error": "Code invalidated: too many attempts"}) 

207 

208 # Authorize the code 

209 device_code.status = "authorized" 

210 device_code.authorizing_user = request.user 

211 device_code.save(update_fields=["status", "authorizing_user"]) 

212 

213 security_logger.info( 

214 "Device code authorized by user_id=%s from %s", 

215 request.user.pk, 

216 request.META.get("REMOTE_ADDR"), 

217 ) 

218 

219 return Status(200, {"message": "Device authorized"}) 

← Back to Dashboard