Coverage for apps / core / management / commands / _cookie_admin_users.py: 89%

247 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-02 13:22 +0000

1"""User-lifecycle + status/audit handlers for `cookie_admin`. 

2 

3Split out of `cookie_admin.py` to keep individual files under the 500-line 

4quality gate. Methods here assume `self` is a `Command` instance (see the 

5main module) so `self.stdout`, `self._error`, `self._success`, etc. are 

6always available. 

7""" 

8 

9from __future__ import annotations 

10 

11import json 

12import logging 

13import os 

14 

15from django.conf import settings 

16from django.contrib.auth.models import User 

17from django.db.models import Count 

18 

19security_logger = logging.getLogger("security") 

20 

21 

22class UsersStatusMixin: 

23 """Status/audit + user-lifecycle subcommand handlers.""" 

24 

25 # ------------------------------------------------------------------ # 

26 # status # 

27 # ------------------------------------------------------------------ # 

28 

29 def _handle_status(self, options): 

30 status = self._collect_status() 

31 

32 if options.get("as_json"): 

33 self.stdout.write(json.dumps({"ok": True, **status}, indent=2)) 

34 return 

35 

36 self._print_status(status) 

37 

38 def _collect_status(self): 

39 from django.core.cache import cache 

40 from django.db import connection 

41 from django.utils import timezone 

42 

43 from apps.core.management.commands.cleanup_device_codes import CLEANUP_CACHE_KEY as DC_KEY 

44 from apps.core.management.commands.cleanup_sessions import CLEANUP_CACHE_KEY as SESS_KEY 

45 from apps.core.models import AppSettings, DeviceCode, WebAuthnCredential 

46 from apps.recipes.management.commands.cleanup_search_images import CLEANUP_CACHE_KEY as IMG_KEY 

47 

48 status = {"auth_mode": settings.AUTH_MODE} 

49 

50 # Database 

51 try: 

52 with connection.cursor() as cursor: 

53 cursor.execute("SELECT 1") 

54 status["database"] = "ok" 

55 except Exception as e: 

56 status["database"] = f"error: {e}" 

57 

58 # Migrations 

59 try: 

60 from io import StringIO 

61 

62 from django.core.management import call_command 

63 

64 out = StringIO() 

65 call_command("migrate", "--check", stdout=out, stderr=out) 

66 status["migrations"] = "up to date" 

67 except SystemExit: 

68 status["migrations"] = "pending" 

69 

70 # Users 

71 status["users"] = { 

72 "total": User.objects.count(), 

73 "active": User.objects.filter(is_active=True).count(), 

74 } 

75 

76 # Passkeys 

77 status["passkeys"] = WebAuthnCredential.objects.count() 

78 

79 # Device codes 

80 now = timezone.now() 

81 status["device_codes"] = { 

82 "pending": DeviceCode.objects.filter(status="pending", expires_at__gt=now).count(), 

83 "stale_expired": DeviceCode.objects.filter(expires_at__lte=now) 

84 .exclude(status__in=["authorized", "invalidated"]) 

85 .count(), 

86 } 

87 

88 # AI / OpenRouter 

89 app_settings = AppSettings.get() 

90 has_env_key = bool(os.environ.get("OPENROUTER_API_KEY", "")) 

91 has_db_key = bool(app_settings._openrouter_api_key) 

92 status["openrouter"] = { 

93 "configured": bool(app_settings.openrouter_api_key), 

94 "source": "env" if has_env_key else ("database" if has_db_key else "none"), 

95 "model": app_settings.default_ai_model, 

96 } 

97 

98 # WebAuthn config 

99 status["webauthn"] = { 

100 "rp_id": settings.WEBAUTHN_RP_ID or "(from request)", 

101 "rp_name": settings.WEBAUTHN_RP_NAME, 

102 } 

103 

104 # Maintenance — last cleanup runs 

105 status["maintenance"] = { 

106 "device_code_cleanup": cache.get(DC_KEY) or "never run", 

107 "session_cleanup": cache.get(SESS_KEY) or "never run", 

108 "search_image_cleanup": cache.get(IMG_KEY) or "never run", 

109 } 

110 

111 # Cache (image-cache health) — parity with the now-gated 

112 # GET /api/recipes/cache/health/ endpoint. 

113 try: 

114 from apps.recipes.api import get_cache_health_dict 

115 

116 status["cache"] = get_cache_health_dict() 

117 except Exception as e: 

118 status["cache"] = {"status": f"error: {e}"} 

119 

120 return status 

121 

122 def _print_status(self, status): 

123 users = status["users"] 

124 dc = status["device_codes"] 

125 src = status["openrouter"]["source"] 

126 

127 self.stdout.write(f"Auth mode: {status['auth_mode']}") 

128 self.stdout.write(f"Database: {status['database']}") 

129 self.stdout.write(f"Migrations: {status['migrations']}") 

130 self.stdout.write(f"Users: {users['active']} active / {users['total']} total") 

131 self.stdout.write(f"Passkeys: {status['passkeys']}") 

132 self.stdout.write(f"Device codes: {dc['pending']} pending, {dc['stale_expired']} stale") 

133 self.stdout.write( 

134 f"OpenRouter: {'configured' if status['openrouter']['configured'] else 'not configured'} (source: {src})" 

135 ) 

136 self.stdout.write(f"WebAuthn RP: {status['webauthn']['rp_id']} ({status['webauthn']['rp_name']})") 

137 self.stdout.write("Maintenance:") 

138 for label, key in [ 

139 (" Device codes", "device_code_cleanup"), 

140 (" Sessions", "session_cleanup"), 

141 (" Search images", "search_image_cleanup"), 

142 ]: 

143 info = status["maintenance"][key] 

144 if isinstance(info, dict): 

145 self.stdout.write( 

146 f"{label}: last ran {info['time'][:19]}, deleted {info['deleted']}, {info['remaining']} remaining" 

147 ) 

148 else: 

149 self.stdout.write(f"{label}: {info}") 

150 

151 # ------------------------------------------------------------------ # 

152 # audit # 

153 # ------------------------------------------------------------------ # 

154 

155 def _handle_audit(self, options): 

156 max_lines = options.get("lines", 50) 

157 

158 from django.utils import timezone 

159 

160 now = timezone.now() 

161 since = now - timezone.timedelta(hours=24) 

162 

163 events = ( 

164 self._collect_registration_events(since, max_lines) 

165 + self._collect_login_events(since, max_lines) 

166 + self._collect_device_code_events(since, max_lines) 

167 ) 

168 events.sort(key=lambda e: e["time"], reverse=True) 

169 events = events[:max_lines] 

170 

171 if options.get("as_json"): 

172 self.stdout.write(json.dumps({"ok": True, "since": since.isoformat(), "events": events}, indent=2)) 

173 return 

174 

175 self.stdout.write(f"Security events (last 24h, max {max_lines}):") 

176 self.stdout.write("-" * 70) 

177 if not events: 

178 self.stdout.write("No events found.") 

179 return 

180 for e in events: 

181 self.stdout.write(self._format_audit_event(e)) 

182 

183 def _collect_registration_events(self, since, max_lines): 

184 """Collect user registration events since the given time.""" 

185 recent_users = User.objects.filter(date_joined__gte=since).order_by("-date_joined")[:max_lines] 

186 return [ 

187 { 

188 "time": u.date_joined.isoformat(), 

189 "type": "registration", 

190 "username": u.username, 

191 } 

192 for u in recent_users 

193 ] 

194 

195 def _collect_login_events(self, since, max_lines): 

196 """Collect passkey login events since the given time.""" 

197 from apps.core.models import WebAuthnCredential 

198 

199 recent_logins = ( 

200 WebAuthnCredential.objects.filter(last_used_at__gte=since) 

201 .select_related("user") 

202 .order_by("-last_used_at")[:max_lines] 

203 ) 

204 return [ 

205 { 

206 "time": c.last_used_at.isoformat(), 

207 "type": "passkey_login", 

208 "username": c.user.username, 

209 "credential_id": c.pk, 

210 } 

211 for c in recent_logins 

212 ] 

213 

214 def _collect_device_code_events(self, since, max_lines): 

215 """Collect device code events since the given time.""" 

216 from apps.core.models import DeviceCode 

217 

218 recent_codes = DeviceCode.objects.filter(created_at__gte=since).order_by("-created_at")[:max_lines] 

219 return [ 

220 { 

221 "time": dc.created_at.isoformat(), 

222 "type": f"device_code_{dc.status}", 

223 "code": dc.code, 

224 "authorizer": dc.authorizing_user.username if dc.authorizing_user else None, 

225 } 

226 for dc in recent_codes 

227 ] 

228 

229 @staticmethod 

230 def _format_audit_event(event): 

231 """Format a single audit event as a human-readable line.""" 

232 ts = event["time"][:19].replace("T", " ") 

233 etype = event["type"] 

234 detail = event.get("username") or event.get("code") or "" 

235 extra = "" 

236 if etype == "passkey_login": 

237 extra = f" (credential #{event['credential_id']})" 

238 elif etype.startswith("device_code_") and event.get("authorizer"): 

239 extra = f" (by {event['authorizer']})" 

240 return f" {ts} {etype:<25} {detail}{extra}" 

241 

242 # ------------------------------------------------------------------ # 

243 # user lifecycle # 

244 # ------------------------------------------------------------------ # 

245 

246 def _handle_list_users(self, options): 

247 users = User.objects.all().order_by("date_joined") 

248 if options.get("active_only"): 

249 users = users.filter(is_active=True) 

250 

251 users = users.annotate(passkey_count=Count("webauthn_credentials")) 

252 

253 if options.get("as_json"): 

254 data = [ 

255 { 

256 "username": u.username, 

257 "user_id": u.pk, 

258 "passkeys": u.passkey_count, 

259 "is_active": u.is_active, 

260 "unlimited_ai": getattr(getattr(u, "profile", None), "unlimited_ai", False), 

261 "date_joined": u.date_joined.strftime("%Y-%m-%d"), 

262 } 

263 for u in users 

264 ] 

265 self.stdout.write(json.dumps({"ok": True, "users": data}, indent=2)) 

266 return 

267 

268 self.stdout.write(f"{'USERNAME':<15} {'ID':<6} {'PASSKEYS':<10} {'ACTIVE':<8} {'UNLIMITED':<10} {'JOINED'}") 

269 self.stdout.write("-" * 70) 

270 for u in users: 

271 unlimited = getattr(getattr(u, "profile", None), "unlimited_ai", False) 

272 self.stdout.write( 

273 f"{u.username:<15} {u.pk:<6} {u.passkey_count:<10} " 

274 f"{'yes' if u.is_active else 'no':<8} " 

275 f"{'yes' if unlimited else 'no':<10} " 

276 f"{u.date_joined.strftime('%Y-%m-%d')}" 

277 ) 

278 active = users.filter(is_active=True).count() 

279 self.stdout.write(f"\nTotal: {users.count()} users ({active} active)") 

280 

281 def _handle_create_user(self, options): 

282 from apps.profiles.models import Profile 

283 

284 username = options["username"] 

285 

286 if User.objects.filter(username=username).exists(): 

287 self._error(f"User '{username}' already exists.", options) 

288 

289 user = User.objects.create_user( 

290 username=username, 

291 password=None, 

292 email="", 

293 is_active=True, 

294 is_staff=False, 

295 ) 

296 user.set_unusable_password() 

297 user.save(update_fields=["password"]) 

298 

299 profile_count = Profile.objects.count() 

300 Profile.objects.create( 

301 user=user, 

302 name=f"User {profile_count + 1}", 

303 avatar_color=Profile.next_avatar_color(), 

304 ) 

305 

306 self._success( 

307 f"Created user '{username}'.", 

308 options, 

309 {"user": self._user_dict(user)}, 

310 ) 

311 

312 def _handle_delete_user(self, options): 

313 user = self._get_user(options["username"], options) 

314 username = user.username 

315 user_data = self._user_dict(user) 

316 user.delete() 

317 self._success( 

318 f"Deleted user '{username}' and associated data.", 

319 options, 

320 {"deleted_user": user_data}, 

321 ) 

322 

323 def _handle_activate(self, options): 

324 user = self._get_user(options["username"], options) 

325 user.is_active = True 

326 user.save(update_fields=["is_active"]) 

327 self._success(f"'{user.username}' has been reactivated.", options, {"user": self._user_dict(user)}) 

328 

329 def _handle_deactivate(self, options): 

330 user = self._get_user(options["username"], options) 

331 user.is_active = False 

332 user.save(update_fields=["is_active"]) 

333 from django.contrib.sessions.models import Session 

334 from django.utils import timezone 

335 

336 count = 0 

337 sessions = Session.objects.filter(expire_date__gte=timezone.now()) 

338 for session in sessions: 

339 data = session.get_decoded() 

340 if str(data.get("_auth_user_id")) == str(user.pk): 

341 session.delete() 

342 count += 1 

343 self._success( 

344 f"'{user.username}' has been deactivated and {count} session(s) invalidated.", 

345 options, 

346 {"user": self._user_dict(user), "sessions_invalidated": count}, 

347 ) 

348 

349 def _resolve_user_by_username_or_profile_id(self, options): 

350 from apps.profiles.models import Profile 

351 

352 username = options.get("username") 

353 profile_id = options.get("profile_id") 

354 

355 if username and profile_id: 

356 self._error("Pass either username or --profile-id, not both.", options) 

357 if not username and not profile_id: 

358 self._error("Must pass either username or --profile-id.", options) 

359 

360 if profile_id: 

361 try: 

362 profile = Profile.objects.select_related("user").get(id=profile_id) 

363 except Profile.DoesNotExist: 

364 self._error(f"Profile with id {profile_id} not found.", options) 

365 if not profile.user: 

366 self._error(f"Profile {profile_id} has no linked user.", options) 

367 return profile.user 

368 return self._get_user(username, options) 

369 

370 def _handle_set_unlimited(self, options): 

371 user = self._resolve_user_by_username_or_profile_id(options) 

372 profile = user.profile 

373 profile.unlimited_ai = True 

374 profile.save(update_fields=["unlimited_ai"]) 

375 self._success( 

376 f"Updated {user.username}: unlimited AI access granted", 

377 options, 

378 {"username": user.username, "user_id": user.pk, "unlimited_ai": True, "action": "set-unlimited"}, 

379 ) 

380 

381 def _handle_remove_unlimited(self, options): 

382 user = self._resolve_user_by_username_or_profile_id(options) 

383 profile = user.profile 

384 profile.unlimited_ai = False 

385 profile.save(update_fields=["unlimited_ai"]) 

386 self._success( 

387 f"Updated {user.username}: unlimited AI access revoked", 

388 options, 

389 {"username": user.username, "user_id": user.pk, "unlimited_ai": False, "action": "remove-unlimited"}, 

390 ) 

391 

392 def _handle_usage(self, options): 

393 from datetime import date as date_cls 

394 

395 from apps.ai.services.quota import ALL_FEATURES, FEATURE_LIMIT_FIELDS, get_usage 

396 from apps.core.models import AppSettings 

397 from apps.profiles.models import Profile 

398 

399 app_settings = AppSettings.get() 

400 limits = {f: getattr(app_settings, FEATURE_LIMIT_FIELDS[f]) for f in ALL_FEATURES} 

401 today = date_cls.today().isoformat() 

402 users_data = self._collect_usage_data(options, get_usage, Profile) 

403 

404 if options.get("as_json"): 

405 json_users = [{k: u[k] for k in ("username", "profile_name", "unlimited_ai", "usage")} for u in users_data] 

406 self.stdout.write(json.dumps({"ok": True, "date": today, "users": json_users}, indent=2)) 

407 return 

408 

409 for u in users_data: 

410 self._print_user_usage(u, limits, ALL_FEATURES) 

411 

412 def _collect_usage_data(self, options, get_usage, Profile): 

413 if options.get("username"): 

414 user = self._get_user(options["username"], options) 

415 profiles = [(user.profile, user)] 

416 else: 

417 profiles = [(p, p.user) for p in Profile.objects.select_related("user").filter(user__isnull=False)] 

418 return [ 

419 { 

420 "username": user.username, 

421 "profile_name": profile.name, 

422 "unlimited_ai": profile.unlimited_ai, 

423 "is_exempt": profile.unlimited_ai, 

424 "usage": get_usage(profile.pk), 

425 } 

426 for profile, user in profiles 

427 ] 

428 

429 def _print_user_usage(self, u, limits, all_features): 

430 tag_str = " [unlimited]" if u["unlimited_ai"] else "" 

431 self.stdout.write(f"{u['username']} ({u['profile_name']}){tag_str}") 

432 for feature in all_features: 

433 count = u["usage"][feature] 

434 suffix = "" if u["is_exempt"] else f"/{limits[feature]}" 

435 self.stdout.write(f" {feature}: {count}{suffix}") 

436 self.stdout.write("") 

437 

438 def _handle_create_session(self, options): 

439 import datetime 

440 

441 from django.contrib.auth import BACKEND_SESSION_KEY, HASH_SESSION_KEY, SESSION_KEY 

442 from django.contrib.sessions.backends.db import SessionStore 

443 from django.utils import timezone 

444 

445 if options.get("as_json") and not options.get("confirm"): 

446 self._error( 

447 f"--confirm flag required for non-interactive create-session. " 

448 f"Re-run with: cookie_admin create-session {options['username']} --json --confirm", 

449 options, 

450 ) 

451 

452 user = self._get_user(options["username"], options) 

453 if not user.is_active: 

454 self._error(f"User '{user.username}' is inactive.", options) 

455 

456 profile = getattr(user, "profile", None) 

457 if profile is None: 

458 self._error(f"User '{user.username}' has no profile.", options) 

459 

460 ttl = options.get("ttl", 3600) 

461 if ttl < 60 or ttl > 86400: 

462 self._error("TTL must be between 60 and 86400 seconds.", options) 

463 

464 # Create session with Django auth keys (same as django.contrib.auth.login()) 

465 session = SessionStore() 

466 session[SESSION_KEY] = str(user.pk) 

467 session[BACKEND_SESSION_KEY] = "django.contrib.auth.backends.ModelBackend" 

468 session[HASH_SESSION_KEY] = user.get_session_auth_hash() 

469 session["profile_id"] = profile.id 

470 session.set_expiry(ttl) 

471 session.create() 

472 

473 security_logger.info( 

474 "CLI session created: user_id=%s, username=%s, ttl=%ds", 

475 user.pk, 

476 user.username, 

477 ttl, 

478 ) 

479 

480 self._success( 

481 f"Session created for '{user.username}' (expires in {ttl}s).", 

482 options, 

483 { 

484 "session_key": session.session_key, 

485 "profile_id": profile.id, 

486 "user": self._user_dict(user), 

487 "expires_in_seconds": ttl, 

488 "expires_at": (timezone.now() + datetime.timedelta(seconds=ttl)).isoformat(), 

489 }, 

490 ) 

← Back to Dashboard