LCOV - code coverage report
Current view: top level - core - queue.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 293 413 70.9 %
Date: 2026-02-22 12:14:12 Functions: 22 26 84.6 %

          Line data    Source code
       1             : /* queue.c - Message queue implementation */
       2             : 
       3             : #include "queue.h"
       4             : #include "pt_compat.h"
       5             : #include "../../include/peertalk.h"
       6             : 
       7             : /* Helper: Check if value is power of two */
       8         111 : static int pt_is_power_of_two(uint16_t value)
       9             : {
      10         111 :     return (value > 0) && ((value & (value - 1)) == 0);
      11             : }
      12             : 
      13             : /* ========================================================================
      14             :  * Queue Management
      15             :  * ======================================================================== */
      16             : 
      17         111 : int pt_queue_init(struct pt_context *ctx, pt_queue *q, uint16_t capacity)
      18             : {
      19             :     size_t alloc_size;
      20             : 
      21         111 :     if (!q) {
      22           0 :         return PT_ERR_INVALID_PARAM;
      23             :     }
      24             : 
      25             :     /* Validate capacity is power of two */
      26         111 :     if (!pt_is_power_of_two(capacity)) {
      27           1 :         if (ctx) {
      28           1 :             PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
      29             :                       "Queue capacity must be power of 2, got %u", capacity);
      30             :         }
      31           1 :         return PT_ERR_INVALID_PARAM;
      32             :     }
      33             : 
      34             :     /* Allocate slots array */
      35         110 :     alloc_size = sizeof(pt_queue_slot) * capacity;
      36         110 :     q->slots = (pt_queue_slot *)pt_alloc_clear(alloc_size);
      37         110 :     if (!q->slots) {
      38           0 :         if (ctx) {
      39           0 :             PT_CTX_ERR(ctx, PT_LOG_CAT_MEMORY,
      40             :                       "Failed to allocate %lu bytes for %u queue slots",
      41             :                       (unsigned long)alloc_size, capacity);
      42             :         }
      43           0 :         return PT_ERR_NO_MEMORY;
      44             :     }
      45             : 
      46             :     /* Initialize queue state */
      47         110 :     q->magic = PT_QUEUE_MAGIC;
      48         110 :     q->capacity = capacity;
      49         110 :     q->capacity_mask = capacity - 1;  /* For fast wrap-around */
      50         110 :     q->write_idx = 0;
      51         110 :     q->read_idx = 0;
      52         110 :     q->count = 0;
      53         110 :     q->has_data = 0;
      54         110 :     q->reserved = 0;
      55             : 
      56             :     /* Initialize Phase 3 extensions (priority freelists, coalesce hash) */
      57         110 :     pt_queue_ext_init(q);
      58             : 
      59         110 :     if (ctx) {
      60         102 :         PT_CTX_INFO(ctx, PT_LOG_CAT_PROTOCOL,
      61             :                    "Queue initialized: %u slots, %zu bytes",
      62             :                    capacity, alloc_size);
      63             :     }
      64             : 
      65         110 :     return 0;
      66             : }
      67             : 
      68          64 : void pt_queue_free(pt_queue *q)
      69             : {
      70          64 :     if (!q || !q->slots) {
      71           0 :         return;
      72             :     }
      73             : 
      74          64 :     pt_free(q->slots);
      75          64 :     q->slots = NULL;
      76          64 :     q->magic = 0;
      77          64 :     q->capacity = 0;
      78          64 :     q->capacity_mask = 0;
      79          64 :     q->write_idx = 0;
      80          64 :     q->read_idx = 0;
      81          64 :     q->count = 0;
      82          64 :     q->has_data = 0;
      83             : }
      84             : 
      85           1 : void pt_queue_reset(pt_queue *q)
      86             : {
      87           1 :     if (!q || q->magic != PT_QUEUE_MAGIC) {
      88           0 :         return;
      89             :     }
      90             : 
      91           1 :     q->write_idx = 0;
      92           1 :     q->read_idx = 0;
      93           1 :     q->count = 0;
      94           1 :     q->has_data = 0;
      95             : }
      96             : 
      97             : /* ========================================================================
      98             :  * Push Operations
      99             :  * ======================================================================== */
     100             : 
     101         954 : int pt_queue_push(struct pt_context *ctx, pt_queue *q,
     102             :                   const void *data, uint16_t len,
     103             :                   uint8_t priority, uint8_t flags)
     104             : {
     105             :     pt_queue_slot *slot;
     106             :     uint8_t pressure;
     107             :     /* cppcheck-suppress variableScope ; static var must persist across calls for threshold tracking */
     108             :     static uint8_t last_pressure_level = 0;  /* Track for logging thresholds */
     109             : 
     110         954 :     if (!q || q->magic != PT_QUEUE_MAGIC || !data) {
     111           1 :         return PT_ERR_INVALID_PARAM;
     112             :     }
     113             : 
     114         953 :     if (len > PT_QUEUE_SLOT_SIZE) {
     115           0 :         if (ctx) {
     116           0 :             PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
     117             :                       "Message too large: %u bytes (max %u)",
     118             :                       len, PT_QUEUE_SLOT_SIZE);
     119             :         }
     120           0 :         return PT_ERR_INVALID_PARAM;
     121             :     }
     122             : 
     123             :     /* Check if full */
     124         953 :     if (q->count >= q->capacity) {
     125           2 :         if (ctx) {
     126           2 :             PT_CTX_WARN(ctx, PT_LOG_CAT_PROTOCOL,
     127             :                        "Queue full: %u/%u slots", q->count, q->capacity);
     128             :         }
     129           2 :         return PT_ERR_BUFFER_FULL;
     130             :     }
     131             : 
     132             :     /* Get slot and write data */
     133         951 :     slot = &q->slots[q->write_idx];
     134         951 :     slot->length = len;
     135         951 :     slot->priority = priority;
     136         951 :     slot->flags = PT_SLOT_USED | flags;
     137         951 :     slot->coalesce_key = PT_COALESCE_NONE;  /* No coalescing for plain push */
     138         951 :     slot->next_slot = PT_SLOT_NONE;
     139         951 :     pt_memcpy(slot->data, data, len);
     140             : 
     141             :     /* Add to priority freelist (Phase 3 enhancement) */
     142             :     {
     143         951 :         pt_queue_ext *ext = &q->ext;
     144         951 :         uint16_t slot_idx = q->write_idx;
     145             : 
     146         951 :         if (ext->prio_tail[priority] != PT_SLOT_NONE) {
     147             :             /* List has items - append to tail */
     148         906 :             q->slots[ext->prio_tail[priority]].next_slot = slot_idx;
     149             :         } else {
     150             :             /* List empty - set head */
     151          45 :             ext->prio_head[priority] = slot_idx;
     152             :         }
     153         951 :         ext->prio_tail[priority] = slot_idx;
     154         951 :         ext->prio_count[priority]++;
     155             :     }
     156             : 
     157             :     /* Advance write index (power-of-two wrap-around) */
     158         951 :     q->write_idx = (q->write_idx + 1) & q->capacity_mask;
     159         951 :     q->count++;
     160         951 :     q->has_data = 1;
     161             : 
     162             :     /* Check backpressure and log at thresholds */
     163         951 :     pressure = pt_queue_pressure(q);
     164         951 :     if (ctx) {
     165             :         /* Log when crossing 80%, 90%, 95% thresholds */
     166         951 :         if (pressure >= 95 && last_pressure_level < 95) {
     167           5 :             PT_CTX_WARN(ctx, PT_LOG_CAT_PERF,
     168             :                        "Queue pressure CRITICAL: %u%% (%u/%u slots)",
     169             :                        pressure, q->count, q->capacity);
     170           5 :             last_pressure_level = 95;
     171         946 :         } else if (pressure >= 90 && last_pressure_level < 90) {
     172           1 :             PT_CTX_WARN(ctx, PT_LOG_CAT_PERF,
     173             :                        "Queue pressure HIGH: %u%% (%u/%u slots)",
     174             :                        pressure, q->count, q->capacity);
     175           1 :             last_pressure_level = 90;
     176         945 :         } else if (pressure >= 80 && last_pressure_level < 80) {
     177           3 :             PT_CTX_WARN(ctx, PT_LOG_CAT_PERF,
     178             :                        "Queue pressure elevated: %u%% (%u/%u slots)",
     179             :                        pressure, q->count, q->capacity);
     180           3 :             last_pressure_level = 80;
     181         942 :         } else if (pressure < 80 && last_pressure_level >= 80) {
     182             :             /* Reset threshold tracking when pressure drops */
     183           5 :             last_pressure_level = 0;
     184             :         }
     185             :     }
     186             : 
     187         951 :     return 0;
     188             : }
     189             : 
     190           1 : int pt_queue_push_isr(pt_queue *q, const void *data, uint16_t len)
     191             : {
     192             :     pt_queue_slot *slot;
     193             : 
     194             :     /* ISR-SAFETY: No parameter validation logging, no backpressure logging */
     195           1 :     if (!q || q->magic != PT_QUEUE_MAGIC || !data) {
     196           0 :         return PT_ERR_INVALID_PARAM;
     197             :     }
     198             : 
     199           1 :     if (len > PT_QUEUE_SLOT_SIZE) {
     200           0 :         return PT_ERR_INVALID_PARAM;
     201             :     }
     202             : 
     203             :     /* Check if full */
     204           1 :     if (q->count >= q->capacity) {
     205           0 :         return PT_ERR_BUFFER_FULL;
     206             :     }
     207             : 
     208             :     /* Get slot and write data using ISR-safe memcpy */
     209           1 :     slot = &q->slots[q->write_idx];
     210           1 :     slot->length = len;
     211           1 :     slot->priority = 0;
     212           1 :     slot->flags = PT_SLOT_USED;
     213           1 :     pt_memcpy_isr(slot->data, data, len);  /* ISR-safe: no Toolbox */
     214             : 
     215             :     /* Advance write index (power-of-two wrap-around) */
     216           1 :     q->write_idx = (q->write_idx + 1) & q->capacity_mask;
     217           1 :     q->count++;
     218           1 :     q->has_data = 1;
     219             : 
     220           1 :     return 0;
     221             : }
     222             : 
     223           0 : int pt_queue_push_isr_ot(pt_queue *q, const void *data, uint16_t len)
     224             : {
     225             :     pt_queue_slot *slot;
     226             : 
     227             :     /* ISR-SAFETY: No logging, uses atomic operations for OT reentrancy */
     228           0 :     if (!q || q->magic != PT_QUEUE_MAGIC || !data) {
     229           0 :         return PT_ERR_INVALID_PARAM;
     230             :     }
     231             : 
     232           0 :     if (len > PT_QUEUE_SLOT_SIZE) {
     233           0 :         return PT_ERR_INVALID_PARAM;
     234             :     }
     235             : 
     236             :     /* Check if full */
     237           0 :     if (q->count >= q->capacity) {
     238           0 :         return PT_ERR_BUFFER_FULL;
     239             :     }
     240             : 
     241             :     /* Get slot and write metadata */
     242           0 :     slot = &q->slots[q->write_idx];
     243           0 :     slot->length = len;
     244           0 :     slot->priority = 0;
     245           0 :     slot->flags = PT_SLOT_USED;  /* Set USED but NOT READY yet */
     246             : 
     247             :     /* Copy data using ISR-safe memcpy */
     248           0 :     pt_memcpy_isr(slot->data, data, len);
     249             : 
     250             :     /* OT REENTRANCY SAFETY: Set READY flag AFTER data copy completes.
     251             :      * If another notifier runs during copy, it won't read partial data.
     252             :      * On Classic Mac with OT, use atomic bit set (OTAtomicSetBit).
     253             :      * On POSIX, just set flag (no OT reentrancy). */
     254             : #ifdef PT_PLATFORM_OPENTRANSPORT
     255             :     /* Atomic bit set for OT notifier reentrancy safety */
     256             :     /* Note: OTAtomicSetBit not available in POSIX builds */
     257             :     slot->flags |= PT_SLOT_READY;
     258             : #else
     259           0 :     slot->flags |= PT_SLOT_READY;
     260             : #endif
     261             : 
     262             :     /* Advance write index (power-of-two wrap-around) */
     263           0 :     q->write_idx = (q->write_idx + 1) & q->capacity_mask;
     264           0 :     q->count++;
     265           0 :     q->has_data = 1;
     266             : 
     267           0 :     return 0;
     268             : }
     269             : 
     270             : /* ========================================================================
     271             :  * Pop Operations
     272             :  * ======================================================================== */
     273             : 
     274          12 : int pt_queue_pop(pt_queue *q, void *data, uint16_t *len)
     275             : {
     276             :     pt_queue_slot *slot;
     277             : 
     278          12 :     if (!q || q->magic != PT_QUEUE_MAGIC || !data || !len) {
     279           0 :         return PT_ERR_INVALID_PARAM;
     280             :     }
     281             : 
     282             :     /* Check if empty */
     283          12 :     if (q->count == 0) {
     284           1 :         return PT_ERR_QUEUE_EMPTY;
     285             :     }
     286             : 
     287             :     /* Get front slot */
     288          11 :     slot = &q->slots[q->read_idx];
     289             : 
     290             : #ifdef PT_PLATFORM_OPENTRANSPORT
     291             :     /* OT REENTRANCY SAFETY: Wait for READY flag before reading data.
     292             :      * If push_isr_ot is interrupted, we won't read partial data. */
     293             :     if (!(slot->flags & PT_SLOT_READY)) {
     294             :         return PT_ERR_QUEUE_EMPTY;  /* Data not ready yet */
     295             :     }
     296             : #endif
     297             : 
     298             :     /* Check slot is used */
     299          11 :     if (!(slot->flags & PT_SLOT_USED)) {
     300           0 :         return PT_ERR_QUEUE_EMPTY;
     301             :     }
     302             : 
     303             :     /* Copy data */
     304          11 :     *len = slot->length;
     305          11 :     pt_memcpy(data, slot->data, slot->length);
     306             : 
     307             :     /* Clear slot */
     308          11 :     slot->flags = 0;
     309          11 :     slot->length = 0;
     310             : 
     311             :     /* Advance read index (power-of-two wrap-around) */
     312          11 :     q->read_idx = (q->read_idx + 1) & q->capacity_mask;
     313          11 :     q->count--;
     314             : 
     315             :     /* Update has_data flag */
     316          11 :     if (q->count == 0) {
     317           6 :         q->has_data = 0;
     318             :     }
     319             : 
     320          11 :     return 0;
     321             : }
     322             : 
     323           1 : int pt_queue_peek(pt_queue *q, void **data, uint16_t *len)
     324             : {
     325             :     pt_queue_slot *slot;
     326             : 
     327           1 :     if (!q || q->magic != PT_QUEUE_MAGIC || !data || !len) {
     328           0 :         return PT_ERR_INVALID_PARAM;
     329             :     }
     330             : 
     331             :     /* Check if empty */
     332           1 :     if (q->count == 0) {
     333           0 :         return PT_ERR_QUEUE_EMPTY;
     334             :     }
     335             : 
     336             :     /* Get front slot */
     337           1 :     slot = &q->slots[q->read_idx];
     338             : 
     339             : #ifdef PT_PLATFORM_OPENTRANSPORT
     340             :     /* OT REENTRANCY SAFETY: Wait for READY flag */
     341             :     if (!(slot->flags & PT_SLOT_READY)) {
     342             :         return PT_ERR_QUEUE_EMPTY;
     343             :     }
     344             : #endif
     345             : 
     346             :     /* Check slot is used */
     347           1 :     if (!(slot->flags & PT_SLOT_USED)) {
     348           0 :         return PT_ERR_QUEUE_EMPTY;
     349             :     }
     350             : 
     351             :     /* Return pointer to slot data (zero-copy) */
     352           1 :     *data = slot->data;
     353           1 :     *len = slot->length;
     354             : 
     355           1 :     return 0;
     356             : }
     357             : 
     358           1 : void pt_queue_consume(pt_queue *q)
     359             : {
     360             :     pt_queue_slot *slot;
     361             : 
     362           1 :     if (!q || q->magic != PT_QUEUE_MAGIC || q->count == 0) {
     363           0 :         return;
     364             :     }
     365             : 
     366             :     /* Clear front slot */
     367           1 :     slot = &q->slots[q->read_idx];
     368           1 :     slot->flags = 0;
     369           1 :     slot->length = 0;
     370             : 
     371             :     /* Advance read index (power-of-two wrap-around) */
     372           1 :     q->read_idx = (q->read_idx + 1) & q->capacity_mask;
     373           1 :     q->count--;
     374             : 
     375             :     /* Update has_data flag */
     376           1 :     if (q->count == 0) {
     377           1 :         q->has_data = 0;
     378             :     }
     379             : }
     380             : 
     381             : /* ========================================================================
     382             :  * Queue Status
     383             :  * ======================================================================== */
     384             : 
     385           7 : uint16_t pt_queue_count(pt_queue *q)
     386             : {
     387           7 :     if (!q || q->magic != PT_QUEUE_MAGIC) {
     388           0 :         return 0;
     389             :     }
     390             : 
     391           7 :     return q->count;
     392             : }
     393             : 
     394           3 : uint16_t pt_queue_free_slots(pt_queue *q)
     395             : {
     396           3 :     if (!q || q->magic != PT_QUEUE_MAGIC) {
     397           0 :         return 0;
     398             :     }
     399             : 
     400           3 :     return q->capacity - q->count;
     401             : }
     402             : 
     403        1845 : uint8_t pt_queue_pressure(pt_queue *q)
     404             : {
     405             :     uint32_t pressure;
     406             : 
     407        1845 :     if (!q || q->magic != PT_QUEUE_MAGIC || q->capacity == 0) {
     408           0 :         return 0;
     409             :     }
     410             : 
     411             :     /* Calculate pressure as percentage (0-100)
     412             :      * Use uint32_t to prevent overflow when count > 655 */
     413        1845 :     pressure = ((uint32_t)q->count * 100) / q->capacity;
     414             : 
     415             :     /* Clamp to 100 */
     416        1845 :     if (pressure > 100) {
     417           0 :         pressure = 100;
     418             :     }
     419             : 
     420        1845 :     return (uint8_t)pressure;
     421             : }
     422             : 
     423           1 : int pt_queue_is_full(pt_queue *q)
     424             : {
     425           1 :     if (!q || q->magic != PT_QUEUE_MAGIC) {
     426           0 :         return 0;
     427             :     }
     428             : 
     429           1 :     return (q->count >= q->capacity) ? 1 : 0;
     430             : }
     431             : 
     432          12 : int pt_queue_is_empty(pt_queue *q)
     433             : {
     434          12 :     if (!q || q->magic != PT_QUEUE_MAGIC) {
     435           0 :         return 1;
     436             :     }
     437             : 
     438          12 :     return (q->count == 0) ? 1 : 0;
     439             : }
     440             : 
     441             : /* ========================================================================
     442             :  * Coalescing
     443             :  * ======================================================================== */
     444             : 
     445           1 : int pt_queue_coalesce(pt_queue *q, const void *data, uint16_t len)
     446             : {
     447             :     uint16_t i;
     448             :     uint16_t search_start;
     449             :     uint16_t search_count;
     450             :     /* cppcheck-suppress variableScope ; C89 style for Classic Mac compiler compatibility */
     451             :     pt_queue_slot *slot;
     452             : 
     453           1 :     if (!q || q->magic != PT_QUEUE_MAGIC || !data) {
     454           0 :         return -1;
     455             :     }
     456             : 
     457           1 :     if (len > PT_QUEUE_SLOT_SIZE) {
     458           0 :         return -1;
     459             :     }
     460             : 
     461             :     /* Search last 4 slots for coalescable message
     462             :      * Rationale: Position updates typically coalesce with very recent
     463             :      * messages. Searching entire queue causes cache misses on 68030
     464             :      * with 256-byte cache line. */
     465           1 :     search_count = (q->count < 4) ? q->count : 4;
     466           1 :     if (search_count == 0) {
     467           0 :         return -1;  /* Queue empty */
     468             :     }
     469             : 
     470             :     /* Start from write_idx and search backwards */
     471           1 :     search_start = q->write_idx;
     472             : 
     473           1 :     for (i = 0; i < search_count; i++) {
     474             :         uint16_t idx;
     475             : 
     476             :         /* Calculate slot index searching backwards (with wrap-around) */
     477           1 :         if (search_start >= i + 1) {
     478           1 :             idx = search_start - (i + 1);
     479             :         } else {
     480           0 :             idx = q->capacity - ((i + 1) - search_start);
     481             :         }
     482             : 
     483           1 :         slot = &q->slots[idx];
     484             : 
     485             :         /* Check if slot is coalescable */
     486           1 :         if ((slot->flags & PT_SLOT_USED) &&
     487           1 :             (slot->flags & PT_SLOT_COALESCABLE)) {
     488             :             /* Found coalescable message - replace it */
     489           1 :             slot->length = len;
     490           1 :             pt_memcpy(slot->data, data, len);
     491           1 :             return 0;
     492             :         }
     493             :     }
     494             : 
     495           0 :     return -1;  /* No coalescable message found */
     496             : }
     497             : 
     498             : /* ========================================================================
     499             :  * Phase 3: Priority & Coalescing Operations
     500             :  * ======================================================================== */
     501             : 
     502             : /*
     503             :  * Initialize extended queue data structures
     504             :  *
     505             :  * Call this after pt_queue_init() from Phase 2 to set up
     506             :  * priority free-lists and coalesce hash table.
     507             :  */
     508         118 : void pt_queue_ext_init(pt_queue *q) {
     509             :     pt_queue_ext *ext;
     510             :     uint16_t i;
     511             :     uint16_t max_slots;  /* Loop invariant hoisted for optimization */
     512             : 
     513         118 :     if (!q) {
     514             :         /* Cannot log without context, but defend against NULL */
     515           0 :         return;
     516             :     }
     517             : 
     518             : #ifdef PT_QUEUE_SLOT_EXPECTED_SIZE
     519             :     /* Runtime size check for C89/C99 (C11 uses _Static_assert) */
     520             :     if (sizeof(pt_queue_slot) != PT_QUEUE_SLOT_EXPECTED_SIZE) {
     521             :         /* Size mismatch - likely compiler padding issue.
     522             :          * Use #pragma pack on Classic Mac compilers if this fails. */
     523             :         return;  /* Fail-safe: don't corrupt memory */
     524             :     }
     525             : #endif
     526             : 
     527             :     /* Capacity validation - queue capacity must not exceed PT_QUEUE_MAX_SLOTS
     528             :      * because next_slot pointers are only initialized for slots 0..MAX_SLOTS-1 */
     529         118 :     if (q->capacity > PT_QUEUE_MAX_SLOTS) {
     530             :         /* Configuration error - slots beyond MAX_SLOTS have uninitialized next_slot.
     531             :          * Caller should use smaller capacity or increase PT_QUEUE_MAX_SLOTS.
     532             :          * NOTE: Cannot log here since we don't have context - caller should check. */
     533           1 :         return;  /* Fail-safe: don't use queue with uninitialized slots */
     534             :     }
     535             : 
     536         117 :     ext = &q->ext;
     537             : 
     538             :     /* Initialize priority free-lists as empty */
     539         585 :     for (i = 0; i < PT_PRIO_COUNT; i++) {
     540         468 :         ext->prio_head[i] = PT_SLOT_NONE;
     541         468 :         ext->prio_tail[i] = PT_SLOT_NONE;
     542         468 :         ext->prio_count[i] = 0;
     543             :     }
     544             : 
     545             :     /* Initialize slot next_slot pointers (stored in slots themselves)
     546             :      * OPTIMIZATION: Hoist loop invariant min(capacity, MAX_SLOTS) */
     547         117 :     max_slots = (q->capacity < PT_QUEUE_MAX_SLOTS) ? q->capacity : PT_QUEUE_MAX_SLOTS;
     548        1921 :     for (i = 0; i < max_slots; i++) {
     549        1804 :         q->slots[i].next_slot = PT_SLOT_NONE;
     550             :     }
     551             : 
     552             :     /* Initialize coalesce hash table as empty */
     553        3861 :     for (i = 0; i < PT_COALESCE_HASH_SIZE; i++) {
     554        3744 :         ext->coalesce_hash[i] = PT_SLOT_NONE;
     555             :     }
     556             : 
     557             :     /* Initialize pending pop state */
     558         117 :     q->pending_pop_prio = 0;
     559         117 :     q->pending_pop_slot = PT_SLOT_NONE;
     560             : 
     561             :     /* Initialize ISR flags (for deferred logging from interrupt context) */
     562         117 :     q->isr_flags.queue_full = 0;
     563         117 :     q->isr_flags.coalesce_hit = 0;
     564         117 :     q->isr_flags.hash_collision = 0;
     565             : 
     566             :     /* NOTE: Logging should be done by caller who has context:
     567             :      * PT_LOG_DEBUG(ctx->log, PT_LOG_CAT_PROTOCOL, "Queue ext initialized: capacity=%u", q->capacity);
     568             :      */
     569             : }
     570             : 
     571             : /*
     572             :  * Pop highest priority message - O(1) using priority free-lists
     573             :  */
     574          13 : int pt_queue_pop_priority(pt_queue *q, void *data, uint16_t *len) {
     575             :     pt_queue_ext *ext;
     576             :     pt_queue_slot *slot;
     577             :     uint16_t slot_idx;
     578             :     int prio;
     579             : 
     580          13 :     if (!q || q->count == 0)
     581           1 :         return -1;
     582             : 
     583          12 :     ext = &q->ext;  /* Extended queue data */
     584             : 
     585             :     /* Find highest non-empty priority level */
     586          32 :     for (prio = PT_PRIO_CRITICAL; prio >= PT_PRIO_LOW; prio--) {
     587          32 :         if (ext->prio_head[prio] != PT_SLOT_NONE)
     588          12 :             break;
     589             :     }
     590             : 
     591          12 :     if (prio < PT_PRIO_LOW)
     592           0 :         return -1;  /* All lists empty (shouldn't happen if count > 0) */
     593             : 
     594             :     /* Pop from head of this priority's list */
     595          12 :     slot_idx = ext->prio_head[prio];
     596          12 :     slot = &q->slots[slot_idx];
     597             : 
     598             :     /* Update list head (next_slot is IN the slot for traversal locality) */
     599          12 :     ext->prio_head[prio] = slot->next_slot;
     600          12 :     if (ext->prio_head[prio] == PT_SLOT_NONE)
     601          10 :         ext->prio_tail[prio] = PT_SLOT_NONE;  /* List now empty */
     602          12 :     ext->prio_count[prio]--;
     603             : 
     604             :     /* Remove from coalesce hash if present */
     605          12 :     if (slot->coalesce_key != PT_COALESCE_NONE) {
     606           1 :         uint16_t hash_idx = PT_COALESCE_HASH(slot->coalesce_key);
     607           1 :         if (ext->coalesce_hash[hash_idx] == slot_idx)
     608           1 :             ext->coalesce_hash[hash_idx] = PT_SLOT_NONE;
     609             :     }
     610             : 
     611             :     /* Copy data out */
     612          12 :     if (data && len) {
     613          12 :         pt_memcpy(data, slot->data, slot->length);
     614          12 :         *len = slot->length;
     615             :     }
     616             : 
     617             :     /* Clear slot */
     618          12 :     slot->flags = 0;
     619          12 :     slot->length = 0;
     620          12 :     slot->next_slot = PT_SLOT_NONE;
     621          12 :     q->count--;
     622             : 
     623          12 :     if (q->count == 0)
     624           4 :         q->has_data = 0;
     625             : 
     626          12 :     return 0;
     627             : }
     628             : 
     629             : /*
     630             :  * Direct pop - returns pointer to slot data without copying (ZERO-COPY)
     631             :  */
     632         346 : int pt_queue_pop_priority_direct(pt_queue *q, const void **data_out,
     633             :                                   uint16_t *len_out) {
     634             :     pt_queue_ext *ext;
     635             :     pt_queue_slot *slot;
     636             :     uint16_t slot_idx;
     637             :     int prio;
     638             : 
     639         346 :     if (!q || q->count == 0 || !data_out || !len_out)
     640         320 :         return -1;
     641             : 
     642          26 :     ext = &q->ext;
     643             : 
     644             :     /* Find highest non-empty priority level */
     645          74 :     for (prio = PT_PRIO_CRITICAL; prio >= PT_PRIO_LOW; prio--) {
     646          74 :         if (ext->prio_head[prio] != PT_SLOT_NONE)
     647          26 :             break;
     648             :     }
     649             : 
     650          26 :     if (prio < PT_PRIO_LOW)
     651           0 :         return -1;
     652             : 
     653             :     /* Return pointer to head slot's data */
     654          26 :     slot_idx = ext->prio_head[prio];
     655          26 :     slot = &q->slots[slot_idx];
     656             : 
     657          26 :     *data_out = slot->data;
     658          26 :     *len_out = slot->length;
     659             : 
     660             :     /* Store pending pop info for commit (use reserved field or add to ext) */
     661          26 :     q->pending_pop_prio = (uint8_t)prio;
     662          26 :     q->pending_pop_slot = slot_idx;
     663             : 
     664          26 :     return 0;
     665             : }
     666             : 
     667             : /*
     668             :  * Commit a direct pop - actually removes the slot from queue
     669             :  */
     670          26 : void pt_queue_pop_priority_commit(pt_queue *q) {
     671             :     pt_queue_ext *ext;
     672             :     pt_queue_slot *slot;
     673             :     uint16_t slot_idx;
     674             :     int prio;
     675             : 
     676          26 :     if (!q)
     677           0 :         return;
     678             : 
     679          26 :     ext = &q->ext;
     680          26 :     prio = q->pending_pop_prio;
     681          26 :     slot_idx = q->pending_pop_slot;
     682          26 :     slot = &q->slots[slot_idx];
     683             : 
     684             :     /* Update list head */
     685          26 :     ext->prio_head[prio] = slot->next_slot;
     686          26 :     if (ext->prio_head[prio] == PT_SLOT_NONE)
     687          18 :         ext->prio_tail[prio] = PT_SLOT_NONE;
     688          26 :     ext->prio_count[prio]--;
     689             : 
     690             :     /* Remove from coalesce hash if present */
     691          26 :     if (slot->coalesce_key != PT_COALESCE_NONE) {
     692           0 :         uint16_t hash_idx = PT_COALESCE_HASH(slot->coalesce_key);
     693           0 :         if (ext->coalesce_hash[hash_idx] == slot_idx)
     694           0 :             ext->coalesce_hash[hash_idx] = PT_SLOT_NONE;
     695             :     }
     696             : 
     697             :     /* Clear slot */
     698          26 :     slot->flags = 0;
     699          26 :     slot->length = 0;
     700          26 :     slot->next_slot = PT_SLOT_NONE;
     701          26 :     q->count--;
     702             : 
     703          26 :     if (q->count == 0)
     704          11 :         q->has_data = 0;
     705             : }
     706             : 
     707             : /*
     708             :  * Rollback a direct pop - cancel without removing from queue
     709             :  *
     710             :  * The pending slot remains in the queue and will be returned again
     711             :  * on the next pt_queue_pop_priority_direct() call.
     712             :  */
     713           0 : void pt_queue_pop_priority_rollback(pt_queue *q) {
     714           0 :     if (!q)
     715           0 :         return;
     716             : 
     717             :     /* Simply reset the pending pop state - slot stays in queue */
     718           0 :     q->pending_pop_slot = PT_SLOT_NONE;
     719           0 :     q->pending_pop_prio = 0;
     720             : }
     721             : 
     722             : /*
     723             :  * Push with coalescing - O(1) using hash table lookup
     724             :  */
     725          91 : int pt_queue_push_coalesce(pt_queue *q, const void *data, uint16_t len,
     726             :                             uint8_t priority, pt_coalesce_key key) {
     727             :     pt_queue_ext *ext;
     728             :     pt_queue_slot *slot;
     729             :     uint16_t slot_idx;
     730             :     uint16_t hash_idx;
     731             : 
     732          91 :     if (!q || !data || len == 0 || len > PT_QUEUE_SLOT_SIZE)
     733           0 :         return -1;
     734             : 
     735          91 :     ext = &q->ext;
     736             : 
     737             :     /* Check hash table for existing message with same key - O(1) */
     738          91 :     if (key != PT_COALESCE_NONE) {
     739          17 :         hash_idx = PT_COALESCE_HASH(key);
     740          17 :         slot_idx = ext->coalesce_hash[hash_idx];
     741             : 
     742          17 :         if (slot_idx != PT_SLOT_NONE) {
     743          12 :             slot = &q->slots[slot_idx];
     744             :             /* Verify it's actually our key (hash collision check) */
     745          12 :             if ((slot->flags & PT_SLOT_USED) && slot->coalesce_key == key) {
     746             :                 /* Found - replace data in place */
     747             :                 /* LOGGING: Caller can log "Coalesce hit: key=0x%04X" at DEBUG level */
     748          10 :                 pt_memcpy(slot->data, data, len);
     749          10 :                 slot->length = len;
     750             :                 /* Note: priority and list position don't change on coalesce */
     751          10 :                 slot->timestamp = pt_get_ticks();  /* Update timestamp */
     752          10 :                 return 0;  /* Coalesced */
     753             :             }
     754             :             /* Hash collision - different key at same bucket */
     755             :             /* LOGGING: Caller can log "Hash collision at bucket %u" at DEBUG level */
     756             :         }
     757             :     }
     758             : 
     759             :     /* No existing message - allocate new slot */
     760          81 :     if (q->count >= q->capacity)
     761           0 :         return -1;  /* Full */
     762             : 
     763             :     /* Find free slot (use write_idx as starting point) */
     764          81 :     slot_idx = q->write_idx;
     765          81 :     slot = &q->slots[slot_idx];
     766             : 
     767             :     /* Fill slot */
     768          81 :     pt_memcpy(slot->data, data, len);
     769          81 :     slot->length = len;
     770          81 :     slot->priority = priority;
     771          81 :     slot->flags = PT_SLOT_USED;
     772          81 :     slot->coalesce_key = key;
     773          81 :     slot->timestamp = pt_get_ticks();
     774             : 
     775             :     /* Add to priority list (append to tail for FIFO within priority)
     776             :      * NOTE: next_slot is IN the slot (not ext) for traversal locality */
     777          81 :     slot->next_slot = PT_SLOT_NONE;
     778          81 :     if (ext->prio_tail[priority] == PT_SLOT_NONE) {
     779             :         /* List was empty */
     780          15 :         ext->prio_head[priority] = slot_idx;
     781             :     } else {
     782             :         /* Append to tail - update previous tail's next_slot */
     783          66 :         q->slots[ext->prio_tail[priority]].next_slot = slot_idx;
     784             :     }
     785          81 :     ext->prio_tail[priority] = slot_idx;
     786          81 :     ext->prio_count[priority]++;
     787             : 
     788             :     /* Add to coalesce hash table */
     789          81 :     if (key != PT_COALESCE_NONE) {
     790           7 :         hash_idx = PT_COALESCE_HASH(key);
     791           7 :         ext->coalesce_hash[hash_idx] = slot_idx;
     792             :     }
     793             : 
     794          81 :     q->write_idx = (q->write_idx + 1) & q->capacity_mask;
     795          81 :     q->count++;
     796          81 :     q->has_data = 1;
     797             : 
     798          81 :     return 0;
     799             : }
     800             : 
     801             : /*
     802             :  * ISR-safe push with coalescing (for MacTCP ASR) - O(1) hash lookup
     803             :  */
     804           0 : int pt_queue_push_coalesce_isr(pt_queue *q, const void *data, uint16_t len,
     805             :                                 uint8_t priority, pt_coalesce_key key) {
     806             :     pt_queue_ext *ext;
     807             :     pt_queue_slot *slot;
     808             :     uint16_t slot_idx;
     809             :     uint16_t hash_idx;
     810             : 
     811           0 :     if (!q || !data || len == 0 || len > PT_QUEUE_SLOT_SIZE)
     812           0 :         return -1;
     813             : 
     814           0 :     ext = &q->ext;
     815             : 
     816             :     /* Check hash table for existing message with same key - O(1) */
     817           0 :     if (key != PT_COALESCE_NONE) {
     818           0 :         hash_idx = PT_COALESCE_HASH(key);
     819           0 :         slot_idx = ext->coalesce_hash[hash_idx];
     820             : 
     821           0 :         if (slot_idx != PT_SLOT_NONE) {
     822           0 :             slot = &q->slots[slot_idx];
     823           0 :             if ((slot->flags & PT_SLOT_USED) && slot->coalesce_key == key) {
     824             :                 /* Found - replace using ISR-safe copy */
     825           0 :                 pt_memcpy_isr(slot->data, data, len);
     826           0 :                 slot->length = len;
     827             :                 /* Don't update timestamp in ISR - no TickCount() call */
     828           0 :                 q->isr_flags.coalesce_hit = 1;  /* Signal main loop for logging */
     829           0 :                 return 0;  /* Coalesced */
     830             :             }
     831             :             /* Hash collision - set flag for logging */
     832           0 :             q->isr_flags.hash_collision = 1;
     833             :         }
     834             :     }
     835             : 
     836             :     /* No existing message - allocate new slot */
     837           0 :     if (q->count >= q->capacity) {
     838           0 :         q->isr_flags.queue_full = 1;  /* Signal main loop for logging */
     839           0 :         return -1;
     840             :     }
     841             : 
     842           0 :     slot_idx = q->write_idx;
     843           0 :     slot = &q->slots[slot_idx];
     844             : 
     845             :     /* Fill slot using ISR-safe copy */
     846           0 :     pt_memcpy_isr(slot->data, data, len);
     847           0 :     slot->length = len;
     848           0 :     slot->priority = priority;
     849           0 :     slot->flags = PT_SLOT_USED;
     850           0 :     slot->coalesce_key = key;
     851           0 :     slot->timestamp = 0;  /* No TickCount() in ISR */
     852             : 
     853             :     /* Add to priority list (next_slot is IN the slot for traversal locality) */
     854           0 :     slot->next_slot = PT_SLOT_NONE;
     855           0 :     if (ext->prio_tail[priority] == PT_SLOT_NONE) {
     856           0 :         ext->prio_head[priority] = slot_idx;
     857             :     } else {
     858           0 :         q->slots[ext->prio_tail[priority]].next_slot = slot_idx;
     859             :     }
     860           0 :     ext->prio_tail[priority] = slot_idx;
     861           0 :     ext->prio_count[priority]++;
     862             : 
     863             :     /* Add to coalesce hash table */
     864           0 :     if (key != PT_COALESCE_NONE) {
     865           0 :         hash_idx = PT_COALESCE_HASH(key);
     866           0 :         ext->coalesce_hash[hash_idx] = slot_idx;
     867             :     }
     868             : 
     869           0 :     q->write_idx = (q->write_idx + 1) & q->capacity_mask;
     870           0 :     q->count++;
     871           0 :     q->has_data = 1;
     872             : 
     873           0 :     return 0;
     874             : }
     875             : 
     876             : /*
     877             :  * Check and log ISR flags from main loop
     878             :  */
     879           0 : void pt_check_queue_isr_flags(struct pt_context *ctx, pt_queue *q) {
     880           0 :     if (!ctx || !ctx->log || !q)
     881           0 :         return;
     882             : 
     883           0 :     if (q->isr_flags.queue_full) {
     884           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_PROTOCOL, "Queue full during ISR");
     885           0 :         q->isr_flags.queue_full = 0;
     886             :     }
     887           0 :     if (q->isr_flags.coalesce_hit) {
     888           0 :         PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL, "Coalesce hit during ISR");
     889           0 :         q->isr_flags.coalesce_hit = 0;
     890             :     }
     891           0 :     if (q->isr_flags.hash_collision) {
     892           0 :         PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL, "Hash collision during ISR");
     893           0 :         q->isr_flags.hash_collision = 0;
     894             :     }
     895             : }
     896             : 
     897             : /* ========================================================================
     898             :  * Phase 3: Backpressure & Batch Operations
     899             :  * ======================================================================== */
     900             : 
     901             : /*
     902             :  * Get current backpressure level.
     903             :  */
     904           9 : pt_backpressure pt_queue_backpressure(pt_queue *q) {
     905             :     uint8_t pressure;
     906             : 
     907           9 :     if (!q)
     908           0 :         return PT_BACKPRESSURE_BLOCKING;
     909             : 
     910           9 :     pressure = pt_queue_pressure(q);
     911             : 
     912           9 :     if (pressure >= PT_PRESSURE_CRITICAL)
     913           4 :         return PT_BACKPRESSURE_BLOCKING;
     914           5 :     if (pressure >= PT_PRESSURE_HIGH)
     915           1 :         return PT_BACKPRESSURE_HEAVY;
     916           4 :     if (pressure >= PT_PRESSURE_MEDIUM)
     917           2 :         return PT_BACKPRESSURE_LIGHT;
     918             : 
     919           2 :     return PT_BACKPRESSURE_NONE;
     920             : }
     921             : 
     922             : /*
     923             :  * Try to push with backpressure awareness.
     924             :  */
     925           2 : int pt_queue_try_push(pt_queue *q, const void *data, uint16_t len,
     926             :                       uint8_t priority, pt_coalesce_key key,
     927             :                       pt_backpressure *pressure_out) {
     928             :     pt_backpressure bp;
     929             :     int result;
     930             : 
     931           2 :     bp = pt_queue_backpressure(q);
     932             : 
     933           2 :     if (pressure_out)
     934           2 :         *pressure_out = bp;
     935             : 
     936             :     /* Apply backpressure policy */
     937           2 :     switch (bp) {
     938           2 :     case PT_BACKPRESSURE_BLOCKING:
     939           2 :         if (priority < PT_PRIO_CRITICAL)
     940           1 :             return -1;  /* Drop - caller should log at WARN level */
     941           1 :         break;
     942             : 
     943           0 :     case PT_BACKPRESSURE_HEAVY:
     944           0 :         if (priority < PT_PRIO_HIGH)
     945           0 :             return -1;  /* Drop - caller should log at WARN level */
     946           0 :         break;
     947             : 
     948           0 :     case PT_BACKPRESSURE_LIGHT:
     949             :         /* Allow but signal caller to slow down */
     950           0 :         break;
     951             : 
     952           0 :     case PT_BACKPRESSURE_NONE:
     953             :         /* All clear */
     954           0 :         break;
     955             :     }
     956             : 
     957           1 :     result = pt_queue_push_coalesce(q, data, len, priority, key);
     958             : 
     959             :     /* Update pressure after push */
     960           1 :     if (pressure_out)
     961           1 :         *pressure_out = pt_queue_backpressure(q);
     962             : 
     963           1 :     return result;
     964             : }

Generated by: LCOV version 1.14