Coverage for apps / recipes / sources_api.py: 39%
145 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-14 19:13 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-14 19:13 +0000
1"""
2SearchSource management API endpoints.
3"""
5import asyncio
6from typing import List, Optional
8from django.utils import timezone
9from ninja import Router, Schema
11from .models import SearchSource
12from .services.search import RecipeSearch
14router = Router(tags=["sources"])
17# Schemas
20class SourceOut(Schema):
21 id: int
22 host: str
23 name: str
24 is_enabled: bool
25 search_url_template: str
26 result_selector: str
27 logo_url: str
28 last_validated_at: Optional[str] = None
29 consecutive_failures: int
30 needs_attention: bool
32 @staticmethod
33 def resolve_last_validated_at(obj):
34 if obj.last_validated_at:
35 return obj.last_validated_at.isoformat()
36 return None
39class SourceToggleOut(Schema):
40 id: int
41 is_enabled: bool
44class SourceUpdateIn(Schema):
45 result_selector: str
48class SourceUpdateOut(Schema):
49 id: int
50 result_selector: str
53class SourceTestOut(Schema):
54 success: bool
55 message: str
56 results_count: int
57 sample_results: List[str]
60class ErrorOut(Schema):
61 error: str
62 message: str
65class BulkToggleIn(Schema):
66 enable: bool
69class BulkToggleOut(Schema):
70 updated_count: int
71 is_enabled: bool
74# Endpoints
77@router.get("/", response=List[SourceOut])
78def list_sources(request):
79 """List all search sources with their status."""
80 sources = SearchSource.objects.all().order_by("name")
81 return list(sources)
84@router.get("/enabled-count/", response={200: dict})
85def enabled_count(request):
86 """Get count of enabled sources vs total."""
87 total = SearchSource.objects.count()
88 enabled = SearchSource.objects.filter(is_enabled=True).count()
89 return {
90 "enabled": enabled,
91 "total": total,
92 }
95@router.get("/{source_id}/", response={200: SourceOut, 404: ErrorOut})
96def get_source(request, source_id: int):
97 """Get a single search source by ID."""
98 try:
99 source = SearchSource.objects.get(id=source_id)
100 return source
101 except SearchSource.DoesNotExist:
102 return 404, {
103 "error": "not_found",
104 "message": f"Source {source_id} not found",
105 }
108@router.post("/{source_id}/toggle/", response={200: SourceToggleOut, 404: ErrorOut})
109def toggle_source(request, source_id: int):
110 """Toggle a source's enabled status."""
111 try:
112 source = SearchSource.objects.get(id=source_id)
113 source.is_enabled = not source.is_enabled
114 source.save()
115 return {
116 "id": source.id,
117 "is_enabled": source.is_enabled,
118 }
119 except SearchSource.DoesNotExist:
120 return 404, {
121 "error": "not_found",
122 "message": f"Source {source_id} not found",
123 }
126@router.post("/bulk-toggle/", response={200: BulkToggleOut})
127def bulk_toggle_sources(request, data: BulkToggleIn):
128 """Enable or disable all sources at once."""
129 updated = SearchSource.objects.all().update(is_enabled=data.enable)
130 return {
131 "updated_count": updated,
132 "is_enabled": data.enable,
133 }
136@router.put("/{source_id}/selector/", response={200: SourceUpdateOut, 404: ErrorOut})
137def update_selector(request, source_id: int, data: SourceUpdateIn):
138 """Update a source's CSS selector."""
139 try:
140 source = SearchSource.objects.get(id=source_id)
141 source.result_selector = data.result_selector
142 source.save()
143 return {
144 "id": source.id,
145 "result_selector": source.result_selector,
146 }
147 except SearchSource.DoesNotExist:
148 return 404, {
149 "error": "not_found",
150 "message": f"Source {source_id} not found",
151 }
154@router.post("/{source_id}/test/", response={200: SourceTestOut, 404: ErrorOut, 500: ErrorOut})
155async def test_source(request, source_id: int):
156 """Test a source by performing a sample search.
158 Uses "chicken" as a test query and checks if results are returned.
159 Updates the source's validation status based on the result.
160 """
161 from asgiref.sync import sync_to_async
163 try:
164 source = await sync_to_async(SearchSource.objects.get)(id=source_id)
165 except SearchSource.DoesNotExist:
166 return 404, {
167 "error": "not_found",
168 "message": f"Source {source_id} not found",
169 }
171 # Test with a common search query
172 test_query = "chicken"
173 search = RecipeSearch()
175 try:
176 # Search only this specific source
177 results = await search.search(
178 query=test_query,
179 sources=[source.host],
180 page=1,
181 per_page=5,
182 )
184 result_count = len(results.get("results", []))
185 sample_titles = [r.get("title", "")[:50] for r in results.get("results", [])[:3]]
187 # Update source validation status
188 if result_count > 0:
189 source.consecutive_failures = 0
190 source.needs_attention = False
191 source.last_validated_at = timezone.now()
192 await sync_to_async(source.save)()
194 return {
195 "success": True,
196 "message": f'Found {result_count} results for "{test_query}"',
197 "results_count": result_count,
198 "sample_results": sample_titles,
199 }
200 else:
201 source.consecutive_failures += 1
202 source.needs_attention = source.consecutive_failures >= 3
203 source.last_validated_at = timezone.now()
204 await sync_to_async(source.save)()
206 return {
207 "success": False,
208 "message": f'No results found for "{test_query}". The selector may need updating.',
209 "results_count": 0,
210 "sample_results": [],
211 }
213 except Exception as e:
214 # Update failure count
215 source.consecutive_failures += 1
216 source.needs_attention = source.consecutive_failures >= 3
217 source.last_validated_at = timezone.now()
218 await sync_to_async(source.save)()
220 return 500, {
221 "error": "test_failed",
222 "message": f"Test failed: {str(e)}",
223 }
226@router.post("/test-all/", response={200: dict})
227async def test_all_sources(request):
228 """Test all enabled sources and return summary.
230 This may take a while as it tests each source sequentially.
231 """
232 from asgiref.sync import sync_to_async
234 sources = await sync_to_async(list)(SearchSource.objects.filter(is_enabled=True))
236 results = {
237 "tested": 0,
238 "passed": 0,
239 "failed": 0,
240 "details": [],
241 }
243 search = RecipeSearch()
244 test_query = "chicken"
246 for source in sources:
247 try:
248 search_results = await search.search(
249 query=test_query,
250 sources=[source.host],
251 page=1,
252 per_page=3,
253 )
255 result_count = len(search_results.get("results", []))
256 success = result_count > 0
258 # Update source status
259 if success:
260 source.consecutive_failures = 0
261 source.needs_attention = False
262 else:
263 source.consecutive_failures += 1
264 source.needs_attention = source.consecutive_failures >= 3
266 source.last_validated_at = timezone.now()
267 await sync_to_async(source.save)()
269 results["tested"] += 1
270 if success:
271 results["passed"] += 1
272 else:
273 results["failed"] += 1
275 results["details"].append(
276 {
277 "id": source.id,
278 "name": source.name,
279 "host": source.host,
280 "success": success,
281 "results_count": result_count,
282 }
283 )
285 except Exception as e:
286 source.consecutive_failures += 1
287 source.needs_attention = source.consecutive_failures >= 3
288 source.last_validated_at = timezone.now()
289 await sync_to_async(source.save)()
291 results["tested"] += 1
292 results["failed"] += 1
293 results["details"].append(
294 {
295 "id": source.id,
296 "name": source.name,
297 "host": source.host,
298 "success": False,
299 "error": str(e),
300 }
301 )
303 return results