Coverage for apps / ai / services / quota.py: 92%

85 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-02 13:22 +0000

1"""AI quota checking and tracking service. 

2 

3Uses Django's database cache backend to enforce per-profile, per-feature 

4daily limits. Quotas only apply in passkey auth mode — home mode is unlimited. 

5""" 

6 

7import logging 

8from datetime import datetime, timedelta, UTC 

9 

10from django.conf import settings 

11from django.core.cache import cache 

12 

13from apps.core.models import AppSettings 

14 

15logger = logging.getLogger(__name__) 

16 

17ALL_FEATURES = ( 

18 "remix", 

19 "remix_suggestions", 

20 "scale", 

21 "tips", 

22 "discover", 

23 "timer", 

24) 

25 

26FEATURE_LIMIT_FIELDS = { 

27 "remix": "daily_limit_remix", 

28 "remix_suggestions": "daily_limit_remix_suggestions", 

29 "scale": "daily_limit_scale", 

30 "tips": "daily_limit_tips", 

31 "discover": "daily_limit_discover", 

32 "timer": "daily_limit_timer", 

33} 

34 

35 

36def _cache_key(profile_id: int, feature: str) -> str: 

37 """Build the daily cache key for a profile/feature pair.""" 

38 today = datetime.now(UTC).date() 

39 return f"ai_quota:{profile_id}:{feature}:{today.isoformat()}" 

40 

41 

42def _seconds_until_midnight_utc() -> int: 

43 """Return seconds remaining until the next UTC midnight.""" 

44 now = datetime.now(UTC) 

45 tomorrow = datetime(now.year, now.month, now.day, tzinfo=UTC) + timedelta(days=1) 

46 return int((tomorrow - now).total_seconds()) 

47 

48 

49def _next_midnight_utc_iso() -> str: 

50 """Return next UTC midnight as an ISO 8601 string.""" 

51 now = datetime.now(UTC) 

52 tomorrow = datetime(now.year, now.month, now.day, tzinfo=UTC) + timedelta(days=1) 

53 return tomorrow.isoformat() 

54 

55 

56def check_quota(profile, feature: str) -> tuple[bool, dict]: 

57 """Check whether *profile* may use *feature* right now (read-only). 

58 

59 Returns (allowed, info_dict). Info is empty when allowed; 

60 contains remaining/limit/used/resets_at when denied. 

61 

62 NOTE: For atomic enforcement under concurrent load, use 

63 reserve_quota() instead of check_quota() + increment_quota(). 

64 """ 

65 if getattr(settings, "AUTH_MODE", "home") != "passkey": 

66 return (True, {}) 

67 

68 if profile.unlimited_ai: 

69 return (True, {}) 

70 

71 if feature not in FEATURE_LIMIT_FIELDS: 

72 raise ValueError(f"Unknown quota feature: {feature}") 

73 

74 app = AppSettings.get() 

75 limit_field = FEATURE_LIMIT_FIELDS[feature] 

76 limit = getattr(app, limit_field) 

77 

78 key = _cache_key(profile.pk, feature) 

79 used = cache.get(key, 0) 

80 

81 if used >= limit: 

82 return ( 

83 False, 

84 { 

85 "remaining": 0, 

86 "limit": limit, 

87 "used": used, 

88 "resets_at": _next_midnight_utc_iso(), 

89 }, 

90 ) 

91 

92 return (True, {}) 

93 

94 

95def reserve_quota(profile, feature: str) -> tuple[bool, dict]: 

96 """Atomically reserve a quota slot BEFORE executing the AI operation. 

97 

98 Uses cache.incr() which is atomic at the database level, preventing 

99 concurrent requests from bypassing the quota limit. 

100 

101 Returns (allowed, info_dict). If allowed, the counter is already 

102 incremented. On AI failure, call release_quota() to roll back. 

103 """ 

104 if getattr(settings, "AUTH_MODE", "home") != "passkey": 

105 return (True, {}) 

106 

107 if profile.unlimited_ai: 

108 return (True, {}) 

109 

110 if feature not in FEATURE_LIMIT_FIELDS: 

111 raise ValueError(f"Unknown quota feature: {feature}") 

112 

113 app = AppSettings.get() 

114 limit_field = FEATURE_LIMIT_FIELDS[feature] 

115 limit = getattr(app, limit_field) 

116 

117 key = _cache_key(profile.pk, feature) 

118 ttl = _seconds_until_midnight_utc() 

119 

120 # Atomic increment. When the key doesn't exist yet, incr() raises ValueError; 

121 # cache.add() is the atomic "create if absent" primitive, so concurrent first- 

122 # time callers don't race each other into overwriting the counter back to 1. 

123 # If add() loses the race to another worker, the key now exists and a retry 

124 # of incr() succeeds. 

125 try: 

126 new_count = cache.incr(key) 

127 except ValueError: 

128 if cache.add(key, 1, timeout=ttl): 

129 new_count = 1 

130 else: 

131 try: 

132 new_count = cache.incr(key) 

133 except ValueError: 

134 cache.set(key, 1, timeout=ttl) 

135 new_count = 1 

136 

137 if new_count > limit: 

138 # Over limit — roll back the increment we just did 

139 try: 

140 cache.decr(key) 

141 except ValueError: 

142 pass 

143 return ( 

144 False, 

145 { 

146 "remaining": 0, 

147 "limit": limit, 

148 "used": new_count - 1, 

149 "resets_at": _next_midnight_utc_iso(), 

150 }, 

151 ) 

152 

153 return (True, {}) 

154 

155 

156def release_quota(profile, feature: str) -> None: 

157 """Roll back a reserved quota slot on AI operation failure (FR-009).""" 

158 if getattr(settings, "AUTH_MODE", "home") != "passkey": 

159 return 

160 if profile.unlimited_ai: 

161 return 

162 

163 key = _cache_key(profile.pk, feature) 

164 try: 

165 new_count = cache.decr(key) 

166 except ValueError: 

167 logger.debug( 

168 "quota release: cache key missing for profile=%s feature=%s (cache may have been flushed)", 

169 profile.pk, 

170 feature, 

171 ) 

172 return 

173 

174 if new_count < 0: 

175 # Defence-in-depth: a correctly-paired reserve/release cycle can never 

176 # drive the counter below zero, but an unpaired release (e.g. due to 

177 # a future bookkeeping regression) must not leave the counter negative 

178 # — a negative counter silently bypasses the `used >= limit` check. 

179 cache.set(key, 0, timeout=_seconds_until_midnight_utc()) 

180 

181 

182def increment_quota(profile, feature: str) -> None: 

183 """Increment the daily counter after a successful AI operation. 

184 

185 DEPRECATED: Use reserve_quota() / release_quota() instead for 

186 atomic enforcement under concurrent load. 

187 """ 

188 key = _cache_key(profile.pk, feature) 

189 ttl = _seconds_until_midnight_utc() 

190 

191 try: 

192 cache.incr(key) 

193 except ValueError: 

194 cache.set(key, 1, timeout=ttl) 

195 

196 

197def get_usage(profile_id: int) -> dict: 

198 """Return {feature: count} for all features for today.""" 

199 return {feature: cache.get(_cache_key(profile_id, feature), 0) for feature in ALL_FEATURES} 

← Back to Dashboard