Coverage for apps / recipes / sources_api.py: 73%

123 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-12 10:49 +0000

1""" 

2SearchSource management API endpoints. 

3""" 

4 

5import asyncio 

6from typing import List, Optional 

7 

8from django.utils import timezone 

9from ninja import Router, Schema, Status 

10 

11from apps.core.auth import AdminAuth, SessionAuth 

12 

13from .models import SearchSource 

14from .services.search import RecipeSearch 

15 

16router = Router(tags=["sources"]) 

17 

18 

19# Schemas 

20 

21 

22class SourceOut(Schema): 

23 id: int 

24 host: str 

25 name: str 

26 is_enabled: bool 

27 search_url_template: str 

28 result_selector: str 

29 logo_url: str 

30 last_validated_at: Optional[str] = None 

31 consecutive_failures: int 

32 needs_attention: bool 

33 

34 @staticmethod 

35 def resolve_last_validated_at(obj): 

36 if obj.last_validated_at: 

37 return obj.last_validated_at.isoformat() 

38 return None 

39 

40 

41class SourceToggleOut(Schema): 

42 id: int 

43 is_enabled: bool 

44 

45 

46class SourceUpdateIn(Schema): 

47 result_selector: str 

48 

49 

50class SourceUpdateOut(Schema): 

51 id: int 

52 result_selector: str 

53 

54 

55class SourceTestOut(Schema): 

56 success: bool 

57 message: str 

58 results_count: int 

59 sample_results: List[str] 

60 

61 

62class ErrorOut(Schema): 

63 error: str 

64 message: str 

65 

66 

67class BulkToggleIn(Schema): 

68 enable: bool 

69 

70 

71class BulkToggleOut(Schema): 

72 updated_count: int 

73 is_enabled: bool 

74 

75 

76# Endpoints 

77 

78 

79@router.get("/", response=List[SourceOut], auth=SessionAuth()) 

80def list_sources(request): 

81 """List all search sources with their status.""" 

82 sources = SearchSource.objects.all().order_by("name") 

83 return list(sources) 

84 

85 

86@router.get("/enabled-count/", response={200: dict}, auth=SessionAuth()) 

87def enabled_count(request): 

88 """Get count of enabled sources vs total.""" 

89 total = SearchSource.objects.count() 

90 enabled = SearchSource.objects.filter(is_enabled=True).count() 

91 return { 

92 "enabled": enabled, 

93 "total": total, 

94 } 

95 

96 

97@router.get("/{source_id}/", response={200: SourceOut, 404: ErrorOut}, auth=SessionAuth()) 

98def get_source(request, source_id: int): 

99 """Get a single search source by ID.""" 

100 try: 

101 source = SearchSource.objects.get(id=source_id) 

102 return source 

103 except SearchSource.DoesNotExist: 

104 return Status( 

105 404, 

106 { 

107 "error": "not_found", 

108 "message": f"Source {source_id} not found", 

109 }, 

110 ) 

111 

112 

113@router.post("/{source_id}/toggle/", response={200: SourceToggleOut, 404: ErrorOut}, auth=AdminAuth()) 

114def toggle_source(request, source_id: int): 

115 """Toggle a source's enabled status.""" 

116 try: 

117 source = SearchSource.objects.get(id=source_id) 

118 source.is_enabled = not source.is_enabled 

119 source.save() 

120 return { 

121 "id": source.id, 

122 "is_enabled": source.is_enabled, 

123 } 

124 except SearchSource.DoesNotExist: 

125 return Status( 

126 404, 

127 { 

128 "error": "not_found", 

129 "message": f"Source {source_id} not found", 

130 }, 

131 ) 

132 

133 

134@router.post("/bulk-toggle/", response={200: BulkToggleOut}, auth=AdminAuth()) 

135def bulk_toggle_sources(request, data: BulkToggleIn): 

136 """Enable or disable all sources at once.""" 

137 updated = SearchSource.objects.all().update(is_enabled=data.enable) 

138 return { 

139 "updated_count": updated, 

140 "is_enabled": data.enable, 

141 } 

142 

143 

144@router.put("/{source_id}/selector/", response={200: SourceUpdateOut, 404: ErrorOut}, auth=AdminAuth()) 

145def update_selector(request, source_id: int, data: SourceUpdateIn): 

146 """Update a source's CSS selector.""" 

147 try: 

148 source = SearchSource.objects.get(id=source_id) 

149 source.result_selector = data.result_selector 

150 source.save() 

151 return { 

152 "id": source.id, 

153 "result_selector": source.result_selector, 

154 } 

155 except SearchSource.DoesNotExist: 

156 return Status( 

157 404, 

158 { 

159 "error": "not_found", 

160 "message": f"Source {source_id} not found", 

161 }, 

162 ) 

163 

164 

165@router.post("/{source_id}/test/", response={200: SourceTestOut, 404: ErrorOut, 500: ErrorOut}, auth=AdminAuth()) 

166async def test_source(request, source_id: int): 

167 """Test a source by performing a sample search. 

168 

169 Uses "chicken" as a test query and checks if results are returned. 

170 Updates the source's validation status based on the result. 

171 """ 

172 from asgiref.sync import sync_to_async 

173 

174 try: 

175 source = await sync_to_async(SearchSource.objects.get)(id=source_id) 

176 except SearchSource.DoesNotExist: 

177 return Status( 

178 404, 

179 { 

180 "error": "not_found", 

181 "message": f"Source {source_id} not found", 

182 }, 

183 ) 

184 

185 # Test with a common search query 

186 test_query = "chicken" 

187 search = RecipeSearch() 

188 

189 try: 

190 # Search only this specific source 

191 results = await search.search( 

192 query=test_query, 

193 sources=[source.host], 

194 page=1, 

195 per_page=5, 

196 ) 

197 

198 result_count = len(results.get("results", [])) 

199 sample_titles = [r.get("title", "")[:50] for r in results.get("results", [])[:3]] 

200 

201 # Update source validation status 

202 if result_count > 0: 

203 source.consecutive_failures = 0 

204 source.needs_attention = False 

205 source.last_validated_at = timezone.now() 

206 await sync_to_async(source.save)() 

207 

208 return { 

209 "success": True, 

210 "message": f'Found {result_count} results for "{test_query}"', 

211 "results_count": result_count, 

212 "sample_results": sample_titles, 

213 } 

214 else: 

215 source.consecutive_failures += 1 

216 source.needs_attention = source.consecutive_failures >= 3 

217 source.last_validated_at = timezone.now() 

218 await sync_to_async(source.save)() 

219 

220 return { 

221 "success": False, 

222 "message": f'No results found for "{test_query}". The selector may need updating.', 

223 "results_count": 0, 

224 "sample_results": [], 

225 } 

226 

227 except Exception as e: 

228 # Update failure count 

229 source.consecutive_failures += 1 

230 source.needs_attention = source.consecutive_failures >= 3 

231 source.last_validated_at = timezone.now() 

232 await sync_to_async(source.save)() 

233 

234 return Status( 

235 500, 

236 { 

237 "error": "test_failed", 

238 "message": f"Test failed: {str(e)}", 

239 }, 

240 ) 

241 

242 

243@router.post("/test-all/", response={200: dict}, auth=AdminAuth()) 

244async def test_all_sources(request): 

245 """Test all enabled sources and return summary. 

246 

247 This may take a while as it tests each source sequentially. 

248 """ 

249 from asgiref.sync import sync_to_async 

250 

251 sources = await sync_to_async(list)(SearchSource.objects.filter(is_enabled=True)) 

252 

253 results = { 

254 "tested": 0, 

255 "passed": 0, 

256 "failed": 0, 

257 "details": [], 

258 } 

259 

260 search = RecipeSearch() 

261 test_query = "chicken" 

262 

263 for source in sources: 

264 try: 

265 search_results = await search.search( 

266 query=test_query, 

267 sources=[source.host], 

268 page=1, 

269 per_page=3, 

270 ) 

271 

272 result_count = len(search_results.get("results", [])) 

273 success = result_count > 0 

274 

275 # Update source status 

276 if success: 

277 source.consecutive_failures = 0 

278 source.needs_attention = False 

279 else: 

280 source.consecutive_failures += 1 

281 source.needs_attention = source.consecutive_failures >= 3 

282 

283 source.last_validated_at = timezone.now() 

284 await sync_to_async(source.save)() 

285 

286 results["tested"] += 1 

287 if success: 

288 results["passed"] += 1 

289 else: 

290 results["failed"] += 1 

291 

292 results["details"].append( 

293 { 

294 "id": source.id, 

295 "name": source.name, 

296 "host": source.host, 

297 "success": success, 

298 "results_count": result_count, 

299 } 

300 ) 

301 

302 except Exception as e: 

303 source.consecutive_failures += 1 

304 source.needs_attention = source.consecutive_failures >= 3 

305 source.last_validated_at = timezone.now() 

306 await sync_to_async(source.save)() 

307 

308 results["tested"] += 1 

309 results["failed"] += 1 

310 results["details"].append( 

311 { 

312 "id": source.id, 

313 "name": source.name, 

314 "host": source.host, 

315 "success": False, 

316 "error": str(e), 

317 } 

318 ) 

319 

320 return results 

← Back to Dashboard