Coverage for apps / core / management / commands / cookie_admin.py: 89%

384 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-12 10:49 +0000

1"""Admin CLI tool for managing Cookie in passkey mode. 

2 

3All subcommands support --json for structured output, making this tool 

4suitable for automation via SSM, scripts, or AI assistants. 

5 

6Usage: 

7 manage.py cookie_admin status [--json] 

8 manage.py cookie_admin audit [--lines N] [--json] 

9 manage.py cookie_admin list-users [--active-only] [--admins-only] [--json] 

10 manage.py cookie_admin create-user <username> [--admin] [--json] 

11 manage.py cookie_admin delete-user <username> [--json] 

12 manage.py cookie_admin promote <username> [--json] 

13 manage.py cookie_admin demote <username> [--json] 

14 manage.py cookie_admin activate <username> [--json] 

15 manage.py cookie_admin deactivate <username> [--json] 

16 manage.py cookie_admin set-unlimited <username> [--json] 

17 manage.py cookie_admin remove-unlimited <username> [--json] 

18 manage.py cookie_admin usage [--username <name>] [--json] 

19 manage.py cookie_admin create-session <username> [--ttl N] [--json] 

20 manage.py cookie_admin reset [--json --confirm] 

21""" 

22 

23import json 

24import logging 

25import os 

26import shutil 

27 

28from django.conf import settings 

29from django.contrib.auth.models import User 

30from django.core.management.base import BaseCommand 

31from django.db.models import Count 

32 

33security_logger = logging.getLogger("security") 

34 

35 

36class Command(BaseCommand): 

37 help = "Manage Cookie app and user accounts (passkey mode only). All subcommands support --json." 

38 

39 def add_arguments(self, parser): 

40 parser.add_argument("--json", action="store_true", dest="as_json", help="Output as JSON (all subcommands)") 

41 sub = parser.add_subparsers(dest="subcommand") 

42 

43 # status 

44 st = sub.add_parser("status", help="App status overview (config, DB, users, AI)") 

45 st.add_argument("--json", action="store_true", dest="as_json") 

46 

47 # audit 

48 au = sub.add_parser("audit", help="Recent security events") 

49 au.add_argument("--lines", type=int, default=50, help="Max events to show (default 50)") 

50 au.add_argument("--json", action="store_true", dest="as_json") 

51 

52 # list-users 

53 ls = sub.add_parser("list-users", help="List all users") 

54 ls.add_argument("--active-only", action="store_true") 

55 ls.add_argument("--admins-only", action="store_true") 

56 ls.add_argument("--json", action="store_true", dest="as_json") 

57 

58 # create-user 

59 cu = sub.add_parser("create-user", help="Create a headless user (no passkey)") 

60 cu.add_argument("username") 

61 cu.add_argument("--admin", action="store_true", help="Grant admin privileges") 

62 cu.add_argument("--json", action="store_true", dest="as_json") 

63 

64 # delete-user 

65 du = sub.add_parser("delete-user", help="Delete a user and their profile") 

66 du.add_argument("username") 

67 du.add_argument("--json", action="store_true", dest="as_json") 

68 

69 # promote 

70 p = sub.add_parser("promote", help="Grant admin privileges") 

71 p.add_argument("username") 

72 p.add_argument("--json", action="store_true", dest="as_json") 

73 

74 # demote 

75 d = sub.add_parser("demote", help="Revoke admin privileges") 

76 d.add_argument("username") 

77 d.add_argument("--json", action="store_true", dest="as_json") 

78 

79 # activate 

80 a = sub.add_parser("activate", help="Reactivate a user account") 

81 a.add_argument("username") 

82 a.add_argument("--json", action="store_true", dest="as_json") 

83 

84 # deactivate 

85 da = sub.add_parser("deactivate", help="Deactivate a user account") 

86 da.add_argument("username") 

87 da.add_argument("--json", action="store_true", dest="as_json") 

88 

89 # set-unlimited 

90 su = sub.add_parser("set-unlimited", help="Grant unlimited AI access") 

91 su.add_argument("username") 

92 su.add_argument("--json", action="store_true", dest="as_json") 

93 

94 # remove-unlimited 

95 ru = sub.add_parser("remove-unlimited", help="Revoke unlimited AI access") 

96 ru.add_argument("username") 

97 ru.add_argument("--json", action="store_true", dest="as_json") 

98 

99 # usage 

100 us = sub.add_parser("usage", help="Show AI usage for today") 

101 us.add_argument("--username", required=False, help="Show usage for a specific user") 

102 us.add_argument("--json", action="store_true", dest="as_json") 

103 

104 # create-session 

105 cs = sub.add_parser("create-session", help="Create a Django session for a user (pentest/automation)") 

106 cs.add_argument("username") 

107 cs.add_argument("--ttl", type=int, default=3600, help="Session TTL in seconds (default 3600)") 

108 cs.add_argument("--json", action="store_true", dest="as_json") 

109 

110 # reset 

111 rs = sub.add_parser("reset", help="Factory reset: delete all data and re-seed defaults") 

112 rs.add_argument("--confirm", action="store_true", help="Skip interactive prompt (required with --json)") 

113 rs.add_argument("--json", action="store_true", dest="as_json") 

114 

115 def handle(self, *args, **options): 

116 if settings.AUTH_MODE != "passkey": 

117 self._error("cookie_admin is only available in passkey mode (AUTH_MODE=passkey).", options, code=2) 

118 

119 subcommand = options.get("subcommand") 

120 if not subcommand: 

121 self._error("No subcommand provided. Use --help for usage.", options, code=1) 

122 

123 handler = getattr(self, f"_handle_{subcommand.replace('-', '_')}", None) 

124 if handler: 

125 handler(options) 

126 else: 

127 self._error(f"Unknown subcommand '{subcommand}'", options, code=1) 

128 

129 def _error(self, message, options=None, code=1): 

130 if options and options.get("as_json"): 

131 self.stdout.write(json.dumps({"ok": False, "error": message})) 

132 else: 

133 self.stderr.write(f"Error: {message}") 

134 raise SystemExit(code) 

135 

136 def _success(self, message, options, extra=None): 

137 if options.get("as_json"): 

138 result = {"ok": True, "message": message} 

139 if extra: 

140 result.update(extra) 

141 self.stdout.write(json.dumps(result)) 

142 else: 

143 self.stdout.write(message) 

144 

145 def _get_user(self, username, options): 

146 try: 

147 return User.objects.get(username=username) 

148 except User.DoesNotExist: 

149 self._error(f"User '{username}' not found.", options) 

150 

151 def _user_dict(self, user): 

152 passkey_count = user.webauthn_credentials.count() 

153 unlimited_ai = getattr(getattr(user, "profile", None), "unlimited_ai", False) 

154 return { 

155 "username": user.username, 

156 "user_id": user.pk, 

157 "passkeys": passkey_count, 

158 "is_admin": user.is_staff, 

159 "is_active": user.is_active, 

160 "unlimited_ai": unlimited_ai, 

161 "date_joined": user.date_joined.strftime("%Y-%m-%d"), 

162 } 

163 

164 def _handle_status(self, options): 

165 status = self._collect_status() 

166 

167 if options.get("as_json"): 

168 self.stdout.write(json.dumps({"ok": True, **status}, indent=2)) 

169 return 

170 

171 self._print_status(status) 

172 

173 def _collect_status(self): 

174 from django.core.cache import cache 

175 from django.db import connection 

176 from django.utils import timezone 

177 

178 from apps.core.management.commands.cleanup_device_codes import CLEANUP_CACHE_KEY as DC_KEY 

179 from apps.core.management.commands.cleanup_sessions import CLEANUP_CACHE_KEY as SESS_KEY 

180 from apps.core.models import AppSettings, DeviceCode, WebAuthnCredential 

181 from apps.recipes.management.commands.cleanup_search_images import CLEANUP_CACHE_KEY as IMG_KEY 

182 

183 status = {"auth_mode": settings.AUTH_MODE} 

184 

185 # Database 

186 try: 

187 with connection.cursor() as cursor: 

188 cursor.execute("SELECT 1") 

189 status["database"] = "ok" 

190 except Exception as e: 

191 status["database"] = f"error: {e}" 

192 

193 # Migrations 

194 try: 

195 from io import StringIO 

196 

197 from django.core.management import call_command 

198 

199 out = StringIO() 

200 call_command("migrate", "--check", stdout=out, stderr=out) 

201 status["migrations"] = "up to date" 

202 except SystemExit: 

203 status["migrations"] = "pending" 

204 

205 # Users 

206 status["users"] = { 

207 "total": User.objects.count(), 

208 "active": User.objects.filter(is_active=True).count(), 

209 "admins": User.objects.filter(is_staff=True, is_active=True).count(), 

210 } 

211 

212 # Passkeys 

213 status["passkeys"] = WebAuthnCredential.objects.count() 

214 

215 # Device codes 

216 now = timezone.now() 

217 status["device_codes"] = { 

218 "pending": DeviceCode.objects.filter(status="pending", expires_at__gt=now).count(), 

219 "stale_expired": DeviceCode.objects.filter(expires_at__lte=now) 

220 .exclude(status__in=["authorized", "invalidated"]) 

221 .count(), 

222 } 

223 

224 # AI / OpenRouter 

225 app_settings = AppSettings.get() 

226 has_env_key = bool(os.environ.get("OPENROUTER_API_KEY", "")) 

227 has_db_key = bool(app_settings._openrouter_api_key) 

228 status["openrouter"] = { 

229 "configured": bool(app_settings.openrouter_api_key), 

230 "source": "env" if has_env_key else ("database" if has_db_key else "none"), 

231 "model": app_settings.default_ai_model, 

232 } 

233 

234 # WebAuthn config 

235 status["webauthn"] = { 

236 "rp_id": settings.WEBAUTHN_RP_ID or "(from request)", 

237 "rp_name": settings.WEBAUTHN_RP_NAME, 

238 } 

239 

240 # Maintenance — last cleanup runs 

241 status["maintenance"] = { 

242 "device_code_cleanup": cache.get(DC_KEY) or "never run", 

243 "session_cleanup": cache.get(SESS_KEY) or "never run", 

244 "search_image_cleanup": cache.get(IMG_KEY) or "never run", 

245 } 

246 

247 return status 

248 

249 def _print_status(self, status): 

250 users = status["users"] 

251 dc = status["device_codes"] 

252 src = status["openrouter"]["source"] 

253 

254 self.stdout.write(f"Auth mode: {status['auth_mode']}") 

255 self.stdout.write(f"Database: {status['database']}") 

256 self.stdout.write(f"Migrations: {status['migrations']}") 

257 self.stdout.write(f"Users: {users['active']} active ({users['admins']} admin) / {users['total']} total") 

258 self.stdout.write(f"Passkeys: {status['passkeys']}") 

259 self.stdout.write(f"Device codes: {dc['pending']} pending, {dc['stale_expired']} stale") 

260 self.stdout.write( 

261 f"OpenRouter: {'configured' if status['openrouter']['configured'] else 'not configured'} (source: {src})" 

262 ) 

263 self.stdout.write(f"WebAuthn RP: {status['webauthn']['rp_id']} ({status['webauthn']['rp_name']})") 

264 self.stdout.write("Maintenance:") 

265 for label, key in [ 

266 (" Device codes", "device_code_cleanup"), 

267 (" Sessions", "session_cleanup"), 

268 (" Search images", "search_image_cleanup"), 

269 ]: 

270 info = status["maintenance"][key] 

271 if isinstance(info, dict): 

272 self.stdout.write( 

273 f"{label}: last ran {info['time'][:19]}, deleted {info['deleted']}, {info['remaining']} remaining" 

274 ) 

275 else: 

276 self.stdout.write(f"{label}: {info}") 

277 

278 def _handle_audit(self, options): 

279 max_lines = options.get("lines", 50) 

280 

281 from django.utils import timezone 

282 

283 now = timezone.now() 

284 since = now - timezone.timedelta(hours=24) 

285 

286 events = ( 

287 self._collect_registration_events(since, max_lines) 

288 + self._collect_login_events(since, max_lines) 

289 + self._collect_device_code_events(since, max_lines) 

290 ) 

291 events.sort(key=lambda e: e["time"], reverse=True) 

292 events = events[:max_lines] 

293 

294 if options.get("as_json"): 

295 self.stdout.write(json.dumps({"ok": True, "since": since.isoformat(), "events": events}, indent=2)) 

296 return 

297 

298 self.stdout.write(f"Security events (last 24h, max {max_lines}):") 

299 self.stdout.write("-" * 70) 

300 if not events: 

301 self.stdout.write("No events found.") 

302 return 

303 for e in events: 

304 self.stdout.write(self._format_audit_event(e)) 

305 

306 def _collect_registration_events(self, since, max_lines): 

307 """Collect user registration events since the given time.""" 

308 recent_users = User.objects.filter(date_joined__gte=since).order_by("-date_joined")[:max_lines] 

309 return [ 

310 { 

311 "time": u.date_joined.isoformat(), 

312 "type": "registration", 

313 "username": u.username, 

314 "is_admin": u.is_staff, 

315 } 

316 for u in recent_users 

317 ] 

318 

319 def _collect_login_events(self, since, max_lines): 

320 """Collect passkey login events since the given time.""" 

321 from apps.core.models import WebAuthnCredential 

322 

323 recent_logins = ( 

324 WebAuthnCredential.objects.filter(last_used_at__gte=since) 

325 .select_related("user") 

326 .order_by("-last_used_at")[:max_lines] 

327 ) 

328 return [ 

329 { 

330 "time": c.last_used_at.isoformat(), 

331 "type": "passkey_login", 

332 "username": c.user.username, 

333 "credential_id": c.pk, 

334 } 

335 for c in recent_logins 

336 ] 

337 

338 def _collect_device_code_events(self, since, max_lines): 

339 """Collect device code events since the given time.""" 

340 from apps.core.models import DeviceCode 

341 

342 recent_codes = DeviceCode.objects.filter(created_at__gte=since).order_by("-created_at")[:max_lines] 

343 return [ 

344 { 

345 "time": dc.created_at.isoformat(), 

346 "type": f"device_code_{dc.status}", 

347 "code": dc.code, 

348 "authorizer": dc.authorizing_user.username if dc.authorizing_user else None, 

349 } 

350 for dc in recent_codes 

351 ] 

352 

353 @staticmethod 

354 def _format_audit_event(event): 

355 """Format a single audit event as a human-readable line.""" 

356 ts = event["time"][:19].replace("T", " ") 

357 etype = event["type"] 

358 detail = event.get("username") or event.get("code") or "" 

359 extra = "" 

360 if etype == "passkey_login": 

361 extra = f" (credential #{event['credential_id']})" 

362 elif etype.startswith("device_code_") and event.get("authorizer"): 

363 extra = f" (by {event['authorizer']})" 

364 return f" {ts} {etype:<25} {detail}{extra}" 

365 

366 def _handle_list_users(self, options): 

367 users = User.objects.all().order_by("date_joined") 

368 if options.get("active_only"): 

369 users = users.filter(is_active=True) 

370 if options.get("admins_only"): 

371 users = users.filter(is_staff=True) 

372 

373 users = users.annotate(passkey_count=Count("webauthn_credentials")) 

374 

375 if options.get("as_json"): 

376 data = [ 

377 { 

378 "username": u.username, 

379 "user_id": u.pk, 

380 "passkeys": u.passkey_count, 

381 "is_admin": u.is_staff, 

382 "is_active": u.is_active, 

383 "unlimited_ai": getattr(getattr(u, "profile", None), "unlimited_ai", False), 

384 "date_joined": u.date_joined.strftime("%Y-%m-%d"), 

385 } 

386 for u in users 

387 ] 

388 self.stdout.write(json.dumps({"ok": True, "users": data}, indent=2)) 

389 return 

390 

391 self.stdout.write(f"{'USERNAME':<15} {'ID':<6} {'PASSKEYS':<10} {'ADMIN':<7} {'ACTIVE':<8} {'JOINED'}") 

392 self.stdout.write("-" * 65) 

393 for u in users: 

394 self.stdout.write( 

395 f"{u.username:<15} {u.pk:<6} {u.passkey_count:<10} " 

396 f"{'yes' if u.is_staff else 'no':<7} " 

397 f"{'yes' if u.is_active else 'no':<8} " 

398 f"{u.date_joined.strftime('%Y-%m-%d')}" 

399 ) 

400 active = users.filter(is_active=True).count() 

401 admins = users.filter(is_staff=True).count() 

402 self.stdout.write(f"\nTotal: {users.count()} users ({active} active, {admins} admin)") 

403 

404 def _handle_create_user(self, options): 

405 from apps.profiles.models import Profile 

406 

407 username = options["username"] 

408 is_admin = options.get("admin", False) 

409 

410 if User.objects.filter(username=username).exists(): 

411 self._error(f"User '{username}' already exists.", options) 

412 

413 user = User.objects.create_user( 

414 username=username, 

415 password=None, 

416 email="", 

417 is_active=True, 

418 is_staff=is_admin, 

419 ) 

420 user.set_unusable_password() 

421 user.save(update_fields=["password"]) 

422 

423 profile_count = Profile.objects.count() 

424 Profile.objects.create( 

425 user=user, 

426 name=f"User {profile_count + 1}", 

427 avatar_color=Profile.next_avatar_color(), 

428 ) 

429 

430 role = "admin" if is_admin else "regular" 

431 self._success( 

432 f"Created {role} user '{username}'.", 

433 options, 

434 {"user": self._user_dict(user)}, 

435 ) 

436 

437 def _handle_delete_user(self, options): 

438 user = self._get_user(options["username"], options) 

439 username = user.username 

440 user_data = self._user_dict(user) 

441 user.delete() 

442 self._success( 

443 f"Deleted user '{username}' and associated data.", 

444 options, 

445 {"deleted_user": user_data}, 

446 ) 

447 

448 def _handle_promote(self, options): 

449 user = self._get_user(options["username"], options) 

450 if user.is_staff: 

451 self._success(f"User '{user.username}' is already an admin.", options, {"user": self._user_dict(user)}) 

452 return 

453 user.is_staff = True 

454 user.save(update_fields=["is_staff"]) 

455 self._success(f"'{user.username}' is now an admin.", options, {"user": self._user_dict(user)}) 

456 

457 def _handle_demote(self, options): 

458 user = self._get_user(options["username"], options) 

459 if not user.is_staff: 

460 self._success(f"User '{user.username}' is not an admin.", options, {"user": self._user_dict(user)}) 

461 return 

462 if User.objects.filter(is_staff=True).count() <= 1: 

463 self._error("Cannot demote the last remaining admin. Promote another user first.", options) 

464 user.is_staff = False 

465 user.save(update_fields=["is_staff"]) 

466 self._success(f"'{user.username}' is no longer an admin.", options, {"user": self._user_dict(user)}) 

467 

468 def _handle_activate(self, options): 

469 user = self._get_user(options["username"], options) 

470 user.is_active = True 

471 user.save(update_fields=["is_active"]) 

472 self._success(f"'{user.username}' has been reactivated.", options, {"user": self._user_dict(user)}) 

473 

474 def _handle_deactivate(self, options): 

475 user = self._get_user(options["username"], options) 

476 user.is_active = False 

477 user.save(update_fields=["is_active"]) 

478 from django.contrib.sessions.models import Session 

479 from django.utils import timezone 

480 

481 count = 0 

482 sessions = Session.objects.filter(expire_date__gte=timezone.now()) 

483 for session in sessions: 

484 data = session.get_decoded() 

485 if str(data.get("_auth_user_id")) == str(user.pk): 

486 session.delete() 

487 count += 1 

488 self._success( 

489 f"'{user.username}' has been deactivated and {count} session(s) invalidated.", 

490 options, 

491 {"user": self._user_dict(user), "sessions_invalidated": count}, 

492 ) 

493 

494 def _handle_set_unlimited(self, options): 

495 user = self._get_user(options["username"], options) 

496 profile = user.profile 

497 profile.unlimited_ai = True 

498 profile.save(update_fields=["unlimited_ai"]) 

499 self._success( 

500 f"Updated {user.username}: unlimited AI access granted", 

501 options, 

502 {"username": user.username, "user_id": user.pk, "unlimited_ai": True, "action": "set-unlimited"}, 

503 ) 

504 

505 def _handle_remove_unlimited(self, options): 

506 user = self._get_user(options["username"], options) 

507 profile = user.profile 

508 profile.unlimited_ai = False 

509 profile.save(update_fields=["unlimited_ai"]) 

510 self._success( 

511 f"Updated {user.username}: unlimited AI access revoked", 

512 options, 

513 {"username": user.username, "user_id": user.pk, "unlimited_ai": False, "action": "remove-unlimited"}, 

514 ) 

515 

516 def _handle_usage(self, options): 

517 from datetime import date as date_cls 

518 

519 from apps.ai.services.quota import ALL_FEATURES, FEATURE_LIMIT_FIELDS, get_usage 

520 from apps.core.models import AppSettings 

521 from apps.profiles.models import Profile 

522 

523 app_settings = AppSettings.get() 

524 limits = {f: getattr(app_settings, FEATURE_LIMIT_FIELDS[f]) for f in ALL_FEATURES} 

525 today = date_cls.today().isoformat() 

526 users_data = self._collect_usage_data(options, get_usage, Profile) 

527 

528 if options.get("as_json"): 

529 json_users = [ 

530 {k: u[k] for k in ("username", "profile_name", "is_admin", "unlimited_ai", "usage")} for u in users_data 

531 ] 

532 self.stdout.write(json.dumps({"ok": True, "date": today, "users": json_users}, indent=2)) 

533 return 

534 

535 for u in users_data: 

536 self._print_user_usage(u, limits, ALL_FEATURES) 

537 

538 def _collect_usage_data(self, options, get_usage, Profile): 

539 if options.get("username"): 

540 user = self._get_user(options["username"], options) 

541 profiles = [(user.profile, user)] 

542 else: 

543 profiles = [(p, p.user) for p in Profile.objects.select_related("user").filter(user__isnull=False)] 

544 return [ 

545 { 

546 "username": user.username, 

547 "profile_name": profile.name, 

548 "is_admin": user.is_staff, 

549 "unlimited_ai": profile.unlimited_ai, 

550 "is_exempt": user.is_staff or profile.unlimited_ai, 

551 "usage": get_usage(profile.pk), 

552 } 

553 for profile, user in profiles 

554 ] 

555 

556 def _print_user_usage(self, u, limits, all_features): 

557 tags = [] 

558 if u["is_admin"]: 

559 tags.append("admin") 

560 if u["unlimited_ai"]: 

561 tags.append("unlimited") 

562 tag_str = f" [{'/'.join(tags)}]" if tags else "" 

563 self.stdout.write(f"{u['username']} ({u['profile_name']}){tag_str}") 

564 for feature in all_features: 

565 count = u["usage"][feature] 

566 suffix = "" if u["is_exempt"] else f"/{limits[feature]}" 

567 self.stdout.write(f" {feature}: {count}{suffix}") 

568 self.stdout.write("") 

569 

570 def _handle_create_session(self, options): 

571 import datetime 

572 import logging 

573 

574 from django.contrib.auth import BACKEND_SESSION_KEY, HASH_SESSION_KEY, SESSION_KEY 

575 from django.contrib.sessions.backends.db import SessionStore 

576 from django.utils import timezone 

577 

578 security_logger = logging.getLogger("security") 

579 

580 user = self._get_user(options["username"], options) 

581 if not user.is_active: 

582 self._error(f"User '{user.username}' is inactive.", options) 

583 

584 profile = getattr(user, "profile", None) 

585 if profile is None: 

586 self._error(f"User '{user.username}' has no profile.", options) 

587 

588 ttl = options.get("ttl", 3600) 

589 if ttl < 60 or ttl > 86400: 

590 self._error("TTL must be between 60 and 86400 seconds.", options) 

591 

592 # Create session with Django auth keys (same as django.contrib.auth.login()) 

593 session = SessionStore() 

594 session[SESSION_KEY] = str(user.pk) 

595 session[BACKEND_SESSION_KEY] = "django.contrib.auth.backends.ModelBackend" 

596 session[HASH_SESSION_KEY] = user.get_session_auth_hash() 

597 session["profile_id"] = profile.id 

598 session.set_expiry(ttl) 

599 session.create() 

600 

601 security_logger.info( 

602 "CLI session created: user_id=%s, username=%s, ttl=%ds", 

603 user.pk, 

604 user.username, 

605 ttl, 

606 ) 

607 

608 self._success( 

609 f"Session created for '{user.username}' (expires in {ttl}s).", 

610 options, 

611 { 

612 "session_key": session.session_key, 

613 "profile_id": profile.id, 

614 "user": self._user_dict(user), 

615 "expires_in_seconds": ttl, 

616 "expires_at": (timezone.now() + datetime.timedelta(seconds=ttl)).isoformat(), 

617 }, 

618 ) 

619 

620 def _handle_reset(self, options): 

621 from django.contrib.sessions.models import Session 

622 from django.core.cache import cache 

623 from django.core.management import call_command 

624 

625 from apps.ai.models import AIDiscoverySuggestion 

626 from apps.profiles.models import Profile 

627 from apps.recipes.models import ( 

628 CachedSearchImage, 

629 Recipe, 

630 RecipeCollection, 

631 RecipeCollectionItem, 

632 RecipeFavorite, 

633 RecipeViewHistory, 

634 SearchSource, 

635 ServingAdjustment, 

636 ) 

637 

638 if options.get("as_json"): 

639 if not options.get("confirm"): 

640 self._error("--confirm flag required for non-interactive reset. Usage: cookie_admin reset --json --confirm", options) 

641 else: 

642 self.stderr.write("WARNING: This will permanently delete ALL data.") 

643 confirm = input("Type RESET to confirm: ") 

644 if confirm != "RESET": 

645 self._error("Aborted.", options) 

646 

647 security_logger.warning("DATABASE RESET initiated via CLI (cookie_admin reset)") 

648 

649 actions = [] 

650 

651 # Delete in FK-safe order 

652 AIDiscoverySuggestion.objects.all().delete() 

653 ServingAdjustment.objects.all().delete() 

654 RecipeViewHistory.objects.all().delete() 

655 RecipeCollectionItem.objects.all().delete() 

656 RecipeCollection.objects.all().delete() 

657 RecipeFavorite.objects.all().delete() 

658 CachedSearchImage.objects.all().delete() 

659 Recipe.objects.all().delete() 

660 Profile.objects.all().delete() 

661 actions.extend([ 

662 "Deleted all profiles", 

663 "Deleted all recipes", 

664 "Cleared favorites, collections, view history", 

665 "Cleared AI suggestions and serving adjustments", 

666 ]) 

667 

668 if settings.AUTH_MODE == "passkey": 

669 from apps.core.models import DeviceCode 

670 

671 DeviceCode.objects.all().delete() 

672 User.objects.all().delete() 

673 actions.append("Deleted all user accounts and device codes") 

674 

675 SearchSource.objects.all().update( 

676 consecutive_failures=0, 

677 needs_attention=False, 

678 last_validated_at=None, 

679 ) 

680 actions.append("Reset search source counters") 

681 

682 # Clear media 

683 for subdir in ("recipe_images", "search_images"): 

684 path = os.path.join(settings.MEDIA_ROOT, subdir) 

685 if os.path.exists(path): 

686 shutil.rmtree(path) 

687 os.makedirs(path) 

688 actions.append("Cleared recipe and search images") 

689 

690 cache.clear() 

691 Session.objects.all().delete() 

692 actions.extend(["Cleared application cache", "Cleared all sessions"]) 

693 

694 call_command("migrate", verbosity=0) 

695 actions.append("Re-ran migrations") 

696 

697 for cmd in ("seed_search_sources", "seed_ai_prompts"): 

698 try: 

699 call_command(cmd, verbosity=0) 

700 actions.append(f"Seeded {cmd.replace('seed_', '')}") 

701 except Exception: 

702 pass 

703 

704 security_logger.warning("DATABASE RESET completed successfully via CLI") 

705 

706 self._success( 

707 "Database reset complete.", 

708 options, 

709 {"actions_performed": actions}, 

710 ) 

← Back to Dashboard