Coverage for apps / core / management / commands / cookie_admin.py: 89%
384 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-12 10:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-12 10:49 +0000
1"""Admin CLI tool for managing Cookie in passkey mode.
3All subcommands support --json for structured output, making this tool
4suitable for automation via SSM, scripts, or AI assistants.
6Usage:
7 manage.py cookie_admin status [--json]
8 manage.py cookie_admin audit [--lines N] [--json]
9 manage.py cookie_admin list-users [--active-only] [--admins-only] [--json]
10 manage.py cookie_admin create-user <username> [--admin] [--json]
11 manage.py cookie_admin delete-user <username> [--json]
12 manage.py cookie_admin promote <username> [--json]
13 manage.py cookie_admin demote <username> [--json]
14 manage.py cookie_admin activate <username> [--json]
15 manage.py cookie_admin deactivate <username> [--json]
16 manage.py cookie_admin set-unlimited <username> [--json]
17 manage.py cookie_admin remove-unlimited <username> [--json]
18 manage.py cookie_admin usage [--username <name>] [--json]
19 manage.py cookie_admin create-session <username> [--ttl N] [--json]
20 manage.py cookie_admin reset [--json --confirm]
21"""
23import json
24import logging
25import os
26import shutil
28from django.conf import settings
29from django.contrib.auth.models import User
30from django.core.management.base import BaseCommand
31from django.db.models import Count
33security_logger = logging.getLogger("security")
36class Command(BaseCommand):
37 help = "Manage Cookie app and user accounts (passkey mode only). All subcommands support --json."
39 def add_arguments(self, parser):
40 parser.add_argument("--json", action="store_true", dest="as_json", help="Output as JSON (all subcommands)")
41 sub = parser.add_subparsers(dest="subcommand")
43 # status
44 st = sub.add_parser("status", help="App status overview (config, DB, users, AI)")
45 st.add_argument("--json", action="store_true", dest="as_json")
47 # audit
48 au = sub.add_parser("audit", help="Recent security events")
49 au.add_argument("--lines", type=int, default=50, help="Max events to show (default 50)")
50 au.add_argument("--json", action="store_true", dest="as_json")
52 # list-users
53 ls = sub.add_parser("list-users", help="List all users")
54 ls.add_argument("--active-only", action="store_true")
55 ls.add_argument("--admins-only", action="store_true")
56 ls.add_argument("--json", action="store_true", dest="as_json")
58 # create-user
59 cu = sub.add_parser("create-user", help="Create a headless user (no passkey)")
60 cu.add_argument("username")
61 cu.add_argument("--admin", action="store_true", help="Grant admin privileges")
62 cu.add_argument("--json", action="store_true", dest="as_json")
64 # delete-user
65 du = sub.add_parser("delete-user", help="Delete a user and their profile")
66 du.add_argument("username")
67 du.add_argument("--json", action="store_true", dest="as_json")
69 # promote
70 p = sub.add_parser("promote", help="Grant admin privileges")
71 p.add_argument("username")
72 p.add_argument("--json", action="store_true", dest="as_json")
74 # demote
75 d = sub.add_parser("demote", help="Revoke admin privileges")
76 d.add_argument("username")
77 d.add_argument("--json", action="store_true", dest="as_json")
79 # activate
80 a = sub.add_parser("activate", help="Reactivate a user account")
81 a.add_argument("username")
82 a.add_argument("--json", action="store_true", dest="as_json")
84 # deactivate
85 da = sub.add_parser("deactivate", help="Deactivate a user account")
86 da.add_argument("username")
87 da.add_argument("--json", action="store_true", dest="as_json")
89 # set-unlimited
90 su = sub.add_parser("set-unlimited", help="Grant unlimited AI access")
91 su.add_argument("username")
92 su.add_argument("--json", action="store_true", dest="as_json")
94 # remove-unlimited
95 ru = sub.add_parser("remove-unlimited", help="Revoke unlimited AI access")
96 ru.add_argument("username")
97 ru.add_argument("--json", action="store_true", dest="as_json")
99 # usage
100 us = sub.add_parser("usage", help="Show AI usage for today")
101 us.add_argument("--username", required=False, help="Show usage for a specific user")
102 us.add_argument("--json", action="store_true", dest="as_json")
104 # create-session
105 cs = sub.add_parser("create-session", help="Create a Django session for a user (pentest/automation)")
106 cs.add_argument("username")
107 cs.add_argument("--ttl", type=int, default=3600, help="Session TTL in seconds (default 3600)")
108 cs.add_argument("--json", action="store_true", dest="as_json")
110 # reset
111 rs = sub.add_parser("reset", help="Factory reset: delete all data and re-seed defaults")
112 rs.add_argument("--confirm", action="store_true", help="Skip interactive prompt (required with --json)")
113 rs.add_argument("--json", action="store_true", dest="as_json")
115 def handle(self, *args, **options):
116 if settings.AUTH_MODE != "passkey":
117 self._error("cookie_admin is only available in passkey mode (AUTH_MODE=passkey).", options, code=2)
119 subcommand = options.get("subcommand")
120 if not subcommand:
121 self._error("No subcommand provided. Use --help for usage.", options, code=1)
123 handler = getattr(self, f"_handle_{subcommand.replace('-', '_')}", None)
124 if handler:
125 handler(options)
126 else:
127 self._error(f"Unknown subcommand '{subcommand}'", options, code=1)
129 def _error(self, message, options=None, code=1):
130 if options and options.get("as_json"):
131 self.stdout.write(json.dumps({"ok": False, "error": message}))
132 else:
133 self.stderr.write(f"Error: {message}")
134 raise SystemExit(code)
136 def _success(self, message, options, extra=None):
137 if options.get("as_json"):
138 result = {"ok": True, "message": message}
139 if extra:
140 result.update(extra)
141 self.stdout.write(json.dumps(result))
142 else:
143 self.stdout.write(message)
145 def _get_user(self, username, options):
146 try:
147 return User.objects.get(username=username)
148 except User.DoesNotExist:
149 self._error(f"User '{username}' not found.", options)
151 def _user_dict(self, user):
152 passkey_count = user.webauthn_credentials.count()
153 unlimited_ai = getattr(getattr(user, "profile", None), "unlimited_ai", False)
154 return {
155 "username": user.username,
156 "user_id": user.pk,
157 "passkeys": passkey_count,
158 "is_admin": user.is_staff,
159 "is_active": user.is_active,
160 "unlimited_ai": unlimited_ai,
161 "date_joined": user.date_joined.strftime("%Y-%m-%d"),
162 }
164 def _handle_status(self, options):
165 status = self._collect_status()
167 if options.get("as_json"):
168 self.stdout.write(json.dumps({"ok": True, **status}, indent=2))
169 return
171 self._print_status(status)
173 def _collect_status(self):
174 from django.core.cache import cache
175 from django.db import connection
176 from django.utils import timezone
178 from apps.core.management.commands.cleanup_device_codes import CLEANUP_CACHE_KEY as DC_KEY
179 from apps.core.management.commands.cleanup_sessions import CLEANUP_CACHE_KEY as SESS_KEY
180 from apps.core.models import AppSettings, DeviceCode, WebAuthnCredential
181 from apps.recipes.management.commands.cleanup_search_images import CLEANUP_CACHE_KEY as IMG_KEY
183 status = {"auth_mode": settings.AUTH_MODE}
185 # Database
186 try:
187 with connection.cursor() as cursor:
188 cursor.execute("SELECT 1")
189 status["database"] = "ok"
190 except Exception as e:
191 status["database"] = f"error: {e}"
193 # Migrations
194 try:
195 from io import StringIO
197 from django.core.management import call_command
199 out = StringIO()
200 call_command("migrate", "--check", stdout=out, stderr=out)
201 status["migrations"] = "up to date"
202 except SystemExit:
203 status["migrations"] = "pending"
205 # Users
206 status["users"] = {
207 "total": User.objects.count(),
208 "active": User.objects.filter(is_active=True).count(),
209 "admins": User.objects.filter(is_staff=True, is_active=True).count(),
210 }
212 # Passkeys
213 status["passkeys"] = WebAuthnCredential.objects.count()
215 # Device codes
216 now = timezone.now()
217 status["device_codes"] = {
218 "pending": DeviceCode.objects.filter(status="pending", expires_at__gt=now).count(),
219 "stale_expired": DeviceCode.objects.filter(expires_at__lte=now)
220 .exclude(status__in=["authorized", "invalidated"])
221 .count(),
222 }
224 # AI / OpenRouter
225 app_settings = AppSettings.get()
226 has_env_key = bool(os.environ.get("OPENROUTER_API_KEY", ""))
227 has_db_key = bool(app_settings._openrouter_api_key)
228 status["openrouter"] = {
229 "configured": bool(app_settings.openrouter_api_key),
230 "source": "env" if has_env_key else ("database" if has_db_key else "none"),
231 "model": app_settings.default_ai_model,
232 }
234 # WebAuthn config
235 status["webauthn"] = {
236 "rp_id": settings.WEBAUTHN_RP_ID or "(from request)",
237 "rp_name": settings.WEBAUTHN_RP_NAME,
238 }
240 # Maintenance — last cleanup runs
241 status["maintenance"] = {
242 "device_code_cleanup": cache.get(DC_KEY) or "never run",
243 "session_cleanup": cache.get(SESS_KEY) or "never run",
244 "search_image_cleanup": cache.get(IMG_KEY) or "never run",
245 }
247 return status
249 def _print_status(self, status):
250 users = status["users"]
251 dc = status["device_codes"]
252 src = status["openrouter"]["source"]
254 self.stdout.write(f"Auth mode: {status['auth_mode']}")
255 self.stdout.write(f"Database: {status['database']}")
256 self.stdout.write(f"Migrations: {status['migrations']}")
257 self.stdout.write(f"Users: {users['active']} active ({users['admins']} admin) / {users['total']} total")
258 self.stdout.write(f"Passkeys: {status['passkeys']}")
259 self.stdout.write(f"Device codes: {dc['pending']} pending, {dc['stale_expired']} stale")
260 self.stdout.write(
261 f"OpenRouter: {'configured' if status['openrouter']['configured'] else 'not configured'} (source: {src})"
262 )
263 self.stdout.write(f"WebAuthn RP: {status['webauthn']['rp_id']} ({status['webauthn']['rp_name']})")
264 self.stdout.write("Maintenance:")
265 for label, key in [
266 (" Device codes", "device_code_cleanup"),
267 (" Sessions", "session_cleanup"),
268 (" Search images", "search_image_cleanup"),
269 ]:
270 info = status["maintenance"][key]
271 if isinstance(info, dict):
272 self.stdout.write(
273 f"{label}: last ran {info['time'][:19]}, deleted {info['deleted']}, {info['remaining']} remaining"
274 )
275 else:
276 self.stdout.write(f"{label}: {info}")
278 def _handle_audit(self, options):
279 max_lines = options.get("lines", 50)
281 from django.utils import timezone
283 now = timezone.now()
284 since = now - timezone.timedelta(hours=24)
286 events = (
287 self._collect_registration_events(since, max_lines)
288 + self._collect_login_events(since, max_lines)
289 + self._collect_device_code_events(since, max_lines)
290 )
291 events.sort(key=lambda e: e["time"], reverse=True)
292 events = events[:max_lines]
294 if options.get("as_json"):
295 self.stdout.write(json.dumps({"ok": True, "since": since.isoformat(), "events": events}, indent=2))
296 return
298 self.stdout.write(f"Security events (last 24h, max {max_lines}):")
299 self.stdout.write("-" * 70)
300 if not events:
301 self.stdout.write("No events found.")
302 return
303 for e in events:
304 self.stdout.write(self._format_audit_event(e))
306 def _collect_registration_events(self, since, max_lines):
307 """Collect user registration events since the given time."""
308 recent_users = User.objects.filter(date_joined__gte=since).order_by("-date_joined")[:max_lines]
309 return [
310 {
311 "time": u.date_joined.isoformat(),
312 "type": "registration",
313 "username": u.username,
314 "is_admin": u.is_staff,
315 }
316 for u in recent_users
317 ]
319 def _collect_login_events(self, since, max_lines):
320 """Collect passkey login events since the given time."""
321 from apps.core.models import WebAuthnCredential
323 recent_logins = (
324 WebAuthnCredential.objects.filter(last_used_at__gte=since)
325 .select_related("user")
326 .order_by("-last_used_at")[:max_lines]
327 )
328 return [
329 {
330 "time": c.last_used_at.isoformat(),
331 "type": "passkey_login",
332 "username": c.user.username,
333 "credential_id": c.pk,
334 }
335 for c in recent_logins
336 ]
338 def _collect_device_code_events(self, since, max_lines):
339 """Collect device code events since the given time."""
340 from apps.core.models import DeviceCode
342 recent_codes = DeviceCode.objects.filter(created_at__gte=since).order_by("-created_at")[:max_lines]
343 return [
344 {
345 "time": dc.created_at.isoformat(),
346 "type": f"device_code_{dc.status}",
347 "code": dc.code,
348 "authorizer": dc.authorizing_user.username if dc.authorizing_user else None,
349 }
350 for dc in recent_codes
351 ]
353 @staticmethod
354 def _format_audit_event(event):
355 """Format a single audit event as a human-readable line."""
356 ts = event["time"][:19].replace("T", " ")
357 etype = event["type"]
358 detail = event.get("username") or event.get("code") or ""
359 extra = ""
360 if etype == "passkey_login":
361 extra = f" (credential #{event['credential_id']})"
362 elif etype.startswith("device_code_") and event.get("authorizer"):
363 extra = f" (by {event['authorizer']})"
364 return f" {ts} {etype:<25} {detail}{extra}"
366 def _handle_list_users(self, options):
367 users = User.objects.all().order_by("date_joined")
368 if options.get("active_only"):
369 users = users.filter(is_active=True)
370 if options.get("admins_only"):
371 users = users.filter(is_staff=True)
373 users = users.annotate(passkey_count=Count("webauthn_credentials"))
375 if options.get("as_json"):
376 data = [
377 {
378 "username": u.username,
379 "user_id": u.pk,
380 "passkeys": u.passkey_count,
381 "is_admin": u.is_staff,
382 "is_active": u.is_active,
383 "unlimited_ai": getattr(getattr(u, "profile", None), "unlimited_ai", False),
384 "date_joined": u.date_joined.strftime("%Y-%m-%d"),
385 }
386 for u in users
387 ]
388 self.stdout.write(json.dumps({"ok": True, "users": data}, indent=2))
389 return
391 self.stdout.write(f"{'USERNAME':<15} {'ID':<6} {'PASSKEYS':<10} {'ADMIN':<7} {'ACTIVE':<8} {'JOINED'}")
392 self.stdout.write("-" * 65)
393 for u in users:
394 self.stdout.write(
395 f"{u.username:<15} {u.pk:<6} {u.passkey_count:<10} "
396 f"{'yes' if u.is_staff else 'no':<7} "
397 f"{'yes' if u.is_active else 'no':<8} "
398 f"{u.date_joined.strftime('%Y-%m-%d')}"
399 )
400 active = users.filter(is_active=True).count()
401 admins = users.filter(is_staff=True).count()
402 self.stdout.write(f"\nTotal: {users.count()} users ({active} active, {admins} admin)")
404 def _handle_create_user(self, options):
405 from apps.profiles.models import Profile
407 username = options["username"]
408 is_admin = options.get("admin", False)
410 if User.objects.filter(username=username).exists():
411 self._error(f"User '{username}' already exists.", options)
413 user = User.objects.create_user(
414 username=username,
415 password=None,
416 email="",
417 is_active=True,
418 is_staff=is_admin,
419 )
420 user.set_unusable_password()
421 user.save(update_fields=["password"])
423 profile_count = Profile.objects.count()
424 Profile.objects.create(
425 user=user,
426 name=f"User {profile_count + 1}",
427 avatar_color=Profile.next_avatar_color(),
428 )
430 role = "admin" if is_admin else "regular"
431 self._success(
432 f"Created {role} user '{username}'.",
433 options,
434 {"user": self._user_dict(user)},
435 )
437 def _handle_delete_user(self, options):
438 user = self._get_user(options["username"], options)
439 username = user.username
440 user_data = self._user_dict(user)
441 user.delete()
442 self._success(
443 f"Deleted user '{username}' and associated data.",
444 options,
445 {"deleted_user": user_data},
446 )
448 def _handle_promote(self, options):
449 user = self._get_user(options["username"], options)
450 if user.is_staff:
451 self._success(f"User '{user.username}' is already an admin.", options, {"user": self._user_dict(user)})
452 return
453 user.is_staff = True
454 user.save(update_fields=["is_staff"])
455 self._success(f"'{user.username}' is now an admin.", options, {"user": self._user_dict(user)})
457 def _handle_demote(self, options):
458 user = self._get_user(options["username"], options)
459 if not user.is_staff:
460 self._success(f"User '{user.username}' is not an admin.", options, {"user": self._user_dict(user)})
461 return
462 if User.objects.filter(is_staff=True).count() <= 1:
463 self._error("Cannot demote the last remaining admin. Promote another user first.", options)
464 user.is_staff = False
465 user.save(update_fields=["is_staff"])
466 self._success(f"'{user.username}' is no longer an admin.", options, {"user": self._user_dict(user)})
468 def _handle_activate(self, options):
469 user = self._get_user(options["username"], options)
470 user.is_active = True
471 user.save(update_fields=["is_active"])
472 self._success(f"'{user.username}' has been reactivated.", options, {"user": self._user_dict(user)})
474 def _handle_deactivate(self, options):
475 user = self._get_user(options["username"], options)
476 user.is_active = False
477 user.save(update_fields=["is_active"])
478 from django.contrib.sessions.models import Session
479 from django.utils import timezone
481 count = 0
482 sessions = Session.objects.filter(expire_date__gte=timezone.now())
483 for session in sessions:
484 data = session.get_decoded()
485 if str(data.get("_auth_user_id")) == str(user.pk):
486 session.delete()
487 count += 1
488 self._success(
489 f"'{user.username}' has been deactivated and {count} session(s) invalidated.",
490 options,
491 {"user": self._user_dict(user), "sessions_invalidated": count},
492 )
494 def _handle_set_unlimited(self, options):
495 user = self._get_user(options["username"], options)
496 profile = user.profile
497 profile.unlimited_ai = True
498 profile.save(update_fields=["unlimited_ai"])
499 self._success(
500 f"Updated {user.username}: unlimited AI access granted",
501 options,
502 {"username": user.username, "user_id": user.pk, "unlimited_ai": True, "action": "set-unlimited"},
503 )
505 def _handle_remove_unlimited(self, options):
506 user = self._get_user(options["username"], options)
507 profile = user.profile
508 profile.unlimited_ai = False
509 profile.save(update_fields=["unlimited_ai"])
510 self._success(
511 f"Updated {user.username}: unlimited AI access revoked",
512 options,
513 {"username": user.username, "user_id": user.pk, "unlimited_ai": False, "action": "remove-unlimited"},
514 )
516 def _handle_usage(self, options):
517 from datetime import date as date_cls
519 from apps.ai.services.quota import ALL_FEATURES, FEATURE_LIMIT_FIELDS, get_usage
520 from apps.core.models import AppSettings
521 from apps.profiles.models import Profile
523 app_settings = AppSettings.get()
524 limits = {f: getattr(app_settings, FEATURE_LIMIT_FIELDS[f]) for f in ALL_FEATURES}
525 today = date_cls.today().isoformat()
526 users_data = self._collect_usage_data(options, get_usage, Profile)
528 if options.get("as_json"):
529 json_users = [
530 {k: u[k] for k in ("username", "profile_name", "is_admin", "unlimited_ai", "usage")} for u in users_data
531 ]
532 self.stdout.write(json.dumps({"ok": True, "date": today, "users": json_users}, indent=2))
533 return
535 for u in users_data:
536 self._print_user_usage(u, limits, ALL_FEATURES)
538 def _collect_usage_data(self, options, get_usage, Profile):
539 if options.get("username"):
540 user = self._get_user(options["username"], options)
541 profiles = [(user.profile, user)]
542 else:
543 profiles = [(p, p.user) for p in Profile.objects.select_related("user").filter(user__isnull=False)]
544 return [
545 {
546 "username": user.username,
547 "profile_name": profile.name,
548 "is_admin": user.is_staff,
549 "unlimited_ai": profile.unlimited_ai,
550 "is_exempt": user.is_staff or profile.unlimited_ai,
551 "usage": get_usage(profile.pk),
552 }
553 for profile, user in profiles
554 ]
556 def _print_user_usage(self, u, limits, all_features):
557 tags = []
558 if u["is_admin"]:
559 tags.append("admin")
560 if u["unlimited_ai"]:
561 tags.append("unlimited")
562 tag_str = f" [{'/'.join(tags)}]" if tags else ""
563 self.stdout.write(f"{u['username']} ({u['profile_name']}){tag_str}")
564 for feature in all_features:
565 count = u["usage"][feature]
566 suffix = "" if u["is_exempt"] else f"/{limits[feature]}"
567 self.stdout.write(f" {feature}: {count}{suffix}")
568 self.stdout.write("")
570 def _handle_create_session(self, options):
571 import datetime
572 import logging
574 from django.contrib.auth import BACKEND_SESSION_KEY, HASH_SESSION_KEY, SESSION_KEY
575 from django.contrib.sessions.backends.db import SessionStore
576 from django.utils import timezone
578 security_logger = logging.getLogger("security")
580 user = self._get_user(options["username"], options)
581 if not user.is_active:
582 self._error(f"User '{user.username}' is inactive.", options)
584 profile = getattr(user, "profile", None)
585 if profile is None:
586 self._error(f"User '{user.username}' has no profile.", options)
588 ttl = options.get("ttl", 3600)
589 if ttl < 60 or ttl > 86400:
590 self._error("TTL must be between 60 and 86400 seconds.", options)
592 # Create session with Django auth keys (same as django.contrib.auth.login())
593 session = SessionStore()
594 session[SESSION_KEY] = str(user.pk)
595 session[BACKEND_SESSION_KEY] = "django.contrib.auth.backends.ModelBackend"
596 session[HASH_SESSION_KEY] = user.get_session_auth_hash()
597 session["profile_id"] = profile.id
598 session.set_expiry(ttl)
599 session.create()
601 security_logger.info(
602 "CLI session created: user_id=%s, username=%s, ttl=%ds",
603 user.pk,
604 user.username,
605 ttl,
606 )
608 self._success(
609 f"Session created for '{user.username}' (expires in {ttl}s).",
610 options,
611 {
612 "session_key": session.session_key,
613 "profile_id": profile.id,
614 "user": self._user_dict(user),
615 "expires_in_seconds": ttl,
616 "expires_at": (timezone.now() + datetime.timedelta(seconds=ttl)).isoformat(),
617 },
618 )
620 def _handle_reset(self, options):
621 from django.contrib.sessions.models import Session
622 from django.core.cache import cache
623 from django.core.management import call_command
625 from apps.ai.models import AIDiscoverySuggestion
626 from apps.profiles.models import Profile
627 from apps.recipes.models import (
628 CachedSearchImage,
629 Recipe,
630 RecipeCollection,
631 RecipeCollectionItem,
632 RecipeFavorite,
633 RecipeViewHistory,
634 SearchSource,
635 ServingAdjustment,
636 )
638 if options.get("as_json"):
639 if not options.get("confirm"):
640 self._error("--confirm flag required for non-interactive reset. Usage: cookie_admin reset --json --confirm", options)
641 else:
642 self.stderr.write("WARNING: This will permanently delete ALL data.")
643 confirm = input("Type RESET to confirm: ")
644 if confirm != "RESET":
645 self._error("Aborted.", options)
647 security_logger.warning("DATABASE RESET initiated via CLI (cookie_admin reset)")
649 actions = []
651 # Delete in FK-safe order
652 AIDiscoverySuggestion.objects.all().delete()
653 ServingAdjustment.objects.all().delete()
654 RecipeViewHistory.objects.all().delete()
655 RecipeCollectionItem.objects.all().delete()
656 RecipeCollection.objects.all().delete()
657 RecipeFavorite.objects.all().delete()
658 CachedSearchImage.objects.all().delete()
659 Recipe.objects.all().delete()
660 Profile.objects.all().delete()
661 actions.extend([
662 "Deleted all profiles",
663 "Deleted all recipes",
664 "Cleared favorites, collections, view history",
665 "Cleared AI suggestions and serving adjustments",
666 ])
668 if settings.AUTH_MODE == "passkey":
669 from apps.core.models import DeviceCode
671 DeviceCode.objects.all().delete()
672 User.objects.all().delete()
673 actions.append("Deleted all user accounts and device codes")
675 SearchSource.objects.all().update(
676 consecutive_failures=0,
677 needs_attention=False,
678 last_validated_at=None,
679 )
680 actions.append("Reset search source counters")
682 # Clear media
683 for subdir in ("recipe_images", "search_images"):
684 path = os.path.join(settings.MEDIA_ROOT, subdir)
685 if os.path.exists(path):
686 shutil.rmtree(path)
687 os.makedirs(path)
688 actions.append("Cleared recipe and search images")
690 cache.clear()
691 Session.objects.all().delete()
692 actions.extend(["Cleared application cache", "Cleared all sessions"])
694 call_command("migrate", verbosity=0)
695 actions.append("Re-ran migrations")
697 for cmd in ("seed_search_sources", "seed_ai_prompts"):
698 try:
699 call_command(cmd, verbosity=0)
700 actions.append(f"Seeded {cmd.replace('seed_', '')}")
701 except Exception:
702 pass
704 security_logger.warning("DATABASE RESET completed successfully via CLI")
706 self._success(
707 "Database reset complete.",
708 options,
709 {"actions_performed": actions},
710 )