Coverage for apps / core / device_code_api.py: 95%
105 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-12 10:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-12 10:49 +0000
1"""Device code authorization flow API — only active in passkey mode."""
3import logging
5from django.conf import settings
6from django.contrib.auth import login
7from django.db import IntegrityError, transaction
8from django.db.models import F
9from django.utils import timezone
10from django_ratelimit.decorators import ratelimit
11from ninja import Router, Schema, Status
13from apps.core.auth import SessionAuth
14from apps.core.auth_helpers import passkey_user_profile_response, require_passkey_mode
15from apps.core.models import DeviceCode, generate_device_code
17security_logger = logging.getLogger("security")
19router = Router(tags=["device"])
21_AUTH_BACKEND = "django.contrib.auth.backends.ModelBackend"
24class AuthorizeIn(Schema):
25 code: str
28@router.post("/code/", response={201: dict, 429: dict})
29@ratelimit(key="ip", rate="10/h", method="POST", block=False)
30def request_code(request):
31 """Generate a new device pairing code."""
32 require_passkey_mode(request)
33 if getattr(request, "limited", False):
34 security_logger.warning(
35 "Rate limit hit: device/code/ from %s",
36 request.META.get("REMOTE_ADDR"),
37 )
38 return Status(429, {"error": "Too many attempts. Please try again later."})
40 # Ensure session exists
41 if not request.session.session_key:
42 request.session.create()
44 session_key = request.session.session_key
46 # Invalidate any existing pending/authorized codes for this session
47 DeviceCode.objects.filter(session_key=session_key, status__in=["pending", "authorized"]).update(
48 status="invalidated"
49 )
51 # Clean up expired codes for this session
52 DeviceCode.objects.filter(session_key=session_key, expires_at__lt=timezone.now()).exclude(
53 status="authorized"
54 ).delete()
56 # Generate unique code with retry, handling DB-level unique constraint
57 device_code = None
58 for _ in range(10):
59 code = generate_device_code()
60 try:
61 device_code = DeviceCode.objects.create(
62 code=code,
63 session_key=session_key,
64 attempts_remaining=settings.DEVICE_CODE_MAX_ATTEMPTS,
65 expires_at=timezone.now() + timezone.timedelta(seconds=settings.DEVICE_CODE_EXPIRY_SECONDS),
66 )
67 break
68 except IntegrityError:
69 continue
71 if device_code is None:
72 return Status(429, {"error": "Unable to generate code. Please try again."})
74 security_logger.info(
75 "Device code generated: session=%s from %s",
76 session_key[:8],
77 request.META.get("REMOTE_ADDR"),
78 )
80 return Status(
81 201,
82 {
83 "code": device_code.code,
84 "expires_in": settings.DEVICE_CODE_EXPIRY_SECONDS,
85 "poll_interval": 5,
86 "poll_url": "/api/auth/device/poll/",
87 },
88 )
91@router.get("/poll/", response={200: dict, 202: dict, 410: dict})
92@ratelimit(key="ip", rate="180/h", method="GET", block=False)
93@transaction.atomic
94def poll_status(request):
95 """Poll for device code authorization status."""
96 require_passkey_mode(request)
97 if getattr(request, "limited", False):
98 return Status(410, {"status": "expired", "error": "Too many requests."})
100 session_key = request.session.session_key
101 if not session_key:
102 return Status(410, {"status": "expired", "error": "No active code. Please request a new one."})
104 device_code = (
105 DeviceCode.objects.select_for_update(of=("self",))
106 .select_related("authorizing_user")
107 .filter(session_key=session_key, status__in=["pending", "authorized"])
108 .order_by("-created_at")
109 .first()
110 )
111 if device_code is None:
112 return Status(410, {"status": "expired", "error": "No active code. Please request a new one."})
114 # Check expiry
115 if device_code.is_expired and device_code.status == "pending":
116 device_code.status = "expired"
117 device_code.save(update_fields=["status"])
118 return Status(
119 410,
120 {
121 "status": "expired",
122 "error": "Code has expired. Please request a new one.",
123 },
124 )
126 if device_code.status == "pending":
127 return Status(202, {"status": "pending"})
129 # Authorized — verify authorizing_user exists
130 user = device_code.authorizing_user
131 if user is None:
132 device_code.status = "invalidated"
133 device_code.save(update_fields=["status"])
134 return Status(410, {"status": "expired", "error": "Authorization invalid. Please request a new code."})
136 # Establish session for this device
137 login(request, user, backend=_AUTH_BACKEND)
138 request.session["profile_id"] = user.profile.id
140 security_logger.info(
141 "Device code consumed: user_id=%s, session=%s from %s",
142 user.pk,
143 session_key[:8],
144 request.META.get("REMOTE_ADDR"),
145 )
147 # Mark code as consumed so it can't be re-polled
148 device_code.status = "expired"
149 device_code.save(update_fields=["status"])
151 response = passkey_user_profile_response(user, user.profile)
152 response["status"] = "authorized"
153 return Status(200, response)
156@router.post("/authorize/", response={200: dict, 400: dict, 429: dict}, auth=SessionAuth())
157@ratelimit(key="ip", rate="20/h", method="POST", block=False)
158@transaction.atomic
159def authorize_code(request, data: AuthorizeIn):
160 """Authorize a pending device code (called by authenticated modern device)."""
161 require_passkey_mode(request)
162 if getattr(request, "limited", False):
163 security_logger.warning(
164 "Rate limit hit: device/authorize/ from %s",
165 request.META.get("REMOTE_ADDR"),
166 )
167 return Status(429, {"error": "Too many attempts. Please try again later."})
169 normalized_code = data.code.strip().upper()
171 try:
172 device_code = DeviceCode.objects.select_for_update().get(code=normalized_code, status="pending")
173 except DeviceCode.DoesNotExist:
174 security_logger.warning(
175 "Device code authorize: invalid code from user_id=%s, ip=%s",
176 request.user.pk,
177 request.META.get("REMOTE_ADDR"),
178 )
179 return Status(400, {"error": "Invalid or expired code"})
181 # Check expiry
182 if device_code.is_expired:
183 device_code.status = "expired"
184 device_code.save(update_fields=["status"])
185 return Status(400, {"error": "Invalid or expired code"})
187 # Check and decrement attempts_remaining (FR-006, FR-007)
188 if device_code.attempts_remaining <= 0:
189 device_code.status = "invalidated"
190 device_code.save(update_fields=["status"])
191 security_logger.warning(
192 "Device code exhausted attempts: code=%s, user_id=%s",
193 normalized_code,
194 request.user.pk,
195 )
196 return Status(400, {"error": "Code invalidated: too many attempts"})
198 # Decrement attempts atomically
199 DeviceCode.objects.filter(pk=device_code.pk).update(attempts_remaining=F("attempts_remaining") - 1)
200 device_code.refresh_from_db()
202 # If attempts now exhausted after decrement, invalidate
203 if device_code.attempts_remaining <= 0:
204 device_code.status = "invalidated"
205 device_code.save(update_fields=["status"])
206 return Status(400, {"error": "Code invalidated: too many attempts"})
208 # Authorize the code
209 device_code.status = "authorized"
210 device_code.authorizing_user = request.user
211 device_code.save(update_fields=["status", "authorizing_user"])
213 security_logger.info(
214 "Device code authorized by user_id=%s from %s",
215 request.user.pk,
216 request.META.get("REMOTE_ADDR"),
217 )
219 return Status(200, {"message": "Device authorized"})