Coverage for apps / recipes / services / source_health.py: 0%
34 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 13:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 13:22 +0000
1"""Shared source-health-check helpers used by both the HTTP handlers and the CLI.
3- `check_source(source)` runs a single source through a sample query and
4 returns `{source_id, name, host, ok, status_code, message, results_count}`.
5- `check_all_sources()` iterates every enabled source and returns a list of
6 the same shape.
8Factored out of `apps/recipes/sources_api.py` so `python manage.py cookie_admin
9sources test [--id|--all]` can reuse the exact same logic as the web endpoints.
10"""
12from __future__ import annotations
14from typing import Any
16from asgiref.sync import sync_to_async
17from django.utils import timezone
19from apps.recipes.models import SearchSource
20from apps.recipes.services.search import RecipeSearch
22TEST_QUERY = "chicken"
25async def check_source(source: SearchSource) -> dict[str, Any]:
26 """Run the sample query against one source; update its health metadata in DB.
28 Returns a plain dict safe to serialise to JSON or print.
29 """
30 search = RecipeSearch()
31 try:
32 results = await search.search(
33 query=TEST_QUERY,
34 sources=[source.host],
35 page=1,
36 per_page=3,
37 )
38 result_count = len(results.get("results", []))
39 ok = result_count > 0
41 if ok:
42 source.consecutive_failures = 0
43 source.needs_attention = False
44 else:
45 source.consecutive_failures += 1
46 source.needs_attention = source.consecutive_failures >= 3
47 source.last_validated_at = timezone.now()
48 await sync_to_async(source.save)()
50 message = (
51 f'Found {result_count} results for "{TEST_QUERY}"'
52 if ok
53 else f'No results for "{TEST_QUERY}" — selector may need updating'
54 )
55 return {
56 "source_id": source.id,
57 "name": source.name,
58 "host": source.host,
59 "ok": ok,
60 "status_code": 200,
61 "message": message,
62 "results_count": result_count,
63 }
64 except Exception as exc:
65 source.consecutive_failures += 1
66 source.needs_attention = source.consecutive_failures >= 3
67 source.last_validated_at = timezone.now()
68 await sync_to_async(source.save)()
69 return {
70 "source_id": source.id,
71 "name": source.name,
72 "host": source.host,
73 "ok": False,
74 "status_code": None,
75 "message": f"Test failed: {exc}",
76 "results_count": 0,
77 }
80async def check_all_sources() -> list[dict[str, Any]]:
81 """Test every enabled source sequentially and return per-source results."""
82 sources = await sync_to_async(list)(SearchSource.objects.filter(is_enabled=True))
83 results = []
84 for source in sources:
85 results.append(await check_source(source))
86 return results