Coverage for apps / recipes / sources_api.py: 39%
145 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 00:40 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 00:40 +0000
1"""
2SearchSource management API endpoints.
3"""
5import asyncio
6from typing import List, Optional
8from django.utils import timezone
9from ninja import Router, Schema
11from .models import SearchSource
12from .services.search import RecipeSearch
14router = Router(tags=['sources'])
17# Schemas
19class SourceOut(Schema):
20 id: int
21 host: str
22 name: str
23 is_enabled: bool
24 search_url_template: str
25 result_selector: str
26 logo_url: str
27 last_validated_at: Optional[str] = None
28 consecutive_failures: int
29 needs_attention: bool
31 @staticmethod
32 def resolve_last_validated_at(obj):
33 if obj.last_validated_at:
34 return obj.last_validated_at.isoformat()
35 return None
38class SourceToggleOut(Schema):
39 id: int
40 is_enabled: bool
43class SourceUpdateIn(Schema):
44 result_selector: str
47class SourceUpdateOut(Schema):
48 id: int
49 result_selector: str
52class SourceTestOut(Schema):
53 success: bool
54 message: str
55 results_count: int
56 sample_results: List[str]
59class ErrorOut(Schema):
60 error: str
61 message: str
64class BulkToggleIn(Schema):
65 enable: bool
68class BulkToggleOut(Schema):
69 updated_count: int
70 is_enabled: bool
73# Endpoints
75@router.get('/', response=List[SourceOut])
76def list_sources(request):
77 """List all search sources with their status."""
78 sources = SearchSource.objects.all().order_by('name')
79 return list(sources)
82@router.get('/enabled-count/', response={200: dict})
83def enabled_count(request):
84 """Get count of enabled sources vs total."""
85 total = SearchSource.objects.count()
86 enabled = SearchSource.objects.filter(is_enabled=True).count()
87 return {
88 'enabled': enabled,
89 'total': total,
90 }
93@router.get('/{source_id}/', response={200: SourceOut, 404: ErrorOut})
94def get_source(request, source_id: int):
95 """Get a single search source by ID."""
96 try:
97 source = SearchSource.objects.get(id=source_id)
98 return source
99 except SearchSource.DoesNotExist:
100 return 404, {
101 'error': 'not_found',
102 'message': f'Source {source_id} not found',
103 }
106@router.post('/{source_id}/toggle/', response={200: SourceToggleOut, 404: ErrorOut})
107def toggle_source(request, source_id: int):
108 """Toggle a source's enabled status."""
109 try:
110 source = SearchSource.objects.get(id=source_id)
111 source.is_enabled = not source.is_enabled
112 source.save()
113 return {
114 'id': source.id,
115 'is_enabled': source.is_enabled,
116 }
117 except SearchSource.DoesNotExist:
118 return 404, {
119 'error': 'not_found',
120 'message': f'Source {source_id} not found',
121 }
124@router.post('/bulk-toggle/', response={200: BulkToggleOut})
125def bulk_toggle_sources(request, data: BulkToggleIn):
126 """Enable or disable all sources at once."""
127 updated = SearchSource.objects.all().update(is_enabled=data.enable)
128 return {
129 'updated_count': updated,
130 'is_enabled': data.enable,
131 }
134@router.put('/{source_id}/selector/', response={200: SourceUpdateOut, 404: ErrorOut})
135def update_selector(request, source_id: int, data: SourceUpdateIn):
136 """Update a source's CSS selector."""
137 try:
138 source = SearchSource.objects.get(id=source_id)
139 source.result_selector = data.result_selector
140 source.save()
141 return {
142 'id': source.id,
143 'result_selector': source.result_selector,
144 }
145 except SearchSource.DoesNotExist:
146 return 404, {
147 'error': 'not_found',
148 'message': f'Source {source_id} not found',
149 }
152@router.post('/{source_id}/test/', response={200: SourceTestOut, 404: ErrorOut, 500: ErrorOut})
153async def test_source(request, source_id: int):
154 """Test a source by performing a sample search.
156 Uses "chicken" as a test query and checks if results are returned.
157 Updates the source's validation status based on the result.
158 """
159 from asgiref.sync import sync_to_async
161 try:
162 source = await sync_to_async(SearchSource.objects.get)(id=source_id)
163 except SearchSource.DoesNotExist:
164 return 404, {
165 'error': 'not_found',
166 'message': f'Source {source_id} not found',
167 }
169 # Test with a common search query
170 test_query = 'chicken'
171 search = RecipeSearch()
173 try:
174 # Search only this specific source
175 results = await search.search(
176 query=test_query,
177 sources=[source.host],
178 page=1,
179 per_page=5,
180 )
182 result_count = len(results.get('results', []))
183 sample_titles = [r.get('title', '')[:50] for r in results.get('results', [])[:3]]
185 # Update source validation status
186 if result_count > 0:
187 source.consecutive_failures = 0
188 source.needs_attention = False
189 source.last_validated_at = timezone.now()
190 await sync_to_async(source.save)()
192 return {
193 'success': True,
194 'message': f'Found {result_count} results for "{test_query}"',
195 'results_count': result_count,
196 'sample_results': sample_titles,
197 }
198 else:
199 source.consecutive_failures += 1
200 source.needs_attention = source.consecutive_failures >= 3
201 source.last_validated_at = timezone.now()
202 await sync_to_async(source.save)()
204 return {
205 'success': False,
206 'message': f'No results found for "{test_query}". The selector may need updating.',
207 'results_count': 0,
208 'sample_results': [],
209 }
211 except Exception as e:
212 # Update failure count
213 source.consecutive_failures += 1
214 source.needs_attention = source.consecutive_failures >= 3
215 source.last_validated_at = timezone.now()
216 await sync_to_async(source.save)()
218 return 500, {
219 'error': 'test_failed',
220 'message': f'Test failed: {str(e)}',
221 }
224@router.post('/test-all/', response={200: dict})
225async def test_all_sources(request):
226 """Test all enabled sources and return summary.
228 This may take a while as it tests each source sequentially.
229 """
230 from asgiref.sync import sync_to_async
232 sources = await sync_to_async(list)(
233 SearchSource.objects.filter(is_enabled=True)
234 )
236 results = {
237 'tested': 0,
238 'passed': 0,
239 'failed': 0,
240 'details': [],
241 }
243 search = RecipeSearch()
244 test_query = 'chicken'
246 for source in sources:
247 try:
248 search_results = await search.search(
249 query=test_query,
250 sources=[source.host],
251 page=1,
252 per_page=3,
253 )
255 result_count = len(search_results.get('results', []))
256 success = result_count > 0
258 # Update source status
259 if success:
260 source.consecutive_failures = 0
261 source.needs_attention = False
262 else:
263 source.consecutive_failures += 1
264 source.needs_attention = source.consecutive_failures >= 3
266 source.last_validated_at = timezone.now()
267 await sync_to_async(source.save)()
269 results['tested'] += 1
270 if success:
271 results['passed'] += 1
272 else:
273 results['failed'] += 1
275 results['details'].append({
276 'id': source.id,
277 'name': source.name,
278 'host': source.host,
279 'success': success,
280 'results_count': result_count,
281 })
283 except Exception as e:
284 source.consecutive_failures += 1
285 source.needs_attention = source.consecutive_failures >= 3
286 source.last_validated_at = timezone.now()
287 await sync_to_async(source.save)()
289 results['tested'] += 1
290 results['failed'] += 1
291 results['details'].append({
292 'id': source.id,
293 'name': source.name,
294 'host': source.host,
295 'success': False,
296 'error': str(e),
297 })
299 return results