Coverage for apps / core / management / commands / _cookie_admin_resources.py: 69%

143 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-02 13:22 +0000

1"""Resource-oriented handlers for `cookie_admin`: sources, quota, rename. 

2 

3Split out of `cookie_admin.py` to stay under the 500-line quality gate. 

4Methods assume `self` is a `Command` instance. 

5""" 

6 

7from __future__ import annotations 

8 

9import json 

10import logging 

11 

12from django.conf import settings 

13from django.contrib.auth.models import User 

14 

15security_logger = logging.getLogger("security") 

16 

17 

18class ResourcesMixin: 

19 """sources, quota, rename subcommand handlers.""" 

20 

21 # ------------------------------------------------------------------ # 

22 # sources # 

23 # ------------------------------------------------------------------ # 

24 

25 def _handle_sources_list(self, options): 

26 from apps.recipes.models import SearchSource 

27 

28 qs = SearchSource.objects.order_by("name") 

29 if options.get("attention"): 

30 qs = qs.filter(needs_attention=True) 

31 rows = [ 

32 { 

33 "id": s.id, 

34 "name": s.name, 

35 "host": s.host, 

36 "url": s.search_url_template, 

37 "enabled": s.is_enabled, 

38 "needs_attention": s.needs_attention, 

39 "selector": s.result_selector, 

40 } 

41 for s in qs 

42 ] 

43 if options.get("as_json"): 

44 self.stdout.write(json.dumps({"ok": True, "sources": rows})) 

45 return 

46 for r in rows: 

47 self.stdout.write( 

48 f"{r['id']:>3} enabled={r['enabled']} attention={r['needs_attention']} {r['name']} {r['host']}" 

49 ) 

50 

51 def _handle_sources_toggle(self, options): 

52 from apps.recipes.models import SearchSource 

53 

54 sid = options["source_id"] 

55 try: 

56 source = SearchSource.objects.get(id=sid) 

57 except SearchSource.DoesNotExist: 

58 self._error(f"Source {sid} not found.", options, code=1) 

59 source.is_enabled = not source.is_enabled 

60 source.save(update_fields=["is_enabled"]) 

61 security_logger.warning("cookie_admin sources toggle %d: %s", sid, source.is_enabled) 

62 self._success( 

63 f"Source {sid} ({source.name}): enabled={source.is_enabled}", 

64 options, 

65 {"source_id": sid, "enabled": source.is_enabled}, 

66 ) 

67 

68 def _handle_sources_toggle_all(self, options): 

69 from apps.recipes.models import SearchSource 

70 

71 value = bool(options.get("enable")) 

72 count = SearchSource.objects.all().update(is_enabled=value) 

73 security_logger.warning("cookie_admin sources toggle-all: enabled=%s count=%d", value, count) 

74 self._success( 

75 f"Set enabled={value} for {count} sources.", 

76 options, 

77 {"enabled": value, "count": count}, 

78 ) 

79 

80 def _handle_sources_set_selector(self, options): 

81 from apps.recipes.models import SearchSource 

82 

83 sid = options["source_id"] 

84 selector = options["selector"].strip() 

85 if not selector: 

86 self._error("--selector must be a non-empty string.", options, code=2) 

87 try: 

88 source = SearchSource.objects.get(id=sid) 

89 except SearchSource.DoesNotExist: 

90 self._error(f"Source {sid} not found.", options, code=1) 

91 source.result_selector = selector 

92 source.save(update_fields=["result_selector"]) 

93 security_logger.warning("cookie_admin sources set-selector %d", sid) 

94 self._success( 

95 f"Source {sid} ({source.name}): selector updated.", 

96 options, 

97 {"source_id": sid, "selector": selector}, 

98 ) 

99 

100 def _handle_sources_test(self, options): 

101 import asyncio 

102 

103 from apps.recipes.models import SearchSource 

104 from apps.recipes.services.source_health import check_all_sources, check_source 

105 

106 if options.get("test_all"): 

107 results = asyncio.run(check_all_sources()) 

108 else: 

109 sid = options["source_id"] 

110 try: 

111 source = SearchSource.objects.get(id=sid) 

112 except SearchSource.DoesNotExist: 

113 self._error(f"Source {sid} not found.", options, code=1) 

114 results = [asyncio.run(check_source(source))] 

115 

116 ok_count = sum(1 for r in results if r["ok"]) 

117 fail_count = len(results) - ok_count 

118 if options.get("as_json"): 

119 self.stdout.write( 

120 json.dumps({"ok": True, "results": results, "summary": {"ok": ok_count, "failed": fail_count}}) 

121 ) 

122 return 

123 for r in results: 

124 marker = "[OK] " if r["ok"] else "[FAIL]" 

125 self.stdout.write(f"{marker} Source {r['source_id']} ({r['name']}) — {r['message']}") 

126 self.stdout.write(f"{ok_count} ok / {fail_count} failed") 

127 

128 def _handle_sources_repair(self, options): 

129 from apps.ai.services.selector import repair_selector 

130 from apps.core.models import AppSettings 

131 from apps.recipes.models import SearchSource 

132 

133 if not AppSettings.get().openrouter_api_key: 

134 self._error( 

135 "sources repair requires OPENROUTER_API_KEY or AppSettings.openrouter_api_key to be set.", 

136 options, 

137 code=2, 

138 ) 

139 sid = options["source_id"] 

140 try: 

141 source = SearchSource.objects.get(id=sid) 

142 except SearchSource.DoesNotExist: 

143 self._error(f"Source {sid} not found.", options, code=1) 

144 try: 

145 result = repair_selector(source_id=sid, html_sample=None, auto_update=False) 

146 except Exception as exc: 

147 self._error(f"repair failed: {exc}", options, code=1) 

148 security_logger.warning("cookie_admin sources repair %d", sid) 

149 self._success( 

150 f"Source {sid} ({source.name}): selector repaired.", 

151 options, 

152 {"source_id": sid, "result": result}, 

153 ) 

154 

155 # ------------------------------------------------------------------ # 

156 # quota # 

157 # ------------------------------------------------------------------ # 

158 

159 def _handle_quota_show(self, options): 

160 from apps.core.models import AppSettings 

161 

162 app = AppSettings.get() 

163 data = { 

164 "remix": app.daily_limit_remix, 

165 "remix_suggestions": app.daily_limit_remix_suggestions, 

166 "scale": app.daily_limit_scale, 

167 "tips": app.daily_limit_tips, 

168 "discover": app.daily_limit_discover, 

169 "timer": app.daily_limit_timer, 

170 } 

171 if options.get("as_json"): 

172 self.stdout.write(json.dumps({"ok": True, "quotas": data})) 

173 return 

174 for name, value in data.items(): 

175 self.stdout.write(f"{name:<18} = {value}") 

176 

177 def _handle_quota_set(self, options): 

178 from apps.core.models import AppSettings 

179 

180 value = options["value"] 

181 if value < 0: 

182 self._error("Quota value must be a non-negative integer.", options, code=2) 

183 feature = options["feature"] 

184 field = { 

185 "remix": "daily_limit_remix", 

186 "remix-suggestions": "daily_limit_remix_suggestions", 

187 "scale": "daily_limit_scale", 

188 "tips": "daily_limit_tips", 

189 "discover": "daily_limit_discover", 

190 "timer": "daily_limit_timer", 

191 }[feature] 

192 app = AppSettings.get() 

193 setattr(app, field, value) 

194 app.save() 

195 security_logger.warning("cookie_admin quota set %s=%d", feature, value) 

196 self._success(f"quota.{feature} = {value}", options, {feature: value}) 

197 

198 # ------------------------------------------------------------------ # 

199 # rename # 

200 # ------------------------------------------------------------------ # 

201 

202 def _handle_rename(self, options): 

203 from apps.profiles.models import Profile 

204 

205 target = options["target"] 

206 new_name = (options["name"] or "").strip() 

207 if not new_name: 

208 self._error("--name must be a non-empty value.", options, code=2) 

209 max_len = Profile._meta.get_field("name").max_length 

210 if len(new_name) > max_len: 

211 self._error(f"--name exceeds max length {max_len}.", options, code=2) 

212 

213 if settings.AUTH_MODE == "passkey": 

214 user = None 

215 if target.isdigit(): 

216 try: 

217 user = User.objects.get(pk=int(target)) 

218 except User.DoesNotExist: 

219 pass 

220 if user is None: 

221 try: 

222 user = User.objects.get(username=target) 

223 except User.DoesNotExist: 

224 self._error(f"No user found for '{target}'.", options, code=1) 

225 profile = getattr(user, "profile", None) 

226 if profile is None: 

227 self._error(f"User '{target}' has no profile.", options, code=1) 

228 else: 

229 if not target.isdigit(): 

230 self._error("home mode: positional arg must be a Profile id (integer).", options, code=2) 

231 try: 

232 profile = Profile.objects.get(pk=int(target)) 

233 except Profile.DoesNotExist: 

234 self._error(f"No profile with id {target}.", options, code=1) 

235 

236 old_name = profile.name 

237 profile.name = new_name 

238 profile.save(update_fields=["name"]) 

239 security_logger.warning("cookie_admin rename profile_id=%d: %s → %s", profile.id, old_name, new_name) 

240 self._success( 

241 f"Profile renamed: {old_name}{new_name} (profile_id={profile.id})", 

242 options, 

243 {"profile_id": profile.id, "old_name": old_name, "new_name": new_name}, 

244 ) 

← Back to Dashboard