Coverage for apps / recipes / sources_api.py: 73%
123 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 13:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 13:22 +0000
1"""
2SearchSource management API endpoints.
3"""
5import asyncio
6from typing import List, Optional
8from django.utils import timezone
9from ninja import Router, Schema, Status
11from apps.core.auth import HomeOnlyAuth, SessionAuth
13from .models import SearchSource
14from .services.search import RecipeSearch
16router = Router(tags=["sources"])
19# Schemas
22class SourceOut(Schema):
23 id: int
24 host: str
25 name: str
26 is_enabled: bool
27 search_url_template: str
28 result_selector: str
29 logo_url: str
30 last_validated_at: Optional[str] = None
31 consecutive_failures: int
32 needs_attention: bool
34 @staticmethod
35 def resolve_last_validated_at(obj):
36 if obj.last_validated_at:
37 return obj.last_validated_at.isoformat()
38 return None
41class SourceToggleOut(Schema):
42 id: int
43 is_enabled: bool
46class SourceUpdateIn(Schema):
47 result_selector: str
50class SourceUpdateOut(Schema):
51 id: int
52 result_selector: str
55class SourceTestOut(Schema):
56 success: bool
57 message: str
58 results_count: int
59 sample_results: List[str]
62class ErrorOut(Schema):
63 error: str
64 message: str
67class BulkToggleIn(Schema):
68 enable: bool
71class BulkToggleOut(Schema):
72 updated_count: int
73 is_enabled: bool
76# Endpoints
79@router.get("/", response=List[SourceOut], auth=SessionAuth())
80def list_sources(request):
81 """List all search sources with their status."""
82 sources = SearchSource.objects.all().order_by("name")
83 return list(sources)
86@router.get("/enabled-count/", response={200: dict}, auth=SessionAuth())
87def enabled_count(request):
88 """Get count of enabled sources vs total."""
89 total = SearchSource.objects.count()
90 enabled = SearchSource.objects.filter(is_enabled=True).count()
91 return {
92 "enabled": enabled,
93 "total": total,
94 }
97# Static paths registered before parametrized `/{source_id}/*` patterns so that
98# Django's URL resolver doesn't shadow them with the str-typed path segment.
101@router.post("/bulk-toggle/", response={200: BulkToggleOut}, auth=HomeOnlyAuth())
102def bulk_toggle_sources(request, data: BulkToggleIn):
103 """Enable or disable all sources at once."""
104 updated = SearchSource.objects.all().update(is_enabled=data.enable)
105 return {
106 "updated_count": updated,
107 "is_enabled": data.enable,
108 }
111@router.post("/test-all/", response={200: dict}, auth=HomeOnlyAuth())
112async def test_all_sources(request):
113 """Test all enabled sources and return summary.
115 This may take a while as it tests each source sequentially.
116 """
117 from asgiref.sync import sync_to_async
119 sources = await sync_to_async(list)(SearchSource.objects.filter(is_enabled=True))
121 results = {
122 "tested": 0,
123 "passed": 0,
124 "failed": 0,
125 "details": [],
126 }
128 search = RecipeSearch()
129 test_query = "chicken"
131 for source in sources:
132 try:
133 search_results = await search.search(
134 query=test_query,
135 sources=[source.host],
136 page=1,
137 per_page=3,
138 )
140 result_count = len(search_results.get("results", []))
141 success = result_count > 0
143 # Update source status
144 if success:
145 source.consecutive_failures = 0
146 source.needs_attention = False
147 else:
148 source.consecutive_failures += 1
149 source.needs_attention = source.consecutive_failures >= 3
151 source.last_validated_at = timezone.now()
152 await sync_to_async(source.save)()
154 results["tested"] += 1
155 if success:
156 results["passed"] += 1
157 else:
158 results["failed"] += 1
160 results["details"].append(
161 {
162 "id": source.id,
163 "name": source.name,
164 "host": source.host,
165 "success": success,
166 "results_count": result_count,
167 }
168 )
170 except Exception as e:
171 source.consecutive_failures += 1
172 source.needs_attention = source.consecutive_failures >= 3
173 source.last_validated_at = timezone.now()
174 await sync_to_async(source.save)()
176 results["tested"] += 1
177 results["failed"] += 1
178 results["details"].append(
179 {
180 "id": source.id,
181 "name": source.name,
182 "host": source.host,
183 "success": False,
184 "error": str(e),
185 }
186 )
188 return results
191@router.get("/{source_id}/", response={200: SourceOut, 404: ErrorOut}, auth=SessionAuth())
192def get_source(request, source_id: int):
193 """Get a single search source by ID."""
194 try:
195 source = SearchSource.objects.get(id=source_id)
196 return source
197 except SearchSource.DoesNotExist:
198 return Status(
199 404,
200 {
201 "error": "not_found",
202 "message": f"Source {source_id} not found",
203 },
204 )
207@router.post("/{source_id}/toggle/", response={200: SourceToggleOut, 404: ErrorOut}, auth=HomeOnlyAuth())
208def toggle_source(request, source_id: int):
209 """Toggle a source's enabled status."""
210 try:
211 source = SearchSource.objects.get(id=source_id)
212 source.is_enabled = not source.is_enabled
213 source.save()
214 return {
215 "id": source.id,
216 "is_enabled": source.is_enabled,
217 }
218 except SearchSource.DoesNotExist:
219 return Status(
220 404,
221 {
222 "error": "not_found",
223 "message": f"Source {source_id} not found",
224 },
225 )
228@router.put("/{source_id}/selector/", response={200: SourceUpdateOut, 404: ErrorOut}, auth=HomeOnlyAuth())
229def update_selector(request, source_id: int, data: SourceUpdateIn):
230 """Update a source's CSS selector."""
231 try:
232 source = SearchSource.objects.get(id=source_id)
233 source.result_selector = data.result_selector
234 source.save()
235 return {
236 "id": source.id,
237 "result_selector": source.result_selector,
238 }
239 except SearchSource.DoesNotExist:
240 return Status(
241 404,
242 {
243 "error": "not_found",
244 "message": f"Source {source_id} not found",
245 },
246 )
249@router.post("/{source_id}/test/", response={200: SourceTestOut, 404: ErrorOut, 500: ErrorOut}, auth=HomeOnlyAuth())
250async def test_source(request, source_id: int):
251 """Test a source by performing a sample search.
253 Uses "chicken" as a test query and checks if results are returned.
254 Updates the source's validation status based on the result.
255 """
256 from asgiref.sync import sync_to_async
258 try:
259 source = await sync_to_async(SearchSource.objects.get)(id=source_id)
260 except SearchSource.DoesNotExist:
261 return Status(
262 404,
263 {
264 "error": "not_found",
265 "message": f"Source {source_id} not found",
266 },
267 )
269 # Test with a common search query
270 test_query = "chicken"
271 search = RecipeSearch()
273 try:
274 # Search only this specific source
275 results = await search.search(
276 query=test_query,
277 sources=[source.host],
278 page=1,
279 per_page=5,
280 )
282 result_count = len(results.get("results", []))
283 sample_titles = [r.get("title", "")[:50] for r in results.get("results", [])[:3]]
285 # Update source validation status
286 if result_count > 0:
287 source.consecutive_failures = 0
288 source.needs_attention = False
289 source.last_validated_at = timezone.now()
290 await sync_to_async(source.save)()
292 return {
293 "success": True,
294 "message": f'Found {result_count} results for "{test_query}"',
295 "results_count": result_count,
296 "sample_results": sample_titles,
297 }
298 else:
299 source.consecutive_failures += 1
300 source.needs_attention = source.consecutive_failures >= 3
301 source.last_validated_at = timezone.now()
302 await sync_to_async(source.save)()
304 return {
305 "success": False,
306 "message": f'No results found for "{test_query}". The selector may need updating.',
307 "results_count": 0,
308 "sample_results": [],
309 }
311 except Exception as e:
312 # Update failure count
313 source.consecutive_failures += 1
314 source.needs_attention = source.consecutive_failures >= 3
315 source.last_validated_at = timezone.now()
316 await sync_to_async(source.save)()
318 return Status(
319 500,
320 {
321 "error": "test_failed",
322 "message": f"Test failed: {str(e)}",
323 },
324 )