Coverage for apps / core / device_code_api.py: 93%
97 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 13:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 13:22 +0000
1"""Device code authorization flow API — only active in passkey mode."""
3import logging
5from django.conf import settings
6from django.contrib.auth import login
7from django.db import IntegrityError, transaction
8from django.http import Http404
9from django.utils import timezone
10from django_ratelimit.decorators import ratelimit
11from ninja import Router, Schema, Status
13from apps.core.auth import SessionAuth
14from apps.core.auth_helpers import passkey_user_profile_response, require_passkey_mode
15from apps.core.models import DeviceCode, generate_device_code
17security_logger = logging.getLogger("security")
19router = Router(tags=["device"])
21_AUTH_BACKEND = "django.contrib.auth.backends.ModelBackend"
24class AuthorizeIn(Schema):
25 code: str
28@router.post("/code/", response={201: dict, 429: dict})
29@ratelimit(key="ip", rate="10/h", method="POST", block=False)
30def request_code(request):
31 """Generate a new device pairing code."""
32 require_passkey_mode(request)
33 if getattr(request, "limited", False):
34 security_logger.warning(
35 "Rate limit hit: device/code/ from %s",
36 request.META.get("REMOTE_ADDR"),
37 )
38 return Status(429, {"error": "Too many attempts. Please try again later."})
40 # Ensure session exists
41 if not request.session.session_key:
42 request.session.create()
44 session_key = request.session.session_key
46 # Invalidate any existing pending/authorized codes for this session
47 DeviceCode.objects.filter(session_key=session_key, status__in=["pending", "authorized"]).update(
48 status="invalidated"
49 )
51 # Clean up expired codes for this session
52 DeviceCode.objects.filter(session_key=session_key, expires_at__lt=timezone.now()).exclude(
53 status="authorized"
54 ).delete()
56 # Generate unique code with retry, handling DB-level unique constraint
57 device_code = None
58 for _ in range(10):
59 code = generate_device_code()
60 try:
61 device_code = DeviceCode.objects.create(
62 code=code,
63 session_key=session_key,
64 expires_at=timezone.now() + timezone.timedelta(seconds=settings.DEVICE_CODE_EXPIRY_SECONDS),
65 )
66 break
67 except IntegrityError:
68 continue
70 if device_code is None:
71 return Status(429, {"error": "Unable to generate code. Please try again."})
73 # F-17: Mark this session as having an in-flight device-code flow so
74 # `poll_status` can distinguish legitimate pollers (410 UX) from
75 # unauthenticated scanner probes (404, indistinguishable from never-existed).
76 request.session["device_code_flow"] = True
78 security_logger.info(
79 "Device code generated: session=%s from %s",
80 session_key[:8],
81 request.META.get("REMOTE_ADDR"),
82 )
84 return Status(
85 201,
86 {
87 "code": device_code.code,
88 "expires_in": settings.DEVICE_CODE_EXPIRY_SECONDS,
89 "poll_interval": 5,
90 "poll_url": "/api/auth/device/poll/",
91 },
92 )
95@router.get("/poll/", response={200: dict, 202: dict, 410: dict})
96@ratelimit(key="ip", rate="180/h", method="GET", block=False)
97@transaction.atomic
98def poll_status(request):
99 """Poll for device code authorization status."""
100 require_passkey_mode(request)
102 # F-17: Callers without an in-flight device-code flow must be
103 # indistinguishable from a never-existed path (v1.42.0+ passkey-mode
104 # invariant). The marker is set in `request_code` on successful code
105 # generation; legitimate pollers keep the 410 UX below.
106 if not request.session.get("device_code_flow"):
107 raise Http404
109 if getattr(request, "limited", False):
110 return Status(410, {"status": "expired", "error": "Too many requests."})
112 session_key = request.session.session_key
113 if not session_key:
114 return Status(410, {"status": "expired", "error": "No active code. Please request a new one."})
116 device_code = (
117 DeviceCode.objects.select_for_update(of=("self",))
118 .select_related("authorizing_user")
119 .filter(session_key=session_key, status__in=["pending", "authorized"])
120 .order_by("-created_at")
121 .first()
122 )
123 if device_code is None:
124 return Status(410, {"status": "expired", "error": "No active code. Please request a new one."})
126 # Check expiry
127 if device_code.is_expired and device_code.status == "pending":
128 device_code.status = "expired"
129 device_code.save(update_fields=["status"])
130 return Status(
131 410,
132 {
133 "status": "expired",
134 "error": "Code has expired. Please request a new one.",
135 },
136 )
138 if device_code.status == "pending":
139 return Status(202, {"status": "pending"})
141 # Authorized — verify authorizing_user exists
142 user = device_code.authorizing_user
143 if user is None:
144 device_code.status = "invalidated"
145 device_code.save(update_fields=["status"])
146 return Status(410, {"status": "expired", "error": "Authorization invalid. Please request a new code."})
148 # Establish session for this device
149 login(request, user, backend=_AUTH_BACKEND)
150 request.session["profile_id"] = user.profile.id
152 security_logger.info(
153 "Device code consumed: user_id=%s, session=%s from %s",
154 user.pk,
155 session_key[:8],
156 request.META.get("REMOTE_ADDR"),
157 )
159 # Mark code as consumed so it can't be re-polled
160 device_code.status = "expired"
161 device_code.save(update_fields=["status"])
163 response = passkey_user_profile_response(user, user.profile)
164 response["status"] = "authorized"
165 return Status(200, response)
168@router.post("/authorize/", response={200: dict, 400: dict, 429: dict}, auth=SessionAuth())
169@ratelimit(key="ip", rate="20/h", method="POST", block=False)
170@transaction.atomic
171def authorize_code(request, data: AuthorizeIn):
172 """Authorize a pending device code (called by authenticated modern device)."""
173 require_passkey_mode(request)
174 if getattr(request, "limited", False):
175 security_logger.warning(
176 "Rate limit hit: device/authorize/ from %s",
177 request.META.get("REMOTE_ADDR"),
178 )
179 return Status(429, {"error": "Too many attempts. Please try again later."})
181 normalized_code = data.code.strip().upper()
183 try:
184 device_code = DeviceCode.objects.select_for_update().get(code=normalized_code, status="pending")
185 except DeviceCode.DoesNotExist:
186 security_logger.warning(
187 "Device code authorize: invalid code from user_id=%s, ip=%s",
188 request.user.pk,
189 request.META.get("REMOTE_ADDR"),
190 )
191 return Status(400, {"error": "Invalid or expired code"})
193 # Check expiry
194 if device_code.is_expired:
195 device_code.status = "expired"
196 device_code.save(update_fields=["status"])
197 return Status(400, {"error": "Invalid or expired code"})
199 # Authorize the code
200 device_code.status = "authorized"
201 device_code.authorizing_user = request.user
202 device_code.save(update_fields=["status", "authorizing_user"])
204 security_logger.info(
205 "Device code authorized by user_id=%s from %s",
206 request.user.pk,
207 request.META.get("REMOTE_ADDR"),
208 )
210 return Status(200, {"message": "Device authorized"})