Coverage for apps / recipes / sources_api.py: 73%
123 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-12 10:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-12 10:49 +0000
1"""
2SearchSource management API endpoints.
3"""
5import asyncio
6from typing import List, Optional
8from django.utils import timezone
9from ninja import Router, Schema, Status
11from apps.core.auth import AdminAuth, SessionAuth
13from .models import SearchSource
14from .services.search import RecipeSearch
16router = Router(tags=["sources"])
19# Schemas
22class SourceOut(Schema):
23 id: int
24 host: str
25 name: str
26 is_enabled: bool
27 search_url_template: str
28 result_selector: str
29 logo_url: str
30 last_validated_at: Optional[str] = None
31 consecutive_failures: int
32 needs_attention: bool
34 @staticmethod
35 def resolve_last_validated_at(obj):
36 if obj.last_validated_at:
37 return obj.last_validated_at.isoformat()
38 return None
41class SourceToggleOut(Schema):
42 id: int
43 is_enabled: bool
46class SourceUpdateIn(Schema):
47 result_selector: str
50class SourceUpdateOut(Schema):
51 id: int
52 result_selector: str
55class SourceTestOut(Schema):
56 success: bool
57 message: str
58 results_count: int
59 sample_results: List[str]
62class ErrorOut(Schema):
63 error: str
64 message: str
67class BulkToggleIn(Schema):
68 enable: bool
71class BulkToggleOut(Schema):
72 updated_count: int
73 is_enabled: bool
76# Endpoints
79@router.get("/", response=List[SourceOut], auth=SessionAuth())
80def list_sources(request):
81 """List all search sources with their status."""
82 sources = SearchSource.objects.all().order_by("name")
83 return list(sources)
86@router.get("/enabled-count/", response={200: dict}, auth=SessionAuth())
87def enabled_count(request):
88 """Get count of enabled sources vs total."""
89 total = SearchSource.objects.count()
90 enabled = SearchSource.objects.filter(is_enabled=True).count()
91 return {
92 "enabled": enabled,
93 "total": total,
94 }
97@router.get("/{source_id}/", response={200: SourceOut, 404: ErrorOut}, auth=SessionAuth())
98def get_source(request, source_id: int):
99 """Get a single search source by ID."""
100 try:
101 source = SearchSource.objects.get(id=source_id)
102 return source
103 except SearchSource.DoesNotExist:
104 return Status(
105 404,
106 {
107 "error": "not_found",
108 "message": f"Source {source_id} not found",
109 },
110 )
113@router.post("/{source_id}/toggle/", response={200: SourceToggleOut, 404: ErrorOut}, auth=AdminAuth())
114def toggle_source(request, source_id: int):
115 """Toggle a source's enabled status."""
116 try:
117 source = SearchSource.objects.get(id=source_id)
118 source.is_enabled = not source.is_enabled
119 source.save()
120 return {
121 "id": source.id,
122 "is_enabled": source.is_enabled,
123 }
124 except SearchSource.DoesNotExist:
125 return Status(
126 404,
127 {
128 "error": "not_found",
129 "message": f"Source {source_id} not found",
130 },
131 )
134@router.post("/bulk-toggle/", response={200: BulkToggleOut}, auth=AdminAuth())
135def bulk_toggle_sources(request, data: BulkToggleIn):
136 """Enable or disable all sources at once."""
137 updated = SearchSource.objects.all().update(is_enabled=data.enable)
138 return {
139 "updated_count": updated,
140 "is_enabled": data.enable,
141 }
144@router.put("/{source_id}/selector/", response={200: SourceUpdateOut, 404: ErrorOut}, auth=AdminAuth())
145def update_selector(request, source_id: int, data: SourceUpdateIn):
146 """Update a source's CSS selector."""
147 try:
148 source = SearchSource.objects.get(id=source_id)
149 source.result_selector = data.result_selector
150 source.save()
151 return {
152 "id": source.id,
153 "result_selector": source.result_selector,
154 }
155 except SearchSource.DoesNotExist:
156 return Status(
157 404,
158 {
159 "error": "not_found",
160 "message": f"Source {source_id} not found",
161 },
162 )
165@router.post("/{source_id}/test/", response={200: SourceTestOut, 404: ErrorOut, 500: ErrorOut}, auth=AdminAuth())
166async def test_source(request, source_id: int):
167 """Test a source by performing a sample search.
169 Uses "chicken" as a test query and checks if results are returned.
170 Updates the source's validation status based on the result.
171 """
172 from asgiref.sync import sync_to_async
174 try:
175 source = await sync_to_async(SearchSource.objects.get)(id=source_id)
176 except SearchSource.DoesNotExist:
177 return Status(
178 404,
179 {
180 "error": "not_found",
181 "message": f"Source {source_id} not found",
182 },
183 )
185 # Test with a common search query
186 test_query = "chicken"
187 search = RecipeSearch()
189 try:
190 # Search only this specific source
191 results = await search.search(
192 query=test_query,
193 sources=[source.host],
194 page=1,
195 per_page=5,
196 )
198 result_count = len(results.get("results", []))
199 sample_titles = [r.get("title", "")[:50] for r in results.get("results", [])[:3]]
201 # Update source validation status
202 if result_count > 0:
203 source.consecutive_failures = 0
204 source.needs_attention = False
205 source.last_validated_at = timezone.now()
206 await sync_to_async(source.save)()
208 return {
209 "success": True,
210 "message": f'Found {result_count} results for "{test_query}"',
211 "results_count": result_count,
212 "sample_results": sample_titles,
213 }
214 else:
215 source.consecutive_failures += 1
216 source.needs_attention = source.consecutive_failures >= 3
217 source.last_validated_at = timezone.now()
218 await sync_to_async(source.save)()
220 return {
221 "success": False,
222 "message": f'No results found for "{test_query}". The selector may need updating.',
223 "results_count": 0,
224 "sample_results": [],
225 }
227 except Exception as e:
228 # Update failure count
229 source.consecutive_failures += 1
230 source.needs_attention = source.consecutive_failures >= 3
231 source.last_validated_at = timezone.now()
232 await sync_to_async(source.save)()
234 return Status(
235 500,
236 {
237 "error": "test_failed",
238 "message": f"Test failed: {str(e)}",
239 },
240 )
243@router.post("/test-all/", response={200: dict}, auth=AdminAuth())
244async def test_all_sources(request):
245 """Test all enabled sources and return summary.
247 This may take a while as it tests each source sequentially.
248 """
249 from asgiref.sync import sync_to_async
251 sources = await sync_to_async(list)(SearchSource.objects.filter(is_enabled=True))
253 results = {
254 "tested": 0,
255 "passed": 0,
256 "failed": 0,
257 "details": [],
258 }
260 search = RecipeSearch()
261 test_query = "chicken"
263 for source in sources:
264 try:
265 search_results = await search.search(
266 query=test_query,
267 sources=[source.host],
268 page=1,
269 per_page=3,
270 )
272 result_count = len(search_results.get("results", []))
273 success = result_count > 0
275 # Update source status
276 if success:
277 source.consecutive_failures = 0
278 source.needs_attention = False
279 else:
280 source.consecutive_failures += 1
281 source.needs_attention = source.consecutive_failures >= 3
283 source.last_validated_at = timezone.now()
284 await sync_to_async(source.save)()
286 results["tested"] += 1
287 if success:
288 results["passed"] += 1
289 else:
290 results["failed"] += 1
292 results["details"].append(
293 {
294 "id": source.id,
295 "name": source.name,
296 "host": source.host,
297 "success": success,
298 "results_count": result_count,
299 }
300 )
302 except Exception as e:
303 source.consecutive_failures += 1
304 source.needs_attention = source.consecutive_failures >= 3
305 source.last_validated_at = timezone.now()
306 await sync_to_async(source.save)()
308 results["tested"] += 1
309 results["failed"] += 1
310 results["details"].append(
311 {
312 "id": source.id,
313 "name": source.name,
314 "host": source.host,
315 "success": False,
316 "error": str(e),
317 }
318 )
320 return results