Coverage for apps / recipes / services / source_health.py: 0%

34 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-02 13:22 +0000

1"""Shared source-health-check helpers used by both the HTTP handlers and the CLI. 

2 

3- `check_source(source)` runs a single source through a sample query and 

4 returns `{source_id, name, host, ok, status_code, message, results_count}`. 

5- `check_all_sources()` iterates every enabled source and returns a list of 

6 the same shape. 

7 

8Factored out of `apps/recipes/sources_api.py` so `python manage.py cookie_admin 

9sources test [--id|--all]` can reuse the exact same logic as the web endpoints. 

10""" 

11 

12from __future__ import annotations 

13 

14from typing import Any 

15 

16from asgiref.sync import sync_to_async 

17from django.utils import timezone 

18 

19from apps.recipes.models import SearchSource 

20from apps.recipes.services.search import RecipeSearch 

21 

22TEST_QUERY = "chicken" 

23 

24 

25async def check_source(source: SearchSource) -> dict[str, Any]: 

26 """Run the sample query against one source; update its health metadata in DB. 

27 

28 Returns a plain dict safe to serialise to JSON or print. 

29 """ 

30 search = RecipeSearch() 

31 try: 

32 results = await search.search( 

33 query=TEST_QUERY, 

34 sources=[source.host], 

35 page=1, 

36 per_page=3, 

37 ) 

38 result_count = len(results.get("results", [])) 

39 ok = result_count > 0 

40 

41 if ok: 

42 source.consecutive_failures = 0 

43 source.needs_attention = False 

44 else: 

45 source.consecutive_failures += 1 

46 source.needs_attention = source.consecutive_failures >= 3 

47 source.last_validated_at = timezone.now() 

48 await sync_to_async(source.save)() 

49 

50 message = ( 

51 f'Found {result_count} results for "{TEST_QUERY}"' 

52 if ok 

53 else f'No results for "{TEST_QUERY}" — selector may need updating' 

54 ) 

55 return { 

56 "source_id": source.id, 

57 "name": source.name, 

58 "host": source.host, 

59 "ok": ok, 

60 "status_code": 200, 

61 "message": message, 

62 "results_count": result_count, 

63 } 

64 except Exception as exc: 

65 source.consecutive_failures += 1 

66 source.needs_attention = source.consecutive_failures >= 3 

67 source.last_validated_at = timezone.now() 

68 await sync_to_async(source.save)() 

69 return { 

70 "source_id": source.id, 

71 "name": source.name, 

72 "host": source.host, 

73 "ok": False, 

74 "status_code": None, 

75 "message": f"Test failed: {exc}", 

76 "results_count": 0, 

77 } 

78 

79 

80async def check_all_sources() -> list[dict[str, Any]]: 

81 """Test every enabled source sequentially and return per-source results.""" 

82 sources = await sync_to_async(list)(SearchSource.objects.filter(is_enabled=True)) 

83 results = [] 

84 for source in sources: 

85 results.append(await check_source(source)) 

86 return results 

← Back to Dashboard