Coverage for apps / core / passkey_api.py: 98%
186 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-12 10:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-12 10:49 +0000
1"""Passkey (WebAuthn) authentication API endpoints — only active in passkey mode."""
3import json
4import logging
5import time
6import uuid
8from django.conf import settings
9from django.contrib.auth import login
10from django.contrib.auth.models import User
11from django.db import transaction
12from django.utils import timezone
13from django_ratelimit.decorators import ratelimit
14from ninja import Router, Status
15from webauthn import (
16 generate_authentication_options,
17 generate_registration_options,
18 options_to_json,
19 verify_authentication_response,
20 verify_registration_response,
21)
22from webauthn.helpers.structs import (
23 AuthenticatorSelectionCriteria,
24 PublicKeyCredentialDescriptor,
25 ResidentKeyRequirement,
26 UserVerificationRequirement,
27)
29from apps.core.auth import SessionAuth
30from apps.core.auth_helpers import passkey_user_profile_response, require_passkey_mode
31from apps.core.models import WebAuthnCredential
32from apps.profiles.models import Profile
34security_logger = logging.getLogger("security")
36router = Router(tags=["passkey"])
38_AUTH_BACKEND = "django.contrib.auth.backends.ModelBackend"
41def _get_rp_id(request):
42 """Get the Relying Party ID from settings or request hostname."""
43 if settings.WEBAUTHN_RP_ID:
44 return settings.WEBAUTHN_RP_ID
45 return request.get_host().split(":")[0]
48def _get_origin(request):
49 """Get the expected origin for WebAuthn verification.
51 Always derives origin from the actual request host, since the browser sends
52 the page's origin (scheme + host) regardless of RP ID configuration.
53 RP ID can be a parent domain (e.g. "matthewdeaves.com" for
54 "cookie.matthewdeaves.com") but the origin must match the actual hostname.
55 """
56 scheme = "https" if request.is_secure() else "http"
57 return f"{scheme}://{request.get_host()}"
60# --- Registration ---
63@router.post("/register/options/", response={200: dict, 400: dict, 429: dict})
64@ratelimit(key="ip", rate="10/h", method="POST", block=False)
65def register_options(request):
66 """Generate WebAuthn registration challenge."""
67 require_passkey_mode(request)
68 if getattr(request, "limited", False):
69 security_logger.warning(
70 "Rate limit hit: passkey register/options/ from %s",
71 request.META.get("REMOTE_ADDR"),
72 )
73 return Status(429, {"error": "Too many attempts. Please try again later."})
75 user_id = uuid.uuid4().bytes
76 options = generate_registration_options(
77 rp_id=_get_rp_id(request),
78 rp_name=settings.WEBAUTHN_RP_NAME,
79 user_name="New User",
80 user_id=user_id,
81 user_display_name="New User",
82 authenticator_selection=AuthenticatorSelectionCriteria(
83 resident_key=ResidentKeyRequirement.REQUIRED,
84 user_verification=UserVerificationRequirement.REQUIRED,
85 ),
86 )
88 # Store challenge, user_id, and creation timestamp in session
89 request.session["webauthn_register_challenge"] = options.challenge.hex()
90 request.session["webauthn_register_user_id"] = user_id.hex()
91 request.session["webauthn_challenge_created_at"] = time.time()
93 return Status(200, json.loads(options_to_json(options)))
96@router.post("/register/verify/", response={201: dict, 400: dict, 429: dict})
97@ratelimit(key="ip", rate="10/h", method="POST", block=False)
98def register_verify(request):
99 """Verify registration response and create account."""
100 require_passkey_mode(request)
102 # Consume challenge BEFORE rate limit check to prevent replay (FR-011)
103 challenge_hex = request.session.pop("webauthn_register_challenge", None)
104 user_id_hex = request.session.pop("webauthn_register_user_id", None)
105 created_at = request.session.pop("webauthn_challenge_created_at", None)
107 if getattr(request, "limited", False):
108 security_logger.warning(
109 "Rate limit hit: passkey register/verify/ from %s",
110 request.META.get("REMOTE_ADDR"),
111 )
112 return Status(429, {"error": "Too many attempts. Please try again later."})
114 if not challenge_hex or not user_id_hex:
115 return Status(400, {"error": "Registration failed: no pending challenge"})
117 # Reject expired challenges (FR-010: 5-minute window)
118 if created_at and (time.time() - created_at) > 300:
119 return Status(400, {"error": "Registration failed: challenge expired"})
121 try:
122 body = json.loads(request.body)
123 except (json.JSONDecodeError, ValueError):
124 return Status(400, {"error": "Registration failed: invalid request body"})
126 try:
127 verification = verify_registration_response(
128 credential=body,
129 expected_challenge=bytes.fromhex(challenge_hex),
130 expected_rp_id=_get_rp_id(request),
131 expected_origin=_get_origin(request),
132 require_user_verification=True,
133 )
134 except Exception as e:
135 security_logger.warning(
136 "Passkey registration verification failed from %s: %s",
137 request.META.get("REMOTE_ADDR"),
138 str(e),
139 )
140 return Status(400, {"error": "Registration failed: verification error"})
142 user, profile = _create_passkey_user_and_profile(verification, body.get("transports"))
143 login(request, user, backend=_AUTH_BACKEND)
144 request.session["profile_id"] = profile.id
146 security_logger.info(
147 "Passkey registration: user_id=%s from %s",
148 user.pk,
149 request.META.get("REMOTE_ADDR"),
150 )
152 return Status(201, passkey_user_profile_response(user, profile))
155@transaction.atomic
156def _create_passkey_user_and_profile(verification, transports=None):
157 """Create User, Profile, and WebAuthnCredential atomically."""
158 username = f"pk_{uuid.uuid4().hex[:8]}"
159 user = User.objects.create_user(
160 username=username,
161 password=None,
162 email="",
163 is_active=True,
164 is_staff=False,
165 )
166 user.set_unusable_password()
167 user.save(update_fields=["password"])
169 profile_count = Profile.objects.count()
170 profile = Profile.objects.create(
171 user=user,
172 name=f"User {profile_count + 1}",
173 avatar_color=Profile.next_avatar_color(),
174 )
176 WebAuthnCredential.objects.create(
177 user=user,
178 credential_id=verification.credential_id,
179 public_key=verification.credential_public_key,
180 sign_count=verification.sign_count,
181 transports=transports,
182 )
184 return user, profile
187# --- Authentication ---
190@router.post("/login/options/", response={200: dict, 429: dict})
191@ratelimit(key="ip", rate="20/h", method="POST", block=False)
192def login_options(request):
193 """Generate WebAuthn authentication challenge."""
194 require_passkey_mode(request)
195 if getattr(request, "limited", False):
196 security_logger.warning(
197 "Rate limit hit: passkey login/options/ from %s",
198 request.META.get("REMOTE_ADDR"),
199 )
200 return Status(429, {"error": "Too many attempts. Please try again later."})
202 # If no credentials exist, don't issue a challenge — the browser would show
203 # confusing hardware-key / QR prompts with nothing to match against.
204 if not WebAuthnCredential.objects.exists():
205 return Status(200, {"no_credentials": True})
207 options = generate_authentication_options(
208 rp_id=_get_rp_id(request),
209 user_verification=UserVerificationRequirement.REQUIRED,
210 )
212 request.session["webauthn_login_challenge"] = options.challenge.hex()
213 request.session["webauthn_challenge_created_at"] = time.time()
215 return Status(200, json.loads(options_to_json(options)))
218@router.post("/login/verify/", response={200: dict, 401: dict, 429: dict})
219@ratelimit(key="ip", rate="20/h", method="POST", block=False)
220def login_verify(request):
221 """Verify authentication response and establish session."""
222 require_passkey_mode(request)
224 # Consume challenge BEFORE rate limit check to prevent replay (FR-011)
225 challenge_hex = request.session.pop("webauthn_login_challenge", None)
226 created_at = request.session.pop("webauthn_challenge_created_at", None)
228 if getattr(request, "limited", False):
229 security_logger.warning(
230 "Rate limit hit: passkey login/verify/ from %s",
231 request.META.get("REMOTE_ADDR"),
232 )
233 return Status(429, {"error": "Too many attempts. Please try again later."})
235 if not challenge_hex:
236 return Status(401, {"error": "Authentication failed: no pending challenge"})
238 # Reject expired challenges (FR-010: 5-minute window)
239 if created_at and (time.time() - created_at) > 300:
240 return Status(401, {"error": "Authentication failed: challenge expired"})
242 try:
243 body = json.loads(request.body)
244 except (json.JSONDecodeError, ValueError):
245 return Status(401, {"error": "Authentication failed: invalid request body"})
247 # Look up credential by credential_id
248 raw_id = body.get("rawId", "")
249 try:
250 from webauthn.helpers import base64url_to_bytes
252 credential_id_bytes = base64url_to_bytes(raw_id)
253 except Exception:
254 return Status(401, {"error": "Authentication failed"})
256 try:
257 credential = WebAuthnCredential.objects.select_related("user").get(credential_id=credential_id_bytes)
258 except WebAuthnCredential.DoesNotExist:
259 security_logger.warning(
260 "Passkey login: unknown credential from %s",
261 request.META.get("REMOTE_ADDR"),
262 )
263 return Status(401, {"error": "Authentication failed"})
265 if not credential.user.is_active:
266 return Status(401, {"error": "Authentication failed"})
268 try:
269 verification = verify_authentication_response(
270 credential=body,
271 expected_challenge=bytes.fromhex(challenge_hex),
272 expected_rp_id=_get_rp_id(request),
273 expected_origin=_get_origin(request),
274 credential_public_key=bytes(credential.public_key),
275 credential_current_sign_count=credential.sign_count,
276 require_user_verification=True,
277 )
278 except Exception as e:
279 security_logger.warning(
280 "Passkey login verification failed from %s: %s",
281 request.META.get("REMOTE_ADDR"),
282 str(e),
283 )
284 return Status(401, {"error": "Authentication failed"})
286 # Check sign_count for cloned authenticator
287 if verification.new_sign_count > 0 and verification.new_sign_count <= credential.sign_count:
288 security_logger.warning(
289 "Possible cloned authenticator: user_id=%s, sign_count went from %d to %d",
290 credential.user_id,
291 credential.sign_count,
292 verification.new_sign_count,
293 )
294 return Status(401, {"error": "Authentication failed"})
296 # Update credential
297 credential.sign_count = verification.new_sign_count
298 credential.last_used_at = timezone.now()
299 credential.save(update_fields=["sign_count", "last_used_at"])
301 login(request, credential.user, backend=_AUTH_BACKEND)
302 request.session["profile_id"] = credential.user.profile.id
304 security_logger.info(
305 "Passkey login: user_id=%s from %s",
306 credential.user_id,
307 request.META.get("REMOTE_ADDR"),
308 )
310 return Status(200, passkey_user_profile_response(credential.user, credential.user.profile))
313# --- Credential Management ---
316@router.get("/credentials/", response={200: dict}, auth=SessionAuth())
317def list_credentials(request):
318 """List current user's registered passkeys."""
319 require_passkey_mode(request)
320 credentials = WebAuthnCredential.objects.filter(user=request.user).order_by("created_at")
321 total = credentials.count()
323 return Status(
324 200,
325 {
326 "credentials": [
327 {
328 "id": c.pk,
329 "created_at": c.created_at.isoformat(),
330 "last_used_at": c.last_used_at.isoformat() if c.last_used_at else None,
331 "is_deletable": total > 1,
332 }
333 for c in credentials
334 ]
335 },
336 )
339@router.post("/credentials/add/options/", response={200: dict}, auth=SessionAuth())
340def add_credential_options(request):
341 """Generate registration options for adding an additional passkey."""
342 require_passkey_mode(request)
344 existing_creds = WebAuthnCredential.objects.filter(user=request.user)
345 exclude = [PublicKeyCredentialDescriptor(id=bytes(c.credential_id)) for c in existing_creds]
347 options = generate_registration_options(
348 rp_id=_get_rp_id(request),
349 rp_name=settings.WEBAUTHN_RP_NAME,
350 user_name=request.user.username,
351 user_id=request.user.pk.to_bytes(8, "big"),
352 user_display_name=request.auth.name,
353 authenticator_selection=AuthenticatorSelectionCriteria(
354 resident_key=ResidentKeyRequirement.REQUIRED,
355 user_verification=UserVerificationRequirement.REQUIRED,
356 ),
357 exclude_credentials=exclude,
358 )
360 request.session["webauthn_add_challenge"] = options.challenge.hex()
361 request.session["webauthn_challenge_created_at"] = time.time()
363 return Status(200, json.loads(options_to_json(options)))
366@router.post("/credentials/add/verify/", response={201: dict, 400: dict}, auth=SessionAuth())
367def add_credential_verify(request):
368 """Verify and store additional passkey for authenticated user."""
369 require_passkey_mode(request)
371 challenge_hex = request.session.pop("webauthn_add_challenge", None)
372 created_at = request.session.pop("webauthn_challenge_created_at", None)
373 if not challenge_hex:
374 return Status(400, {"error": "No pending challenge"})
376 # Reject expired challenges (FR-010: 5-minute window)
377 if created_at and (time.time() - created_at) > 300:
378 return Status(400, {"error": "Challenge expired"})
380 try:
381 body = json.loads(request.body)
382 except (json.JSONDecodeError, ValueError):
383 return Status(400, {"error": "Invalid request body"})
385 try:
386 verification = verify_registration_response(
387 credential=body,
388 expected_challenge=bytes.fromhex(challenge_hex),
389 expected_rp_id=_get_rp_id(request),
390 expected_origin=_get_origin(request),
391 require_user_verification=True,
392 )
393 except Exception as e:
394 security_logger.warning(
395 "Add credential verification failed for user_id=%s: %s",
396 request.user.pk,
397 str(e),
398 )
399 return Status(400, {"error": "Verification failed"})
401 cred = WebAuthnCredential.objects.create(
402 user=request.user,
403 credential_id=verification.credential_id,
404 public_key=verification.credential_public_key,
405 sign_count=verification.sign_count,
406 transports=body.get("transports"),
407 )
409 security_logger.info(
410 "Passkey added: user_id=%s, credential_pk=%s",
411 request.user.pk,
412 cred.pk,
413 )
415 return Status(
416 201,
417 {
418 "credential": {
419 "id": cred.pk,
420 "created_at": cred.created_at.isoformat(),
421 "last_used_at": None,
422 "is_deletable": True,
423 }
424 },
425 )
428@router.delete(
429 "/credentials/{credential_id}/",
430 response={200: dict, 400: dict, 404: dict},
431 auth=SessionAuth(),
432)
433@transaction.atomic
434def delete_credential(request, credential_id: int):
435 """Delete a registered passkey."""
436 require_passkey_mode(request)
438 try:
439 cred = WebAuthnCredential.objects.select_for_update().get(pk=credential_id, user=request.user)
440 except WebAuthnCredential.DoesNotExist:
441 return Status(404, {"error": "Credential not found"})
443 total = WebAuthnCredential.objects.select_for_update().filter(user=request.user).count()
444 if total <= 1:
445 return Status(400, {"error": "Cannot delete your only passkey"})
447 cred.delete()
448 security_logger.info(
449 "Passkey deleted: user_id=%s, credential_pk=%s",
450 request.user.pk,
451 credential_id,
452 )
453 return Status(200, {"message": "Passkey deleted"})