Coverage for apps / core / management / commands / cookie_admin.py: 98%

156 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-02 13:22 +0000

1"""Admin CLI for Cookie. Covers user lifecycle (passkey mode) and app config (either mode). 

2 

3All subcommands support --json for structured output. 

4 

5Admin privilege does not exist in passkey mode — all passkey users are peers. 

6App configuration is reached exclusively via this CLI (spec 014-remove-is-staff). 

7 

8Passkey-only (require AUTH_MODE=passkey — operate on Django User / DeviceCode): 

9 cookie_admin list-users [--active-only] [--json] 

10 cookie_admin create-user <username> [--json] 

11 cookie_admin delete-user <username> [--json] 

12 cookie_admin activate <username> [--json] 

13 cookie_admin deactivate <username> [--json] 

14 cookie_admin set-unlimited <username> [--json] 

15 cookie_admin remove-unlimited <username> [--json] 

16 cookie_admin usage [--username <name>] [--json] 

17 cookie_admin create-session <username> [--ttl N] [--json] 

18 

19Mode-agnostic (operate on AppSettings / AIPrompt / SearchSource / Profile): 

20 cookie_admin status [--json] # adds a 'cache' block in --json 

21 cookie_admin audit [--lines N] [--json] 

22 cookie_admin reset [--json --confirm] 

23 cookie_admin set-api-key [--key KEY | --stdin] 

24 cookie_admin test-api-key [--key KEY | --stdin] [--json] 

25 cookie_admin set-default-model <model_id> [--json] 

26 cookie_admin prompts list [--json] 

27 cookie_admin prompts show <prompt_type> [--json] 

28 cookie_admin prompts set <prompt_type> [--system-file PATH] [--user-file PATH] 

29 [--model MODEL] [--active {true,false}] [--json] 

30 cookie_admin sources list [--attention] [--json] 

31 cookie_admin sources toggle <source_id> [--json] 

32 cookie_admin sources toggle-all {--enable | --disable} [--json] 

33 cookie_admin sources set-selector <source_id> --selector CSS [--json] 

34 cookie_admin sources test [--id N | --all] [--json] 

35 cookie_admin sources repair <source_id> [--json] 

36 cookie_admin quota show [--json] 

37 cookie_admin quota set {remix|remix-suggestions|scale|tips|discover|timer} <N> [--json] 

38 cookie_admin rename <user_or_profile> --name NEW [--json] 

39 

40Implementation split across sibling `_cookie_admin_*.py` mixins to keep 

41each file under the 500-line quality gate. Django's management loader 

42ignores `_`-prefixed files so only this module registers as the command. 

43""" 

44 

45import json 

46import logging 

47 

48from django.conf import settings 

49from django.contrib.auth.models import User 

50from django.core.management.base import BaseCommand 

51 

52from apps.core.management.commands._cookie_admin_app import AppConfigMixin 

53from apps.core.management.commands._cookie_admin_resources import ResourcesMixin 

54from apps.core.management.commands._cookie_admin_users import UsersStatusMixin 

55 

56security_logger = logging.getLogger("security") 

57 

58 

59class Command(UsersStatusMixin, AppConfigMixin, ResourcesMixin, BaseCommand): 

60 help = "Manage Cookie app config, users, and data. User-lifecycle subcommands require passkey mode; others work in either mode. All subcommands support --json." 

61 

62 # Subcommands that operate on Django User / DeviceCode require AUTH_MODE=passkey. 

63 # App-config subcommands (AppSettings / AIPrompt / SearchSource / Profile) work in either mode. 

64 PASSKEY_ONLY_SUBCOMMANDS = frozenset( 

65 { 

66 "list-users", 

67 "create-user", 

68 "delete-user", 

69 "activate", 

70 "deactivate", 

71 "set-unlimited", 

72 "remove-unlimited", 

73 "usage", 

74 "create-session", 

75 } 

76 ) 

77 

78 def add_arguments(self, parser): 

79 parser.add_argument("--json", action="store_true", dest="as_json", help="Output as JSON (all subcommands)") 

80 sub = parser.add_subparsers(dest="subcommand") 

81 

82 # status 

83 st = sub.add_parser("status", help="App status overview (config, DB, users, AI)") 

84 st.add_argument("--json", action="store_true", dest="as_json") 

85 

86 # audit 

87 au = sub.add_parser("audit", help="Recent security events") 

88 au.add_argument("--lines", type=int, default=50, help="Max events to show (default 50)") 

89 au.add_argument("--json", action="store_true", dest="as_json") 

90 

91 # list-users 

92 ls = sub.add_parser("list-users", help="List all users") 

93 ls.add_argument("--active-only", action="store_true") 

94 ls.add_argument("--json", action="store_true", dest="as_json") 

95 

96 # create-user 

97 cu = sub.add_parser("create-user", help="Create a headless user (no passkey)") 

98 cu.add_argument("username") 

99 cu.add_argument("--json", action="store_true", dest="as_json") 

100 

101 # delete-user 

102 du = sub.add_parser("delete-user", help="Delete a user and their profile") 

103 du.add_argument("username") 

104 du.add_argument("--json", action="store_true", dest="as_json") 

105 

106 # activate 

107 a = sub.add_parser("activate", help="Reactivate a user account") 

108 a.add_argument("username") 

109 a.add_argument("--json", action="store_true", dest="as_json") 

110 

111 # deactivate 

112 da = sub.add_parser("deactivate", help="Deactivate a user account") 

113 da.add_argument("username") 

114 da.add_argument("--json", action="store_true", dest="as_json") 

115 

116 # set-unlimited 

117 su = sub.add_parser("set-unlimited", help="Grant unlimited AI access") 

118 su.add_argument("username", nargs="?", default=None) 

119 su.add_argument("--profile-id", type=int, dest="profile_id") 

120 su.add_argument("--json", action="store_true", dest="as_json") 

121 

122 # remove-unlimited 

123 ru = sub.add_parser("remove-unlimited", help="Revoke unlimited AI access") 

124 ru.add_argument("username", nargs="?", default=None) 

125 ru.add_argument("--profile-id", type=int, dest="profile_id") 

126 ru.add_argument("--json", action="store_true", dest="as_json") 

127 

128 # usage 

129 us = sub.add_parser("usage", help="Show AI usage for today") 

130 us.add_argument("--username", required=False, help="Show usage for a specific user") 

131 us.add_argument("--json", action="store_true", dest="as_json") 

132 

133 # create-session 

134 cs = sub.add_parser("create-session", help="Create a Django session for a user (pentest/automation)") 

135 cs.add_argument("username") 

136 cs.add_argument("--ttl", type=int, default=3600, help="Session TTL in seconds (default 3600)") 

137 cs.add_argument("--json", action="store_true", dest="as_json") 

138 cs.add_argument("--confirm", action="store_true", help="Required for non-interactive (--json) mode") 

139 

140 # reset 

141 rs = sub.add_parser("reset", help="Factory reset: delete all data and re-seed defaults") 

142 rs.add_argument("--confirm", action="store_true", help="Skip interactive prompt (required with --json)") 

143 rs.add_argument("--json", action="store_true", dest="as_json") 

144 

145 # set-api-key 

146 sak = sub.add_parser("set-api-key", help="Set the OpenRouter API key in AppSettings") 

147 sak_group = sak.add_mutually_exclusive_group(required=True) 

148 sak_group.add_argument("--key", help="API key (avoid in shared shells)") 

149 sak_group.add_argument("--stdin", action="store_true", help="Read key from standard input") 

150 sak.add_argument("--json", action="store_true", dest="as_json") 

151 

152 # test-api-key 

153 tak = sub.add_parser("test-api-key", help="Validate an OpenRouter API key without saving") 

154 tak_group = tak.add_mutually_exclusive_group(required=True) 

155 tak_group.add_argument("--key") 

156 tak_group.add_argument("--stdin", action="store_true") 

157 tak.add_argument("--json", action="store_true", dest="as_json") 

158 

159 # set-default-model 

160 sdm = sub.add_parser("set-default-model", help="Set the default AI model id in AppSettings") 

161 sdm.add_argument("model_id") 

162 sdm.add_argument("--json", action="store_true", dest="as_json") 

163 

164 # prompts 

165 pr = sub.add_parser("prompts", help="AI prompt management") 

166 pr_sub = pr.add_subparsers(dest="prompts_action") 

167 pr_list = pr_sub.add_parser("list", help="List AI prompts") 

168 pr_list.add_argument("--json", action="store_true", dest="as_json") 

169 pr_show = pr_sub.add_parser("show", help="Show one AI prompt by type") 

170 pr_show.add_argument("prompt_type") 

171 pr_show.add_argument("--json", action="store_true", dest="as_json") 

172 pr_set = pr_sub.add_parser("set", help="Update an AI prompt's fields (file-based content)") 

173 pr_set.add_argument("prompt_type") 

174 pr_set.add_argument("--system-file", dest="system_file", help="Path to new system prompt (UTF-8)") 

175 pr_set.add_argument("--user-file", dest="user_file", help="Path to new user template (UTF-8)") 

176 pr_set.add_argument("--model", dest="model", help="Model id (must be in AIPrompt.AVAILABLE_MODELS)") 

177 pr_set.add_argument("--active", dest="active", choices=["true", "false"], help="Set is_active") 

178 pr_set.add_argument("--json", action="store_true", dest="as_json") 

179 

180 # sources 

181 sr = sub.add_parser("sources", help="Search-source management") 

182 sr_sub = sr.add_subparsers(dest="sources_action") 

183 sr_list = sr_sub.add_parser("list", help="List search sources") 

184 sr_list.add_argument("--attention", action="store_true", help="Only sources flagged needs_attention") 

185 sr_list.add_argument("--json", action="store_true", dest="as_json") 

186 sr_tog = sr_sub.add_parser("toggle", help="Flip a source's enabled state") 

187 sr_tog.add_argument("source_id", type=int) 

188 sr_tog.add_argument("--json", action="store_true", dest="as_json") 

189 sr_tall = sr_sub.add_parser("toggle-all", help="Set every source's enabled state") 

190 sr_tall_group = sr_tall.add_mutually_exclusive_group(required=True) 

191 sr_tall_group.add_argument("--enable", action="store_true") 

192 sr_tall_group.add_argument("--disable", action="store_true") 

193 sr_tall.add_argument("--json", action="store_true", dest="as_json") 

194 sr_sel = sr_sub.add_parser("set-selector", help="Overwrite a source's CSS selector") 

195 sr_sel.add_argument("source_id", type=int) 

196 sr_sel.add_argument("--selector", required=True) 

197 sr_sel.add_argument("--json", action="store_true", dest="as_json") 

198 sr_test = sr_sub.add_parser("test", help="Run the health-check on one source or all") 

199 sr_test_group = sr_test.add_mutually_exclusive_group(required=True) 

200 sr_test_group.add_argument("--id", dest="source_id", type=int) 

201 sr_test_group.add_argument("--all", action="store_true", dest="test_all") 

202 sr_test.add_argument("--json", action="store_true", dest="as_json") 

203 sr_rep = sr_sub.add_parser("repair", help="AI-assisted selector regeneration (requires API key)") 

204 sr_rep.add_argument("source_id", type=int) 

205 sr_rep.add_argument("--json", action="store_true", dest="as_json") 

206 

207 # quota 

208 qt = sub.add_parser("quota", help="AI daily quota limits") 

209 qt_sub = qt.add_subparsers(dest="quota_action") 

210 qt_show = qt_sub.add_parser("show", help="Show all six daily limits") 

211 qt_show.add_argument("--json", action="store_true", dest="as_json") 

212 qt_set = qt_sub.add_parser("set", help="Set one daily limit") 

213 qt_set.add_argument( 

214 "feature", 

215 choices=["remix", "remix-suggestions", "scale", "tips", "discover", "timer"], 

216 ) 

217 qt_set.add_argument("value", type=int) 

218 qt_set.add_argument("--json", action="store_true", dest="as_json") 

219 

220 # rename 

221 rn = sub.add_parser( 

222 "rename", 

223 help="Rename a profile (passkey: by user_id|username; home: by profile_id)", 

224 ) 

225 rn.add_argument("target", help="User id/username (passkey) or Profile id (home)") 

226 rn.add_argument("--name", required=True, help="New profile name") 

227 rn.add_argument("--json", action="store_true", dest="as_json") 

228 

229 def handle(self, *args, **options): 

230 subcommand = options.get("subcommand") 

231 if not subcommand: 

232 self._error("No subcommand provided. Use --help for usage.", options, code=1) 

233 

234 if subcommand in self.PASSKEY_ONLY_SUBCOMMANDS and settings.AUTH_MODE != "passkey": 

235 self._error(f"'{subcommand}' requires AUTH_MODE=passkey.", options, code=2) 

236 

237 # Nested-subcommand dispatch (prompts/sources/quota have per-action handlers). 

238 NESTED = {"prompts": "prompts_action", "sources": "sources_action", "quota": "quota_action"} 

239 if subcommand in NESTED: 

240 action = options.get(NESTED[subcommand]) 

241 if not action: 

242 self._error(f"'{subcommand}' requires an action (see --help).", options, code=1) 

243 handler_name = f"_handle_{subcommand}_{action.replace('-', '_')}" 

244 else: 

245 handler_name = f"_handle_{subcommand.replace('-', '_')}" 

246 

247 handler = getattr(self, handler_name, None) 

248 if handler: 

249 handler(options) 

250 else: 

251 self._error(f"Unknown subcommand '{subcommand}'", options, code=1) 

252 

253 # ------------------------------------------------------------------ # 

254 # Shared helpers (used by every mixin's _handle_* methods) # 

255 # ------------------------------------------------------------------ # 

256 

257 def _error(self, message, options=None, code=1): 

258 if options and options.get("as_json"): 

259 self.stdout.write(json.dumps({"ok": False, "error": message})) 

260 else: 

261 self.stderr.write(f"Error: {message}") 

262 raise SystemExit(code) 

263 

264 def _success(self, message, options, extra=None): 

265 if options.get("as_json"): 

266 result = {"ok": True, "message": message} 

267 if extra: 

268 result.update(extra) 

269 self.stdout.write(json.dumps(result)) 

270 else: 

271 self.stdout.write(message) 

272 

273 def _get_user(self, username, options): 

274 try: 

275 return User.objects.get(username=username) 

276 except User.DoesNotExist: 

277 self._error(f"User '{username}' not found.", options) 

278 

279 def _user_dict(self, user): 

280 passkey_count = user.webauthn_credentials.count() 

281 unlimited_ai = getattr(getattr(user, "profile", None), "unlimited_ai", False) 

282 return { 

283 "username": user.username, 

284 "user_id": user.pk, 

285 "passkeys": passkey_count, 

286 "is_active": user.is_active, 

287 "unlimited_ai": unlimited_ai, 

288 "date_joined": user.date_joined.strftime("%Y-%m-%d"), 

289 } 

← Back to Dashboard