LCOV - code coverage report
Current view: top level - core - send.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 108 233 46.4 %
Date: 2026-02-22 12:14:12 Functions: 6 8 75.0 %

          Line data    Source code
       1             : /* send.c - Batch send operations for PeerTalk
       2             :  *
       3             :  * Implements batching to combine multiple small messages into one TCP packet
       4             :  * for improved efficiency. Common in games where many small control messages
       5             :  * (position updates, input events) would otherwise have high TCP/IP overhead.
       6             :  *
       7             :  * Also implements transparent fragmentation for large messages sent to
       8             :  * constrained peers. Applications call PeerTalk_Send() and the SDK handles
       9             :  * fragment splitting automatically based on negotiated capabilities.
      10             :  */
      11             : 
      12             : #include "pt_internal.h"
      13             : #include "send.h"
      14             : #include "queue.h"
      15             : #include "protocol.h"
      16             : #include "peer.h"
      17             : #include "pt_compat.h"
      18             : #include "direct_buffer.h"
      19             : 
      20             : /* ========================================================================
      21             :  * Fragment Send (Internal)
      22             :  *
      23             :  * Sends a single fragment of a larger message. Each fragment is framed
      24             :  * as a complete message with PT_MSG_FLAG_FRAGMENT set and a fragment
      25             :  * header prepended to the payload.
      26             :  * ======================================================================== */
      27             : 
      28           0 : static int pt_send_fragment(struct pt_context *ctx, struct pt_peer *peer,
      29             :                             const uint8_t *data, uint16_t frag_len,
      30             :                             uint16_t msg_id, uint16_t total_len,
      31             :                             uint16_t offset, uint8_t frag_flags,
      32             :                             uint8_t priority) {
      33           0 :     pt_queue *q = peer->send_queue;
      34             :     pt_fragment_header frag_hdr;
      35             :     uint8_t frag_buf[PT_QUEUE_SLOT_SIZE];
      36             :     uint16_t total_frag_len;
      37             :     int result;
      38             : 
      39           0 :     if (!q) {
      40           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_SEND,
      41             :             "Peer %u has no send queue", peer->hot.id);
      42           0 :         return PT_ERR_INVALID_STATE;
      43             :     }
      44             : 
      45             :     /* Build fragment header */
      46           0 :     frag_hdr.message_id = msg_id;
      47           0 :     frag_hdr.total_length = total_len;
      48           0 :     frag_hdr.fragment_offset = offset;
      49           0 :     frag_hdr.fragment_flags = frag_flags;
      50           0 :     frag_hdr.reserved = 0;
      51             : 
      52             :     /* Check fragment fits in queue slot */
      53           0 :     total_frag_len = PT_FRAGMENT_HEADER_SIZE + frag_len;
      54           0 :     if (total_frag_len > PT_QUEUE_SLOT_SIZE) {
      55           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_SEND,
      56             :             "Fragment too large: %u + %u > %u",
      57             :             PT_FRAGMENT_HEADER_SIZE, frag_len, PT_QUEUE_SLOT_SIZE);
      58           0 :         return PT_ERR_MESSAGE_TOO_LARGE;
      59             :     }
      60             : 
      61             :     /* Build fragment: header + data */
      62           0 :     pt_fragment_encode(&frag_hdr, frag_buf);
      63           0 :     pt_memcpy(frag_buf + PT_FRAGMENT_HEADER_SIZE, data, frag_len);
      64             : 
      65             :     /* Queue fragment with PT_SLOT_FRAGMENT flag
      66             :      * The drain code checks this flag and sets PT_MSG_FLAG_FRAGMENT on send */
      67           0 :     result = pt_queue_push(ctx, q, frag_buf, total_frag_len,
      68             :                            priority, PT_SLOT_FRAGMENT);
      69             : 
      70           0 :     if (result == 0) {
      71           0 :         PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
      72             :             "Queued fragment: id=%u offset=%u len=%u %s%s",
      73             :             msg_id, offset, frag_len,
      74             :             (frag_flags & PT_FRAGMENT_FLAG_FIRST) ? "FIRST " : "",
      75             :             (frag_flags & PT_FRAGMENT_FLAG_LAST) ? "LAST" : "");
      76             :     }
      77             : 
      78           0 :     return result;
      79             : }
      80             : 
      81             : /* ========================================================================
      82             :  * Batch Send Operations
      83             :  * ======================================================================== */
      84             : 
      85             : /*
      86             :  * Batch send - combine multiple small messages into one TCP packet
      87             :  *
      88             :  * This improves efficiency by reducing TCP/IP overhead for
      89             :  * many small messages common in games.
      90             :  *
      91             :  * NOTE: The actual pt_batch_send() function requires a platform-specific
      92             :  * send function. This is provided by Phases 3/4/5:
      93             :  *   - POSIX: pt_posix_send()
      94             :  *   - MacTCP: pt_mactcp_send()
      95             :  *   - Open Transport: pt_ot_send()
      96             :  *
      97             :  * The batch building (pt_batch_init, pt_batch_add) is platform-independent.
      98             :  */
      99             : 
     100             : /* Batch constants and type are defined in send.h */
     101             : 
     102          10 : void pt_batch_init(pt_batch *batch) {
     103          10 :     batch->used = 0;
     104          10 :     batch->count = 0;
     105          10 :     batch->is_fragment = 0;
     106          10 :     batch->reserved = 0;
     107          10 : }
     108             : 
     109          53 : int pt_batch_add(pt_batch *batch, const void *data, uint16_t len) {
     110          53 :     if (batch->used + PT_BATCH_HEADER + len > PT_BATCH_MAX_SIZE)
     111           2 :         return -1;  /* Batch full */
     112             : 
     113             :     /* Add length prefix (big-endian) */
     114          51 :     batch->buffer[batch->used++] = (len >> 8) & 0xFF;
     115          51 :     batch->buffer[batch->used++] = len & 0xFF;
     116             : 
     117             :     /* Reserved bytes for flags/type */
     118          51 :     batch->buffer[batch->used++] = 0;
     119          51 :     batch->buffer[batch->used++] = 0;
     120             : 
     121             :     /* Add data */
     122          51 :     pt_memcpy(batch->buffer + batch->used, data, len);
     123          51 :     batch->used += len;
     124          51 :     batch->count++;
     125             : 
     126          51 :     return 0;
     127             : }
     128             : 
     129             : /*
     130             :  * Send a batch as a single framed message
     131             :  *
     132             :  * This function builds the complete framed message but delegates
     133             :  * the actual send to the platform layer. The caller (poll loop)
     134             :  * should use the appropriate platform send function.
     135             :  *
     136             :  * Returns: bytes to send (stored in batch->buffer with header prepended),
     137             :  *          or 0 if batch is empty
     138             :  */
     139           2 : int pt_batch_prepare(struct pt_peer *peer, pt_batch *batch) {
     140             :     pt_message_header hdr;
     141             :     uint8_t header_buf[PT_MESSAGE_HEADER_SIZE];
     142             : 
     143           2 :     if (batch->count == 0)
     144           1 :         return 0;
     145             : 
     146             :     /* Build header */
     147           1 :     hdr.version = PT_PROTOCOL_VERSION;
     148           1 :     hdr.type = PT_MSG_TYPE_DATA;
     149           1 :     hdr.flags = PT_MSG_FLAG_BATCH;
     150           1 :     hdr.sequence = peer->hot.send_seq++;
     151           1 :     hdr.payload_len = batch->used;
     152             : 
     153           1 :     pt_message_encode_header(&hdr, header_buf);
     154             : 
     155             :     /*
     156             :      * The caller should send:
     157             :      *   1. header_buf (PT_MESSAGE_HEADER_SIZE bytes)
     158             :      *   2. batch->buffer (batch->used bytes)
     159             :      *   3. CRC-16 of (header + payload)
     160             :      *
     161             :      * Or use the platform's send_framed() function if available.
     162             :      */
     163           1 :     return batch->used;
     164             : }
     165             : 
     166             : /*
     167             :  * NOTE: pt_queue_peek() and pt_queue_consume() were REMOVED.
     168             :  *
     169             :  * These functions bypassed the priority free-lists and coalesce hash,
     170             :  * causing data structure corruption. The batch send code now uses
     171             :  * pt_queue_pop_priority() which properly maintains all data structures.
     172             :  *
     173             :  * If you need peek-like functionality, pop into a local buffer instead:
     174             :  *   uint8_t buf[PT_QUEUE_SLOT_SIZE];
     175             :  *   uint16_t len;
     176             :  *   if (pt_queue_pop_priority(q, buf, &len) == 0) {
     177             :  *       // Process buf[0..len-1]
     178             :  *   }
     179             :  */
     180             : 
     181             : /* pt_batch_send_fn typedef is in send.h */
     182             : 
     183             : /*
     184             :  * Drain send queue in batches
     185             :  *
     186             :  * Called from poll loop - combines queued messages into batches.
     187             :  * The send_fn callback is platform-specific and provided by the
     188             :  * networking layer (Phase 4 POSIX, Phase 5 MacTCP, Phase 6 OT).
     189             :  *
     190             :  * Uses pre-allocated batch buffer from ctx->send_batch to avoid
     191             :  * 1.4KB stack allocation per call.
     192             :  *
     193             :  * IMPORTANT: Uses pt_queue_pop_priority() which properly maintains
     194             :  * priority free-lists and coalesce hash. Messages are batched in
     195             :  * priority order (CRITICAL first, then HIGH, NORMAL, LOW).
     196             :  *
     197             :  * LOGGING: Logs DEBUG on successful batch send, ERR on failure, WARN
     198             :  * for oversized messages.
     199             :  */
     200           3 : int pt_drain_send_queue(struct pt_context *ctx, struct pt_peer *peer,
     201             :                         pt_batch_send_fn send_fn) {
     202             :     pt_batch *batch;
     203           3 :     pt_queue *q = peer->send_queue;
     204             :     const void *msg_data;  /* Zero-copy pointer to slot data */
     205             :     uint16_t len;
     206             :     uint8_t slot_flags;
     207           3 :     int sent = 0;
     208             : 
     209           3 :     if (!ctx || !q || pt_queue_is_empty(q) || !send_fn)
     210           1 :         return 0;
     211             : 
     212             :     /* Use pre-allocated batch buffer from context */
     213           2 :     batch = &ctx->send_batch;
     214           2 :     pt_batch_init(batch);
     215             : 
     216             :     /* Pop messages in priority order using ZERO-COPY direct pop.
     217             :      * This avoids double-copy: instead of slot->temp->batch, we do slot->batch.
     218             :      * On 68k with 2-10 MB/s memory bandwidth, this saves significant time.
     219             :      *
     220             :      * FRAGMENT HANDLING: Fragments (PT_SLOT_FRAGMENT) must be sent individually
     221             :      * with PT_MSG_FLAG_FRAGMENT set, NOT batched with normal messages.
     222             :      *
     223             :      * PROTOCOL: Only commit after successful batch_add to avoid message loss. */
     224           6 :     while (pt_queue_pop_priority_direct(q, &msg_data, &len) == 0) {
     225             :         /* Check if this is a fragment - needs special handling */
     226           4 :         slot_flags = q->slots[q->pending_pop_slot].flags;
     227             : 
     228           4 :         if (slot_flags & PT_SLOT_FRAGMENT) {
     229             :             /* Fragment: Flush any pending batch first, then send fragment alone */
     230           0 :             if (batch->count > 0) {
     231           0 :                 if (send_fn(ctx, peer, batch) == 0) {
     232           0 :                     sent++;
     233           0 :                     PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
     234             :                         "Batch sent: %u messages, %u bytes", batch->count, batch->used);
     235             :                 } else {
     236           0 :                     PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL, "Batch send failed");
     237             :                 }
     238           0 :                 pt_batch_init(batch);
     239             :             }
     240             : 
     241             :             /* Send fragment as its own "batch" with is_fragment flag */
     242           0 :             if (pt_batch_add(batch, msg_data, len) == 0) {
     243           0 :                 batch->is_fragment = 1;
     244           0 :                 if (send_fn(ctx, peer, batch) == 0) {
     245           0 :                     sent++;
     246           0 :                     PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
     247             :                         "Fragment sent: %u bytes", len);
     248             :                 } else {
     249           0 :                     PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL, "Fragment send failed");
     250             :                 }
     251             :             }
     252           0 :             pt_batch_init(batch);
     253           0 :             pt_queue_pop_priority_commit(q);
     254           0 :             continue;
     255             :         }
     256             : 
     257             :         /* Regular message: add to batch */
     258           4 :         if (pt_batch_add(batch, msg_data, len) < 0) {
     259             :             /* Batch full - DON'T commit yet, send current batch first */
     260           0 :             if (send_fn(ctx, peer, batch) == 0) {
     261           0 :                 sent++;
     262           0 :                 PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
     263             :                     "Batch sent: %u messages, %u bytes", batch->count, batch->used);
     264             :             } else {
     265           0 :                 PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL, "Batch send failed");
     266             :             }
     267           0 :             pt_batch_init(batch);
     268             : 
     269             :             /* Now try adding the message to the fresh batch */
     270           0 :             if (pt_batch_add(batch, msg_data, len) < 0) {
     271             :                 /* Message too large even for empty batch - should not happen
     272             :                  * if PT_BATCH_MAX_SIZE > PT_QUEUE_SLOT_SIZE */
     273           0 :                 PT_CTX_WARN(ctx, PT_LOG_CAT_PROTOCOL,
     274             :                     "Message too large for batch (%u bytes), dropped", len);
     275             :                 /* Commit the pop anyway - message is lost (config error) */
     276           0 :                 pt_queue_pop_priority_commit(q);
     277           0 :                 continue;
     278             :             }
     279             :         }
     280             :         /* Message added to batch - now commit the pop */
     281           4 :         pt_queue_pop_priority_commit(q);
     282             :     }
     283             : 
     284             :     /* Send remaining batch */
     285           2 :     if (batch->count > 0) {
     286           2 :         if (send_fn(ctx, peer, batch) == 0) {
     287           1 :             sent++;
     288           1 :             PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
     289             :                 "Batch sent: %u messages, %u bytes", batch->count, batch->used);
     290             :         } else {
     291           1 :             PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL, "Batch send failed");
     292             :         }
     293             :     }
     294             : 
     295           2 :     return sent;
     296             : }
     297             : 
     298             : /* ========================================================================
     299             :  * Tier 2: Direct Buffer Send
     300             :  * ======================================================================== */
     301             : 
     302           0 : int pt_drain_direct_buffer(struct pt_context *ctx, struct pt_peer *peer,
     303             :                            pt_direct_send_fn send_fn) {
     304             :     pt_direct_buffer *buf;
     305             :     int result;
     306             : 
     307           0 :     if (!ctx || !peer || !send_fn) {
     308           0 :         return 0;
     309             :     }
     310             : 
     311           0 :     buf = &peer->send_direct;
     312             : 
     313             :     /* Check if there's data queued to send */
     314           0 :     if (!pt_direct_buffer_ready(buf)) {
     315           0 :         return 0;
     316             :     }
     317             : 
     318             :     /* Mark as sending */
     319           0 :     if (pt_direct_buffer_mark_sending(buf) != 0) {
     320           0 :         return 0;
     321             :     }
     322             : 
     323             :     /* Send via platform callback */
     324           0 :     result = send_fn(ctx, peer, buf->data, buf->length);
     325             : 
     326           0 :     if (result == 0) {
     327           0 :         PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
     328             :                     "Tier 2: Sent %u bytes to peer %u",
     329             :                     buf->length, peer->hot.id);
     330           0 :         pt_direct_buffer_complete(buf);
     331           0 :         return 1;
     332             :     } else {
     333           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_SEND,
     334             :                   "Tier 2: Send failed for peer %u (%u bytes)",
     335             :                   peer->hot.id, buf->length);
     336             :         /* Mark complete even on error to allow retry with new data */
     337           0 :         pt_direct_buffer_complete(buf);
     338           0 :         return -1;
     339             :     }
     340             : }
     341             : 
     342             : /* ========================================================================
     343             :  * Phase 3.5: SendEx API - Priority-based Sending with Coalescing
     344             :  * ======================================================================== */
     345             : 
     346             : /**
     347             :  * Send message to peer with extended options
     348             :  *
     349             :  * Routes message to appropriate transport and queue based on flags:
     350             :  * - PT_SEND_UNRELIABLE: Uses UDP if available, else TCP
     351             :  * - PT_SEND_COALESCABLE: Allows message coalescing with matching key
     352             :  * - PT_SEND_NO_DELAY: Disables Nagle algorithm for this message
     353             :  *
     354             :  * Priority determines queue placement (CRITICAL > HIGH > NORMAL > LOW).
     355             :  * Coalesce key enables deduplication of repeated messages (e.g., position updates).
     356             :  *
     357             :  * FRAGMENTATION: If the message exceeds the peer's negotiated max_message_size
     358             :  * and fragmentation is enabled, the SDK automatically fragments the message.
     359             :  * The receiver reassembles fragments transparently before delivering to the
     360             :  * application callback. App developers never see fragments.
     361             :  *
     362             :  * @param ctx Valid PeerTalk context
     363             :  * @param peer_id Destination peer ID
     364             :  * @param data Message data (not null)
     365             :  * @param length Message length (1-PT_MAX_MESSAGE bytes)
     366             :  * @param priority PT_PRIORITY_* constant
     367             :  * @param flags Bitmask of PT_SEND_* flags
     368             :  * @param coalesce_key Key for message coalescing (0 = no coalescing)
     369             :  *
     370             :  * @return PT_OK on success, PT_ERR_* on failure
     371             :  */
     372         269 : PeerTalk_Error PeerTalk_SendEx(PeerTalk_Context *ctx_pub,
     373             :                                 PeerTalk_PeerID peer_id,
     374             :                                 const void *data,
     375             :                                 uint16_t length,
     376             :                                 uint8_t priority,
     377             :                                 uint8_t flags,
     378             :                                 uint16_t coalesce_key) {
     379         269 :     struct pt_context *ctx = (struct pt_context *)ctx_pub;
     380             :     struct pt_peer *peer;
     381             :     pt_queue *q;
     382             :     int result;
     383             : 
     384             :     /* Validate parameters */
     385         269 :     if (!ctx || ctx->magic != PT_CONTEXT_MAGIC) {
     386           1 :         return PT_ERR_INVALID_STATE;
     387             :     }
     388         268 :     if (!data || length == 0 || length > PT_MAX_MESSAGE_SIZE) {
     389           5 :         return PT_ERR_INVALID_PARAM;
     390             :     }
     391         263 :     if (priority > PT_PRIORITY_CRITICAL) {
     392           2 :         return PT_ERR_INVALID_PARAM;
     393             :     }
     394             : 
     395             :     /* Find peer by ID */
     396         261 :     peer = pt_peer_find_by_id(ctx, peer_id);
     397         261 :     if (!peer) {
     398           2 :         PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
     399             :                    "SendEx failed: Peer %u not found", peer_id);
     400           2 :         return PT_ERR_PEER_NOT_FOUND;
     401             :     }
     402             : 
     403             :     /* ================================================================
     404             :      * FLOW CONTROL: Check peer's reported buffer pressure
     405             :      *
     406             :      * The peer reports its receive queue pressure via capability updates.
     407             :      * When peer reports high pressure, we throttle outgoing sends based
     408             :      * on message priority. This prevents overwhelming slow receivers.
     409             :      *
     410             :      * The throttling is transparent to the app - we return WOULD_BLOCK
     411             :      * so they can retry later. The SDK handles all the complexity.
     412             :      * ================================================================ */
     413         259 :     if (pt_peer_should_throttle(peer, priority)) {
     414           0 :         PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
     415             :                     "Throttled: peer %u pressure=%u%%, priority=%u",
     416             :                     peer_id, peer->cold.caps.buffer_pressure, priority);
     417           0 :         return PT_ERR_WOULD_BLOCK;
     418             :     }
     419             : 
     420             :     /* ================================================================
     421             :      * RATE LIMITING: Token bucket based on peer pressure
     422             :      *
     423             :      * When peer reports high buffer pressure, we automatically enable
     424             :      * rate limiting to prevent flooding. The rate is auto-adjusted
     425             :      * based on pressure level (set in pt_peer_update_adaptive_params).
     426             :      *
     427             :      * Returns PT_ERR_RATE_LIMITED so apps know to back off - this is
     428             :      * different from WOULD_BLOCK (buffer busy) vs rate limited (pacing).
     429             :      * ================================================================ */
     430         259 :     if (pt_peer_check_rate_limit(ctx, peer, length)) {
     431           0 :         PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
     432             :                     "Rate limited: peer %u at %u KB/s, msg=%u bytes",
     433             :                     peer_id, peer->cold.caps.rate_limit_bytes_per_sec / 1024, length);
     434           0 :         return PT_ERR_RATE_LIMITED;
     435             :     }
     436             : 
     437             :     /* ================================================================
     438             :      * FLOW CONTROL: Check send window based on peer's buffer capacity
     439             :      *
     440             :      * After capability exchange, we know the peer's recv_buffer_size.
     441             :      * We calculate a send_window = recv_buffer / max_message to limit
     442             :      * how many messages can be queued/in-flight simultaneously.
     443             :      *
     444             :      * This prevents flooding slow receivers (e.g., 68k Mac) faster
     445             :      * than they can process, which would cause severe send/recv
     446             :      * asymmetry and poor throughput.
     447             :      *
     448             :      * The window is checked against: pipeline.pending_count + queue.count
     449             :      *
     450             :      * CRITICAL priority bypasses this check - control messages (ACKs,
     451             :      * capability updates) must always get through regardless of flow
     452             :      * control state. Without this, protocol messages could be blocked
     453             :      * by their own flow control feedback.
     454             :      * ================================================================ */
     455         259 :     if (priority < PT_PRIORITY_CRITICAL &&
     456         257 :         peer->cold.caps.caps_exchanged && peer->cold.caps.send_window > 0) {
     457           0 :         uint16_t in_flight = peer->pipeline.pending_count;
     458           0 :         q = peer->send_queue;
     459           0 :         if (q) {
     460           0 :             in_flight += q->count;
     461             :         }
     462             : 
     463           0 :         if (in_flight >= peer->cold.caps.send_window) {
     464           0 :             PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
     465             :                 "Flow control: peer %u at window limit (in_flight=%u, window=%u)",
     466             :                 peer_id, in_flight, peer->cold.caps.send_window);
     467           0 :             return PT_ERR_WOULD_BLOCK;
     468             :         }
     469             :     }
     470             : 
     471             :     /* ================================================================
     472             :      * AUTOMATIC FRAGMENTATION
     473             :      *
     474             :      * If message exceeds peer's negotiated max and fragmentation is enabled,
     475             :      * split into fragments. Each fragment is queued separately and
     476             :      * reassembled by the receiver before delivery to app callback.
     477             :      *
     478             :      * PRESSURE-TRIGGERED FRAGMENTATION: When peer reports high buffer
     479             :      * pressure (>= PT_PRESSURE_FRAG_THRESHOLD), we proactively fragment
     480             :      * even messages that would fit, using a reduced max size. This gives
     481             :      * the receiver smaller chunks to process, reducing buffer pressure.
     482             :      *
     483             :      * The app developer never sees this - they just call PeerTalk_Send()
     484             :      * and the SDK handles everything.
     485             :      * ================================================================ */
     486             :     /* Log effective_max on first send to this peer for debugging */
     487         259 :     if (!peer->cold.caps.first_send_logged) {
     488          22 :         PT_CTX_INFO(ctx, PT_LOG_CAT_SEND,
     489             :             "First send to peer %u: effective_max=%u, fragmentation=%s",
     490             :             peer_id, peer->hot.effective_max_msg,
     491             :             ctx->enable_fragmentation ? "enabled" : "disabled");
     492          22 :         peer->cold.caps.first_send_logged = 1;
     493             :     }
     494             : 
     495             :     /* Determine if we need to fragment */
     496             :     {
     497         259 :         int needs_fragmentation = 0;
     498         259 :         uint16_t frag_max = peer->hot.effective_max_msg;
     499             : 
     500         259 :         if (ctx->enable_fragmentation && frag_max > 0) {
     501             :             /* Always fragment if over effective max */
     502         196 :             if (length > frag_max) {
     503           0 :                 needs_fragmentation = 1;
     504             :             }
     505             :             /* Fragment large messages when peer is under pressure
     506             :              * Use configurable threshold from ctx->pressure_frag if available */
     507         196 :             else if (peer->cold.caps.buffer_pressure >= ctx->pressure_frag &&
     508             :                      length > PT_PRESSURE_REDUCED_MAX) {
     509           0 :                 needs_fragmentation = 1;
     510           0 :                 frag_max = PT_PRESSURE_REDUCED_MAX;  /* Use reduced max for this send */
     511           0 :                 PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
     512             :                     "Pressure-triggered fragmentation: peer %u at %u%% pressure, msg %u bytes",
     513             :                     peer_id, peer->cold.caps.buffer_pressure, length);
     514             :             }
     515             :         }
     516             : 
     517         259 :     if (needs_fragmentation) {
     518             : 
     519           0 :         const uint8_t *src = (const uint8_t *)data;
     520             :         uint16_t max_frag_data;
     521           0 :         uint16_t offset = 0;
     522           0 :         uint16_t remaining = length;
     523             :         uint16_t msg_id;
     524             :         uint8_t frag_flags;
     525             :         int frag_result;
     526             : 
     527             :         /* Allocate unique message ID for this fragmented message */
     528           0 :         msg_id = (uint16_t)(ctx->next_message_id++ & 0xFFFF);
     529             : 
     530             :         /* Determine optimal fragment size:
     531             :          * 1. Use peer's optimal_chunk if negotiated (fills 25% of MacTCP buffer)
     532             :          * 2. Fall back to frag_max minus overhead (may be reduced due to pressure)
     533             :          * 3. Cap by queue slot size */
     534           0 :         if (peer->cold.caps.caps_exchanged && peer->cold.caps.optimal_chunk > 0 &&
     535           0 :             peer->cold.caps.optimal_chunk < frag_max) {
     536             :             /* Use peer's advertised optimal chunk for best receive performance */
     537           0 :             max_frag_data = peer->cold.caps.optimal_chunk;
     538             :         } else {
     539             :             /* Default: frag_max minus fragment header overhead
     540             :              * frag_max may be reduced from effective_max_msg due to pressure */
     541           0 :             max_frag_data = frag_max - PT_FRAGMENT_HEADER_SIZE;
     542             :         }
     543             : 
     544             :         /* Cap by frag_max (may be pressure-reduced) */
     545             :         {
     546           0 :             uint16_t max_peer = frag_max - PT_FRAGMENT_HEADER_SIZE;
     547           0 :             if (max_frag_data > max_peer) {
     548           0 :                 max_frag_data = max_peer;
     549             :             }
     550             :         }
     551             : 
     552             :         /* Limit to queue slot size (fragment header + data must fit) */
     553             :         {
     554           0 :             uint16_t max_slot_data = PT_QUEUE_SLOT_SIZE - PT_FRAGMENT_HEADER_SIZE;
     555           0 :             if (max_frag_data > max_slot_data) {
     556           0 :                 max_frag_data = max_slot_data;
     557             :             }
     558             :         }
     559             : 
     560           0 :         if (max_frag_data < 64) {
     561             :             /* Peer's max is too small for practical fragmentation */
     562           0 :             PT_CTX_ERR(ctx, PT_LOG_CAT_SEND,
     563             :                 "Frag max %u too small for fragmentation",
     564             :                 frag_max);
     565           0 :             return PT_ERR_MESSAGE_TOO_LARGE;
     566             :         }
     567             : 
     568           0 :         PT_CTX_INFO(ctx, PT_LOG_CAT_SEND,
     569             :             "Fragmenting %u bytes for peer %u (chunk=%u, max=%u, count=%u)",
     570             :             length, peer_id, max_frag_data, frag_max,
     571             :             (length + max_frag_data - 1) / max_frag_data);
     572             : 
     573           0 :         while (remaining > 0) {
     574           0 :             uint16_t frag_len = (remaining > max_frag_data) ? max_frag_data : remaining;
     575             : 
     576           0 :             frag_flags = 0;
     577           0 :             if (offset == 0) {
     578           0 :                 frag_flags |= PT_FRAGMENT_FLAG_FIRST;
     579             :             }
     580           0 :             if (remaining <= max_frag_data) {
     581           0 :                 frag_flags |= PT_FRAGMENT_FLAG_LAST;
     582             :             }
     583             : 
     584           0 :             frag_result = pt_send_fragment(ctx, peer, src + offset, frag_len,
     585             :                                            msg_id, length, offset, frag_flags,
     586             :                                            priority);
     587             : 
     588           0 :             if (frag_result != 0) {
     589           0 :                 PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
     590             :                     "Fragment send failed at offset %u: %d", offset, frag_result);
     591           0 :                 return frag_result;
     592             :             }
     593             : 
     594           0 :             offset += frag_len;
     595           0 :             remaining -= frag_len;
     596             : 
     597             :             /* NOTE: For synchronous fragmentation, we send one fragment at a time.
     598             :              * The poll loop will drain the direct buffer before we can queue the next.
     599             :              * For now, return after first fragment - caller may need to retry for more.
     600             :              * A full implementation would queue all fragments atomically. */
     601           0 :             if (remaining > 0) {
     602             :                 /* More fragments pending - let poll loop drain first fragment */
     603           0 :                 PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
     604             :                     "Fragment %u/%u queued, %u bytes remaining",
     605             :                     (offset / max_frag_data), ((length + max_frag_data - 1) / max_frag_data),
     606             :                     remaining);
     607             :                 /* For now, continue - real implementation would need state machine */
     608             :             }
     609             :         }
     610             : 
     611           0 :         return PT_OK;
     612             :     }
     613             :     }  /* End of fragmentation decision block */
     614             : 
     615             :     /* Route unreliable sends to UDP if available */
     616         259 :     if (flags & PT_SEND_UNRELIABLE) {
     617             :         /* Check if UDP is available on this platform */
     618           1 :         if (ctx->plat && ctx->plat->send_udp) {
     619           1 :             int udp_result = ctx->plat->send_udp(ctx, peer, data, length);
     620           1 :             if (udp_result == 0) {
     621           1 :                 PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
     622             :                            "Sent %u bytes to peer %u via UDP",
     623             :                            length, peer_id);
     624           1 :                 return PT_OK;
     625             :             }
     626             :             /* UDP failed, fall back to TCP */
     627           0 :             PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
     628             :                        "UDP send failed, falling back to TCP");
     629             :         }
     630             :         /* No UDP available, fall through to TCP */
     631             :     }
     632             : 
     633             :     /* ================================================================
     634             :      * ASYNC SEND PIPELINE (MacTCP optimization)
     635             :      *
     636             :      * Try async TCP send if available and slots are free. This bypasses
     637             :      * the queue for immediate sends, keeping multiple operations in
     638             :      * flight for improved throughput (200-400% on 68k Macs).
     639             :      *
     640             :      * Conditions for async path:
     641             :      * - Platform supports tcp_send_async (MacTCP)
     642             :      * - Peer pipeline is initialized (CONNECTED state)
     643             :      * - At least one slot is available
     644             :      * - Message fits in a single frame (not fragmented above)
     645             :      *
     646             :      * Falls back to queue-based approach if async unavailable or full.
     647             :      * ================================================================ */
     648         258 :     if (ctx->plat && ctx->plat->tcp_send_async &&
     649           0 :         peer->pipeline.initialized &&
     650           0 :         peer->pipeline.pending_count < PT_SEND_PIPELINE_DEPTH) {
     651             : 
     652             :         /* Convert PT_SEND_* flags to PT_MSG_FLAG_* for protocol layer.
     653             :          * The values are designed to match for the common flags:
     654             :          * - PT_SEND_UNRELIABLE (0x01) -> PT_MSG_FLAG_UNRELIABLE (0x01)
     655             :          * - PT_SEND_COALESCABLE (0x02) -> PT_MSG_FLAG_COALESCABLE (0x02)
     656             :          * - PT_SEND_NO_DELAY (0x04) -> PT_MSG_FLAG_NO_DELAY (0x04)
     657             :          * The PT_MSG_FLAG_NO_DELAY flag is used to set pushFlag=1 in MacTCP. */
     658           0 :         uint8_t msg_flags = flags & 0x0F;  /* Mask to low nibble (protocol flags) */
     659             : 
     660           0 :         int async_err = ctx->plat->tcp_send_async(ctx, peer, data, length, msg_flags);
     661           0 :         if (async_err == PT_OK) {
     662           0 :             PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
     663             :                         "Async send: %u bytes to peer %u (slots used: %d/%d)",
     664             :                         length, peer_id, peer->pipeline.pending_count,
     665             :                         PT_SEND_PIPELINE_DEPTH);
     666           0 :             return PT_OK;
     667             :         }
     668           0 :         if (async_err != PT_ERR_WOULD_BLOCK) {
     669             :             /* Fatal error - don't fallback */
     670           0 :             return async_err;
     671             :         }
     672             :         /* WOULD_BLOCK means slots full - fall through to queue */
     673           0 :         PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
     674             :                     "Async pipeline full (%d pending), falling back to queue",
     675             :                     peer->pipeline.pending_count);
     676             :     }
     677             : 
     678             :     /* Two-tier routing: large messages go to Tier 2, small to Tier 1 */
     679             :     {
     680         258 :         uint16_t threshold = ctx->direct_threshold;
     681         258 :         if (threshold == 0) {
     682           0 :             threshold = PT_DIRECT_THRESHOLD;
     683             :         }
     684             : 
     685         258 :         if (length > threshold) {
     686             :             /* Tier 2: Direct buffer for large messages */
     687          82 :             result = pt_direct_buffer_queue(&peer->send_direct, data, length, priority);
     688          82 :             if (result == PT_ERR_WOULD_BLOCK) {
     689           1 :                 PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
     690             :                             "Tier 2 buffer busy for peer %u, caller should retry",
     691             :                             peer_id);
     692           1 :                 return PT_ERR_WOULD_BLOCK;
     693             :             }
     694          81 :             if (result != 0) {
     695           0 :                 PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
     696             :                            "Tier 2 queue failed for peer %u: %d", peer_id, result);
     697           0 :                 return result;
     698             :             }
     699             : 
     700          81 :             PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
     701             :                         "Tier 2: Queued %u bytes to peer %u (pri=%u)",
     702             :                         length, peer_id, priority);
     703          81 :             return PT_OK;
     704             :         }
     705             :     }
     706             : 
     707             :     /* Tier 1: Queue for small messages */
     708         176 :     q = peer->send_queue;
     709         176 :     if (!q) {
     710           2 :         PT_CTX_ERR(ctx, PT_LOG_CAT_SEND,
     711             :                   "SendEx failed: Peer %u has no send queue", peer_id);
     712           2 :         return PT_ERR_INVALID_STATE;
     713             :     }
     714             : 
     715             :     /* Check backpressure before queuing
     716             :      * pt_queue_pressure() returns 0-100 (percentage), not 0.0-1.0 */
     717             :     {
     718         174 :         uint8_t pressure = pt_queue_pressure(q);
     719         174 :         if (pressure >= 90) {
     720             :             /* Queue >90% full - reject LOW priority messages */
     721           5 :             if (priority == PT_PRIORITY_LOW) {
     722           5 :                 PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
     723             :                            "Queue pressure %u%% - rejecting LOW priority message",
     724             :                            (unsigned)pressure);
     725           5 :                 return PT_ERR_BUFFER_FULL;
     726             :             }
     727             :         }
     728         169 :         if (pressure >= 75) {
     729             :             /* Queue >75% full - reject NORMAL priority messages */
     730          91 :             if (priority == PT_PRIORITY_NORMAL) {
     731          88 :                 PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
     732             :                            "Queue pressure %u%% - rejecting NORMAL priority message",
     733             :                            (unsigned)pressure);
     734          88 :                 return PT_ERR_BUFFER_FULL;
     735             :             }
     736             :         }
     737             :     }
     738             : 
     739             :     /* Enqueue message with priority and optional coalescing */
     740          81 :     if (flags & PT_SEND_COALESCABLE && coalesce_key != 0) {
     741             :         /* Coalescing enabled - replace existing message with same key */
     742           5 :         result = pt_queue_push_coalesce(q, data, length, priority, coalesce_key);
     743             :     } else {
     744             :         /* Normal enqueue */
     745          76 :         result = pt_queue_push(ctx, q, data, length, priority, 0);
     746             :     }
     747             : 
     748          81 :     if (result != 0) {
     749           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
     750             :                    "Queue full - message dropped (peer=%u, len=%u, pri=%u)",
     751             :                    peer_id, length, priority);
     752           0 :         return PT_ERR_BUFFER_FULL;
     753             :     }
     754             : 
     755             :     /* Flags are now handled by tcp_send_async (MacTCP pushFlag control)
     756             :      * and protocol layer (PT_MSG_FLAG_* in message headers). */
     757             : 
     758          81 :     PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
     759             :                 "Queued %u bytes to peer %u (pri=%u, flags=0x%02X, key=%u)",
     760             :                 length, peer_id, priority, flags, coalesce_key);
     761             : 
     762          81 :     return PT_OK;
     763             : }
     764             : 
     765             : /**
     766             :  * Send message to peer (simple wrapper)
     767             :  *
     768             :  * Sends with default priority (NORMAL), no special flags, and no coalescing.
     769             :  * Most applications should use this for typical reliable messaging.
     770             :  *
     771             :  * @param ctx Valid PeerTalk context
     772             :  * @param peer_id Destination peer ID
     773             :  * @param data Message data (not null)
     774             :  * @param length Message length (1-PT_MAX_MESSAGE bytes)
     775             :  *
     776             :  * @return PT_OK on success, PT_ERR_* on failure
     777             :  */
     778         228 : PeerTalk_Error PeerTalk_Send(PeerTalk_Context *ctx_pub,
     779             :                               PeerTalk_PeerID peer_id,
     780             :                               const void *data,
     781             :                               uint16_t length) {
     782             :     /* Delegate to SendEx with default parameters */
     783         228 :     return PeerTalk_SendEx(ctx_pub, peer_id, data, length,
     784             :                           PT_PRIORITY_NORMAL,
     785             :                           PT_SEND_DEFAULT,
     786             :                           0);  /* No coalescing */
     787             : }

Generated by: LCOV version 1.14