Coverage for apps / core / management / commands / cookie_admin.py: 98%
156 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 13:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 13:22 +0000
1"""Admin CLI for Cookie. Covers user lifecycle (passkey mode) and app config (either mode).
3All subcommands support --json for structured output.
5Admin privilege does not exist in passkey mode — all passkey users are peers.
6App configuration is reached exclusively via this CLI (spec 014-remove-is-staff).
8Passkey-only (require AUTH_MODE=passkey — operate on Django User / DeviceCode):
9 cookie_admin list-users [--active-only] [--json]
10 cookie_admin create-user <username> [--json]
11 cookie_admin delete-user <username> [--json]
12 cookie_admin activate <username> [--json]
13 cookie_admin deactivate <username> [--json]
14 cookie_admin set-unlimited <username> [--json]
15 cookie_admin remove-unlimited <username> [--json]
16 cookie_admin usage [--username <name>] [--json]
17 cookie_admin create-session <username> [--ttl N] [--json]
19Mode-agnostic (operate on AppSettings / AIPrompt / SearchSource / Profile):
20 cookie_admin status [--json] # adds a 'cache' block in --json
21 cookie_admin audit [--lines N] [--json]
22 cookie_admin reset [--json --confirm]
23 cookie_admin set-api-key [--key KEY | --stdin]
24 cookie_admin test-api-key [--key KEY | --stdin] [--json]
25 cookie_admin set-default-model <model_id> [--json]
26 cookie_admin prompts list [--json]
27 cookie_admin prompts show <prompt_type> [--json]
28 cookie_admin prompts set <prompt_type> [--system-file PATH] [--user-file PATH]
29 [--model MODEL] [--active {true,false}] [--json]
30 cookie_admin sources list [--attention] [--json]
31 cookie_admin sources toggle <source_id> [--json]
32 cookie_admin sources toggle-all {--enable | --disable} [--json]
33 cookie_admin sources set-selector <source_id> --selector CSS [--json]
34 cookie_admin sources test [--id N | --all] [--json]
35 cookie_admin sources repair <source_id> [--json]
36 cookie_admin quota show [--json]
37 cookie_admin quota set {remix|remix-suggestions|scale|tips|discover|timer} <N> [--json]
38 cookie_admin rename <user_or_profile> --name NEW [--json]
40Implementation split across sibling `_cookie_admin_*.py` mixins to keep
41each file under the 500-line quality gate. Django's management loader
42ignores `_`-prefixed files so only this module registers as the command.
43"""
45import json
46import logging
48from django.conf import settings
49from django.contrib.auth.models import User
50from django.core.management.base import BaseCommand
52from apps.core.management.commands._cookie_admin_app import AppConfigMixin
53from apps.core.management.commands._cookie_admin_resources import ResourcesMixin
54from apps.core.management.commands._cookie_admin_users import UsersStatusMixin
56security_logger = logging.getLogger("security")
59class Command(UsersStatusMixin, AppConfigMixin, ResourcesMixin, BaseCommand):
60 help = "Manage Cookie app config, users, and data. User-lifecycle subcommands require passkey mode; others work in either mode. All subcommands support --json."
62 # Subcommands that operate on Django User / DeviceCode require AUTH_MODE=passkey.
63 # App-config subcommands (AppSettings / AIPrompt / SearchSource / Profile) work in either mode.
64 PASSKEY_ONLY_SUBCOMMANDS = frozenset(
65 {
66 "list-users",
67 "create-user",
68 "delete-user",
69 "activate",
70 "deactivate",
71 "set-unlimited",
72 "remove-unlimited",
73 "usage",
74 "create-session",
75 }
76 )
78 def add_arguments(self, parser):
79 parser.add_argument("--json", action="store_true", dest="as_json", help="Output as JSON (all subcommands)")
80 sub = parser.add_subparsers(dest="subcommand")
82 # status
83 st = sub.add_parser("status", help="App status overview (config, DB, users, AI)")
84 st.add_argument("--json", action="store_true", dest="as_json")
86 # audit
87 au = sub.add_parser("audit", help="Recent security events")
88 au.add_argument("--lines", type=int, default=50, help="Max events to show (default 50)")
89 au.add_argument("--json", action="store_true", dest="as_json")
91 # list-users
92 ls = sub.add_parser("list-users", help="List all users")
93 ls.add_argument("--active-only", action="store_true")
94 ls.add_argument("--json", action="store_true", dest="as_json")
96 # create-user
97 cu = sub.add_parser("create-user", help="Create a headless user (no passkey)")
98 cu.add_argument("username")
99 cu.add_argument("--json", action="store_true", dest="as_json")
101 # delete-user
102 du = sub.add_parser("delete-user", help="Delete a user and their profile")
103 du.add_argument("username")
104 du.add_argument("--json", action="store_true", dest="as_json")
106 # activate
107 a = sub.add_parser("activate", help="Reactivate a user account")
108 a.add_argument("username")
109 a.add_argument("--json", action="store_true", dest="as_json")
111 # deactivate
112 da = sub.add_parser("deactivate", help="Deactivate a user account")
113 da.add_argument("username")
114 da.add_argument("--json", action="store_true", dest="as_json")
116 # set-unlimited
117 su = sub.add_parser("set-unlimited", help="Grant unlimited AI access")
118 su.add_argument("username", nargs="?", default=None)
119 su.add_argument("--profile-id", type=int, dest="profile_id")
120 su.add_argument("--json", action="store_true", dest="as_json")
122 # remove-unlimited
123 ru = sub.add_parser("remove-unlimited", help="Revoke unlimited AI access")
124 ru.add_argument("username", nargs="?", default=None)
125 ru.add_argument("--profile-id", type=int, dest="profile_id")
126 ru.add_argument("--json", action="store_true", dest="as_json")
128 # usage
129 us = sub.add_parser("usage", help="Show AI usage for today")
130 us.add_argument("--username", required=False, help="Show usage for a specific user")
131 us.add_argument("--json", action="store_true", dest="as_json")
133 # create-session
134 cs = sub.add_parser("create-session", help="Create a Django session for a user (pentest/automation)")
135 cs.add_argument("username")
136 cs.add_argument("--ttl", type=int, default=3600, help="Session TTL in seconds (default 3600)")
137 cs.add_argument("--json", action="store_true", dest="as_json")
138 cs.add_argument("--confirm", action="store_true", help="Required for non-interactive (--json) mode")
140 # reset
141 rs = sub.add_parser("reset", help="Factory reset: delete all data and re-seed defaults")
142 rs.add_argument("--confirm", action="store_true", help="Skip interactive prompt (required with --json)")
143 rs.add_argument("--json", action="store_true", dest="as_json")
145 # set-api-key
146 sak = sub.add_parser("set-api-key", help="Set the OpenRouter API key in AppSettings")
147 sak_group = sak.add_mutually_exclusive_group(required=True)
148 sak_group.add_argument("--key", help="API key (avoid in shared shells)")
149 sak_group.add_argument("--stdin", action="store_true", help="Read key from standard input")
150 sak.add_argument("--json", action="store_true", dest="as_json")
152 # test-api-key
153 tak = sub.add_parser("test-api-key", help="Validate an OpenRouter API key without saving")
154 tak_group = tak.add_mutually_exclusive_group(required=True)
155 tak_group.add_argument("--key")
156 tak_group.add_argument("--stdin", action="store_true")
157 tak.add_argument("--json", action="store_true", dest="as_json")
159 # set-default-model
160 sdm = sub.add_parser("set-default-model", help="Set the default AI model id in AppSettings")
161 sdm.add_argument("model_id")
162 sdm.add_argument("--json", action="store_true", dest="as_json")
164 # prompts
165 pr = sub.add_parser("prompts", help="AI prompt management")
166 pr_sub = pr.add_subparsers(dest="prompts_action")
167 pr_list = pr_sub.add_parser("list", help="List AI prompts")
168 pr_list.add_argument("--json", action="store_true", dest="as_json")
169 pr_show = pr_sub.add_parser("show", help="Show one AI prompt by type")
170 pr_show.add_argument("prompt_type")
171 pr_show.add_argument("--json", action="store_true", dest="as_json")
172 pr_set = pr_sub.add_parser("set", help="Update an AI prompt's fields (file-based content)")
173 pr_set.add_argument("prompt_type")
174 pr_set.add_argument("--system-file", dest="system_file", help="Path to new system prompt (UTF-8)")
175 pr_set.add_argument("--user-file", dest="user_file", help="Path to new user template (UTF-8)")
176 pr_set.add_argument("--model", dest="model", help="Model id (must be in AIPrompt.AVAILABLE_MODELS)")
177 pr_set.add_argument("--active", dest="active", choices=["true", "false"], help="Set is_active")
178 pr_set.add_argument("--json", action="store_true", dest="as_json")
180 # sources
181 sr = sub.add_parser("sources", help="Search-source management")
182 sr_sub = sr.add_subparsers(dest="sources_action")
183 sr_list = sr_sub.add_parser("list", help="List search sources")
184 sr_list.add_argument("--attention", action="store_true", help="Only sources flagged needs_attention")
185 sr_list.add_argument("--json", action="store_true", dest="as_json")
186 sr_tog = sr_sub.add_parser("toggle", help="Flip a source's enabled state")
187 sr_tog.add_argument("source_id", type=int)
188 sr_tog.add_argument("--json", action="store_true", dest="as_json")
189 sr_tall = sr_sub.add_parser("toggle-all", help="Set every source's enabled state")
190 sr_tall_group = sr_tall.add_mutually_exclusive_group(required=True)
191 sr_tall_group.add_argument("--enable", action="store_true")
192 sr_tall_group.add_argument("--disable", action="store_true")
193 sr_tall.add_argument("--json", action="store_true", dest="as_json")
194 sr_sel = sr_sub.add_parser("set-selector", help="Overwrite a source's CSS selector")
195 sr_sel.add_argument("source_id", type=int)
196 sr_sel.add_argument("--selector", required=True)
197 sr_sel.add_argument("--json", action="store_true", dest="as_json")
198 sr_test = sr_sub.add_parser("test", help="Run the health-check on one source or all")
199 sr_test_group = sr_test.add_mutually_exclusive_group(required=True)
200 sr_test_group.add_argument("--id", dest="source_id", type=int)
201 sr_test_group.add_argument("--all", action="store_true", dest="test_all")
202 sr_test.add_argument("--json", action="store_true", dest="as_json")
203 sr_rep = sr_sub.add_parser("repair", help="AI-assisted selector regeneration (requires API key)")
204 sr_rep.add_argument("source_id", type=int)
205 sr_rep.add_argument("--json", action="store_true", dest="as_json")
207 # quota
208 qt = sub.add_parser("quota", help="AI daily quota limits")
209 qt_sub = qt.add_subparsers(dest="quota_action")
210 qt_show = qt_sub.add_parser("show", help="Show all six daily limits")
211 qt_show.add_argument("--json", action="store_true", dest="as_json")
212 qt_set = qt_sub.add_parser("set", help="Set one daily limit")
213 qt_set.add_argument(
214 "feature",
215 choices=["remix", "remix-suggestions", "scale", "tips", "discover", "timer"],
216 )
217 qt_set.add_argument("value", type=int)
218 qt_set.add_argument("--json", action="store_true", dest="as_json")
220 # rename
221 rn = sub.add_parser(
222 "rename",
223 help="Rename a profile (passkey: by user_id|username; home: by profile_id)",
224 )
225 rn.add_argument("target", help="User id/username (passkey) or Profile id (home)")
226 rn.add_argument("--name", required=True, help="New profile name")
227 rn.add_argument("--json", action="store_true", dest="as_json")
229 def handle(self, *args, **options):
230 subcommand = options.get("subcommand")
231 if not subcommand:
232 self._error("No subcommand provided. Use --help for usage.", options, code=1)
234 if subcommand in self.PASSKEY_ONLY_SUBCOMMANDS and settings.AUTH_MODE != "passkey":
235 self._error(f"'{subcommand}' requires AUTH_MODE=passkey.", options, code=2)
237 # Nested-subcommand dispatch (prompts/sources/quota have per-action handlers).
238 NESTED = {"prompts": "prompts_action", "sources": "sources_action", "quota": "quota_action"}
239 if subcommand in NESTED:
240 action = options.get(NESTED[subcommand])
241 if not action:
242 self._error(f"'{subcommand}' requires an action (see --help).", options, code=1)
243 handler_name = f"_handle_{subcommand}_{action.replace('-', '_')}"
244 else:
245 handler_name = f"_handle_{subcommand.replace('-', '_')}"
247 handler = getattr(self, handler_name, None)
248 if handler:
249 handler(options)
250 else:
251 self._error(f"Unknown subcommand '{subcommand}'", options, code=1)
253 # ------------------------------------------------------------------ #
254 # Shared helpers (used by every mixin's _handle_* methods) #
255 # ------------------------------------------------------------------ #
257 def _error(self, message, options=None, code=1):
258 if options and options.get("as_json"):
259 self.stdout.write(json.dumps({"ok": False, "error": message}))
260 else:
261 self.stderr.write(f"Error: {message}")
262 raise SystemExit(code)
264 def _success(self, message, options, extra=None):
265 if options.get("as_json"):
266 result = {"ok": True, "message": message}
267 if extra:
268 result.update(extra)
269 self.stdout.write(json.dumps(result))
270 else:
271 self.stdout.write(message)
273 def _get_user(self, username, options):
274 try:
275 return User.objects.get(username=username)
276 except User.DoesNotExist:
277 self._error(f"User '{username}' not found.", options)
279 def _user_dict(self, user):
280 passkey_count = user.webauthn_credentials.count()
281 unlimited_ai = getattr(getattr(user, "profile", None), "unlimited_ai", False)
282 return {
283 "username": user.username,
284 "user_id": user.pk,
285 "passkeys": passkey_count,
286 "is_active": user.is_active,
287 "unlimited_ai": unlimited_ai,
288 "date_joined": user.date_joined.strftime("%Y-%m-%d"),
289 }