Coverage for apps / ai / services / quota.py: 92%
85 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 13:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 13:22 +0000
1"""AI quota checking and tracking service.
3Uses Django's database cache backend to enforce per-profile, per-feature
4daily limits. Quotas only apply in passkey auth mode — home mode is unlimited.
5"""
7import logging
8from datetime import datetime, timedelta, UTC
10from django.conf import settings
11from django.core.cache import cache
13from apps.core.models import AppSettings
15logger = logging.getLogger(__name__)
17ALL_FEATURES = (
18 "remix",
19 "remix_suggestions",
20 "scale",
21 "tips",
22 "discover",
23 "timer",
24)
26FEATURE_LIMIT_FIELDS = {
27 "remix": "daily_limit_remix",
28 "remix_suggestions": "daily_limit_remix_suggestions",
29 "scale": "daily_limit_scale",
30 "tips": "daily_limit_tips",
31 "discover": "daily_limit_discover",
32 "timer": "daily_limit_timer",
33}
36def _cache_key(profile_id: int, feature: str) -> str:
37 """Build the daily cache key for a profile/feature pair."""
38 today = datetime.now(UTC).date()
39 return f"ai_quota:{profile_id}:{feature}:{today.isoformat()}"
42def _seconds_until_midnight_utc() -> int:
43 """Return seconds remaining until the next UTC midnight."""
44 now = datetime.now(UTC)
45 tomorrow = datetime(now.year, now.month, now.day, tzinfo=UTC) + timedelta(days=1)
46 return int((tomorrow - now).total_seconds())
49def _next_midnight_utc_iso() -> str:
50 """Return next UTC midnight as an ISO 8601 string."""
51 now = datetime.now(UTC)
52 tomorrow = datetime(now.year, now.month, now.day, tzinfo=UTC) + timedelta(days=1)
53 return tomorrow.isoformat()
56def check_quota(profile, feature: str) -> tuple[bool, dict]:
57 """Check whether *profile* may use *feature* right now (read-only).
59 Returns (allowed, info_dict). Info is empty when allowed;
60 contains remaining/limit/used/resets_at when denied.
62 NOTE: For atomic enforcement under concurrent load, use
63 reserve_quota() instead of check_quota() + increment_quota().
64 """
65 if getattr(settings, "AUTH_MODE", "home") != "passkey":
66 return (True, {})
68 if profile.unlimited_ai:
69 return (True, {})
71 if feature not in FEATURE_LIMIT_FIELDS:
72 raise ValueError(f"Unknown quota feature: {feature}")
74 app = AppSettings.get()
75 limit_field = FEATURE_LIMIT_FIELDS[feature]
76 limit = getattr(app, limit_field)
78 key = _cache_key(profile.pk, feature)
79 used = cache.get(key, 0)
81 if used >= limit:
82 return (
83 False,
84 {
85 "remaining": 0,
86 "limit": limit,
87 "used": used,
88 "resets_at": _next_midnight_utc_iso(),
89 },
90 )
92 return (True, {})
95def reserve_quota(profile, feature: str) -> tuple[bool, dict]:
96 """Atomically reserve a quota slot BEFORE executing the AI operation.
98 Uses cache.incr() which is atomic at the database level, preventing
99 concurrent requests from bypassing the quota limit.
101 Returns (allowed, info_dict). If allowed, the counter is already
102 incremented. On AI failure, call release_quota() to roll back.
103 """
104 if getattr(settings, "AUTH_MODE", "home") != "passkey":
105 return (True, {})
107 if profile.unlimited_ai:
108 return (True, {})
110 if feature not in FEATURE_LIMIT_FIELDS:
111 raise ValueError(f"Unknown quota feature: {feature}")
113 app = AppSettings.get()
114 limit_field = FEATURE_LIMIT_FIELDS[feature]
115 limit = getattr(app, limit_field)
117 key = _cache_key(profile.pk, feature)
118 ttl = _seconds_until_midnight_utc()
120 # Atomic increment. When the key doesn't exist yet, incr() raises ValueError;
121 # cache.add() is the atomic "create if absent" primitive, so concurrent first-
122 # time callers don't race each other into overwriting the counter back to 1.
123 # If add() loses the race to another worker, the key now exists and a retry
124 # of incr() succeeds.
125 try:
126 new_count = cache.incr(key)
127 except ValueError:
128 if cache.add(key, 1, timeout=ttl):
129 new_count = 1
130 else:
131 try:
132 new_count = cache.incr(key)
133 except ValueError:
134 cache.set(key, 1, timeout=ttl)
135 new_count = 1
137 if new_count > limit:
138 # Over limit — roll back the increment we just did
139 try:
140 cache.decr(key)
141 except ValueError:
142 pass
143 return (
144 False,
145 {
146 "remaining": 0,
147 "limit": limit,
148 "used": new_count - 1,
149 "resets_at": _next_midnight_utc_iso(),
150 },
151 )
153 return (True, {})
156def release_quota(profile, feature: str) -> None:
157 """Roll back a reserved quota slot on AI operation failure (FR-009)."""
158 if getattr(settings, "AUTH_MODE", "home") != "passkey":
159 return
160 if profile.unlimited_ai:
161 return
163 key = _cache_key(profile.pk, feature)
164 try:
165 new_count = cache.decr(key)
166 except ValueError:
167 logger.debug(
168 "quota release: cache key missing for profile=%s feature=%s (cache may have been flushed)",
169 profile.pk,
170 feature,
171 )
172 return
174 if new_count < 0:
175 # Defence-in-depth: a correctly-paired reserve/release cycle can never
176 # drive the counter below zero, but an unpaired release (e.g. due to
177 # a future bookkeeping regression) must not leave the counter negative
178 # — a negative counter silently bypasses the `used >= limit` check.
179 cache.set(key, 0, timeout=_seconds_until_midnight_utc())
182def increment_quota(profile, feature: str) -> None:
183 """Increment the daily counter after a successful AI operation.
185 DEPRECATED: Use reserve_quota() / release_quota() instead for
186 atomic enforcement under concurrent load.
187 """
188 key = _cache_key(profile.pk, feature)
189 ttl = _seconds_until_midnight_utc()
191 try:
192 cache.incr(key)
193 except ValueError:
194 cache.set(key, 1, timeout=ttl)
197def get_usage(profile_id: int) -> dict:
198 """Return {feature: count} for all features for today."""
199 return {feature: cache.get(_cache_key(profile_id, feature), 0) for feature in ALL_FEATURES}