Coverage for apps / core / passkey_api.py: 98%

186 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-12 10:49 +0000

1"""Passkey (WebAuthn) authentication API endpoints — only active in passkey mode.""" 

2 

3import json 

4import logging 

5import time 

6import uuid 

7 

8from django.conf import settings 

9from django.contrib.auth import login 

10from django.contrib.auth.models import User 

11from django.db import transaction 

12from django.utils import timezone 

13from django_ratelimit.decorators import ratelimit 

14from ninja import Router, Status 

15from webauthn import ( 

16 generate_authentication_options, 

17 generate_registration_options, 

18 options_to_json, 

19 verify_authentication_response, 

20 verify_registration_response, 

21) 

22from webauthn.helpers.structs import ( 

23 AuthenticatorSelectionCriteria, 

24 PublicKeyCredentialDescriptor, 

25 ResidentKeyRequirement, 

26 UserVerificationRequirement, 

27) 

28 

29from apps.core.auth import SessionAuth 

30from apps.core.auth_helpers import passkey_user_profile_response, require_passkey_mode 

31from apps.core.models import WebAuthnCredential 

32from apps.profiles.models import Profile 

33 

34security_logger = logging.getLogger("security") 

35 

36router = Router(tags=["passkey"]) 

37 

38_AUTH_BACKEND = "django.contrib.auth.backends.ModelBackend" 

39 

40 

41def _get_rp_id(request): 

42 """Get the Relying Party ID from settings or request hostname.""" 

43 if settings.WEBAUTHN_RP_ID: 

44 return settings.WEBAUTHN_RP_ID 

45 return request.get_host().split(":")[0] 

46 

47 

48def _get_origin(request): 

49 """Get the expected origin for WebAuthn verification. 

50 

51 Always derives origin from the actual request host, since the browser sends 

52 the page's origin (scheme + host) regardless of RP ID configuration. 

53 RP ID can be a parent domain (e.g. "matthewdeaves.com" for 

54 "cookie.matthewdeaves.com") but the origin must match the actual hostname. 

55 """ 

56 scheme = "https" if request.is_secure() else "http" 

57 return f"{scheme}://{request.get_host()}" 

58 

59 

60# --- Registration --- 

61 

62 

63@router.post("/register/options/", response={200: dict, 400: dict, 429: dict}) 

64@ratelimit(key="ip", rate="10/h", method="POST", block=False) 

65def register_options(request): 

66 """Generate WebAuthn registration challenge.""" 

67 require_passkey_mode(request) 

68 if getattr(request, "limited", False): 

69 security_logger.warning( 

70 "Rate limit hit: passkey register/options/ from %s", 

71 request.META.get("REMOTE_ADDR"), 

72 ) 

73 return Status(429, {"error": "Too many attempts. Please try again later."}) 

74 

75 user_id = uuid.uuid4().bytes 

76 options = generate_registration_options( 

77 rp_id=_get_rp_id(request), 

78 rp_name=settings.WEBAUTHN_RP_NAME, 

79 user_name="New User", 

80 user_id=user_id, 

81 user_display_name="New User", 

82 authenticator_selection=AuthenticatorSelectionCriteria( 

83 resident_key=ResidentKeyRequirement.REQUIRED, 

84 user_verification=UserVerificationRequirement.REQUIRED, 

85 ), 

86 ) 

87 

88 # Store challenge, user_id, and creation timestamp in session 

89 request.session["webauthn_register_challenge"] = options.challenge.hex() 

90 request.session["webauthn_register_user_id"] = user_id.hex() 

91 request.session["webauthn_challenge_created_at"] = time.time() 

92 

93 return Status(200, json.loads(options_to_json(options))) 

94 

95 

96@router.post("/register/verify/", response={201: dict, 400: dict, 429: dict}) 

97@ratelimit(key="ip", rate="10/h", method="POST", block=False) 

98def register_verify(request): 

99 """Verify registration response and create account.""" 

100 require_passkey_mode(request) 

101 

102 # Consume challenge BEFORE rate limit check to prevent replay (FR-011) 

103 challenge_hex = request.session.pop("webauthn_register_challenge", None) 

104 user_id_hex = request.session.pop("webauthn_register_user_id", None) 

105 created_at = request.session.pop("webauthn_challenge_created_at", None) 

106 

107 if getattr(request, "limited", False): 

108 security_logger.warning( 

109 "Rate limit hit: passkey register/verify/ from %s", 

110 request.META.get("REMOTE_ADDR"), 

111 ) 

112 return Status(429, {"error": "Too many attempts. Please try again later."}) 

113 

114 if not challenge_hex or not user_id_hex: 

115 return Status(400, {"error": "Registration failed: no pending challenge"}) 

116 

117 # Reject expired challenges (FR-010: 5-minute window) 

118 if created_at and (time.time() - created_at) > 300: 

119 return Status(400, {"error": "Registration failed: challenge expired"}) 

120 

121 try: 

122 body = json.loads(request.body) 

123 except (json.JSONDecodeError, ValueError): 

124 return Status(400, {"error": "Registration failed: invalid request body"}) 

125 

126 try: 

127 verification = verify_registration_response( 

128 credential=body, 

129 expected_challenge=bytes.fromhex(challenge_hex), 

130 expected_rp_id=_get_rp_id(request), 

131 expected_origin=_get_origin(request), 

132 require_user_verification=True, 

133 ) 

134 except Exception as e: 

135 security_logger.warning( 

136 "Passkey registration verification failed from %s: %s", 

137 request.META.get("REMOTE_ADDR"), 

138 str(e), 

139 ) 

140 return Status(400, {"error": "Registration failed: verification error"}) 

141 

142 user, profile = _create_passkey_user_and_profile(verification, body.get("transports")) 

143 login(request, user, backend=_AUTH_BACKEND) 

144 request.session["profile_id"] = profile.id 

145 

146 security_logger.info( 

147 "Passkey registration: user_id=%s from %s", 

148 user.pk, 

149 request.META.get("REMOTE_ADDR"), 

150 ) 

151 

152 return Status(201, passkey_user_profile_response(user, profile)) 

153 

154 

155@transaction.atomic 

156def _create_passkey_user_and_profile(verification, transports=None): 

157 """Create User, Profile, and WebAuthnCredential atomically.""" 

158 username = f"pk_{uuid.uuid4().hex[:8]}" 

159 user = User.objects.create_user( 

160 username=username, 

161 password=None, 

162 email="", 

163 is_active=True, 

164 is_staff=False, 

165 ) 

166 user.set_unusable_password() 

167 user.save(update_fields=["password"]) 

168 

169 profile_count = Profile.objects.count() 

170 profile = Profile.objects.create( 

171 user=user, 

172 name=f"User {profile_count + 1}", 

173 avatar_color=Profile.next_avatar_color(), 

174 ) 

175 

176 WebAuthnCredential.objects.create( 

177 user=user, 

178 credential_id=verification.credential_id, 

179 public_key=verification.credential_public_key, 

180 sign_count=verification.sign_count, 

181 transports=transports, 

182 ) 

183 

184 return user, profile 

185 

186 

187# --- Authentication --- 

188 

189 

190@router.post("/login/options/", response={200: dict, 429: dict}) 

191@ratelimit(key="ip", rate="20/h", method="POST", block=False) 

192def login_options(request): 

193 """Generate WebAuthn authentication challenge.""" 

194 require_passkey_mode(request) 

195 if getattr(request, "limited", False): 

196 security_logger.warning( 

197 "Rate limit hit: passkey login/options/ from %s", 

198 request.META.get("REMOTE_ADDR"), 

199 ) 

200 return Status(429, {"error": "Too many attempts. Please try again later."}) 

201 

202 # If no credentials exist, don't issue a challenge — the browser would show 

203 # confusing hardware-key / QR prompts with nothing to match against. 

204 if not WebAuthnCredential.objects.exists(): 

205 return Status(200, {"no_credentials": True}) 

206 

207 options = generate_authentication_options( 

208 rp_id=_get_rp_id(request), 

209 user_verification=UserVerificationRequirement.REQUIRED, 

210 ) 

211 

212 request.session["webauthn_login_challenge"] = options.challenge.hex() 

213 request.session["webauthn_challenge_created_at"] = time.time() 

214 

215 return Status(200, json.loads(options_to_json(options))) 

216 

217 

218@router.post("/login/verify/", response={200: dict, 401: dict, 429: dict}) 

219@ratelimit(key="ip", rate="20/h", method="POST", block=False) 

220def login_verify(request): 

221 """Verify authentication response and establish session.""" 

222 require_passkey_mode(request) 

223 

224 # Consume challenge BEFORE rate limit check to prevent replay (FR-011) 

225 challenge_hex = request.session.pop("webauthn_login_challenge", None) 

226 created_at = request.session.pop("webauthn_challenge_created_at", None) 

227 

228 if getattr(request, "limited", False): 

229 security_logger.warning( 

230 "Rate limit hit: passkey login/verify/ from %s", 

231 request.META.get("REMOTE_ADDR"), 

232 ) 

233 return Status(429, {"error": "Too many attempts. Please try again later."}) 

234 

235 if not challenge_hex: 

236 return Status(401, {"error": "Authentication failed: no pending challenge"}) 

237 

238 # Reject expired challenges (FR-010: 5-minute window) 

239 if created_at and (time.time() - created_at) > 300: 

240 return Status(401, {"error": "Authentication failed: challenge expired"}) 

241 

242 try: 

243 body = json.loads(request.body) 

244 except (json.JSONDecodeError, ValueError): 

245 return Status(401, {"error": "Authentication failed: invalid request body"}) 

246 

247 # Look up credential by credential_id 

248 raw_id = body.get("rawId", "") 

249 try: 

250 from webauthn.helpers import base64url_to_bytes 

251 

252 credential_id_bytes = base64url_to_bytes(raw_id) 

253 except Exception: 

254 return Status(401, {"error": "Authentication failed"}) 

255 

256 try: 

257 credential = WebAuthnCredential.objects.select_related("user").get(credential_id=credential_id_bytes) 

258 except WebAuthnCredential.DoesNotExist: 

259 security_logger.warning( 

260 "Passkey login: unknown credential from %s", 

261 request.META.get("REMOTE_ADDR"), 

262 ) 

263 return Status(401, {"error": "Authentication failed"}) 

264 

265 if not credential.user.is_active: 

266 return Status(401, {"error": "Authentication failed"}) 

267 

268 try: 

269 verification = verify_authentication_response( 

270 credential=body, 

271 expected_challenge=bytes.fromhex(challenge_hex), 

272 expected_rp_id=_get_rp_id(request), 

273 expected_origin=_get_origin(request), 

274 credential_public_key=bytes(credential.public_key), 

275 credential_current_sign_count=credential.sign_count, 

276 require_user_verification=True, 

277 ) 

278 except Exception as e: 

279 security_logger.warning( 

280 "Passkey login verification failed from %s: %s", 

281 request.META.get("REMOTE_ADDR"), 

282 str(e), 

283 ) 

284 return Status(401, {"error": "Authentication failed"}) 

285 

286 # Check sign_count for cloned authenticator 

287 if verification.new_sign_count > 0 and verification.new_sign_count <= credential.sign_count: 

288 security_logger.warning( 

289 "Possible cloned authenticator: user_id=%s, sign_count went from %d to %d", 

290 credential.user_id, 

291 credential.sign_count, 

292 verification.new_sign_count, 

293 ) 

294 return Status(401, {"error": "Authentication failed"}) 

295 

296 # Update credential 

297 credential.sign_count = verification.new_sign_count 

298 credential.last_used_at = timezone.now() 

299 credential.save(update_fields=["sign_count", "last_used_at"]) 

300 

301 login(request, credential.user, backend=_AUTH_BACKEND) 

302 request.session["profile_id"] = credential.user.profile.id 

303 

304 security_logger.info( 

305 "Passkey login: user_id=%s from %s", 

306 credential.user_id, 

307 request.META.get("REMOTE_ADDR"), 

308 ) 

309 

310 return Status(200, passkey_user_profile_response(credential.user, credential.user.profile)) 

311 

312 

313# --- Credential Management --- 

314 

315 

316@router.get("/credentials/", response={200: dict}, auth=SessionAuth()) 

317def list_credentials(request): 

318 """List current user's registered passkeys.""" 

319 require_passkey_mode(request) 

320 credentials = WebAuthnCredential.objects.filter(user=request.user).order_by("created_at") 

321 total = credentials.count() 

322 

323 return Status( 

324 200, 

325 { 

326 "credentials": [ 

327 { 

328 "id": c.pk, 

329 "created_at": c.created_at.isoformat(), 

330 "last_used_at": c.last_used_at.isoformat() if c.last_used_at else None, 

331 "is_deletable": total > 1, 

332 } 

333 for c in credentials 

334 ] 

335 }, 

336 ) 

337 

338 

339@router.post("/credentials/add/options/", response={200: dict}, auth=SessionAuth()) 

340def add_credential_options(request): 

341 """Generate registration options for adding an additional passkey.""" 

342 require_passkey_mode(request) 

343 

344 existing_creds = WebAuthnCredential.objects.filter(user=request.user) 

345 exclude = [PublicKeyCredentialDescriptor(id=bytes(c.credential_id)) for c in existing_creds] 

346 

347 options = generate_registration_options( 

348 rp_id=_get_rp_id(request), 

349 rp_name=settings.WEBAUTHN_RP_NAME, 

350 user_name=request.user.username, 

351 user_id=request.user.pk.to_bytes(8, "big"), 

352 user_display_name=request.auth.name, 

353 authenticator_selection=AuthenticatorSelectionCriteria( 

354 resident_key=ResidentKeyRequirement.REQUIRED, 

355 user_verification=UserVerificationRequirement.REQUIRED, 

356 ), 

357 exclude_credentials=exclude, 

358 ) 

359 

360 request.session["webauthn_add_challenge"] = options.challenge.hex() 

361 request.session["webauthn_challenge_created_at"] = time.time() 

362 

363 return Status(200, json.loads(options_to_json(options))) 

364 

365 

366@router.post("/credentials/add/verify/", response={201: dict, 400: dict}, auth=SessionAuth()) 

367def add_credential_verify(request): 

368 """Verify and store additional passkey for authenticated user.""" 

369 require_passkey_mode(request) 

370 

371 challenge_hex = request.session.pop("webauthn_add_challenge", None) 

372 created_at = request.session.pop("webauthn_challenge_created_at", None) 

373 if not challenge_hex: 

374 return Status(400, {"error": "No pending challenge"}) 

375 

376 # Reject expired challenges (FR-010: 5-minute window) 

377 if created_at and (time.time() - created_at) > 300: 

378 return Status(400, {"error": "Challenge expired"}) 

379 

380 try: 

381 body = json.loads(request.body) 

382 except (json.JSONDecodeError, ValueError): 

383 return Status(400, {"error": "Invalid request body"}) 

384 

385 try: 

386 verification = verify_registration_response( 

387 credential=body, 

388 expected_challenge=bytes.fromhex(challenge_hex), 

389 expected_rp_id=_get_rp_id(request), 

390 expected_origin=_get_origin(request), 

391 require_user_verification=True, 

392 ) 

393 except Exception as e: 

394 security_logger.warning( 

395 "Add credential verification failed for user_id=%s: %s", 

396 request.user.pk, 

397 str(e), 

398 ) 

399 return Status(400, {"error": "Verification failed"}) 

400 

401 cred = WebAuthnCredential.objects.create( 

402 user=request.user, 

403 credential_id=verification.credential_id, 

404 public_key=verification.credential_public_key, 

405 sign_count=verification.sign_count, 

406 transports=body.get("transports"), 

407 ) 

408 

409 security_logger.info( 

410 "Passkey added: user_id=%s, credential_pk=%s", 

411 request.user.pk, 

412 cred.pk, 

413 ) 

414 

415 return Status( 

416 201, 

417 { 

418 "credential": { 

419 "id": cred.pk, 

420 "created_at": cred.created_at.isoformat(), 

421 "last_used_at": None, 

422 "is_deletable": True, 

423 } 

424 }, 

425 ) 

426 

427 

428@router.delete( 

429 "/credentials/{credential_id}/", 

430 response={200: dict, 400: dict, 404: dict}, 

431 auth=SessionAuth(), 

432) 

433@transaction.atomic 

434def delete_credential(request, credential_id: int): 

435 """Delete a registered passkey.""" 

436 require_passkey_mode(request) 

437 

438 try: 

439 cred = WebAuthnCredential.objects.select_for_update().get(pk=credential_id, user=request.user) 

440 except WebAuthnCredential.DoesNotExist: 

441 return Status(404, {"error": "Credential not found"}) 

442 

443 total = WebAuthnCredential.objects.select_for_update().filter(user=request.user).count() 

444 if total <= 1: 

445 return Status(400, {"error": "Cannot delete your only passkey"}) 

446 

447 cred.delete() 

448 security_logger.info( 

449 "Passkey deleted: user_id=%s, credential_pk=%s", 

450 request.user.pk, 

451 credential_id, 

452 ) 

453 return Status(200, {"message": "Passkey deleted"}) 

← Back to Dashboard