Line data Source code
1 : /* queue.c - Message queue implementation */
2 :
3 : #include "queue.h"
4 : #include "pt_compat.h"
5 : #include "../../include/peertalk.h"
6 :
7 : /* Helper: Check if value is power of two */
8 111 : static int pt_is_power_of_two(uint16_t value)
9 : {
10 111 : return (value > 0) && ((value & (value - 1)) == 0);
11 : }
12 :
13 : /* ========================================================================
14 : * Queue Management
15 : * ======================================================================== */
16 :
17 111 : int pt_queue_init(struct pt_context *ctx, pt_queue *q, uint16_t capacity)
18 : {
19 : size_t alloc_size;
20 :
21 111 : if (!q) {
22 0 : return PT_ERR_INVALID_PARAM;
23 : }
24 :
25 : /* Validate capacity is power of two */
26 111 : if (!pt_is_power_of_two(capacity)) {
27 1 : if (ctx) {
28 1 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
29 : "Queue capacity must be power of 2, got %u", capacity);
30 : }
31 1 : return PT_ERR_INVALID_PARAM;
32 : }
33 :
34 : /* Allocate slots array */
35 110 : alloc_size = sizeof(pt_queue_slot) * capacity;
36 110 : q->slots = (pt_queue_slot *)pt_alloc_clear(alloc_size);
37 110 : if (!q->slots) {
38 0 : if (ctx) {
39 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_MEMORY,
40 : "Failed to allocate %lu bytes for %u queue slots",
41 : (unsigned long)alloc_size, capacity);
42 : }
43 0 : return PT_ERR_NO_MEMORY;
44 : }
45 :
46 : /* Initialize queue state */
47 110 : q->magic = PT_QUEUE_MAGIC;
48 110 : q->capacity = capacity;
49 110 : q->capacity_mask = capacity - 1; /* For fast wrap-around */
50 110 : q->write_idx = 0;
51 110 : q->read_idx = 0;
52 110 : q->count = 0;
53 110 : q->has_data = 0;
54 110 : q->reserved = 0;
55 :
56 : /* Initialize Phase 3 extensions (priority freelists, coalesce hash) */
57 110 : pt_queue_ext_init(q);
58 :
59 110 : if (ctx) {
60 102 : PT_CTX_INFO(ctx, PT_LOG_CAT_PROTOCOL,
61 : "Queue initialized: %u slots, %zu bytes",
62 : capacity, alloc_size);
63 : }
64 :
65 110 : return 0;
66 : }
67 :
68 64 : void pt_queue_free(pt_queue *q)
69 : {
70 64 : if (!q || !q->slots) {
71 0 : return;
72 : }
73 :
74 64 : pt_free(q->slots);
75 64 : q->slots = NULL;
76 64 : q->magic = 0;
77 64 : q->capacity = 0;
78 64 : q->capacity_mask = 0;
79 64 : q->write_idx = 0;
80 64 : q->read_idx = 0;
81 64 : q->count = 0;
82 64 : q->has_data = 0;
83 : }
84 :
85 1 : void pt_queue_reset(pt_queue *q)
86 : {
87 1 : if (!q || q->magic != PT_QUEUE_MAGIC) {
88 0 : return;
89 : }
90 :
91 1 : q->write_idx = 0;
92 1 : q->read_idx = 0;
93 1 : q->count = 0;
94 1 : q->has_data = 0;
95 : }
96 :
97 : /* ========================================================================
98 : * Push Operations
99 : * ======================================================================== */
100 :
101 954 : int pt_queue_push(struct pt_context *ctx, pt_queue *q,
102 : const void *data, uint16_t len,
103 : uint8_t priority, uint8_t flags)
104 : {
105 : pt_queue_slot *slot;
106 : uint8_t pressure;
107 : /* cppcheck-suppress variableScope ; static var must persist across calls for threshold tracking */
108 : static uint8_t last_pressure_level = 0; /* Track for logging thresholds */
109 :
110 954 : if (!q || q->magic != PT_QUEUE_MAGIC || !data) {
111 1 : return PT_ERR_INVALID_PARAM;
112 : }
113 :
114 953 : if (len > PT_QUEUE_SLOT_SIZE) {
115 0 : if (ctx) {
116 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
117 : "Message too large: %u bytes (max %u)",
118 : len, PT_QUEUE_SLOT_SIZE);
119 : }
120 0 : return PT_ERR_INVALID_PARAM;
121 : }
122 :
123 : /* Check if full */
124 953 : if (q->count >= q->capacity) {
125 2 : if (ctx) {
126 2 : PT_CTX_WARN(ctx, PT_LOG_CAT_PROTOCOL,
127 : "Queue full: %u/%u slots", q->count, q->capacity);
128 : }
129 2 : return PT_ERR_BUFFER_FULL;
130 : }
131 :
132 : /* Get slot and write data */
133 951 : slot = &q->slots[q->write_idx];
134 951 : slot->length = len;
135 951 : slot->priority = priority;
136 951 : slot->flags = PT_SLOT_USED | flags;
137 951 : slot->coalesce_key = PT_COALESCE_NONE; /* No coalescing for plain push */
138 951 : slot->next_slot = PT_SLOT_NONE;
139 951 : pt_memcpy(slot->data, data, len);
140 :
141 : /* Add to priority freelist (Phase 3 enhancement) */
142 : {
143 951 : pt_queue_ext *ext = &q->ext;
144 951 : uint16_t slot_idx = q->write_idx;
145 :
146 951 : if (ext->prio_tail[priority] != PT_SLOT_NONE) {
147 : /* List has items - append to tail */
148 906 : q->slots[ext->prio_tail[priority]].next_slot = slot_idx;
149 : } else {
150 : /* List empty - set head */
151 45 : ext->prio_head[priority] = slot_idx;
152 : }
153 951 : ext->prio_tail[priority] = slot_idx;
154 951 : ext->prio_count[priority]++;
155 : }
156 :
157 : /* Advance write index (power-of-two wrap-around) */
158 951 : q->write_idx = (q->write_idx + 1) & q->capacity_mask;
159 951 : q->count++;
160 951 : q->has_data = 1;
161 :
162 : /* Check backpressure and log at thresholds */
163 951 : pressure = pt_queue_pressure(q);
164 951 : if (ctx) {
165 : /* Log when crossing 80%, 90%, 95% thresholds */
166 951 : if (pressure >= 95 && last_pressure_level < 95) {
167 5 : PT_CTX_WARN(ctx, PT_LOG_CAT_PERF,
168 : "Queue pressure CRITICAL: %u%% (%u/%u slots)",
169 : pressure, q->count, q->capacity);
170 5 : last_pressure_level = 95;
171 946 : } else if (pressure >= 90 && last_pressure_level < 90) {
172 1 : PT_CTX_WARN(ctx, PT_LOG_CAT_PERF,
173 : "Queue pressure HIGH: %u%% (%u/%u slots)",
174 : pressure, q->count, q->capacity);
175 1 : last_pressure_level = 90;
176 945 : } else if (pressure >= 80 && last_pressure_level < 80) {
177 3 : PT_CTX_WARN(ctx, PT_LOG_CAT_PERF,
178 : "Queue pressure elevated: %u%% (%u/%u slots)",
179 : pressure, q->count, q->capacity);
180 3 : last_pressure_level = 80;
181 942 : } else if (pressure < 80 && last_pressure_level >= 80) {
182 : /* Reset threshold tracking when pressure drops */
183 5 : last_pressure_level = 0;
184 : }
185 : }
186 :
187 951 : return 0;
188 : }
189 :
190 1 : int pt_queue_push_isr(pt_queue *q, const void *data, uint16_t len)
191 : {
192 : pt_queue_slot *slot;
193 :
194 : /* ISR-SAFETY: No parameter validation logging, no backpressure logging */
195 1 : if (!q || q->magic != PT_QUEUE_MAGIC || !data) {
196 0 : return PT_ERR_INVALID_PARAM;
197 : }
198 :
199 1 : if (len > PT_QUEUE_SLOT_SIZE) {
200 0 : return PT_ERR_INVALID_PARAM;
201 : }
202 :
203 : /* Check if full */
204 1 : if (q->count >= q->capacity) {
205 0 : return PT_ERR_BUFFER_FULL;
206 : }
207 :
208 : /* Get slot and write data using ISR-safe memcpy */
209 1 : slot = &q->slots[q->write_idx];
210 1 : slot->length = len;
211 1 : slot->priority = 0;
212 1 : slot->flags = PT_SLOT_USED;
213 1 : pt_memcpy_isr(slot->data, data, len); /* ISR-safe: no Toolbox */
214 :
215 : /* Advance write index (power-of-two wrap-around) */
216 1 : q->write_idx = (q->write_idx + 1) & q->capacity_mask;
217 1 : q->count++;
218 1 : q->has_data = 1;
219 :
220 1 : return 0;
221 : }
222 :
223 0 : int pt_queue_push_isr_ot(pt_queue *q, const void *data, uint16_t len)
224 : {
225 : pt_queue_slot *slot;
226 :
227 : /* ISR-SAFETY: No logging, uses atomic operations for OT reentrancy */
228 0 : if (!q || q->magic != PT_QUEUE_MAGIC || !data) {
229 0 : return PT_ERR_INVALID_PARAM;
230 : }
231 :
232 0 : if (len > PT_QUEUE_SLOT_SIZE) {
233 0 : return PT_ERR_INVALID_PARAM;
234 : }
235 :
236 : /* Check if full */
237 0 : if (q->count >= q->capacity) {
238 0 : return PT_ERR_BUFFER_FULL;
239 : }
240 :
241 : /* Get slot and write metadata */
242 0 : slot = &q->slots[q->write_idx];
243 0 : slot->length = len;
244 0 : slot->priority = 0;
245 0 : slot->flags = PT_SLOT_USED; /* Set USED but NOT READY yet */
246 :
247 : /* Copy data using ISR-safe memcpy */
248 0 : pt_memcpy_isr(slot->data, data, len);
249 :
250 : /* OT REENTRANCY SAFETY: Set READY flag AFTER data copy completes.
251 : * If another notifier runs during copy, it won't read partial data.
252 : * On Classic Mac with OT, use atomic bit set (OTAtomicSetBit).
253 : * On POSIX, just set flag (no OT reentrancy). */
254 : #ifdef PT_PLATFORM_OPENTRANSPORT
255 : /* Atomic bit set for OT notifier reentrancy safety */
256 : /* Note: OTAtomicSetBit not available in POSIX builds */
257 : slot->flags |= PT_SLOT_READY;
258 : #else
259 0 : slot->flags |= PT_SLOT_READY;
260 : #endif
261 :
262 : /* Advance write index (power-of-two wrap-around) */
263 0 : q->write_idx = (q->write_idx + 1) & q->capacity_mask;
264 0 : q->count++;
265 0 : q->has_data = 1;
266 :
267 0 : return 0;
268 : }
269 :
270 : /* ========================================================================
271 : * Pop Operations
272 : * ======================================================================== */
273 :
274 12 : int pt_queue_pop(pt_queue *q, void *data, uint16_t *len)
275 : {
276 : pt_queue_slot *slot;
277 :
278 12 : if (!q || q->magic != PT_QUEUE_MAGIC || !data || !len) {
279 0 : return PT_ERR_INVALID_PARAM;
280 : }
281 :
282 : /* Check if empty */
283 12 : if (q->count == 0) {
284 1 : return PT_ERR_QUEUE_EMPTY;
285 : }
286 :
287 : /* Get front slot */
288 11 : slot = &q->slots[q->read_idx];
289 :
290 : #ifdef PT_PLATFORM_OPENTRANSPORT
291 : /* OT REENTRANCY SAFETY: Wait for READY flag before reading data.
292 : * If push_isr_ot is interrupted, we won't read partial data. */
293 : if (!(slot->flags & PT_SLOT_READY)) {
294 : return PT_ERR_QUEUE_EMPTY; /* Data not ready yet */
295 : }
296 : #endif
297 :
298 : /* Check slot is used */
299 11 : if (!(slot->flags & PT_SLOT_USED)) {
300 0 : return PT_ERR_QUEUE_EMPTY;
301 : }
302 :
303 : /* Copy data */
304 11 : *len = slot->length;
305 11 : pt_memcpy(data, slot->data, slot->length);
306 :
307 : /* Clear slot */
308 11 : slot->flags = 0;
309 11 : slot->length = 0;
310 :
311 : /* Advance read index (power-of-two wrap-around) */
312 11 : q->read_idx = (q->read_idx + 1) & q->capacity_mask;
313 11 : q->count--;
314 :
315 : /* Update has_data flag */
316 11 : if (q->count == 0) {
317 6 : q->has_data = 0;
318 : }
319 :
320 11 : return 0;
321 : }
322 :
323 1 : int pt_queue_peek(pt_queue *q, void **data, uint16_t *len)
324 : {
325 : pt_queue_slot *slot;
326 :
327 1 : if (!q || q->magic != PT_QUEUE_MAGIC || !data || !len) {
328 0 : return PT_ERR_INVALID_PARAM;
329 : }
330 :
331 : /* Check if empty */
332 1 : if (q->count == 0) {
333 0 : return PT_ERR_QUEUE_EMPTY;
334 : }
335 :
336 : /* Get front slot */
337 1 : slot = &q->slots[q->read_idx];
338 :
339 : #ifdef PT_PLATFORM_OPENTRANSPORT
340 : /* OT REENTRANCY SAFETY: Wait for READY flag */
341 : if (!(slot->flags & PT_SLOT_READY)) {
342 : return PT_ERR_QUEUE_EMPTY;
343 : }
344 : #endif
345 :
346 : /* Check slot is used */
347 1 : if (!(slot->flags & PT_SLOT_USED)) {
348 0 : return PT_ERR_QUEUE_EMPTY;
349 : }
350 :
351 : /* Return pointer to slot data (zero-copy) */
352 1 : *data = slot->data;
353 1 : *len = slot->length;
354 :
355 1 : return 0;
356 : }
357 :
358 1 : void pt_queue_consume(pt_queue *q)
359 : {
360 : pt_queue_slot *slot;
361 :
362 1 : if (!q || q->magic != PT_QUEUE_MAGIC || q->count == 0) {
363 0 : return;
364 : }
365 :
366 : /* Clear front slot */
367 1 : slot = &q->slots[q->read_idx];
368 1 : slot->flags = 0;
369 1 : slot->length = 0;
370 :
371 : /* Advance read index (power-of-two wrap-around) */
372 1 : q->read_idx = (q->read_idx + 1) & q->capacity_mask;
373 1 : q->count--;
374 :
375 : /* Update has_data flag */
376 1 : if (q->count == 0) {
377 1 : q->has_data = 0;
378 : }
379 : }
380 :
381 : /* ========================================================================
382 : * Queue Status
383 : * ======================================================================== */
384 :
385 7 : uint16_t pt_queue_count(pt_queue *q)
386 : {
387 7 : if (!q || q->magic != PT_QUEUE_MAGIC) {
388 0 : return 0;
389 : }
390 :
391 7 : return q->count;
392 : }
393 :
394 3 : uint16_t pt_queue_free_slots(pt_queue *q)
395 : {
396 3 : if (!q || q->magic != PT_QUEUE_MAGIC) {
397 0 : return 0;
398 : }
399 :
400 3 : return q->capacity - q->count;
401 : }
402 :
403 1845 : uint8_t pt_queue_pressure(pt_queue *q)
404 : {
405 : uint32_t pressure;
406 :
407 1845 : if (!q || q->magic != PT_QUEUE_MAGIC || q->capacity == 0) {
408 0 : return 0;
409 : }
410 :
411 : /* Calculate pressure as percentage (0-100)
412 : * Use uint32_t to prevent overflow when count > 655 */
413 1845 : pressure = ((uint32_t)q->count * 100) / q->capacity;
414 :
415 : /* Clamp to 100 */
416 1845 : if (pressure > 100) {
417 0 : pressure = 100;
418 : }
419 :
420 1845 : return (uint8_t)pressure;
421 : }
422 :
423 1 : int pt_queue_is_full(pt_queue *q)
424 : {
425 1 : if (!q || q->magic != PT_QUEUE_MAGIC) {
426 0 : return 0;
427 : }
428 :
429 1 : return (q->count >= q->capacity) ? 1 : 0;
430 : }
431 :
432 12 : int pt_queue_is_empty(pt_queue *q)
433 : {
434 12 : if (!q || q->magic != PT_QUEUE_MAGIC) {
435 0 : return 1;
436 : }
437 :
438 12 : return (q->count == 0) ? 1 : 0;
439 : }
440 :
441 : /* ========================================================================
442 : * Coalescing
443 : * ======================================================================== */
444 :
445 1 : int pt_queue_coalesce(pt_queue *q, const void *data, uint16_t len)
446 : {
447 : uint16_t i;
448 : uint16_t search_start;
449 : uint16_t search_count;
450 : /* cppcheck-suppress variableScope ; C89 style for Classic Mac compiler compatibility */
451 : pt_queue_slot *slot;
452 :
453 1 : if (!q || q->magic != PT_QUEUE_MAGIC || !data) {
454 0 : return -1;
455 : }
456 :
457 1 : if (len > PT_QUEUE_SLOT_SIZE) {
458 0 : return -1;
459 : }
460 :
461 : /* Search last 4 slots for coalescable message
462 : * Rationale: Position updates typically coalesce with very recent
463 : * messages. Searching entire queue causes cache misses on 68030
464 : * with 256-byte cache line. */
465 1 : search_count = (q->count < 4) ? q->count : 4;
466 1 : if (search_count == 0) {
467 0 : return -1; /* Queue empty */
468 : }
469 :
470 : /* Start from write_idx and search backwards */
471 1 : search_start = q->write_idx;
472 :
473 1 : for (i = 0; i < search_count; i++) {
474 : uint16_t idx;
475 :
476 : /* Calculate slot index searching backwards (with wrap-around) */
477 1 : if (search_start >= i + 1) {
478 1 : idx = search_start - (i + 1);
479 : } else {
480 0 : idx = q->capacity - ((i + 1) - search_start);
481 : }
482 :
483 1 : slot = &q->slots[idx];
484 :
485 : /* Check if slot is coalescable */
486 1 : if ((slot->flags & PT_SLOT_USED) &&
487 1 : (slot->flags & PT_SLOT_COALESCABLE)) {
488 : /* Found coalescable message - replace it */
489 1 : slot->length = len;
490 1 : pt_memcpy(slot->data, data, len);
491 1 : return 0;
492 : }
493 : }
494 :
495 0 : return -1; /* No coalescable message found */
496 : }
497 :
498 : /* ========================================================================
499 : * Phase 3: Priority & Coalescing Operations
500 : * ======================================================================== */
501 :
502 : /*
503 : * Initialize extended queue data structures
504 : *
505 : * Call this after pt_queue_init() from Phase 2 to set up
506 : * priority free-lists and coalesce hash table.
507 : */
508 118 : void pt_queue_ext_init(pt_queue *q) {
509 : pt_queue_ext *ext;
510 : uint16_t i;
511 : uint16_t max_slots; /* Loop invariant hoisted for optimization */
512 :
513 118 : if (!q) {
514 : /* Cannot log without context, but defend against NULL */
515 0 : return;
516 : }
517 :
518 : #ifdef PT_QUEUE_SLOT_EXPECTED_SIZE
519 : /* Runtime size check for C89/C99 (C11 uses _Static_assert) */
520 : if (sizeof(pt_queue_slot) != PT_QUEUE_SLOT_EXPECTED_SIZE) {
521 : /* Size mismatch - likely compiler padding issue.
522 : * Use #pragma pack on Classic Mac compilers if this fails. */
523 : return; /* Fail-safe: don't corrupt memory */
524 : }
525 : #endif
526 :
527 : /* Capacity validation - queue capacity must not exceed PT_QUEUE_MAX_SLOTS
528 : * because next_slot pointers are only initialized for slots 0..MAX_SLOTS-1 */
529 118 : if (q->capacity > PT_QUEUE_MAX_SLOTS) {
530 : /* Configuration error - slots beyond MAX_SLOTS have uninitialized next_slot.
531 : * Caller should use smaller capacity or increase PT_QUEUE_MAX_SLOTS.
532 : * NOTE: Cannot log here since we don't have context - caller should check. */
533 1 : return; /* Fail-safe: don't use queue with uninitialized slots */
534 : }
535 :
536 117 : ext = &q->ext;
537 :
538 : /* Initialize priority free-lists as empty */
539 585 : for (i = 0; i < PT_PRIO_COUNT; i++) {
540 468 : ext->prio_head[i] = PT_SLOT_NONE;
541 468 : ext->prio_tail[i] = PT_SLOT_NONE;
542 468 : ext->prio_count[i] = 0;
543 : }
544 :
545 : /* Initialize slot next_slot pointers (stored in slots themselves)
546 : * OPTIMIZATION: Hoist loop invariant min(capacity, MAX_SLOTS) */
547 117 : max_slots = (q->capacity < PT_QUEUE_MAX_SLOTS) ? q->capacity : PT_QUEUE_MAX_SLOTS;
548 1921 : for (i = 0; i < max_slots; i++) {
549 1804 : q->slots[i].next_slot = PT_SLOT_NONE;
550 : }
551 :
552 : /* Initialize coalesce hash table as empty */
553 3861 : for (i = 0; i < PT_COALESCE_HASH_SIZE; i++) {
554 3744 : ext->coalesce_hash[i] = PT_SLOT_NONE;
555 : }
556 :
557 : /* Initialize pending pop state */
558 117 : q->pending_pop_prio = 0;
559 117 : q->pending_pop_slot = PT_SLOT_NONE;
560 :
561 : /* Initialize ISR flags (for deferred logging from interrupt context) */
562 117 : q->isr_flags.queue_full = 0;
563 117 : q->isr_flags.coalesce_hit = 0;
564 117 : q->isr_flags.hash_collision = 0;
565 :
566 : /* NOTE: Logging should be done by caller who has context:
567 : * PT_LOG_DEBUG(ctx->log, PT_LOG_CAT_PROTOCOL, "Queue ext initialized: capacity=%u", q->capacity);
568 : */
569 : }
570 :
571 : /*
572 : * Pop highest priority message - O(1) using priority free-lists
573 : */
574 13 : int pt_queue_pop_priority(pt_queue *q, void *data, uint16_t *len) {
575 : pt_queue_ext *ext;
576 : pt_queue_slot *slot;
577 : uint16_t slot_idx;
578 : int prio;
579 :
580 13 : if (!q || q->count == 0)
581 1 : return -1;
582 :
583 12 : ext = &q->ext; /* Extended queue data */
584 :
585 : /* Find highest non-empty priority level */
586 32 : for (prio = PT_PRIO_CRITICAL; prio >= PT_PRIO_LOW; prio--) {
587 32 : if (ext->prio_head[prio] != PT_SLOT_NONE)
588 12 : break;
589 : }
590 :
591 12 : if (prio < PT_PRIO_LOW)
592 0 : return -1; /* All lists empty (shouldn't happen if count > 0) */
593 :
594 : /* Pop from head of this priority's list */
595 12 : slot_idx = ext->prio_head[prio];
596 12 : slot = &q->slots[slot_idx];
597 :
598 : /* Update list head (next_slot is IN the slot for traversal locality) */
599 12 : ext->prio_head[prio] = slot->next_slot;
600 12 : if (ext->prio_head[prio] == PT_SLOT_NONE)
601 10 : ext->prio_tail[prio] = PT_SLOT_NONE; /* List now empty */
602 12 : ext->prio_count[prio]--;
603 :
604 : /* Remove from coalesce hash if present */
605 12 : if (slot->coalesce_key != PT_COALESCE_NONE) {
606 1 : uint16_t hash_idx = PT_COALESCE_HASH(slot->coalesce_key);
607 1 : if (ext->coalesce_hash[hash_idx] == slot_idx)
608 1 : ext->coalesce_hash[hash_idx] = PT_SLOT_NONE;
609 : }
610 :
611 : /* Copy data out */
612 12 : if (data && len) {
613 12 : pt_memcpy(data, slot->data, slot->length);
614 12 : *len = slot->length;
615 : }
616 :
617 : /* Clear slot */
618 12 : slot->flags = 0;
619 12 : slot->length = 0;
620 12 : slot->next_slot = PT_SLOT_NONE;
621 12 : q->count--;
622 :
623 12 : if (q->count == 0)
624 4 : q->has_data = 0;
625 :
626 12 : return 0;
627 : }
628 :
629 : /*
630 : * Direct pop - returns pointer to slot data without copying (ZERO-COPY)
631 : */
632 346 : int pt_queue_pop_priority_direct(pt_queue *q, const void **data_out,
633 : uint16_t *len_out) {
634 : pt_queue_ext *ext;
635 : pt_queue_slot *slot;
636 : uint16_t slot_idx;
637 : int prio;
638 :
639 346 : if (!q || q->count == 0 || !data_out || !len_out)
640 320 : return -1;
641 :
642 26 : ext = &q->ext;
643 :
644 : /* Find highest non-empty priority level */
645 74 : for (prio = PT_PRIO_CRITICAL; prio >= PT_PRIO_LOW; prio--) {
646 74 : if (ext->prio_head[prio] != PT_SLOT_NONE)
647 26 : break;
648 : }
649 :
650 26 : if (prio < PT_PRIO_LOW)
651 0 : return -1;
652 :
653 : /* Return pointer to head slot's data */
654 26 : slot_idx = ext->prio_head[prio];
655 26 : slot = &q->slots[slot_idx];
656 :
657 26 : *data_out = slot->data;
658 26 : *len_out = slot->length;
659 :
660 : /* Store pending pop info for commit (use reserved field or add to ext) */
661 26 : q->pending_pop_prio = (uint8_t)prio;
662 26 : q->pending_pop_slot = slot_idx;
663 :
664 26 : return 0;
665 : }
666 :
667 : /*
668 : * Commit a direct pop - actually removes the slot from queue
669 : */
670 26 : void pt_queue_pop_priority_commit(pt_queue *q) {
671 : pt_queue_ext *ext;
672 : pt_queue_slot *slot;
673 : uint16_t slot_idx;
674 : int prio;
675 :
676 26 : if (!q)
677 0 : return;
678 :
679 26 : ext = &q->ext;
680 26 : prio = q->pending_pop_prio;
681 26 : slot_idx = q->pending_pop_slot;
682 26 : slot = &q->slots[slot_idx];
683 :
684 : /* Update list head */
685 26 : ext->prio_head[prio] = slot->next_slot;
686 26 : if (ext->prio_head[prio] == PT_SLOT_NONE)
687 18 : ext->prio_tail[prio] = PT_SLOT_NONE;
688 26 : ext->prio_count[prio]--;
689 :
690 : /* Remove from coalesce hash if present */
691 26 : if (slot->coalesce_key != PT_COALESCE_NONE) {
692 0 : uint16_t hash_idx = PT_COALESCE_HASH(slot->coalesce_key);
693 0 : if (ext->coalesce_hash[hash_idx] == slot_idx)
694 0 : ext->coalesce_hash[hash_idx] = PT_SLOT_NONE;
695 : }
696 :
697 : /* Clear slot */
698 26 : slot->flags = 0;
699 26 : slot->length = 0;
700 26 : slot->next_slot = PT_SLOT_NONE;
701 26 : q->count--;
702 :
703 26 : if (q->count == 0)
704 11 : q->has_data = 0;
705 : }
706 :
707 : /*
708 : * Rollback a direct pop - cancel without removing from queue
709 : *
710 : * The pending slot remains in the queue and will be returned again
711 : * on the next pt_queue_pop_priority_direct() call.
712 : */
713 0 : void pt_queue_pop_priority_rollback(pt_queue *q) {
714 0 : if (!q)
715 0 : return;
716 :
717 : /* Simply reset the pending pop state - slot stays in queue */
718 0 : q->pending_pop_slot = PT_SLOT_NONE;
719 0 : q->pending_pop_prio = 0;
720 : }
721 :
722 : /*
723 : * Push with coalescing - O(1) using hash table lookup
724 : */
725 91 : int pt_queue_push_coalesce(pt_queue *q, const void *data, uint16_t len,
726 : uint8_t priority, pt_coalesce_key key) {
727 : pt_queue_ext *ext;
728 : pt_queue_slot *slot;
729 : uint16_t slot_idx;
730 : uint16_t hash_idx;
731 :
732 91 : if (!q || !data || len == 0 || len > PT_QUEUE_SLOT_SIZE)
733 0 : return -1;
734 :
735 91 : ext = &q->ext;
736 :
737 : /* Check hash table for existing message with same key - O(1) */
738 91 : if (key != PT_COALESCE_NONE) {
739 17 : hash_idx = PT_COALESCE_HASH(key);
740 17 : slot_idx = ext->coalesce_hash[hash_idx];
741 :
742 17 : if (slot_idx != PT_SLOT_NONE) {
743 12 : slot = &q->slots[slot_idx];
744 : /* Verify it's actually our key (hash collision check) */
745 12 : if ((slot->flags & PT_SLOT_USED) && slot->coalesce_key == key) {
746 : /* Found - replace data in place */
747 : /* LOGGING: Caller can log "Coalesce hit: key=0x%04X" at DEBUG level */
748 10 : pt_memcpy(slot->data, data, len);
749 10 : slot->length = len;
750 : /* Note: priority and list position don't change on coalesce */
751 10 : slot->timestamp = pt_get_ticks(); /* Update timestamp */
752 10 : return 0; /* Coalesced */
753 : }
754 : /* Hash collision - different key at same bucket */
755 : /* LOGGING: Caller can log "Hash collision at bucket %u" at DEBUG level */
756 : }
757 : }
758 :
759 : /* No existing message - allocate new slot */
760 81 : if (q->count >= q->capacity)
761 0 : return -1; /* Full */
762 :
763 : /* Find free slot (use write_idx as starting point) */
764 81 : slot_idx = q->write_idx;
765 81 : slot = &q->slots[slot_idx];
766 :
767 : /* Fill slot */
768 81 : pt_memcpy(slot->data, data, len);
769 81 : slot->length = len;
770 81 : slot->priority = priority;
771 81 : slot->flags = PT_SLOT_USED;
772 81 : slot->coalesce_key = key;
773 81 : slot->timestamp = pt_get_ticks();
774 :
775 : /* Add to priority list (append to tail for FIFO within priority)
776 : * NOTE: next_slot is IN the slot (not ext) for traversal locality */
777 81 : slot->next_slot = PT_SLOT_NONE;
778 81 : if (ext->prio_tail[priority] == PT_SLOT_NONE) {
779 : /* List was empty */
780 15 : ext->prio_head[priority] = slot_idx;
781 : } else {
782 : /* Append to tail - update previous tail's next_slot */
783 66 : q->slots[ext->prio_tail[priority]].next_slot = slot_idx;
784 : }
785 81 : ext->prio_tail[priority] = slot_idx;
786 81 : ext->prio_count[priority]++;
787 :
788 : /* Add to coalesce hash table */
789 81 : if (key != PT_COALESCE_NONE) {
790 7 : hash_idx = PT_COALESCE_HASH(key);
791 7 : ext->coalesce_hash[hash_idx] = slot_idx;
792 : }
793 :
794 81 : q->write_idx = (q->write_idx + 1) & q->capacity_mask;
795 81 : q->count++;
796 81 : q->has_data = 1;
797 :
798 81 : return 0;
799 : }
800 :
801 : /*
802 : * ISR-safe push with coalescing (for MacTCP ASR) - O(1) hash lookup
803 : */
804 0 : int pt_queue_push_coalesce_isr(pt_queue *q, const void *data, uint16_t len,
805 : uint8_t priority, pt_coalesce_key key) {
806 : pt_queue_ext *ext;
807 : pt_queue_slot *slot;
808 : uint16_t slot_idx;
809 : uint16_t hash_idx;
810 :
811 0 : if (!q || !data || len == 0 || len > PT_QUEUE_SLOT_SIZE)
812 0 : return -1;
813 :
814 0 : ext = &q->ext;
815 :
816 : /* Check hash table for existing message with same key - O(1) */
817 0 : if (key != PT_COALESCE_NONE) {
818 0 : hash_idx = PT_COALESCE_HASH(key);
819 0 : slot_idx = ext->coalesce_hash[hash_idx];
820 :
821 0 : if (slot_idx != PT_SLOT_NONE) {
822 0 : slot = &q->slots[slot_idx];
823 0 : if ((slot->flags & PT_SLOT_USED) && slot->coalesce_key == key) {
824 : /* Found - replace using ISR-safe copy */
825 0 : pt_memcpy_isr(slot->data, data, len);
826 0 : slot->length = len;
827 : /* Don't update timestamp in ISR - no TickCount() call */
828 0 : q->isr_flags.coalesce_hit = 1; /* Signal main loop for logging */
829 0 : return 0; /* Coalesced */
830 : }
831 : /* Hash collision - set flag for logging */
832 0 : q->isr_flags.hash_collision = 1;
833 : }
834 : }
835 :
836 : /* No existing message - allocate new slot */
837 0 : if (q->count >= q->capacity) {
838 0 : q->isr_flags.queue_full = 1; /* Signal main loop for logging */
839 0 : return -1;
840 : }
841 :
842 0 : slot_idx = q->write_idx;
843 0 : slot = &q->slots[slot_idx];
844 :
845 : /* Fill slot using ISR-safe copy */
846 0 : pt_memcpy_isr(slot->data, data, len);
847 0 : slot->length = len;
848 0 : slot->priority = priority;
849 0 : slot->flags = PT_SLOT_USED;
850 0 : slot->coalesce_key = key;
851 0 : slot->timestamp = 0; /* No TickCount() in ISR */
852 :
853 : /* Add to priority list (next_slot is IN the slot for traversal locality) */
854 0 : slot->next_slot = PT_SLOT_NONE;
855 0 : if (ext->prio_tail[priority] == PT_SLOT_NONE) {
856 0 : ext->prio_head[priority] = slot_idx;
857 : } else {
858 0 : q->slots[ext->prio_tail[priority]].next_slot = slot_idx;
859 : }
860 0 : ext->prio_tail[priority] = slot_idx;
861 0 : ext->prio_count[priority]++;
862 :
863 : /* Add to coalesce hash table */
864 0 : if (key != PT_COALESCE_NONE) {
865 0 : hash_idx = PT_COALESCE_HASH(key);
866 0 : ext->coalesce_hash[hash_idx] = slot_idx;
867 : }
868 :
869 0 : q->write_idx = (q->write_idx + 1) & q->capacity_mask;
870 0 : q->count++;
871 0 : q->has_data = 1;
872 :
873 0 : return 0;
874 : }
875 :
876 : /*
877 : * Check and log ISR flags from main loop
878 : */
879 0 : void pt_check_queue_isr_flags(struct pt_context *ctx, pt_queue *q) {
880 0 : if (!ctx || !ctx->log || !q)
881 0 : return;
882 :
883 0 : if (q->isr_flags.queue_full) {
884 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_PROTOCOL, "Queue full during ISR");
885 0 : q->isr_flags.queue_full = 0;
886 : }
887 0 : if (q->isr_flags.coalesce_hit) {
888 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL, "Coalesce hit during ISR");
889 0 : q->isr_flags.coalesce_hit = 0;
890 : }
891 0 : if (q->isr_flags.hash_collision) {
892 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL, "Hash collision during ISR");
893 0 : q->isr_flags.hash_collision = 0;
894 : }
895 : }
896 :
897 : /* ========================================================================
898 : * Phase 3: Backpressure & Batch Operations
899 : * ======================================================================== */
900 :
901 : /*
902 : * Get current backpressure level.
903 : */
904 9 : pt_backpressure pt_queue_backpressure(pt_queue *q) {
905 : uint8_t pressure;
906 :
907 9 : if (!q)
908 0 : return PT_BACKPRESSURE_BLOCKING;
909 :
910 9 : pressure = pt_queue_pressure(q);
911 :
912 9 : if (pressure >= PT_PRESSURE_CRITICAL)
913 4 : return PT_BACKPRESSURE_BLOCKING;
914 5 : if (pressure >= PT_PRESSURE_HIGH)
915 1 : return PT_BACKPRESSURE_HEAVY;
916 4 : if (pressure >= PT_PRESSURE_MEDIUM)
917 2 : return PT_BACKPRESSURE_LIGHT;
918 :
919 2 : return PT_BACKPRESSURE_NONE;
920 : }
921 :
922 : /*
923 : * Try to push with backpressure awareness.
924 : */
925 2 : int pt_queue_try_push(pt_queue *q, const void *data, uint16_t len,
926 : uint8_t priority, pt_coalesce_key key,
927 : pt_backpressure *pressure_out) {
928 : pt_backpressure bp;
929 : int result;
930 :
931 2 : bp = pt_queue_backpressure(q);
932 :
933 2 : if (pressure_out)
934 2 : *pressure_out = bp;
935 :
936 : /* Apply backpressure policy */
937 2 : switch (bp) {
938 2 : case PT_BACKPRESSURE_BLOCKING:
939 2 : if (priority < PT_PRIO_CRITICAL)
940 1 : return -1; /* Drop - caller should log at WARN level */
941 1 : break;
942 :
943 0 : case PT_BACKPRESSURE_HEAVY:
944 0 : if (priority < PT_PRIO_HIGH)
945 0 : return -1; /* Drop - caller should log at WARN level */
946 0 : break;
947 :
948 0 : case PT_BACKPRESSURE_LIGHT:
949 : /* Allow but signal caller to slow down */
950 0 : break;
951 :
952 0 : case PT_BACKPRESSURE_NONE:
953 : /* All clear */
954 0 : break;
955 : }
956 :
957 1 : result = pt_queue_push_coalesce(q, data, len, priority, key);
958 :
959 : /* Update pressure after push */
960 1 : if (pressure_out)
961 1 : *pressure_out = pt_queue_backpressure(q);
962 :
963 1 : return result;
964 : }
|