Coverage for apps / recipes / services / search.py: 89%
192 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-12 10:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-12 10:49 +0000
1"""
2Async multi-site recipe search service.
3"""
5import asyncio
6import logging
7from dataclasses import dataclass
8from typing import Optional
9from urllib.parse import quote_plus
11from asgiref.sync import sync_to_async
12from bs4 import BeautifulSoup
13from curl_cffi.requests import AsyncSession
14from django.utils import timezone
16from apps.core.validators import (
17 MAX_HTML_SIZE,
18 MAX_REDIRECT_HOPS,
19 check_content_size,
20 check_response_size,
21 validate_redirect_url,
22)
23from apps.recipes.services.fingerprint import (
24 get_fallback_profiles,
25 get_random_delay,
26 get_random_profile,
27)
29logger = logging.getLogger(__name__)
32@dataclass
33class SearchResult:
34 """A single search result from a recipe site."""
36 url: str
37 title: str
38 host: str
39 image_url: str = ""
40 description: str = ""
41 rating_count: Optional[int] = None
44class RecipeSearch:
45 """
46 Async recipe search service that queries multiple sites concurrently.
48 Uses curl_cffi with browser impersonation to fetch search pages,
49 then parses results using BeautifulSoup with site-specific selectors.
51 Browser profiles are centralized in fingerprint.py for maintainability.
52 """
54 MAX_CONCURRENT = 10
55 DEFAULT_TIMEOUT = 30
57 def __init__(self):
58 self.timeout = self.DEFAULT_TIMEOUT
60 async def search(
61 self,
62 query: str,
63 sources: Optional[list[str]] = None,
64 page: int = 1,
65 per_page: int = 20,
66 ) -> dict:
67 """
68 Search for recipes across multiple sites.
70 Args:
71 query: Search query string
72 sources: Optional list of hosts to search (None = all enabled)
73 page: Page number (1-indexed)
74 per_page: Results per page
76 Returns:
77 dict with keys:
78 - results: List of SearchResult dicts
79 - total: Total result count
80 - page: Current page
81 - has_more: Whether more results exist
82 - sites: Dict mapping host to result count
83 """
84 enabled_sources = await self._get_enabled_sources(sources)
86 if not enabled_sources:
87 return self._empty_response(page)
89 results_by_source = await self._fetch_all_sources(enabled_sources, query)
90 all_results, site_counts = await self._aggregate_results(
91 enabled_sources,
92 results_by_source,
93 )
95 result_dicts = self._deduplicate_and_convert(all_results)
96 result_dicts = self._filter_relevant(query, result_dicts)
97 if result_dicts:
98 result_dicts = self._apply_ai_ranking(query, result_dicts)
100 return self._paginate(result_dicts, page, per_page, site_counts)
102 async def _get_enabled_sources(self, sources: Optional[list[str]] = None) -> list:
103 """Load enabled search sources, optionally filtered by host name."""
104 from apps.recipes.models import SearchSource
106 get_sources = sync_to_async(lambda: list(SearchSource.objects.filter(is_enabled=True)))
107 enabled = await get_sources()
108 if sources:
109 enabled = [s for s in enabled if s.host in sources]
110 return enabled
112 @staticmethod
113 def _empty_response(page: int) -> dict:
114 """Return an empty search response."""
115 return {
116 "results": [],
117 "total": 0,
118 "page": page,
119 "has_more": False,
120 "sites": {},
121 }
123 async def _fetch_all_sources(self, enabled_sources: list, query: str) -> list:
124 """Search all sources concurrently and return raw results."""
125 semaphore = asyncio.Semaphore(self.MAX_CONCURRENT)
126 primary_profile = get_random_profile()
128 async with AsyncSession(impersonate=primary_profile) as session:
129 tasks = [self._search_source(session, semaphore, source, query) for source in enabled_sources]
130 return await asyncio.gather(*tasks, return_exceptions=True)
132 async def _aggregate_results(
133 self,
134 enabled_sources: list,
135 results_by_source: list,
136 ) -> tuple[list["SearchResult"], dict[str, int]]:
137 """Aggregate per-source results, recording successes and failures."""
138 all_results: list[SearchResult] = []
139 site_counts: dict[str, int] = {}
141 for source, result in zip(enabled_sources, results_by_source):
142 if isinstance(result, Exception):
143 logger.warning(f"Search failed for {source.host}: {result}")
144 await self._record_failure(source)
145 continue
147 site_counts[source.host] = len(result)
148 all_results.extend(result)
149 await self._record_success(source)
151 return all_results, site_counts
153 @staticmethod
154 def _deduplicate_and_convert(results: list["SearchResult"]) -> list[dict]:
155 """Deduplicate results by URL and convert to dict format for ranking."""
156 seen_urls: set[str] = set()
157 unique: list[dict] = []
158 for r in results:
159 if r.url not in seen_urls:
160 seen_urls.add(r.url)
161 unique.append(
162 {
163 "url": r.url,
164 "title": r.title,
165 "host": r.host,
166 "image_url": r.image_url,
167 "description": r.description,
168 "rating_count": r.rating_count,
169 }
170 )
171 return unique
173 @staticmethod
174 def _paginate(
175 result_dicts: list[dict],
176 page: int,
177 per_page: int,
178 site_counts: dict[str, int],
179 ) -> dict:
180 """Paginate results and build the final response dict."""
181 total = len(result_dicts)
182 start = (page - 1) * per_page
183 end = start + per_page
184 return {
185 "results": result_dicts[start:end],
186 "total": total,
187 "page": page,
188 "has_more": end < total,
189 "sites": site_counts,
190 }
192 @staticmethod
193 def _filter_relevant(query: str, results: list[dict]) -> list[dict]:
194 """Filter results to those where at least one query term appears in the title.
196 This prevents nonsensical queries (e.g., "xyznonexistent") from returning
197 unrelated results that happen to be scraped from source search pages.
198 """
199 query_terms = [t.lower() for t in query.split() if len(t) >= 2]
200 if not query_terms:
201 return results
203 filtered = []
204 for result in results:
205 title_lower = result["title"].lower()
206 if any(term in title_lower for term in query_terms):
207 filtered.append(result)
208 return filtered
210 @staticmethod
211 def _apply_ai_ranking(query: str, results: list[dict]) -> list[dict]:
212 """Rank search results by relevance using deterministic scoring."""
213 from apps.ai.services.ranking import rank_results
215 return rank_results(query, results)
217 async def _search_source(
218 self,
219 session: AsyncSession,
220 semaphore: asyncio.Semaphore,
221 source,
222 query: str,
223 ) -> list[SearchResult]:
224 """
225 Search a single source for recipes.
227 Uses randomized delays and retry with fallback browser profiles
228 to avoid bot detection patterns.
229 """
230 async with semaphore:
231 await asyncio.sleep(get_random_delay())
232 search_url = source.search_url_template.replace("{query}", quote_plus(query))
233 profiles_to_try = self._build_profile_list(session)
235 last_error = None
236 for i, profile in enumerate(profiles_to_try[:3]):
237 if i > 0:
238 await asyncio.sleep(get_random_delay() * (i + 1))
239 result, error = await self._try_fetch_and_parse(
240 session,
241 search_url,
242 profile,
243 source,
244 )
245 if result is not None:
246 return result
247 last_error = error or last_error
249 raise last_error or Exception("All retry attempts failed")
251 async def _try_fetch_and_parse(self, session, url, profile, source):
252 """Attempt a single fetch+parse. Returns (results, None) or (None, error)."""
253 try:
254 response = await self._fetch_with_profile(session, url, profile)
255 if response.status_code == 200:
256 return self._parse_search_results(
257 response.text,
258 source.host,
259 source.result_selector,
260 url,
261 ), None
262 error = Exception(f"HTTP {response.status_code}")
263 if not self._should_retry_status(response.status_code):
264 raise error
265 return None, error
266 except asyncio.TimeoutError:
267 return None, Exception("Request timed out")
268 except Exception as e:
269 if not self._should_retry_error(e):
270 raise
271 return None, e
273 @staticmethod
274 def _build_profile_list(session):
275 """Build ordered list of browser profiles to try."""
276 current = session._impersonate if hasattr(session, "_impersonate") else None
277 profiles = [None] # None = use session's existing profile
278 profiles.extend(get_fallback_profiles(exclude=current))
279 return profiles
281 async def _fetch_with_profile(self, session: AsyncSession, url: str, profile):
282 """Fetch a URL, using a new session with the given profile or the existing session."""
283 if profile:
284 async with AsyncSession(impersonate=profile) as retry_session:
285 return await self._fetch_url(retry_session, url)
286 return await self._fetch_url(session, url)
288 @staticmethod
289 def _should_retry_status(status_code: int) -> bool:
290 """Return True if the HTTP status code is transient and worth retrying."""
291 return status_code in (403, 404, 429) or status_code >= 500
293 @staticmethod
294 def _should_retry_error(error: Exception) -> bool:
295 """Return True if the exception represents a transient error worth retrying."""
296 error_str = str(error)
297 if "HTTP" not in error_str:
298 return True
299 return any(code in error_str for code in ("403", "404", "429", "500", "502", "503"))
301 async def _fetch_url(self, session: AsyncSession, url: str):
302 """Fetch a URL with timeout handling, redirect validation, and size limits."""
303 from curl_cffi import CurlOpt
305 current_url = url
306 current_resolve: list[str] = []
307 for _ in range(MAX_REDIRECT_HOPS):
308 curl_opts = {CurlOpt.RESOLVE: current_resolve} if current_resolve else {}
309 async with AsyncSession(
310 impersonate=session._impersonate if hasattr(session, "_impersonate") else "chrome",
311 curl_options=curl_opts,
312 ) as pin_session:
313 response = await asyncio.wait_for(
314 pin_session.get(current_url, timeout=self.timeout, allow_redirects=False),
315 timeout=self.timeout + 5,
316 )
318 if response.status_code in (301, 302, 303, 307, 308):
319 location = response.headers.get("location")
320 if not location:
321 return response
322 resolved = validate_redirect_url(location)
323 current_url = location
324 current_resolve = resolved.curl_resolve
325 continue
327 if response.status_code == 200:
328 if not check_response_size(response, MAX_HTML_SIZE):
329 raise ValueError(f"Search response too large for {url}")
330 check_content_size(response.content, MAX_HTML_SIZE)
332 return response
334 raise ValueError(f"Too many redirects (>{MAX_REDIRECT_HOPS}) for {url}")
336 def _parse_search_results(
337 self,
338 html: str,
339 host: str,
340 selector: str,
341 base_url: str,
342 ) -> list[SearchResult]:
343 """
344 Parse search results from HTML.
346 Uses the site-specific CSS selector if available,
347 otherwise falls back to common patterns.
348 """
349 from apps.recipes.services.search_parsers import (
350 extract_result_from_element,
351 fallback_parse,
352 )
354 soup = BeautifulSoup(html, "html.parser")
355 results = []
357 # Try site-specific selector first
358 if selector:
359 elements = soup.select(selector)
360 if elements:
361 for el in elements[:20]: # Limit per site
362 result = extract_result_from_element(el, host, base_url)
363 if result:
364 results.append(result)
365 return results
367 # Fallback: Look for common recipe link patterns
368 results = fallback_parse(soup, host, base_url)
369 return results[:20] # Limit per site
371 async def _record_failure(self, source) -> None:
372 """Record a search failure for maintenance tracking."""
374 @sync_to_async
375 def update():
376 source.consecutive_failures += 1
377 if source.consecutive_failures >= 2:
378 source.needs_attention = True
379 source.save(update_fields=["consecutive_failures", "needs_attention"])
381 await update()
383 async def _record_success(self, source) -> None:
384 """Record a successful search."""
386 @sync_to_async
387 def update():
388 source.consecutive_failures = 0
389 source.needs_attention = False
390 source.last_validated_at = timezone.now()
391 source.save(
392 update_fields=[
393 "consecutive_failures",
394 "needs_attention",
395 "last_validated_at",
396 ]
397 )
399 await update()