Coverage for apps / core / management / commands / _cookie_admin_app.py: 75%

177 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-02 13:22 +0000

1"""App-config handlers for `cookie_admin`: reset, api key, prompts. 

2 

3Split out of `cookie_admin.py` to stay under the 500-line quality gate. 

4Methods assume `self` is a `Command` instance — see main module for the 

5shared helpers (`self._error`, `self._success`). 

6""" 

7 

8from __future__ import annotations 

9 

10import json 

11import logging 

12import os 

13import pwd 

14import shutil 

15 

16from django.conf import settings 

17from django.contrib.auth.models import User 

18 

19security_logger = logging.getLogger("security") 

20 

21 

22class AppConfigMixin: 

23 """reset + OpenRouter/prompts subcommand handlers.""" 

24 

25 # ------------------------------------------------------------------ # 

26 # reset # 

27 # ------------------------------------------------------------------ # 

28 

29 def _handle_reset(self, options): 

30 from django.contrib.sessions.models import Session 

31 from django.core.cache import cache 

32 from django.core.management import call_command 

33 

34 from apps.ai.models import AIDiscoverySuggestion 

35 from apps.profiles.models import Profile 

36 from apps.recipes.models import ( 

37 CachedSearchImage, 

38 Recipe, 

39 RecipeCollection, 

40 RecipeCollectionItem, 

41 RecipeFavorite, 

42 RecipeViewHistory, 

43 SearchSource, 

44 ServingAdjustment, 

45 ) 

46 

47 if options.get("as_json"): 

48 if not options.get("confirm"): 

49 self._error( 

50 "--confirm flag required for non-interactive reset. Usage: cookie_admin reset --json --confirm", 

51 options, 

52 ) 

53 else: 

54 self.stderr.write("WARNING: This will permanently delete ALL data.") 

55 confirm = input("Type RESET to confirm: ") 

56 if confirm != "RESET": 

57 self._error("Aborted.", options) 

58 

59 security_logger.warning("DATABASE RESET initiated via CLI (cookie_admin reset)") 

60 

61 actions = [] 

62 

63 # Delete in FK-safe order 

64 AIDiscoverySuggestion.objects.all().delete() 

65 ServingAdjustment.objects.all().delete() 

66 RecipeViewHistory.objects.all().delete() 

67 RecipeCollectionItem.objects.all().delete() 

68 RecipeCollection.objects.all().delete() 

69 RecipeFavorite.objects.all().delete() 

70 CachedSearchImage.objects.all().delete() 

71 Recipe.objects.all().delete() 

72 Profile.objects.all().delete() 

73 actions.extend( 

74 [ 

75 "Deleted all profiles", 

76 "Deleted all recipes", 

77 "Cleared favorites, collections, view history", 

78 "Cleared AI suggestions and serving adjustments", 

79 ] 

80 ) 

81 

82 if settings.AUTH_MODE == "passkey": 

83 from apps.core.models import DeviceCode 

84 

85 DeviceCode.objects.all().delete() 

86 User.objects.all().delete() 

87 actions.append("Deleted all user accounts and device codes") 

88 

89 SearchSource.objects.all().update( 

90 consecutive_failures=0, 

91 needs_attention=False, 

92 last_validated_at=None, 

93 ) 

94 actions.append("Reset search source counters") 

95 

96 # Clear media (re-chown to app user — this command runs as root via docker exec) 

97 try: 

98 app_pw = pwd.getpwnam("app") 

99 app_uid, app_gid = app_pw.pw_uid, app_pw.pw_gid 

100 except KeyError: 

101 app_uid, app_gid = -1, -1 

102 for subdir in ("recipe_images", "search_images"): 

103 path = os.path.join(settings.MEDIA_ROOT, subdir) 

104 if os.path.exists(path): 

105 shutil.rmtree(path) 

106 os.makedirs(path) 

107 if app_uid != -1: 

108 os.chown(path, app_uid, app_gid) 

109 actions.append("Cleared recipe and search images") 

110 

111 cache.clear() 

112 Session.objects.all().delete() 

113 actions.extend(["Cleared application cache", "Cleared all sessions"]) 

114 

115 call_command("migrate", verbosity=0) 

116 actions.append("Re-ran migrations") 

117 

118 for cmd in ("seed_search_sources", "seed_ai_prompts"): 

119 try: 

120 call_command(cmd, verbosity=0) 

121 actions.append(f"Seeded {cmd.replace('seed_', '')}") 

122 except Exception as exc: 

123 # Seed commands are optional (may not be installed in every deployment); 

124 # log at debug level so operators can diagnose if a reset doesn't repopulate. 

125 logging.getLogger(__name__).debug("seed command %s skipped: %s", cmd, exc) 

126 

127 security_logger.warning("DATABASE RESET completed successfully via CLI") 

128 

129 self._success( 

130 "Database reset complete.", 

131 options, 

132 {"actions_performed": actions}, 

133 ) 

134 

135 # ------------------------------------------------------------------ # 

136 # api key + default model # 

137 # ------------------------------------------------------------------ # 

138 

139 def _read_key(self, options): 

140 """Return a non-empty API key read from --key or stdin. Errors on empty.""" 

141 import sys 

142 

143 if options.get("stdin"): 

144 value = sys.stdin.read().strip() 

145 else: 

146 value = (options.get("key") or "").strip() 

147 if not value: 

148 self._error("API key must be a non-empty value.", options, code=2) 

149 return value 

150 

151 def _handle_set_api_key(self, options): 

152 from apps.core.models import AppSettings 

153 

154 value = self._read_key(options) 

155 app = AppSettings.get() 

156 app.openrouter_api_key = value 

157 app.save() 

158 security_logger.warning("cookie_admin set-api-key: key changed") 

159 self._success("API key saved.", options, {"saved": True}) 

160 

161 def _handle_test_api_key(self, options): 

162 from apps.ai.services.openrouter import OpenRouterService 

163 

164 value = self._read_key(options) 

165 try: 

166 ok, reason = OpenRouterService.test_connection(value) 

167 except Exception as exc: 

168 ok, reason = False, str(exc) 

169 if options.get("as_json"): 

170 self.stdout.write(json.dumps({"ok": True, "valid": ok, "reason": reason})) 

171 else: 

172 self.stdout.write("valid" if ok else f"invalid: {reason}") 

173 if not ok: 

174 raise SystemExit(1) 

175 

176 def _handle_set_default_model(self, options): 

177 from apps.ai.models import AIPrompt 

178 from apps.core.models import AppSettings 

179 

180 model_id = options["model_id"] 

181 valid = {m[0] for m in AIPrompt.AVAILABLE_MODELS} 

182 if model_id not in valid: 

183 self._error( 

184 f"Unknown model '{model_id}'. Valid: {sorted(valid)}", 

185 options, 

186 code=2, 

187 ) 

188 app = AppSettings.get() 

189 app.default_ai_model = model_id 

190 app.save() 

191 security_logger.warning("cookie_admin set-default-model: %s", model_id) 

192 self._success(f"Default model set to {model_id}.", options, {"default_ai_model": model_id}) 

193 

194 # ------------------------------------------------------------------ # 

195 # prompts # 

196 # ------------------------------------------------------------------ # 

197 

198 def _handle_prompts_list(self, options): 

199 from apps.ai.models import AIPrompt 

200 

201 rows = list(AIPrompt.objects.order_by("prompt_type").values("prompt_type", "name", "model", "is_active")) 

202 if options.get("as_json"): 

203 self.stdout.write(json.dumps({"ok": True, "prompts": rows})) 

204 return 

205 for r in rows: 

206 self.stdout.write( 

207 f"{r['prompt_type']:<22} model={r['model']:<35} active={r['is_active']} name={r['name']!r}" 

208 ) 

209 

210 def _handle_prompts_show(self, options): 

211 from apps.ai.models import AIPrompt 

212 

213 try: 

214 p = AIPrompt.objects.get(prompt_type=options["prompt_type"]) 

215 except AIPrompt.DoesNotExist: 

216 self._error(f"Prompt '{options['prompt_type']}' not found.", options, code=2) 

217 payload = { 

218 "prompt_type": p.prompt_type, 

219 "name": p.name, 

220 "description": p.description, 

221 "model": p.model, 

222 "is_active": p.is_active, 

223 "system_prompt": p.system_prompt, 

224 "user_prompt_template": p.user_prompt_template, 

225 } 

226 if options.get("as_json"): 

227 self.stdout.write(json.dumps({"ok": True, "prompt": payload})) 

228 return 

229 self.stdout.write(f"prompt_type: {p.prompt_type}") 

230 self.stdout.write(f"name: {p.name}") 

231 self.stdout.write(f"description: {p.description}") 

232 self.stdout.write(f"model: {p.model}") 

233 self.stdout.write(f"is_active: {p.is_active}") 

234 self.stdout.write("system_prompt:") 

235 self.stdout.write(p.system_prompt) 

236 self.stdout.write("user_prompt_template:") 

237 self.stdout.write(p.user_prompt_template) 

238 

239 def _handle_prompts_set(self, options): 

240 from apps.ai.models import AIPrompt 

241 

242 prompt_type = options["prompt_type"] 

243 try: 

244 prompt = AIPrompt.objects.get(prompt_type=prompt_type) 

245 except AIPrompt.DoesNotExist: 

246 self._error(f"Prompt '{prompt_type}' not found.", options, code=2) 

247 

248 # Read files FIRST (before any DB write) so a missing file doesn't leave 

249 # a half-updated row. 

250 updated_fields = [] 

251 new_system = new_user = None 

252 if options.get("system_file"): 

253 new_system = self._read_text_file(options["system_file"], options) 

254 updated_fields.append("system_prompt") 

255 if options.get("user_file"): 

256 new_user = self._read_text_file(options["user_file"], options) 

257 updated_fields.append("user_prompt_template") 

258 if options.get("model"): 

259 valid = {m[0] for m in AIPrompt.AVAILABLE_MODELS} 

260 if options["model"] not in valid: 

261 self._error( 

262 f"Unknown model '{options['model']}'. Valid: {sorted(valid)}", 

263 options, 

264 code=2, 

265 ) 

266 updated_fields.append("model") 

267 if options.get("active"): 

268 updated_fields.append("is_active") 

269 if not updated_fields: 

270 self._error( 

271 "prompts set: specify at least one of --system-file, --user-file, --model, --active.", options, code=2 

272 ) 

273 

274 if new_system is not None: 

275 prompt.system_prompt = new_system 

276 if new_user is not None: 

277 prompt.user_prompt_template = new_user 

278 if options.get("model"): 

279 prompt.model = options["model"] 

280 if options.get("active"): 

281 prompt.is_active = options["active"] == "true" 

282 prompt.save() 

283 

284 security_logger.warning("cookie_admin prompts set %s: fields=%s", prompt_type, updated_fields) 

285 self._success( 

286 f"Prompt {prompt_type} updated: fields={updated_fields}", 

287 options, 

288 {"prompt_type": prompt_type, "updated_fields": updated_fields}, 

289 ) 

290 

291 def _read_text_file(self, path, options): 

292 try: 

293 with open(path, encoding="utf-8") as fh: 

294 return fh.read() 

295 except (OSError, UnicodeDecodeError) as exc: 

296 self._error(f"Cannot read file '{path}': {exc}", options, code=2) 

← Back to Dashboard