Coverage for apps / recipes / sources_api.py: 73%

123 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-02 13:22 +0000

1""" 

2SearchSource management API endpoints. 

3""" 

4 

5import asyncio 

6from typing import List, Optional 

7 

8from django.utils import timezone 

9from ninja import Router, Schema, Status 

10 

11from apps.core.auth import HomeOnlyAuth, SessionAuth 

12 

13from .models import SearchSource 

14from .services.search import RecipeSearch 

15 

16router = Router(tags=["sources"]) 

17 

18 

19# Schemas 

20 

21 

22class SourceOut(Schema): 

23 id: int 

24 host: str 

25 name: str 

26 is_enabled: bool 

27 search_url_template: str 

28 result_selector: str 

29 logo_url: str 

30 last_validated_at: Optional[str] = None 

31 consecutive_failures: int 

32 needs_attention: bool 

33 

34 @staticmethod 

35 def resolve_last_validated_at(obj): 

36 if obj.last_validated_at: 

37 return obj.last_validated_at.isoformat() 

38 return None 

39 

40 

41class SourceToggleOut(Schema): 

42 id: int 

43 is_enabled: bool 

44 

45 

46class SourceUpdateIn(Schema): 

47 result_selector: str 

48 

49 

50class SourceUpdateOut(Schema): 

51 id: int 

52 result_selector: str 

53 

54 

55class SourceTestOut(Schema): 

56 success: bool 

57 message: str 

58 results_count: int 

59 sample_results: List[str] 

60 

61 

62class ErrorOut(Schema): 

63 error: str 

64 message: str 

65 

66 

67class BulkToggleIn(Schema): 

68 enable: bool 

69 

70 

71class BulkToggleOut(Schema): 

72 updated_count: int 

73 is_enabled: bool 

74 

75 

76# Endpoints 

77 

78 

79@router.get("/", response=List[SourceOut], auth=SessionAuth()) 

80def list_sources(request): 

81 """List all search sources with their status.""" 

82 sources = SearchSource.objects.all().order_by("name") 

83 return list(sources) 

84 

85 

86@router.get("/enabled-count/", response={200: dict}, auth=SessionAuth()) 

87def enabled_count(request): 

88 """Get count of enabled sources vs total.""" 

89 total = SearchSource.objects.count() 

90 enabled = SearchSource.objects.filter(is_enabled=True).count() 

91 return { 

92 "enabled": enabled, 

93 "total": total, 

94 } 

95 

96 

97# Static paths registered before parametrized `/{source_id}/*` patterns so that 

98# Django's URL resolver doesn't shadow them with the str-typed path segment. 

99 

100 

101@router.post("/bulk-toggle/", response={200: BulkToggleOut}, auth=HomeOnlyAuth()) 

102def bulk_toggle_sources(request, data: BulkToggleIn): 

103 """Enable or disable all sources at once.""" 

104 updated = SearchSource.objects.all().update(is_enabled=data.enable) 

105 return { 

106 "updated_count": updated, 

107 "is_enabled": data.enable, 

108 } 

109 

110 

111@router.post("/test-all/", response={200: dict}, auth=HomeOnlyAuth()) 

112async def test_all_sources(request): 

113 """Test all enabled sources and return summary. 

114 

115 This may take a while as it tests each source sequentially. 

116 """ 

117 from asgiref.sync import sync_to_async 

118 

119 sources = await sync_to_async(list)(SearchSource.objects.filter(is_enabled=True)) 

120 

121 results = { 

122 "tested": 0, 

123 "passed": 0, 

124 "failed": 0, 

125 "details": [], 

126 } 

127 

128 search = RecipeSearch() 

129 test_query = "chicken" 

130 

131 for source in sources: 

132 try: 

133 search_results = await search.search( 

134 query=test_query, 

135 sources=[source.host], 

136 page=1, 

137 per_page=3, 

138 ) 

139 

140 result_count = len(search_results.get("results", [])) 

141 success = result_count > 0 

142 

143 # Update source status 

144 if success: 

145 source.consecutive_failures = 0 

146 source.needs_attention = False 

147 else: 

148 source.consecutive_failures += 1 

149 source.needs_attention = source.consecutive_failures >= 3 

150 

151 source.last_validated_at = timezone.now() 

152 await sync_to_async(source.save)() 

153 

154 results["tested"] += 1 

155 if success: 

156 results["passed"] += 1 

157 else: 

158 results["failed"] += 1 

159 

160 results["details"].append( 

161 { 

162 "id": source.id, 

163 "name": source.name, 

164 "host": source.host, 

165 "success": success, 

166 "results_count": result_count, 

167 } 

168 ) 

169 

170 except Exception as e: 

171 source.consecutive_failures += 1 

172 source.needs_attention = source.consecutive_failures >= 3 

173 source.last_validated_at = timezone.now() 

174 await sync_to_async(source.save)() 

175 

176 results["tested"] += 1 

177 results["failed"] += 1 

178 results["details"].append( 

179 { 

180 "id": source.id, 

181 "name": source.name, 

182 "host": source.host, 

183 "success": False, 

184 "error": str(e), 

185 } 

186 ) 

187 

188 return results 

189 

190 

191@router.get("/{source_id}/", response={200: SourceOut, 404: ErrorOut}, auth=SessionAuth()) 

192def get_source(request, source_id: int): 

193 """Get a single search source by ID.""" 

194 try: 

195 source = SearchSource.objects.get(id=source_id) 

196 return source 

197 except SearchSource.DoesNotExist: 

198 return Status( 

199 404, 

200 { 

201 "error": "not_found", 

202 "message": f"Source {source_id} not found", 

203 }, 

204 ) 

205 

206 

207@router.post("/{source_id}/toggle/", response={200: SourceToggleOut, 404: ErrorOut}, auth=HomeOnlyAuth()) 

208def toggle_source(request, source_id: int): 

209 """Toggle a source's enabled status.""" 

210 try: 

211 source = SearchSource.objects.get(id=source_id) 

212 source.is_enabled = not source.is_enabled 

213 source.save() 

214 return { 

215 "id": source.id, 

216 "is_enabled": source.is_enabled, 

217 } 

218 except SearchSource.DoesNotExist: 

219 return Status( 

220 404, 

221 { 

222 "error": "not_found", 

223 "message": f"Source {source_id} not found", 

224 }, 

225 ) 

226 

227 

228@router.put("/{source_id}/selector/", response={200: SourceUpdateOut, 404: ErrorOut}, auth=HomeOnlyAuth()) 

229def update_selector(request, source_id: int, data: SourceUpdateIn): 

230 """Update a source's CSS selector.""" 

231 try: 

232 source = SearchSource.objects.get(id=source_id) 

233 source.result_selector = data.result_selector 

234 source.save() 

235 return { 

236 "id": source.id, 

237 "result_selector": source.result_selector, 

238 } 

239 except SearchSource.DoesNotExist: 

240 return Status( 

241 404, 

242 { 

243 "error": "not_found", 

244 "message": f"Source {source_id} not found", 

245 }, 

246 ) 

247 

248 

249@router.post("/{source_id}/test/", response={200: SourceTestOut, 404: ErrorOut, 500: ErrorOut}, auth=HomeOnlyAuth()) 

250async def test_source(request, source_id: int): 

251 """Test a source by performing a sample search. 

252 

253 Uses "chicken" as a test query and checks if results are returned. 

254 Updates the source's validation status based on the result. 

255 """ 

256 from asgiref.sync import sync_to_async 

257 

258 try: 

259 source = await sync_to_async(SearchSource.objects.get)(id=source_id) 

260 except SearchSource.DoesNotExist: 

261 return Status( 

262 404, 

263 { 

264 "error": "not_found", 

265 "message": f"Source {source_id} not found", 

266 }, 

267 ) 

268 

269 # Test with a common search query 

270 test_query = "chicken" 

271 search = RecipeSearch() 

272 

273 try: 

274 # Search only this specific source 

275 results = await search.search( 

276 query=test_query, 

277 sources=[source.host], 

278 page=1, 

279 per_page=5, 

280 ) 

281 

282 result_count = len(results.get("results", [])) 

283 sample_titles = [r.get("title", "")[:50] for r in results.get("results", [])[:3]] 

284 

285 # Update source validation status 

286 if result_count > 0: 

287 source.consecutive_failures = 0 

288 source.needs_attention = False 

289 source.last_validated_at = timezone.now() 

290 await sync_to_async(source.save)() 

291 

292 return { 

293 "success": True, 

294 "message": f'Found {result_count} results for "{test_query}"', 

295 "results_count": result_count, 

296 "sample_results": sample_titles, 

297 } 

298 else: 

299 source.consecutive_failures += 1 

300 source.needs_attention = source.consecutive_failures >= 3 

301 source.last_validated_at = timezone.now() 

302 await sync_to_async(source.save)() 

303 

304 return { 

305 "success": False, 

306 "message": f'No results found for "{test_query}". The selector may need updating.', 

307 "results_count": 0, 

308 "sample_results": [], 

309 } 

310 

311 except Exception as e: 

312 # Update failure count 

313 source.consecutive_failures += 1 

314 source.needs_attention = source.consecutive_failures >= 3 

315 source.last_validated_at = timezone.now() 

316 await sync_to_async(source.save)() 

317 

318 return Status( 

319 500, 

320 { 

321 "error": "test_failed", 

322 "message": f"Test failed: {str(e)}", 

323 }, 

324 ) 

← Back to Dashboard