Coverage for apps / recipes / services / search.py: 89%

192 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-12 10:49 +0000

1""" 

2Async multi-site recipe search service. 

3""" 

4 

5import asyncio 

6import logging 

7from dataclasses import dataclass 

8from typing import Optional 

9from urllib.parse import quote_plus 

10 

11from asgiref.sync import sync_to_async 

12from bs4 import BeautifulSoup 

13from curl_cffi.requests import AsyncSession 

14from django.utils import timezone 

15 

16from apps.core.validators import ( 

17 MAX_HTML_SIZE, 

18 MAX_REDIRECT_HOPS, 

19 check_content_size, 

20 check_response_size, 

21 validate_redirect_url, 

22) 

23from apps.recipes.services.fingerprint import ( 

24 get_fallback_profiles, 

25 get_random_delay, 

26 get_random_profile, 

27) 

28 

29logger = logging.getLogger(__name__) 

30 

31 

32@dataclass 

33class SearchResult: 

34 """A single search result from a recipe site.""" 

35 

36 url: str 

37 title: str 

38 host: str 

39 image_url: str = "" 

40 description: str = "" 

41 rating_count: Optional[int] = None 

42 

43 

44class RecipeSearch: 

45 """ 

46 Async recipe search service that queries multiple sites concurrently. 

47 

48 Uses curl_cffi with browser impersonation to fetch search pages, 

49 then parses results using BeautifulSoup with site-specific selectors. 

50 

51 Browser profiles are centralized in fingerprint.py for maintainability. 

52 """ 

53 

54 MAX_CONCURRENT = 10 

55 DEFAULT_TIMEOUT = 30 

56 

57 def __init__(self): 

58 self.timeout = self.DEFAULT_TIMEOUT 

59 

60 async def search( 

61 self, 

62 query: str, 

63 sources: Optional[list[str]] = None, 

64 page: int = 1, 

65 per_page: int = 20, 

66 ) -> dict: 

67 """ 

68 Search for recipes across multiple sites. 

69 

70 Args: 

71 query: Search query string 

72 sources: Optional list of hosts to search (None = all enabled) 

73 page: Page number (1-indexed) 

74 per_page: Results per page 

75 

76 Returns: 

77 dict with keys: 

78 - results: List of SearchResult dicts 

79 - total: Total result count 

80 - page: Current page 

81 - has_more: Whether more results exist 

82 - sites: Dict mapping host to result count 

83 """ 

84 enabled_sources = await self._get_enabled_sources(sources) 

85 

86 if not enabled_sources: 

87 return self._empty_response(page) 

88 

89 results_by_source = await self._fetch_all_sources(enabled_sources, query) 

90 all_results, site_counts = await self._aggregate_results( 

91 enabled_sources, 

92 results_by_source, 

93 ) 

94 

95 result_dicts = self._deduplicate_and_convert(all_results) 

96 result_dicts = self._filter_relevant(query, result_dicts) 

97 if result_dicts: 

98 result_dicts = self._apply_ai_ranking(query, result_dicts) 

99 

100 return self._paginate(result_dicts, page, per_page, site_counts) 

101 

102 async def _get_enabled_sources(self, sources: Optional[list[str]] = None) -> list: 

103 """Load enabled search sources, optionally filtered by host name.""" 

104 from apps.recipes.models import SearchSource 

105 

106 get_sources = sync_to_async(lambda: list(SearchSource.objects.filter(is_enabled=True))) 

107 enabled = await get_sources() 

108 if sources: 

109 enabled = [s for s in enabled if s.host in sources] 

110 return enabled 

111 

112 @staticmethod 

113 def _empty_response(page: int) -> dict: 

114 """Return an empty search response.""" 

115 return { 

116 "results": [], 

117 "total": 0, 

118 "page": page, 

119 "has_more": False, 

120 "sites": {}, 

121 } 

122 

123 async def _fetch_all_sources(self, enabled_sources: list, query: str) -> list: 

124 """Search all sources concurrently and return raw results.""" 

125 semaphore = asyncio.Semaphore(self.MAX_CONCURRENT) 

126 primary_profile = get_random_profile() 

127 

128 async with AsyncSession(impersonate=primary_profile) as session: 

129 tasks = [self._search_source(session, semaphore, source, query) for source in enabled_sources] 

130 return await asyncio.gather(*tasks, return_exceptions=True) 

131 

132 async def _aggregate_results( 

133 self, 

134 enabled_sources: list, 

135 results_by_source: list, 

136 ) -> tuple[list["SearchResult"], dict[str, int]]: 

137 """Aggregate per-source results, recording successes and failures.""" 

138 all_results: list[SearchResult] = [] 

139 site_counts: dict[str, int] = {} 

140 

141 for source, result in zip(enabled_sources, results_by_source): 

142 if isinstance(result, Exception): 

143 logger.warning(f"Search failed for {source.host}: {result}") 

144 await self._record_failure(source) 

145 continue 

146 

147 site_counts[source.host] = len(result) 

148 all_results.extend(result) 

149 await self._record_success(source) 

150 

151 return all_results, site_counts 

152 

153 @staticmethod 

154 def _deduplicate_and_convert(results: list["SearchResult"]) -> list[dict]: 

155 """Deduplicate results by URL and convert to dict format for ranking.""" 

156 seen_urls: set[str] = set() 

157 unique: list[dict] = [] 

158 for r in results: 

159 if r.url not in seen_urls: 

160 seen_urls.add(r.url) 

161 unique.append( 

162 { 

163 "url": r.url, 

164 "title": r.title, 

165 "host": r.host, 

166 "image_url": r.image_url, 

167 "description": r.description, 

168 "rating_count": r.rating_count, 

169 } 

170 ) 

171 return unique 

172 

173 @staticmethod 

174 def _paginate( 

175 result_dicts: list[dict], 

176 page: int, 

177 per_page: int, 

178 site_counts: dict[str, int], 

179 ) -> dict: 

180 """Paginate results and build the final response dict.""" 

181 total = len(result_dicts) 

182 start = (page - 1) * per_page 

183 end = start + per_page 

184 return { 

185 "results": result_dicts[start:end], 

186 "total": total, 

187 "page": page, 

188 "has_more": end < total, 

189 "sites": site_counts, 

190 } 

191 

192 @staticmethod 

193 def _filter_relevant(query: str, results: list[dict]) -> list[dict]: 

194 """Filter results to those where at least one query term appears in the title. 

195 

196 This prevents nonsensical queries (e.g., "xyznonexistent") from returning 

197 unrelated results that happen to be scraped from source search pages. 

198 """ 

199 query_terms = [t.lower() for t in query.split() if len(t) >= 2] 

200 if not query_terms: 

201 return results 

202 

203 filtered = [] 

204 for result in results: 

205 title_lower = result["title"].lower() 

206 if any(term in title_lower for term in query_terms): 

207 filtered.append(result) 

208 return filtered 

209 

210 @staticmethod 

211 def _apply_ai_ranking(query: str, results: list[dict]) -> list[dict]: 

212 """Rank search results by relevance using deterministic scoring.""" 

213 from apps.ai.services.ranking import rank_results 

214 

215 return rank_results(query, results) 

216 

217 async def _search_source( 

218 self, 

219 session: AsyncSession, 

220 semaphore: asyncio.Semaphore, 

221 source, 

222 query: str, 

223 ) -> list[SearchResult]: 

224 """ 

225 Search a single source for recipes. 

226 

227 Uses randomized delays and retry with fallback browser profiles 

228 to avoid bot detection patterns. 

229 """ 

230 async with semaphore: 

231 await asyncio.sleep(get_random_delay()) 

232 search_url = source.search_url_template.replace("{query}", quote_plus(query)) 

233 profiles_to_try = self._build_profile_list(session) 

234 

235 last_error = None 

236 for i, profile in enumerate(profiles_to_try[:3]): 

237 if i > 0: 

238 await asyncio.sleep(get_random_delay() * (i + 1)) 

239 result, error = await self._try_fetch_and_parse( 

240 session, 

241 search_url, 

242 profile, 

243 source, 

244 ) 

245 if result is not None: 

246 return result 

247 last_error = error or last_error 

248 

249 raise last_error or Exception("All retry attempts failed") 

250 

251 async def _try_fetch_and_parse(self, session, url, profile, source): 

252 """Attempt a single fetch+parse. Returns (results, None) or (None, error).""" 

253 try: 

254 response = await self._fetch_with_profile(session, url, profile) 

255 if response.status_code == 200: 

256 return self._parse_search_results( 

257 response.text, 

258 source.host, 

259 source.result_selector, 

260 url, 

261 ), None 

262 error = Exception(f"HTTP {response.status_code}") 

263 if not self._should_retry_status(response.status_code): 

264 raise error 

265 return None, error 

266 except asyncio.TimeoutError: 

267 return None, Exception("Request timed out") 

268 except Exception as e: 

269 if not self._should_retry_error(e): 

270 raise 

271 return None, e 

272 

273 @staticmethod 

274 def _build_profile_list(session): 

275 """Build ordered list of browser profiles to try.""" 

276 current = session._impersonate if hasattr(session, "_impersonate") else None 

277 profiles = [None] # None = use session's existing profile 

278 profiles.extend(get_fallback_profiles(exclude=current)) 

279 return profiles 

280 

281 async def _fetch_with_profile(self, session: AsyncSession, url: str, profile): 

282 """Fetch a URL, using a new session with the given profile or the existing session.""" 

283 if profile: 

284 async with AsyncSession(impersonate=profile) as retry_session: 

285 return await self._fetch_url(retry_session, url) 

286 return await self._fetch_url(session, url) 

287 

288 @staticmethod 

289 def _should_retry_status(status_code: int) -> bool: 

290 """Return True if the HTTP status code is transient and worth retrying.""" 

291 return status_code in (403, 404, 429) or status_code >= 500 

292 

293 @staticmethod 

294 def _should_retry_error(error: Exception) -> bool: 

295 """Return True if the exception represents a transient error worth retrying.""" 

296 error_str = str(error) 

297 if "HTTP" not in error_str: 

298 return True 

299 return any(code in error_str for code in ("403", "404", "429", "500", "502", "503")) 

300 

301 async def _fetch_url(self, session: AsyncSession, url: str): 

302 """Fetch a URL with timeout handling, redirect validation, and size limits.""" 

303 from curl_cffi import CurlOpt 

304 

305 current_url = url 

306 current_resolve: list[str] = [] 

307 for _ in range(MAX_REDIRECT_HOPS): 

308 curl_opts = {CurlOpt.RESOLVE: current_resolve} if current_resolve else {} 

309 async with AsyncSession( 

310 impersonate=session._impersonate if hasattr(session, "_impersonate") else "chrome", 

311 curl_options=curl_opts, 

312 ) as pin_session: 

313 response = await asyncio.wait_for( 

314 pin_session.get(current_url, timeout=self.timeout, allow_redirects=False), 

315 timeout=self.timeout + 5, 

316 ) 

317 

318 if response.status_code in (301, 302, 303, 307, 308): 

319 location = response.headers.get("location") 

320 if not location: 

321 return response 

322 resolved = validate_redirect_url(location) 

323 current_url = location 

324 current_resolve = resolved.curl_resolve 

325 continue 

326 

327 if response.status_code == 200: 

328 if not check_response_size(response, MAX_HTML_SIZE): 

329 raise ValueError(f"Search response too large for {url}") 

330 check_content_size(response.content, MAX_HTML_SIZE) 

331 

332 return response 

333 

334 raise ValueError(f"Too many redirects (>{MAX_REDIRECT_HOPS}) for {url}") 

335 

336 def _parse_search_results( 

337 self, 

338 html: str, 

339 host: str, 

340 selector: str, 

341 base_url: str, 

342 ) -> list[SearchResult]: 

343 """ 

344 Parse search results from HTML. 

345 

346 Uses the site-specific CSS selector if available, 

347 otherwise falls back to common patterns. 

348 """ 

349 from apps.recipes.services.search_parsers import ( 

350 extract_result_from_element, 

351 fallback_parse, 

352 ) 

353 

354 soup = BeautifulSoup(html, "html.parser") 

355 results = [] 

356 

357 # Try site-specific selector first 

358 if selector: 

359 elements = soup.select(selector) 

360 if elements: 

361 for el in elements[:20]: # Limit per site 

362 result = extract_result_from_element(el, host, base_url) 

363 if result: 

364 results.append(result) 

365 return results 

366 

367 # Fallback: Look for common recipe link patterns 

368 results = fallback_parse(soup, host, base_url) 

369 return results[:20] # Limit per site 

370 

371 async def _record_failure(self, source) -> None: 

372 """Record a search failure for maintenance tracking.""" 

373 

374 @sync_to_async 

375 def update(): 

376 source.consecutive_failures += 1 

377 if source.consecutive_failures >= 2: 

378 source.needs_attention = True 

379 source.save(update_fields=["consecutive_failures", "needs_attention"]) 

380 

381 await update() 

382 

383 async def _record_success(self, source) -> None: 

384 """Record a successful search.""" 

385 

386 @sync_to_async 

387 def update(): 

388 source.consecutive_failures = 0 

389 source.needs_attention = False 

390 source.last_validated_at = timezone.now() 

391 source.save( 

392 update_fields=[ 

393 "consecutive_failures", 

394 "needs_attention", 

395 "last_validated_at", 

396 ] 

397 ) 

398 

399 await update() 

← Back to Dashboard