Coverage for apps / core / management / commands / _cookie_admin_resources.py: 69%
143 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 13:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 13:22 +0000
1"""Resource-oriented handlers for `cookie_admin`: sources, quota, rename.
3Split out of `cookie_admin.py` to stay under the 500-line quality gate.
4Methods assume `self` is a `Command` instance.
5"""
7from __future__ import annotations
9import json
10import logging
12from django.conf import settings
13from django.contrib.auth.models import User
15security_logger = logging.getLogger("security")
18class ResourcesMixin:
19 """sources, quota, rename subcommand handlers."""
21 # ------------------------------------------------------------------ #
22 # sources #
23 # ------------------------------------------------------------------ #
25 def _handle_sources_list(self, options):
26 from apps.recipes.models import SearchSource
28 qs = SearchSource.objects.order_by("name")
29 if options.get("attention"):
30 qs = qs.filter(needs_attention=True)
31 rows = [
32 {
33 "id": s.id,
34 "name": s.name,
35 "host": s.host,
36 "url": s.search_url_template,
37 "enabled": s.is_enabled,
38 "needs_attention": s.needs_attention,
39 "selector": s.result_selector,
40 }
41 for s in qs
42 ]
43 if options.get("as_json"):
44 self.stdout.write(json.dumps({"ok": True, "sources": rows}))
45 return
46 for r in rows:
47 self.stdout.write(
48 f"{r['id']:>3} enabled={r['enabled']} attention={r['needs_attention']} {r['name']} {r['host']}"
49 )
51 def _handle_sources_toggle(self, options):
52 from apps.recipes.models import SearchSource
54 sid = options["source_id"]
55 try:
56 source = SearchSource.objects.get(id=sid)
57 except SearchSource.DoesNotExist:
58 self._error(f"Source {sid} not found.", options, code=1)
59 source.is_enabled = not source.is_enabled
60 source.save(update_fields=["is_enabled"])
61 security_logger.warning("cookie_admin sources toggle %d: %s", sid, source.is_enabled)
62 self._success(
63 f"Source {sid} ({source.name}): enabled={source.is_enabled}",
64 options,
65 {"source_id": sid, "enabled": source.is_enabled},
66 )
68 def _handle_sources_toggle_all(self, options):
69 from apps.recipes.models import SearchSource
71 value = bool(options.get("enable"))
72 count = SearchSource.objects.all().update(is_enabled=value)
73 security_logger.warning("cookie_admin sources toggle-all: enabled=%s count=%d", value, count)
74 self._success(
75 f"Set enabled={value} for {count} sources.",
76 options,
77 {"enabled": value, "count": count},
78 )
80 def _handle_sources_set_selector(self, options):
81 from apps.recipes.models import SearchSource
83 sid = options["source_id"]
84 selector = options["selector"].strip()
85 if not selector:
86 self._error("--selector must be a non-empty string.", options, code=2)
87 try:
88 source = SearchSource.objects.get(id=sid)
89 except SearchSource.DoesNotExist:
90 self._error(f"Source {sid} not found.", options, code=1)
91 source.result_selector = selector
92 source.save(update_fields=["result_selector"])
93 security_logger.warning("cookie_admin sources set-selector %d", sid)
94 self._success(
95 f"Source {sid} ({source.name}): selector updated.",
96 options,
97 {"source_id": sid, "selector": selector},
98 )
100 def _handle_sources_test(self, options):
101 import asyncio
103 from apps.recipes.models import SearchSource
104 from apps.recipes.services.source_health import check_all_sources, check_source
106 if options.get("test_all"):
107 results = asyncio.run(check_all_sources())
108 else:
109 sid = options["source_id"]
110 try:
111 source = SearchSource.objects.get(id=sid)
112 except SearchSource.DoesNotExist:
113 self._error(f"Source {sid} not found.", options, code=1)
114 results = [asyncio.run(check_source(source))]
116 ok_count = sum(1 for r in results if r["ok"])
117 fail_count = len(results) - ok_count
118 if options.get("as_json"):
119 self.stdout.write(
120 json.dumps({"ok": True, "results": results, "summary": {"ok": ok_count, "failed": fail_count}})
121 )
122 return
123 for r in results:
124 marker = "[OK] " if r["ok"] else "[FAIL]"
125 self.stdout.write(f"{marker} Source {r['source_id']} ({r['name']}) — {r['message']}")
126 self.stdout.write(f"{ok_count} ok / {fail_count} failed")
128 def _handle_sources_repair(self, options):
129 from apps.ai.services.selector import repair_selector
130 from apps.core.models import AppSettings
131 from apps.recipes.models import SearchSource
133 if not AppSettings.get().openrouter_api_key:
134 self._error(
135 "sources repair requires OPENROUTER_API_KEY or AppSettings.openrouter_api_key to be set.",
136 options,
137 code=2,
138 )
139 sid = options["source_id"]
140 try:
141 source = SearchSource.objects.get(id=sid)
142 except SearchSource.DoesNotExist:
143 self._error(f"Source {sid} not found.", options, code=1)
144 try:
145 result = repair_selector(source_id=sid, html_sample=None, auto_update=False)
146 except Exception as exc:
147 self._error(f"repair failed: {exc}", options, code=1)
148 security_logger.warning("cookie_admin sources repair %d", sid)
149 self._success(
150 f"Source {sid} ({source.name}): selector repaired.",
151 options,
152 {"source_id": sid, "result": result},
153 )
155 # ------------------------------------------------------------------ #
156 # quota #
157 # ------------------------------------------------------------------ #
159 def _handle_quota_show(self, options):
160 from apps.core.models import AppSettings
162 app = AppSettings.get()
163 data = {
164 "remix": app.daily_limit_remix,
165 "remix_suggestions": app.daily_limit_remix_suggestions,
166 "scale": app.daily_limit_scale,
167 "tips": app.daily_limit_tips,
168 "discover": app.daily_limit_discover,
169 "timer": app.daily_limit_timer,
170 }
171 if options.get("as_json"):
172 self.stdout.write(json.dumps({"ok": True, "quotas": data}))
173 return
174 for name, value in data.items():
175 self.stdout.write(f"{name:<18} = {value}")
177 def _handle_quota_set(self, options):
178 from apps.core.models import AppSettings
180 value = options["value"]
181 if value < 0:
182 self._error("Quota value must be a non-negative integer.", options, code=2)
183 feature = options["feature"]
184 field = {
185 "remix": "daily_limit_remix",
186 "remix-suggestions": "daily_limit_remix_suggestions",
187 "scale": "daily_limit_scale",
188 "tips": "daily_limit_tips",
189 "discover": "daily_limit_discover",
190 "timer": "daily_limit_timer",
191 }[feature]
192 app = AppSettings.get()
193 setattr(app, field, value)
194 app.save()
195 security_logger.warning("cookie_admin quota set %s=%d", feature, value)
196 self._success(f"quota.{feature} = {value}", options, {feature: value})
198 # ------------------------------------------------------------------ #
199 # rename #
200 # ------------------------------------------------------------------ #
202 def _handle_rename(self, options):
203 from apps.profiles.models import Profile
205 target = options["target"]
206 new_name = (options["name"] or "").strip()
207 if not new_name:
208 self._error("--name must be a non-empty value.", options, code=2)
209 max_len = Profile._meta.get_field("name").max_length
210 if len(new_name) > max_len:
211 self._error(f"--name exceeds max length {max_len}.", options, code=2)
213 if settings.AUTH_MODE == "passkey":
214 user = None
215 if target.isdigit():
216 try:
217 user = User.objects.get(pk=int(target))
218 except User.DoesNotExist:
219 pass
220 if user is None:
221 try:
222 user = User.objects.get(username=target)
223 except User.DoesNotExist:
224 self._error(f"No user found for '{target}'.", options, code=1)
225 profile = getattr(user, "profile", None)
226 if profile is None:
227 self._error(f"User '{target}' has no profile.", options, code=1)
228 else:
229 if not target.isdigit():
230 self._error("home mode: positional arg must be a Profile id (integer).", options, code=2)
231 try:
232 profile = Profile.objects.get(pk=int(target))
233 except Profile.DoesNotExist:
234 self._error(f"No profile with id {target}.", options, code=1)
236 old_name = profile.name
237 profile.name = new_name
238 profile.save(update_fields=["name"])
239 security_logger.warning("cookie_admin rename profile_id=%d: %s → %s", profile.id, old_name, new_name)
240 self._success(
241 f"Profile renamed: {old_name} → {new_name} (profile_id={profile.id})",
242 options,
243 {"profile_id": profile.id, "old_name": old_name, "new_name": new_name},
244 )