Coverage for apps / core / passkey_api.py: 98%

190 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-02 13:22 +0000

1"""Passkey (WebAuthn) authentication API endpoints — only active in passkey mode.""" 

2 

3import json 

4import logging 

5import secrets 

6import string 

7import time 

8import uuid 

9 

10from django.conf import settings 

11from django.contrib.auth import login 

12from django.contrib.auth.models import User 

13from django.db import transaction 

14from django.utils import timezone 

15from django_ratelimit.decorators import ratelimit 

16from ninja import Router, Status 

17from webauthn import ( 

18 generate_authentication_options, 

19 generate_registration_options, 

20 options_to_json, 

21 verify_authentication_response, 

22 verify_registration_response, 

23) 

24from webauthn.helpers.structs import ( 

25 AuthenticatorSelectionCriteria, 

26 PublicKeyCredentialDescriptor, 

27 ResidentKeyRequirement, 

28 UserVerificationRequirement, 

29) 

30 

31from apps.core.auth import SessionAuth 

32from apps.core.auth_helpers import passkey_user_profile_response, require_passkey_mode 

33from apps.core.models import WebAuthnCredential 

34from apps.profiles.models import Profile 

35 

36security_logger = logging.getLogger("security") 

37 

38router = Router(tags=["passkey"]) 

39 

40_AUTH_BACKEND = "django.contrib.auth.backends.ModelBackend" 

41 

42 

43def _get_rp_id(request): 

44 """Get the Relying Party ID from settings or request hostname.""" 

45 if settings.WEBAUTHN_RP_ID: 

46 return settings.WEBAUTHN_RP_ID 

47 return request.get_host().split(":")[0] 

48 

49 

50def _get_origin(request): 

51 """Get the expected origin for WebAuthn verification. 

52 

53 When WEBAUTHN_RP_ORIGIN is set (production), returns that pinned value. 

54 This decouples origin binding from the request Host header, preventing 

55 X-Forwarded-Host injection from influencing the expected origin (F-33). 

56 

57 Falls back to deriving from request host when WEBAUTHN_RP_ORIGIN is not 

58 set (development use without a fixed domain). 

59 """ 

60 if settings.WEBAUTHN_RP_ORIGIN: 

61 return settings.WEBAUTHN_RP_ORIGIN 

62 scheme = "https" if request.is_secure() else "http" 

63 return f"{scheme}://{request.get_host()}" 

64 

65 

66# --- Registration --- 

67 

68 

69@router.post("/register/options/", response={200: dict, 400: dict, 429: dict}) 

70@ratelimit(key="ip", rate="10/h", method="POST", block=False) 

71def register_options(request): 

72 """Generate WebAuthn registration challenge.""" 

73 require_passkey_mode(request) 

74 if getattr(request, "limited", False): 

75 security_logger.warning( 

76 "Rate limit hit: passkey register/options/ from %s", 

77 request.META.get("REMOTE_ADDR"), 

78 ) 

79 return Status(429, {"error": "Too many attempts. Please try again later."}) 

80 

81 user_id = uuid.uuid4().bytes 

82 options = generate_registration_options( 

83 rp_id=_get_rp_id(request), 

84 rp_name=settings.WEBAUTHN_RP_NAME, 

85 user_name="New User", 

86 user_id=user_id, 

87 user_display_name="New User", 

88 authenticator_selection=AuthenticatorSelectionCriteria( 

89 resident_key=ResidentKeyRequirement.REQUIRED, 

90 user_verification=UserVerificationRequirement.REQUIRED, 

91 ), 

92 ) 

93 

94 # Store challenge, user_id, and creation timestamp in session 

95 request.session["webauthn_register_challenge"] = options.challenge.hex() 

96 request.session["webauthn_register_user_id"] = user_id.hex() 

97 request.session["webauthn_register_challenge_created_at"] = time.time() 

98 

99 return Status(200, json.loads(options_to_json(options))) 

100 

101 

102@router.post("/register/verify/", response={201: dict, 400: dict, 429: dict}) 

103@ratelimit(key="ip", rate="10/h", method="POST", block=False) 

104def register_verify(request): 

105 """Verify registration response and create account.""" 

106 require_passkey_mode(request) 

107 

108 # Consume challenge BEFORE rate limit check to prevent replay (FR-011) 

109 challenge_hex = request.session.pop("webauthn_register_challenge", None) 

110 user_id_hex = request.session.pop("webauthn_register_user_id", None) 

111 created_at = request.session.pop("webauthn_register_challenge_created_at", None) 

112 

113 if getattr(request, "limited", False): 

114 security_logger.warning( 

115 "Rate limit hit: passkey register/verify/ from %s", 

116 request.META.get("REMOTE_ADDR"), 

117 ) 

118 return Status(429, {"error": "Too many attempts. Please try again later."}) 

119 

120 if not challenge_hex or not user_id_hex: 

121 return Status(400, {"error": "Registration failed: no pending challenge"}) 

122 

123 # Reject expired challenges (FR-010: 5-minute window) 

124 if created_at and (time.time() - created_at) > 300: 

125 return Status(400, {"error": "Registration failed: challenge expired"}) 

126 

127 try: 

128 body = json.loads(request.body) 

129 except (json.JSONDecodeError, ValueError): 

130 return Status(400, {"error": "Registration failed: invalid request body"}) 

131 

132 try: 

133 verification = verify_registration_response( 

134 credential=body, 

135 expected_challenge=bytes.fromhex(challenge_hex), 

136 expected_rp_id=_get_rp_id(request), 

137 expected_origin=_get_origin(request), 

138 require_user_verification=True, 

139 ) 

140 except Exception as e: 

141 security_logger.warning( 

142 "Passkey registration verification failed from %s: %s", 

143 request.META.get("REMOTE_ADDR"), 

144 str(e), 

145 ) 

146 return Status(400, {"error": "Registration failed: verification error"}) 

147 

148 user, profile = _create_passkey_user_and_profile(verification, body.get("transports")) 

149 login(request, user, backend=_AUTH_BACKEND) 

150 request.session["profile_id"] = profile.id 

151 

152 security_logger.info( 

153 "Passkey registration: user_id=%s from %s", 

154 user.pk, 

155 request.META.get("REMOTE_ADDR"), 

156 ) 

157 

158 return Status(201, passkey_user_profile_response(user, profile)) 

159 

160 

161@transaction.atomic 

162def _create_passkey_user_and_profile(verification, transports=None): 

163 """Create User, Profile, and WebAuthnCredential atomically.""" 

164 username = f"pk_{uuid.uuid4().hex[:8]}" 

165 user = User.objects.create_user( 

166 username=username, 

167 password=None, 

168 email="", 

169 is_active=True, 

170 is_staff=False, 

171 ) 

172 user.set_unusable_password() 

173 user.save(update_fields=["password"]) 

174 

175 # First char is forced to a letter so the suffix can never collapse to all 

176 # digits. uuid.uuid4().hex[:6] is hex (0-9a-f) — ~5.6% of draws are all 

177 # digits, which then matches `^User \d+$` and trips R23's "user count 

178 # leak" guard. The remaining 5 hex chars carry the entropy. 

179 suffix = secrets.choice(string.ascii_lowercase) + uuid.uuid4().hex[:5] 

180 profile = Profile.objects.create( 

181 user=user, 

182 name=f"User {suffix}", 

183 avatar_color=Profile.next_avatar_color(), 

184 ) 

185 

186 WebAuthnCredential.objects.create( 

187 user=user, 

188 credential_id=verification.credential_id, 

189 public_key=verification.credential_public_key, 

190 sign_count=verification.sign_count, 

191 transports=transports, 

192 ) 

193 

194 return user, profile 

195 

196 

197# --- Authentication --- 

198 

199 

200@router.post("/login/options/", response={200: dict, 429: dict}) 

201@ratelimit(key="ip", rate="20/h", method="POST", block=False) 

202def login_options(request): 

203 """Generate WebAuthn authentication challenge.""" 

204 require_passkey_mode(request) 

205 if getattr(request, "limited", False): 

206 security_logger.warning( 

207 "Rate limit hit: passkey login/options/ from %s", 

208 request.META.get("REMOTE_ADDR"), 

209 ) 

210 return Status(429, {"error": "Too many attempts. Please try again later."}) 

211 

212 # If no credentials exist, don't issue a challenge — the browser would show 

213 # confusing hardware-key / QR prompts with nothing to match against. 

214 if not WebAuthnCredential.objects.exists(): 

215 return Status(200, {"no_credentials": True}) 

216 

217 options = generate_authentication_options( 

218 rp_id=_get_rp_id(request), 

219 user_verification=UserVerificationRequirement.REQUIRED, 

220 ) 

221 

222 request.session["webauthn_login_challenge"] = options.challenge.hex() 

223 request.session["webauthn_login_challenge_created_at"] = time.time() 

224 

225 return Status(200, json.loads(options_to_json(options))) 

226 

227 

228@router.post("/login/verify/", response={200: dict, 401: dict, 429: dict}) 

229@ratelimit(key="ip", rate="20/h", method="POST", block=False) 

230def login_verify(request): 

231 """Verify authentication response and establish session.""" 

232 require_passkey_mode(request) 

233 

234 # Consume challenge BEFORE rate limit check to prevent replay (FR-011) 

235 challenge_hex = request.session.pop("webauthn_login_challenge", None) 

236 created_at = request.session.pop("webauthn_login_challenge_created_at", None) 

237 

238 if getattr(request, "limited", False): 

239 security_logger.warning( 

240 "Rate limit hit: passkey login/verify/ from %s", 

241 request.META.get("REMOTE_ADDR"), 

242 ) 

243 return Status(429, {"error": "Too many attempts. Please try again later."}) 

244 

245 if not challenge_hex: 

246 return Status(401, {"error": "Authentication failed: no pending challenge"}) 

247 

248 # Reject expired challenges (FR-010: 5-minute window) 

249 if created_at and (time.time() - created_at) > 300: 

250 return Status(401, {"error": "Authentication failed: challenge expired"}) 

251 

252 try: 

253 body = json.loads(request.body) 

254 except (json.JSONDecodeError, ValueError): 

255 return Status(401, {"error": "Authentication failed: invalid request body"}) 

256 

257 # Look up credential by credential_id 

258 raw_id = body.get("rawId", "") 

259 try: 

260 from webauthn.helpers import base64url_to_bytes 

261 

262 credential_id_bytes = base64url_to_bytes(raw_id) 

263 except Exception: 

264 return Status(401, {"error": "Authentication failed"}) 

265 

266 try: 

267 credential = WebAuthnCredential.objects.select_related("user").get(credential_id=credential_id_bytes) 

268 except WebAuthnCredential.DoesNotExist: 

269 security_logger.warning( 

270 "Passkey login: unknown credential from %s", 

271 request.META.get("REMOTE_ADDR"), 

272 ) 

273 return Status(401, {"error": "Authentication failed"}) 

274 

275 if not credential.user.is_active: 

276 return Status(401, {"error": "Authentication failed"}) 

277 

278 try: 

279 verification = verify_authentication_response( 

280 credential=body, 

281 expected_challenge=bytes.fromhex(challenge_hex), 

282 expected_rp_id=_get_rp_id(request), 

283 expected_origin=_get_origin(request), 

284 credential_public_key=bytes(credential.public_key), 

285 credential_current_sign_count=credential.sign_count, 

286 require_user_verification=True, 

287 ) 

288 except Exception as e: 

289 security_logger.warning( 

290 "Passkey login verification failed from %s: %s", 

291 request.META.get("REMOTE_ADDR"), 

292 str(e), 

293 ) 

294 return Status(401, {"error": "Authentication failed"}) 

295 

296 # Check sign_count for cloned authenticator 

297 if verification.new_sign_count > 0 and verification.new_sign_count <= credential.sign_count: 

298 security_logger.warning( 

299 "Possible cloned authenticator: user_id=%s, sign_count went from %d to %d", 

300 credential.user_id, 

301 credential.sign_count, 

302 verification.new_sign_count, 

303 ) 

304 return Status(401, {"error": "Authentication failed"}) 

305 

306 # Update credential 

307 credential.sign_count = verification.new_sign_count 

308 credential.last_used_at = timezone.now() 

309 credential.save(update_fields=["sign_count", "last_used_at"]) 

310 

311 login(request, credential.user, backend=_AUTH_BACKEND) 

312 request.session["profile_id"] = credential.user.profile.id 

313 

314 security_logger.info( 

315 "Passkey login: user_id=%s from %s", 

316 credential.user_id, 

317 request.META.get("REMOTE_ADDR"), 

318 ) 

319 

320 return Status(200, passkey_user_profile_response(credential.user, credential.user.profile)) 

321 

322 

323# --- Credential Management --- 

324 

325 

326@router.get("/credentials/", response={200: dict}, auth=SessionAuth()) 

327def list_credentials(request): 

328 """List current user's registered passkeys.""" 

329 require_passkey_mode(request) 

330 credentials = WebAuthnCredential.objects.filter(user=request.user).order_by("created_at") 

331 total = credentials.count() 

332 

333 return Status( 

334 200, 

335 { 

336 "credentials": [ 

337 { 

338 "id": c.pk, 

339 "created_at": c.created_at.isoformat(), 

340 "last_used_at": c.last_used_at.isoformat() if c.last_used_at else None, 

341 "is_deletable": total > 1, 

342 } 

343 for c in credentials 

344 ] 

345 }, 

346 ) 

347 

348 

349@router.post("/credentials/add/options/", response={200: dict}, auth=SessionAuth()) 

350def add_credential_options(request): 

351 """Generate registration options for adding an additional passkey.""" 

352 require_passkey_mode(request) 

353 

354 existing_creds = WebAuthnCredential.objects.filter(user=request.user) 

355 exclude = [PublicKeyCredentialDescriptor(id=bytes(c.credential_id)) for c in existing_creds] 

356 

357 options = generate_registration_options( 

358 rp_id=_get_rp_id(request), 

359 rp_name=settings.WEBAUTHN_RP_NAME, 

360 user_name=request.user.username, 

361 user_id=request.user.pk.to_bytes(8, "big"), 

362 user_display_name=request.auth.name, 

363 authenticator_selection=AuthenticatorSelectionCriteria( 

364 resident_key=ResidentKeyRequirement.REQUIRED, 

365 user_verification=UserVerificationRequirement.REQUIRED, 

366 ), 

367 exclude_credentials=exclude, 

368 ) 

369 

370 request.session["webauthn_add_challenge"] = options.challenge.hex() 

371 request.session["webauthn_add_challenge_created_at"] = time.time() 

372 

373 return Status(200, json.loads(options_to_json(options))) 

374 

375 

376@router.post("/credentials/add/verify/", response={201: dict, 400: dict}, auth=SessionAuth()) 

377def add_credential_verify(request): 

378 """Verify and store additional passkey for authenticated user.""" 

379 require_passkey_mode(request) 

380 

381 challenge_hex = request.session.pop("webauthn_add_challenge", None) 

382 created_at = request.session.pop("webauthn_add_challenge_created_at", None) 

383 if not challenge_hex: 

384 return Status(400, {"error": "No pending challenge"}) 

385 

386 # Reject expired challenges (FR-010: 5-minute window) 

387 if created_at and (time.time() - created_at) > 300: 

388 return Status(400, {"error": "Challenge expired"}) 

389 

390 try: 

391 body = json.loads(request.body) 

392 except (json.JSONDecodeError, ValueError): 

393 return Status(400, {"error": "Invalid request body"}) 

394 

395 try: 

396 verification = verify_registration_response( 

397 credential=body, 

398 expected_challenge=bytes.fromhex(challenge_hex), 

399 expected_rp_id=_get_rp_id(request), 

400 expected_origin=_get_origin(request), 

401 require_user_verification=True, 

402 ) 

403 except Exception as e: 

404 security_logger.warning( 

405 "Add credential verification failed for user_id=%s: %s", 

406 request.user.pk, 

407 str(e), 

408 ) 

409 return Status(400, {"error": "Verification failed"}) 

410 

411 cred = WebAuthnCredential.objects.create( 

412 user=request.user, 

413 credential_id=verification.credential_id, 

414 public_key=verification.credential_public_key, 

415 sign_count=verification.sign_count, 

416 transports=body.get("transports"), 

417 ) 

418 

419 security_logger.info( 

420 "Passkey added: user_id=%s, credential_pk=%s", 

421 request.user.pk, 

422 cred.pk, 

423 ) 

424 

425 return Status( 

426 201, 

427 { 

428 "credential": { 

429 "id": cred.pk, 

430 "created_at": cred.created_at.isoformat(), 

431 "last_used_at": None, 

432 "is_deletable": True, 

433 } 

434 }, 

435 ) 

436 

437 

438@router.delete( 

439 "/credentials/{credential_id}/", 

440 response={200: dict, 400: dict, 404: dict}, 

441 auth=SessionAuth(), 

442) 

443@transaction.atomic 

444def delete_credential(request, credential_id: int): 

445 """Delete a registered passkey.""" 

446 require_passkey_mode(request) 

447 

448 try: 

449 cred = WebAuthnCredential.objects.select_for_update().get(pk=credential_id, user=request.user) 

450 except WebAuthnCredential.DoesNotExist: 

451 return Status(404, {"error": "Credential not found"}) 

452 

453 total = WebAuthnCredential.objects.select_for_update().filter(user=request.user).count() 

454 if total <= 1: 

455 return Status(400, {"error": "Cannot delete your only passkey"}) 

456 

457 cred.delete() 

458 security_logger.info( 

459 "Passkey deleted: user_id=%s, credential_pk=%s", 

460 request.user.pk, 

461 credential_id, 

462 ) 

463 return Status(200, {"message": "Passkey deleted"}) 

← Back to Dashboard