Coverage for apps / core / passkey_api.py: 98%
190 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 13:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 13:22 +0000
1"""Passkey (WebAuthn) authentication API endpoints — only active in passkey mode."""
3import json
4import logging
5import secrets
6import string
7import time
8import uuid
10from django.conf import settings
11from django.contrib.auth import login
12from django.contrib.auth.models import User
13from django.db import transaction
14from django.utils import timezone
15from django_ratelimit.decorators import ratelimit
16from ninja import Router, Status
17from webauthn import (
18 generate_authentication_options,
19 generate_registration_options,
20 options_to_json,
21 verify_authentication_response,
22 verify_registration_response,
23)
24from webauthn.helpers.structs import (
25 AuthenticatorSelectionCriteria,
26 PublicKeyCredentialDescriptor,
27 ResidentKeyRequirement,
28 UserVerificationRequirement,
29)
31from apps.core.auth import SessionAuth
32from apps.core.auth_helpers import passkey_user_profile_response, require_passkey_mode
33from apps.core.models import WebAuthnCredential
34from apps.profiles.models import Profile
36security_logger = logging.getLogger("security")
38router = Router(tags=["passkey"])
40_AUTH_BACKEND = "django.contrib.auth.backends.ModelBackend"
43def _get_rp_id(request):
44 """Get the Relying Party ID from settings or request hostname."""
45 if settings.WEBAUTHN_RP_ID:
46 return settings.WEBAUTHN_RP_ID
47 return request.get_host().split(":")[0]
50def _get_origin(request):
51 """Get the expected origin for WebAuthn verification.
53 When WEBAUTHN_RP_ORIGIN is set (production), returns that pinned value.
54 This decouples origin binding from the request Host header, preventing
55 X-Forwarded-Host injection from influencing the expected origin (F-33).
57 Falls back to deriving from request host when WEBAUTHN_RP_ORIGIN is not
58 set (development use without a fixed domain).
59 """
60 if settings.WEBAUTHN_RP_ORIGIN:
61 return settings.WEBAUTHN_RP_ORIGIN
62 scheme = "https" if request.is_secure() else "http"
63 return f"{scheme}://{request.get_host()}"
66# --- Registration ---
69@router.post("/register/options/", response={200: dict, 400: dict, 429: dict})
70@ratelimit(key="ip", rate="10/h", method="POST", block=False)
71def register_options(request):
72 """Generate WebAuthn registration challenge."""
73 require_passkey_mode(request)
74 if getattr(request, "limited", False):
75 security_logger.warning(
76 "Rate limit hit: passkey register/options/ from %s",
77 request.META.get("REMOTE_ADDR"),
78 )
79 return Status(429, {"error": "Too many attempts. Please try again later."})
81 user_id = uuid.uuid4().bytes
82 options = generate_registration_options(
83 rp_id=_get_rp_id(request),
84 rp_name=settings.WEBAUTHN_RP_NAME,
85 user_name="New User",
86 user_id=user_id,
87 user_display_name="New User",
88 authenticator_selection=AuthenticatorSelectionCriteria(
89 resident_key=ResidentKeyRequirement.REQUIRED,
90 user_verification=UserVerificationRequirement.REQUIRED,
91 ),
92 )
94 # Store challenge, user_id, and creation timestamp in session
95 request.session["webauthn_register_challenge"] = options.challenge.hex()
96 request.session["webauthn_register_user_id"] = user_id.hex()
97 request.session["webauthn_register_challenge_created_at"] = time.time()
99 return Status(200, json.loads(options_to_json(options)))
102@router.post("/register/verify/", response={201: dict, 400: dict, 429: dict})
103@ratelimit(key="ip", rate="10/h", method="POST", block=False)
104def register_verify(request):
105 """Verify registration response and create account."""
106 require_passkey_mode(request)
108 # Consume challenge BEFORE rate limit check to prevent replay (FR-011)
109 challenge_hex = request.session.pop("webauthn_register_challenge", None)
110 user_id_hex = request.session.pop("webauthn_register_user_id", None)
111 created_at = request.session.pop("webauthn_register_challenge_created_at", None)
113 if getattr(request, "limited", False):
114 security_logger.warning(
115 "Rate limit hit: passkey register/verify/ from %s",
116 request.META.get("REMOTE_ADDR"),
117 )
118 return Status(429, {"error": "Too many attempts. Please try again later."})
120 if not challenge_hex or not user_id_hex:
121 return Status(400, {"error": "Registration failed: no pending challenge"})
123 # Reject expired challenges (FR-010: 5-minute window)
124 if created_at and (time.time() - created_at) > 300:
125 return Status(400, {"error": "Registration failed: challenge expired"})
127 try:
128 body = json.loads(request.body)
129 except (json.JSONDecodeError, ValueError):
130 return Status(400, {"error": "Registration failed: invalid request body"})
132 try:
133 verification = verify_registration_response(
134 credential=body,
135 expected_challenge=bytes.fromhex(challenge_hex),
136 expected_rp_id=_get_rp_id(request),
137 expected_origin=_get_origin(request),
138 require_user_verification=True,
139 )
140 except Exception as e:
141 security_logger.warning(
142 "Passkey registration verification failed from %s: %s",
143 request.META.get("REMOTE_ADDR"),
144 str(e),
145 )
146 return Status(400, {"error": "Registration failed: verification error"})
148 user, profile = _create_passkey_user_and_profile(verification, body.get("transports"))
149 login(request, user, backend=_AUTH_BACKEND)
150 request.session["profile_id"] = profile.id
152 security_logger.info(
153 "Passkey registration: user_id=%s from %s",
154 user.pk,
155 request.META.get("REMOTE_ADDR"),
156 )
158 return Status(201, passkey_user_profile_response(user, profile))
161@transaction.atomic
162def _create_passkey_user_and_profile(verification, transports=None):
163 """Create User, Profile, and WebAuthnCredential atomically."""
164 username = f"pk_{uuid.uuid4().hex[:8]}"
165 user = User.objects.create_user(
166 username=username,
167 password=None,
168 email="",
169 is_active=True,
170 is_staff=False,
171 )
172 user.set_unusable_password()
173 user.save(update_fields=["password"])
175 # First char is forced to a letter so the suffix can never collapse to all
176 # digits. uuid.uuid4().hex[:6] is hex (0-9a-f) — ~5.6% of draws are all
177 # digits, which then matches `^User \d+$` and trips R23's "user count
178 # leak" guard. The remaining 5 hex chars carry the entropy.
179 suffix = secrets.choice(string.ascii_lowercase) + uuid.uuid4().hex[:5]
180 profile = Profile.objects.create(
181 user=user,
182 name=f"User {suffix}",
183 avatar_color=Profile.next_avatar_color(),
184 )
186 WebAuthnCredential.objects.create(
187 user=user,
188 credential_id=verification.credential_id,
189 public_key=verification.credential_public_key,
190 sign_count=verification.sign_count,
191 transports=transports,
192 )
194 return user, profile
197# --- Authentication ---
200@router.post("/login/options/", response={200: dict, 429: dict})
201@ratelimit(key="ip", rate="20/h", method="POST", block=False)
202def login_options(request):
203 """Generate WebAuthn authentication challenge."""
204 require_passkey_mode(request)
205 if getattr(request, "limited", False):
206 security_logger.warning(
207 "Rate limit hit: passkey login/options/ from %s",
208 request.META.get("REMOTE_ADDR"),
209 )
210 return Status(429, {"error": "Too many attempts. Please try again later."})
212 # If no credentials exist, don't issue a challenge — the browser would show
213 # confusing hardware-key / QR prompts with nothing to match against.
214 if not WebAuthnCredential.objects.exists():
215 return Status(200, {"no_credentials": True})
217 options = generate_authentication_options(
218 rp_id=_get_rp_id(request),
219 user_verification=UserVerificationRequirement.REQUIRED,
220 )
222 request.session["webauthn_login_challenge"] = options.challenge.hex()
223 request.session["webauthn_login_challenge_created_at"] = time.time()
225 return Status(200, json.loads(options_to_json(options)))
228@router.post("/login/verify/", response={200: dict, 401: dict, 429: dict})
229@ratelimit(key="ip", rate="20/h", method="POST", block=False)
230def login_verify(request):
231 """Verify authentication response and establish session."""
232 require_passkey_mode(request)
234 # Consume challenge BEFORE rate limit check to prevent replay (FR-011)
235 challenge_hex = request.session.pop("webauthn_login_challenge", None)
236 created_at = request.session.pop("webauthn_login_challenge_created_at", None)
238 if getattr(request, "limited", False):
239 security_logger.warning(
240 "Rate limit hit: passkey login/verify/ from %s",
241 request.META.get("REMOTE_ADDR"),
242 )
243 return Status(429, {"error": "Too many attempts. Please try again later."})
245 if not challenge_hex:
246 return Status(401, {"error": "Authentication failed: no pending challenge"})
248 # Reject expired challenges (FR-010: 5-minute window)
249 if created_at and (time.time() - created_at) > 300:
250 return Status(401, {"error": "Authentication failed: challenge expired"})
252 try:
253 body = json.loads(request.body)
254 except (json.JSONDecodeError, ValueError):
255 return Status(401, {"error": "Authentication failed: invalid request body"})
257 # Look up credential by credential_id
258 raw_id = body.get("rawId", "")
259 try:
260 from webauthn.helpers import base64url_to_bytes
262 credential_id_bytes = base64url_to_bytes(raw_id)
263 except Exception:
264 return Status(401, {"error": "Authentication failed"})
266 try:
267 credential = WebAuthnCredential.objects.select_related("user").get(credential_id=credential_id_bytes)
268 except WebAuthnCredential.DoesNotExist:
269 security_logger.warning(
270 "Passkey login: unknown credential from %s",
271 request.META.get("REMOTE_ADDR"),
272 )
273 return Status(401, {"error": "Authentication failed"})
275 if not credential.user.is_active:
276 return Status(401, {"error": "Authentication failed"})
278 try:
279 verification = verify_authentication_response(
280 credential=body,
281 expected_challenge=bytes.fromhex(challenge_hex),
282 expected_rp_id=_get_rp_id(request),
283 expected_origin=_get_origin(request),
284 credential_public_key=bytes(credential.public_key),
285 credential_current_sign_count=credential.sign_count,
286 require_user_verification=True,
287 )
288 except Exception as e:
289 security_logger.warning(
290 "Passkey login verification failed from %s: %s",
291 request.META.get("REMOTE_ADDR"),
292 str(e),
293 )
294 return Status(401, {"error": "Authentication failed"})
296 # Check sign_count for cloned authenticator
297 if verification.new_sign_count > 0 and verification.new_sign_count <= credential.sign_count:
298 security_logger.warning(
299 "Possible cloned authenticator: user_id=%s, sign_count went from %d to %d",
300 credential.user_id,
301 credential.sign_count,
302 verification.new_sign_count,
303 )
304 return Status(401, {"error": "Authentication failed"})
306 # Update credential
307 credential.sign_count = verification.new_sign_count
308 credential.last_used_at = timezone.now()
309 credential.save(update_fields=["sign_count", "last_used_at"])
311 login(request, credential.user, backend=_AUTH_BACKEND)
312 request.session["profile_id"] = credential.user.profile.id
314 security_logger.info(
315 "Passkey login: user_id=%s from %s",
316 credential.user_id,
317 request.META.get("REMOTE_ADDR"),
318 )
320 return Status(200, passkey_user_profile_response(credential.user, credential.user.profile))
323# --- Credential Management ---
326@router.get("/credentials/", response={200: dict}, auth=SessionAuth())
327def list_credentials(request):
328 """List current user's registered passkeys."""
329 require_passkey_mode(request)
330 credentials = WebAuthnCredential.objects.filter(user=request.user).order_by("created_at")
331 total = credentials.count()
333 return Status(
334 200,
335 {
336 "credentials": [
337 {
338 "id": c.pk,
339 "created_at": c.created_at.isoformat(),
340 "last_used_at": c.last_used_at.isoformat() if c.last_used_at else None,
341 "is_deletable": total > 1,
342 }
343 for c in credentials
344 ]
345 },
346 )
349@router.post("/credentials/add/options/", response={200: dict}, auth=SessionAuth())
350def add_credential_options(request):
351 """Generate registration options for adding an additional passkey."""
352 require_passkey_mode(request)
354 existing_creds = WebAuthnCredential.objects.filter(user=request.user)
355 exclude = [PublicKeyCredentialDescriptor(id=bytes(c.credential_id)) for c in existing_creds]
357 options = generate_registration_options(
358 rp_id=_get_rp_id(request),
359 rp_name=settings.WEBAUTHN_RP_NAME,
360 user_name=request.user.username,
361 user_id=request.user.pk.to_bytes(8, "big"),
362 user_display_name=request.auth.name,
363 authenticator_selection=AuthenticatorSelectionCriteria(
364 resident_key=ResidentKeyRequirement.REQUIRED,
365 user_verification=UserVerificationRequirement.REQUIRED,
366 ),
367 exclude_credentials=exclude,
368 )
370 request.session["webauthn_add_challenge"] = options.challenge.hex()
371 request.session["webauthn_add_challenge_created_at"] = time.time()
373 return Status(200, json.loads(options_to_json(options)))
376@router.post("/credentials/add/verify/", response={201: dict, 400: dict}, auth=SessionAuth())
377def add_credential_verify(request):
378 """Verify and store additional passkey for authenticated user."""
379 require_passkey_mode(request)
381 challenge_hex = request.session.pop("webauthn_add_challenge", None)
382 created_at = request.session.pop("webauthn_add_challenge_created_at", None)
383 if not challenge_hex:
384 return Status(400, {"error": "No pending challenge"})
386 # Reject expired challenges (FR-010: 5-minute window)
387 if created_at and (time.time() - created_at) > 300:
388 return Status(400, {"error": "Challenge expired"})
390 try:
391 body = json.loads(request.body)
392 except (json.JSONDecodeError, ValueError):
393 return Status(400, {"error": "Invalid request body"})
395 try:
396 verification = verify_registration_response(
397 credential=body,
398 expected_challenge=bytes.fromhex(challenge_hex),
399 expected_rp_id=_get_rp_id(request),
400 expected_origin=_get_origin(request),
401 require_user_verification=True,
402 )
403 except Exception as e:
404 security_logger.warning(
405 "Add credential verification failed for user_id=%s: %s",
406 request.user.pk,
407 str(e),
408 )
409 return Status(400, {"error": "Verification failed"})
411 cred = WebAuthnCredential.objects.create(
412 user=request.user,
413 credential_id=verification.credential_id,
414 public_key=verification.credential_public_key,
415 sign_count=verification.sign_count,
416 transports=body.get("transports"),
417 )
419 security_logger.info(
420 "Passkey added: user_id=%s, credential_pk=%s",
421 request.user.pk,
422 cred.pk,
423 )
425 return Status(
426 201,
427 {
428 "credential": {
429 "id": cred.pk,
430 "created_at": cred.created_at.isoformat(),
431 "last_used_at": None,
432 "is_deletable": True,
433 }
434 },
435 )
438@router.delete(
439 "/credentials/{credential_id}/",
440 response={200: dict, 400: dict, 404: dict},
441 auth=SessionAuth(),
442)
443@transaction.atomic
444def delete_credential(request, credential_id: int):
445 """Delete a registered passkey."""
446 require_passkey_mode(request)
448 try:
449 cred = WebAuthnCredential.objects.select_for_update().get(pk=credential_id, user=request.user)
450 except WebAuthnCredential.DoesNotExist:
451 return Status(404, {"error": "Credential not found"})
453 total = WebAuthnCredential.objects.select_for_update().filter(user=request.user).count()
454 if total <= 1:
455 return Status(400, {"error": "Cannot delete your only passkey"})
457 cred.delete()
458 security_logger.info(
459 "Passkey deleted: user_id=%s, credential_pk=%s",
460 request.user.pk,
461 credential_id,
462 )
463 return Status(200, {"message": "Passkey deleted"})