LCOV - code coverage report
Current view: top level - core - peer.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 256 389 65.8 %
Date: 2026-02-22 12:14:12 Functions: 18 22 81.8 %

          Line data    Source code
       1             : /* peer.c - Peer management implementation */
       2             : 
       3             : #include "peer.h"
       4             : #include "queue.h"
       5             : #include "pt_compat.h"
       6             : #include "direct_buffer.h"
       7             : #include "protocol.h"
       8             : #include "../../include/peertalk.h"
       9             : 
      10             : /* ========================================================================
      11             :  * Peer List Operations
      12             :  * ======================================================================== */
      13             : 
      14         241 : int pt_peer_list_init(struct pt_context *ctx, uint16_t max_peers)
      15             : {
      16             :     size_t alloc_size;
      17             :     uint16_t i;
      18             : 
      19         241 :     if (!ctx || ctx->magic != PT_CONTEXT_MAGIC) {
      20           0 :         return PT_ERR_INVALID_PARAM;
      21             :     }
      22             : 
      23             :     /* Allocate peer array */
      24         241 :     alloc_size = sizeof(struct pt_peer) * max_peers;
      25         241 :     ctx->peers = (struct pt_peer *)pt_alloc_clear(alloc_size);
      26         241 :     if (!ctx->peers) {
      27           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_MEMORY,
      28             :                   "Failed to allocate %zu bytes for %u peers",
      29             :                   alloc_size, max_peers);
      30           0 :         return PT_ERR_NO_MEMORY;
      31             :     }
      32             : 
      33             :     /* Initialize all peer slots */
      34        3801 :     for (i = 0; i < max_peers; i++) {
      35        3560 :         struct pt_peer *peer = &ctx->peers[i];
      36        3560 :         peer->hot.id = (PeerTalk_PeerID)(i + 1);  /* IDs start at 1 */
      37        3560 :         peer->hot.state = PT_PEER_STATE_UNUSED;
      38        3560 :         peer->hot.magic = 0;  /* Not valid until created */
      39        3560 :         peer->hot.name_idx = (uint8_t)i;  /* Index into centralized name table */
      40             :     }
      41             : 
      42         241 :     ctx->max_peers = max_peers;
      43         241 :     ctx->peer_count = 0;
      44             : 
      45         241 :     PT_CTX_INFO(ctx, PT_LOG_CAT_INIT,
      46             :                "Peer list initialized: %u slots, %zu bytes",
      47             :                max_peers, alloc_size);
      48             : 
      49         241 :     return 0;
      50             : }
      51             : 
      52         242 : void pt_peer_list_free(struct pt_context *ctx)
      53             : {
      54         242 :     if (!ctx || !ctx->peers) {
      55           1 :         return;
      56             :     }
      57             : 
      58         241 :     pt_free(ctx->peers);
      59         241 :     ctx->peers = NULL;
      60         241 :     ctx->max_peers = 0;
      61         241 :     ctx->peer_count = 0;
      62             : }
      63             : 
      64             : /* ========================================================================
      65             :  * Peer Lookup Functions
      66             :  * ======================================================================== */
      67             : 
      68         306 : struct pt_peer *pt_peer_find_by_id(struct pt_context *ctx, PeerTalk_PeerID id)
      69             : {
      70             :     uint8_t index;
      71             :     struct pt_peer *peer;
      72             : 
      73         306 :     if (!ctx || ctx->magic != PT_CONTEXT_MAGIC) {
      74           0 :         return NULL;
      75             :     }
      76             : 
      77             :     /* ID 0 is invalid, IDs start at 1 */
      78         306 :     if (id == 0 || id > ctx->max_peers) {
      79          19 :         return NULL;
      80             :     }
      81             : 
      82             :     /* Convert ID to array index (IDs are 1-based) */
      83         287 :     index = (uint8_t)(id - 1);
      84         287 :     peer = &ctx->peers[index];
      85             : 
      86             :     /* Check if peer is valid */
      87         287 :     if (peer->hot.state == PT_PEER_STATE_UNUSED ||
      88         285 :         peer->hot.magic != PT_PEER_MAGIC) {
      89           2 :         return NULL;
      90             :     }
      91             : 
      92         285 :     return peer;
      93             : }
      94             : 
      95          92 : struct pt_peer *pt_peer_find_by_addr(struct pt_context *ctx,
      96             :                                       uint32_t ip, uint16_t port)
      97             : {
      98             :     uint16_t i;
      99             : 
     100             :     /* DOD PERFORMANCE NOTE:
     101             :      * This function is called on EVERY incoming packet to identify which peer
     102             :      * it belongs to. Currently it accesses peer->info.address and peer->info.port
     103             :      * which are in cold storage (~1.4KB per peer). On 68030 with 256-byte cache,
     104             :      * scanning 16 peers touches 22KB+ causing severe cache thrashing.
     105             :      *
     106             :      * RECOMMENDED OPTIMIZATIONS (Phase 1 modifications):
     107             :      * 1. Move address/port to pt_peer_hot struct (adds 6 bytes to hot data)
     108             :      * 2. OR: Add peer_addr_hash[] lookup table to pt_context (similar to
     109             :      *    peer_id_to_index[]) using simple hash: (ip ^ port) & (PT_MAX_PEERS-1)
     110             :      *
     111             :      * For low peer counts (<8), the current linear scan is acceptable.
     112             :      * For higher counts or on 68000/68020 (no cache), optimization is critical.
     113             :      *
     114             :      * CRITICAL: This function is called on EVERY incoming packet - must access
     115             :      * only hot data
     116             :      */
     117             : 
     118          92 :     if (!ctx || ctx->magic != PT_CONTEXT_MAGIC) {
     119           0 :         return NULL;
     120             :     }
     121             : 
     122        1404 :     for (i = 0; i < ctx->max_peers; i++) {
     123        1314 :         struct pt_peer *peer = &ctx->peers[i];
     124             : 
     125        1314 :         if (peer->hot.state != PT_PEER_STATE_UNUSED &&
     126          10 :             peer->cold.info.address == ip &&
     127           9 :             peer->cold.info.port == port) {
     128           2 :             return peer;
     129             :         }
     130             :     }
     131             : 
     132          90 :     return NULL;
     133             : }
     134             : 
     135           7 : struct pt_peer *pt_peer_find_by_name(struct pt_context *ctx, const char *name)
     136             : {
     137             :     uint16_t i;
     138             : 
     139             :     /* DOD Optimization: Use centralized peer_names[] table from Phase 1.
     140             :      * This avoids accessing cold storage (peer->info.name) which is ~1.4KB
     141             :      * per peer. On 68030 with 256-byte cache, this prevents severe cache
     142             :      * thrashing when scanning multiple peers.
     143             :      *
     144             :      * Phase 1 stores names in ctx->peer_names[name_idx] and the index
     145             :      * is stored in peer->hot.name_idx (hot storage, 32 bytes per peer).
     146             :      */
     147             : 
     148           7 :     if (!ctx || ctx->magic != PT_CONTEXT_MAGIC) {
     149           0 :         return NULL;
     150             :     }
     151             : 
     152           7 :     if (!name || name[0] == '\0') {
     153           2 :         return NULL;
     154             :     }
     155             : 
     156          45 :     for (i = 0; i < ctx->max_peers; i++) {
     157          42 :         struct pt_peer *peer = &ctx->peers[i];
     158             :         const char *peer_name;
     159             : 
     160          42 :         if (peer->hot.state == PT_PEER_STATE_UNUSED) {
     161          39 :             continue;
     162             :         }
     163             : 
     164             :         /* Get peer name from centralized table */
     165           3 :         peer_name = ctx->peer_names[peer->hot.name_idx];
     166             : 
     167             :         /* Compare strings manually (no pt_strcmp available) */
     168             :         {
     169           3 :             const char *a = peer_name;
     170           3 :             const char *b = name;
     171          17 :             while (*a && *b && *a == *b) {
     172          14 :                 a++;
     173          14 :                 b++;
     174             :             }
     175           3 :             if (*a == *b) {
     176           2 :                 return peer;
     177             :             }
     178             :         }
     179             :     }
     180             : 
     181           3 :     return NULL;
     182             : }
     183             : 
     184          65 : struct pt_peer *pt_peer_find_unused(struct pt_context *ctx)
     185             : {
     186             :     uint16_t i;
     187             : 
     188          65 :     if (!ctx || ctx->magic != PT_CONTEXT_MAGIC) {
     189           0 :         return NULL;
     190             :     }
     191             : 
     192          68 :     for (i = 0; i < ctx->max_peers; i++) {
     193          68 :         struct pt_peer *peer = &ctx->peers[i];
     194          68 :         if (peer->hot.state == PT_PEER_STATE_UNUSED) {
     195          65 :             return peer;
     196             :         }
     197             :     }
     198             : 
     199           0 :     return NULL;
     200             : }
     201             : 
     202             : /* ========================================================================
     203             :  * Peer Lifecycle
     204             :  * ======================================================================== */
     205             : 
     206          65 : struct pt_peer *pt_peer_create(struct pt_context *ctx,
     207             :                                const char *name,
     208             :                                uint32_t ip, uint16_t port)
     209             : {
     210             :     struct pt_peer *peer;
     211             : 
     212          65 :     if (!ctx || ctx->magic != PT_CONTEXT_MAGIC) {
     213           0 :         return NULL;
     214             :     }
     215             : 
     216             :     /* Check if peer already exists by address */
     217          65 :     peer = pt_peer_find_by_addr(ctx, ip, port);
     218          65 :     if (peer) {
     219             :         /* Update last_seen and name */
     220           0 :         peer->hot.last_seen = ctx->plat->get_ticks();
     221             : 
     222           0 :         if (name && name[0] != '\0') {
     223           0 :             size_t name_len = pt_strlen(name);
     224           0 :             if (name_len > PT_MAX_PEER_NAME) {
     225           0 :                 name_len = PT_MAX_PEER_NAME;
     226             :             }
     227           0 :             pt_memcpy(ctx->peer_names[peer->hot.name_idx], name, name_len);
     228           0 :             ctx->peer_names[peer->hot.name_idx][name_len] = '\0';
     229             :         }
     230             : 
     231           0 :         return peer;
     232             :     }
     233             : 
     234             :     /* Find unused slot */
     235          65 :     peer = pt_peer_find_unused(ctx);
     236          65 :     if (!peer) {
     237           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_CONNECT,
     238             :                    "No available peer slots (max %u)", ctx->max_peers);
     239           0 :         return NULL;
     240             :     }
     241             : 
     242             :     /* Initialize peer */
     243          65 :     pt_memset(&peer->cold, 0, sizeof(peer->cold));
     244             : 
     245             :     /* Clear buffer lengths */
     246          65 :     peer->cold.obuflen = 0;
     247          65 :     peer->cold.ibuflen = 0;
     248             : 
     249             : #ifdef PT_DEBUG
     250             :     /* Set canaries in debug mode */
     251             :     peer->cold.obuf_canary = PT_CANARY_OBUF;
     252             :     peer->cold.ibuf_canary = PT_CANARY_IBUF;
     253             : #endif
     254             : 
     255             :     /* Set magic */
     256          65 :     peer->hot.magic = PT_PEER_MAGIC;
     257             : 
     258             :     /* Set address and port */
     259          65 :     peer->cold.info.address = ip;
     260          65 :     peer->cold.info.port = port;
     261             : 
     262             :     /* Initialize addresses array for PeerTalk_Connect() */
     263          65 :     peer->hot.address_count = 1;
     264          65 :     peer->cold.addresses[0].address = ip;
     265          65 :     peer->cold.addresses[0].port = port;
     266          65 :     peer->cold.addresses[0].transport = 0;  /* TCPIP transport */
     267             : 
     268             :     /* Clear connection state */
     269          65 :     peer->cold.info.connected = 0;
     270          65 :     peer->hot.latency_ms = 0;
     271          65 :     peer->cold.info.queue_pressure = 0;
     272             : 
     273             :     /* Set initial state */
     274          65 :     peer->hot.state = PT_PEER_STATE_DISCOVERED;
     275             : 
     276             :     /* Update last_seen */
     277          65 :     peer->hot.last_seen = ctx->plat->get_ticks();
     278             : 
     279             :     /* Clear connection timing and sequence numbers */
     280          65 :     peer->cold.ping_sent_time = 0;
     281          65 :     peer->hot.send_seq = 0;
     282          65 :     peer->hot.recv_seq = 0;
     283             : 
     284             :     /* Initialize capabilities with conservative defaults for pre-exchange.
     285             :      * These are used if the peer doesn't support capability negotiation
     286             :      * (legacy peer) or until the capability message arrives. Values update
     287             :      * when PT_MSG_TYPE_CAPABILITY is received in poll loop. */
     288          65 :     peer->cold.caps.max_message_size = PT_CAP_DEFAULT_MAX_MSG;  /* 512 = conservative */
     289          65 :     peer->cold.caps.preferred_chunk = PT_CAP_DEFAULT_CHUNK;     /* 1024 */
     290          65 :     peer->cold.caps.capability_flags = 0;
     291          65 :     peer->cold.caps.buffer_pressure = 0;
     292          65 :     peer->cold.caps.caps_exchanged = 0;
     293          65 :     peer->cold.caps.send_window = PT_FLOW_WINDOW_DEFAULT;  /* Updated after cap exchange */
     294          65 :     peer->hot.effective_max_msg = PT_CAP_DEFAULT_MAX_MSG;  /* Conservative default */
     295          65 :     peer->hot.effective_chunk = 1024;  /* Default chunk for unknown RTT */
     296          65 :     peer->hot.pipeline_depth = 2;      /* Conservative pipeline depth */
     297             : 
     298             :     /* Initialize reassembly state */
     299          65 :     peer->cold.reassembly.message_id = 0;
     300          65 :     peer->cold.reassembly.total_length = 0;
     301          65 :     peer->cold.reassembly.received_length = 0;
     302          65 :     peer->cold.reassembly.active = 0;
     303          65 :     peer->cold.reassembly.reserved = 0;
     304             : 
     305             :     /* Initialize stream state */
     306          65 :     peer->stream.data = NULL;
     307          65 :     peer->stream.user_data = NULL;
     308          65 :     peer->stream.on_complete = NULL;
     309          65 :     peer->stream.total_length = 0;
     310          65 :     peer->stream.bytes_sent = 0;
     311          65 :     peer->stream.active = 0;
     312          65 :     peer->stream.cancelled = 0;
     313             : 
     314             :     /* Clear connection handle */
     315          65 :     peer->hot.connection = NULL;
     316             : 
     317             :     /* Copy name */
     318          65 :     if (name && name[0] != '\0') {
     319          44 :         size_t name_len = pt_strlen(name);
     320          44 :         if (name_len > PT_MAX_PEER_NAME) {
     321           0 :             name_len = PT_MAX_PEER_NAME;
     322             :         }
     323          44 :         pt_memcpy(ctx->peer_names[peer->hot.name_idx], name, name_len);
     324          44 :         ctx->peer_names[peer->hot.name_idx][name_len] = '\0';
     325             :     } else {
     326          21 :         ctx->peer_names[peer->hot.name_idx][0] = '\0';
     327             :     }
     328             : 
     329             :     /* Initialize Tier 2 direct buffers for large messages */
     330             :     {
     331          65 :         uint16_t buf_size = ctx->direct_buffer_size;
     332          65 :         if (buf_size == 0) {
     333          11 :             buf_size = PT_DIRECT_DEFAULT_SIZE;
     334             :         }
     335          65 :         if (pt_direct_buffer_init(&peer->send_direct, buf_size) != 0) {
     336           0 :             PT_CTX_WARN(ctx, PT_LOG_CAT_MEMORY,
     337             :                        "Failed to allocate send direct buffer for peer %u",
     338             :                        peer->hot.id);
     339             :             /* Non-fatal: peer can still use Tier 1 queue */
     340             :         }
     341          65 :         if (pt_direct_buffer_init(&peer->recv_direct, buf_size) != 0) {
     342           0 :             PT_CTX_WARN(ctx, PT_LOG_CAT_MEMORY,
     343             :                        "Failed to allocate recv direct buffer for peer %u",
     344             :                        peer->hot.id);
     345             :             /* Non-fatal: small messages still work */
     346             :         }
     347             :     }
     348             : 
     349             :     /* Increment peer count */
     350          65 :     ctx->peer_count++;
     351             : 
     352          65 :     PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
     353             :                "Peer created: id=%u name='%s' addr=0x%08X port=%u",
     354             :                peer->hot.id, ctx->peer_names[peer->hot.name_idx], ip, port);
     355             : 
     356          65 :     return peer;
     357             : }
     358             : 
     359           2 : void pt_peer_destroy(struct pt_context *ctx, struct pt_peer *peer)
     360             : {
     361           2 :     if (!ctx || !peer || peer->hot.magic != PT_PEER_MAGIC) {
     362           0 :         return;
     363             :     }
     364             : 
     365           2 :     PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
     366             :                "Peer destroyed: id=%u name='%s'",
     367             :                peer->hot.id, ctx->peer_names[peer->hot.name_idx]);
     368             : 
     369             :     /* Cleanup async send pipeline */
     370           2 :     pt_pipeline_cleanup(ctx, peer);
     371             : 
     372             :     /* Free Tier 2 direct buffers */
     373           2 :     pt_direct_buffer_free(&peer->send_direct);
     374           2 :     pt_direct_buffer_free(&peer->recv_direct);
     375             : 
     376             :     /* Clear sensitive data */
     377           2 :     peer->hot.magic = 0;
     378           2 :     peer->hot.state = PT_PEER_STATE_UNUSED;
     379           2 :     ctx->peer_names[peer->hot.name_idx][0] = '\0';
     380           2 :     peer->cold.info.address = 0;
     381           2 :     peer->cold.info.port = 0;
     382           2 :     peer->cold.info.connected = 0;
     383           2 :     peer->hot.connection = NULL;
     384             : 
     385             :     /* Decrement peer count */
     386           2 :     if (ctx->peer_count > 0) {
     387           2 :         ctx->peer_count--;
     388             :     }
     389             : }
     390             : 
     391             : /* ========================================================================
     392             :  * State Management
     393             :  * ======================================================================== */
     394             : 
     395          80 : int pt_peer_set_state(struct pt_context *ctx, struct pt_peer *peer,
     396             :                       pt_peer_state new_state)
     397             : {
     398             :     pt_peer_state old_state;
     399          80 :     int valid_transition = 0;
     400             : 
     401          80 :     if (!peer || peer->hot.magic != PT_PEER_MAGIC) {
     402           0 :         return PT_ERR_INVALID_PARAM;
     403             :     }
     404             : 
     405          80 :     old_state = peer->hot.state;
     406             : 
     407             :     /* Check if transition is valid */
     408          80 :     switch (old_state) {
     409           2 :     case PT_PEER_STATE_UNUSED:
     410           2 :         valid_transition = (new_state == PT_PEER_STATE_DISCOVERED);
     411           2 :         break;
     412             : 
     413          44 :     case PT_PEER_STATE_DISCOVERED:
     414          27 :         valid_transition = (new_state == PT_PEER_STATE_CONNECTING ||
     415           6 :                            new_state == PT_PEER_STATE_CONNECTED ||
     416          71 :                            new_state == PT_PEER_STATE_DISCOVERED ||
     417             :                            new_state == PT_PEER_STATE_UNUSED);
     418          44 :         break;
     419             : 
     420          16 :     case PT_PEER_STATE_CONNECTING:
     421           2 :         valid_transition = (new_state == PT_PEER_STATE_CONNECTED ||
     422          18 :                            new_state == PT_PEER_STATE_FAILED ||
     423             :                            new_state == PT_PEER_STATE_UNUSED);
     424          16 :         break;
     425             : 
     426           2 :     case PT_PEER_STATE_CONNECTED:
     427           0 :         valid_transition = (new_state == PT_PEER_STATE_DISCONNECTING ||
     428           2 :                            new_state == PT_PEER_STATE_FAILED ||
     429             :                            new_state == PT_PEER_STATE_UNUSED);
     430           2 :         break;
     431             : 
     432          11 :     case PT_PEER_STATE_DISCONNECTING:
     433             :         /* Allow transition to DISCOVERED for reconnection support */
     434          11 :         valid_transition = (new_state == PT_PEER_STATE_UNUSED ||
     435             :                            new_state == PT_PEER_STATE_DISCOVERED);
     436          11 :         break;
     437             : 
     438           5 :     case PT_PEER_STATE_FAILED:
     439           5 :         valid_transition = (new_state == PT_PEER_STATE_UNUSED ||
     440             :                            new_state == PT_PEER_STATE_DISCOVERED);
     441           5 :         break;
     442             : 
     443           0 :     default:
     444           0 :         valid_transition = 0;
     445           0 :         break;
     446             :     }
     447             : 
     448          80 :     if (!valid_transition) {
     449           8 :         if (ctx) {
     450           8 :             PT_CTX_WARN(ctx, PT_LOG_CAT_CONNECT,
     451             :                        "Invalid state transition: %s → %s (peer id=%u)",
     452             :                        pt_peer_state_str(old_state),
     453             :                        pt_peer_state_str(new_state),
     454             :                        peer->hot.id);
     455             :         }
     456           8 :         return PT_ERR_INVALID_STATE;
     457             :     }
     458             : 
     459             :     /* Perform transition */
     460          72 :     peer->hot.state = new_state;
     461             : 
     462             :     /* Log transition */
     463          72 :     if (ctx) {
     464          72 :         if (new_state == PT_PEER_STATE_CONNECTED) {
     465             :             /* Operational visibility for connections */
     466          35 :             PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
     467             :                        "Peer state: %s → %s (peer id=%u)",
     468             :                        pt_peer_state_str(old_state),
     469             :                        pt_peer_state_str(new_state),
     470             :                        peer->hot.id);
     471             :         } else {
     472             :             /* Verbose diagnostics for other transitions */
     473          37 :             PT_CTX_DEBUG(ctx, PT_LOG_CAT_CONNECT,
     474             :                         "Peer state: %s → %s (peer id=%u)",
     475             :                         pt_peer_state_str(old_state),
     476             :                         pt_peer_state_str(new_state),
     477             :                         peer->hot.id);
     478             :         }
     479             :     }
     480             : 
     481          72 :     return 0;
     482             : }
     483             : 
     484         160 : const char *pt_peer_state_str(pt_peer_state state)
     485             : {
     486         160 :     switch (state) {
     487           3 :     case PT_PEER_STATE_UNUSED:        return "UNUSED";
     488          59 :     case PT_PEER_STATE_DISCOVERED:    return "DISCOVERED";
     489          33 :     case PT_PEER_STATE_CONNECTING:    return "CONNECTING";
     490          39 :     case PT_PEER_STATE_CONNECTED:     return "CONNECTED";
     491          15 :     case PT_PEER_STATE_DISCONNECTING: return "DISCONNECTING";
     492          11 :     case PT_PEER_STATE_FAILED:        return "FAILED";
     493           0 :     default:                          return "UNKNOWN";
     494             :     }
     495             : }
     496             : 
     497             : /* ========================================================================
     498             :  * Timeout & Validation
     499             :  * ======================================================================== */
     500             : 
     501          11 : int pt_peer_is_timed_out(struct pt_peer *peer, pt_tick_t now,
     502             :                          pt_tick_t timeout_ticks)
     503             : {
     504             :     pt_tick_t elapsed;
     505             : 
     506          11 :     if (!peer || peer->hot.last_seen == 0) {
     507           2 :         return 0;
     508             :     }
     509             : 
     510           9 :     elapsed = now - peer->hot.last_seen;
     511           9 :     return (elapsed > timeout_ticks) ? 1 : 0;
     512             : }
     513             : 
     514             : /* cppcheck-suppress constParameter ; peer could be const but keeping non-const for API consistency */
     515           1 : int pt_peer_check_canaries(struct pt_context *ctx, struct pt_peer *peer)
     516             : {
     517           1 :     int corrupted = 0;
     518             : 
     519           1 :     if (!peer) {
     520           0 :         return -1;
     521             :     }
     522             : 
     523             : #ifdef PT_DEBUG
     524             :     /* Check output buffer canary */
     525             :     if (peer->cold.obuf_canary != PT_CANARY_OBUF) {
     526             :         if (ctx) {
     527             :             PT_CTX_ERR(ctx, PT_LOG_CAT_MEMORY,
     528             :                       "Output buffer overflow detected (peer id=%u): "
     529             :                       "expected 0x%08X, got 0x%08X",
     530             :                       peer->hot.id, PT_CANARY_OBUF, peer->cold.obuf_canary);
     531             :         }
     532             :         corrupted = 1;
     533             :     }
     534             : 
     535             :     /* Check input buffer canary */
     536             :     if (peer->cold.ibuf_canary != PT_CANARY_IBUF) {
     537             :         if (ctx) {
     538             :             PT_CTX_ERR(ctx, PT_LOG_CAT_MEMORY,
     539             :                       "Input buffer overflow detected (peer id=%u): "
     540             :                       "expected 0x%08X, got 0x%08X",
     541             :                       peer->hot.id, PT_CANARY_IBUF, peer->cold.ibuf_canary);
     542             :         }
     543             :         corrupted = 1;
     544             :     }
     545             : #else
     546             :     /* In release builds, canaries are not present - always return valid */
     547             :     (void)ctx;  /* Suppress unused warning */
     548             : #endif
     549             : 
     550           1 :     return corrupted ? -1 : 0;
     551             : }
     552             : 
     553           3 : void pt_peer_get_info(struct pt_peer *peer, PeerTalk_PeerInfo *info)
     554             : {
     555           3 :     if (!peer || !info) {
     556           0 :         return;
     557             :     }
     558             : 
     559             :     /* Copy peer info */
     560           3 :     pt_memcpy(info, &peer->cold.info, sizeof(PeerTalk_PeerInfo));
     561             : 
     562             :     /* Update fields from hot data */
     563           3 :     info->id = peer->hot.id;
     564           3 :     info->latency_ms = peer->hot.latency_ms;
     565           3 :     info->name_idx = peer->hot.name_idx;
     566             : 
     567             :     /* Update connected field based on current state */
     568           3 :     info->connected = (peer->hot.state == PT_PEER_STATE_CONNECTED) ? 1 : 0;
     569             : }
     570             : 
     571             : /* ========================================================================
     572             :  * Flow Control
     573             :  * ======================================================================== */
     574             : 
     575             : /**
     576             :  * Get the pressure threshold level (0, 25, 50, 75, 100) for a pressure value
     577             :  */
     578         640 : static uint8_t pt_pressure_level(uint8_t pressure)
     579             : {
     580         640 :     if (pressure >= 75) return 75;
     581         639 :     if (pressure >= 50) return 50;
     582         637 :     if (pressure >= 25) return 25;
     583         637 :     return 0;
     584             : }
     585             : 
     586         320 : int pt_peer_check_pressure_update(struct pt_context *ctx, struct pt_peer *peer)
     587             : {
     588             :     uint8_t send_pressure;
     589             :     uint8_t recv_pressure;
     590             :     uint8_t current_pressure;
     591             :     uint8_t current_level;
     592             :     uint8_t last_level;
     593             : 
     594         320 :     if (!ctx || !peer || peer->hot.magic != PT_PEER_MAGIC) {
     595           0 :         return 0;
     596             :     }
     597             : 
     598             :     /* Only check for connected peers */
     599         320 :     if (peer->hot.state != PT_PEER_STATE_CONNECTED) {
     600           0 :         return 0;
     601             :     }
     602             : 
     603             :     /* Get pressure from BOTH queues - report the worse one.
     604             :      *
     605             :      * This captures the actual constraint regardless of where it is:
     606             :      * - High send_pressure: "I can't transmit fast enough" (echo backlog)
     607             :      * - High recv_pressure: "I can't receive fast enough" (processing backlog)
     608             :      *
     609             :      * On MacTCP, recv uses zero-copy so recv_queue is often empty.
     610             :      * The real bottleneck shows up in send_queue when echoing back.
     611             :      * By reporting MAX, we capture whichever is the actual constraint.
     612             :      */
     613         320 :     send_pressure = peer->send_queue ? pt_queue_pressure(peer->send_queue) : 0;
     614         320 :     recv_pressure = peer->recv_queue ? pt_queue_pressure(peer->recv_queue) : 0;
     615         320 :     current_pressure = (send_pressure > recv_pressure) ? send_pressure : recv_pressure;
     616             : 
     617             :     /* Quantize to threshold levels for hysteresis */
     618         320 :     current_level = pt_pressure_level(current_pressure);
     619         320 :     last_level = pt_pressure_level(peer->cold.caps.last_reported_pressure);
     620             : 
     621             :     /* Check if we crossed a threshold */
     622         320 :     if (current_level != last_level) {
     623           2 :         PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
     624             :             "Pressure threshold crossed for peer %u: %u%% -> %u%% (level %u -> %u)",
     625             :             peer->hot.id, peer->cold.caps.last_reported_pressure,
     626             :             current_pressure, last_level, current_level);
     627             : 
     628             :         /* Mark update pending - poll loop will send capability message */
     629           2 :         peer->cold.caps.pressure_update_pending = 1;
     630           2 :         peer->cold.caps.last_reported_pressure = current_pressure;
     631             : 
     632           2 :         return 1;
     633             :     }
     634             : 
     635         318 :     return 0;
     636             : }
     637             : 
     638         279 : int pt_peer_should_throttle(struct pt_peer *peer, uint8_t priority)
     639             : {
     640             :     /* NOTE: This version uses hardcoded thresholds for backward compatibility.
     641             :      * The context-aware version pt_peer_should_throttle_ctx() uses configurable
     642             :      * thresholds from ctx->pressure_*. The send path uses this simpler version
     643             :      * which relies on the PT_PRESSURE_* constants. */
     644             :     uint8_t peer_pressure;
     645             : 
     646         279 :     if (!peer || peer->hot.magic != PT_PEER_MAGIC) {
     647           0 :         return 0;  /* Don't throttle on invalid peer */
     648             :     }
     649             : 
     650             :     /* Get peer's reported buffer pressure */
     651         279 :     peer_pressure = peer->cold.caps.buffer_pressure;
     652             : 
     653             :     /* Decision thresholds based on peer's reported pressure:
     654             :      *
     655             :      * 0-49:  No throttle (send normally)
     656             :      * 50-84: Light throttle (skip LOW priority)
     657             :      * 85-94: Heavy throttle (skip NORMAL and LOW)
     658             :      * 95+:   Blocking (only CRITICAL passes)
     659             :      *
     660             :      * This implements sender-side flow control based on receiver feedback.
     661             :      * The receiver reports its queue pressure via capability updates,
     662             :      * and we back off accordingly to prevent overwhelming it.
     663             :      */
     664             : 
     665         279 :     if (peer_pressure >= PT_PRESSURE_CRITICAL) {
     666             :         /* Blocking: only CRITICAL priority passes */
     667           7 :         if (priority < PT_PRIORITY_CRITICAL) {
     668           4 :             return 1;  /* Throttle */
     669             :         }
     670         272 :     } else if (peer_pressure >= PT_PRESSURE_HIGH) {
     671             :         /* Heavy throttle: skip NORMAL and LOW */
     672           5 :         if (priority < PT_PRIORITY_HIGH) {
     673           3 :             return 1;  /* Throttle */
     674             :         }
     675         267 :     } else if (peer_pressure >= PT_PRESSURE_MEDIUM) {
     676             :         /* Light throttle: skip LOW priority */
     677           5 :         if (priority < PT_PRIORITY_NORMAL) {
     678           2 :             return 1;  /* Throttle */
     679             :         }
     680             :     }
     681             : 
     682         270 :     return 0;  /* Don't throttle - send normally */
     683             : }
     684             : 
     685         264 : int pt_peer_check_rate_limit(struct pt_context *ctx, struct pt_peer *peer,
     686             :                               uint16_t bytes)
     687             : {
     688             :     pt_tick_t now, elapsed;
     689             :     uint32_t tokens_to_add;
     690             : 
     691         264 :     if (!peer || peer->hot.magic != PT_PEER_MAGIC) {
     692           1 :         return 0;  /* Don't rate limit invalid peer */
     693             :     }
     694             : 
     695             :     /* No rate limit if disabled */
     696         263 :     if (peer->cold.caps.rate_limit_bytes_per_sec == 0) {
     697         260 :         return 0;
     698             :     }
     699             : 
     700             :     /* Get current time */
     701           3 :     if (ctx && ctx->plat && ctx->plat->get_ticks) {
     702           2 :         now = ctx->plat->get_ticks();
     703             :     } else {
     704           1 :         return 0;  /* Can't rate limit without timer */
     705             :     }
     706             : 
     707             :     /* Refill tokens based on elapsed time
     708             :      * tokens_to_add = elapsed_ms * bytes_per_sec / 1000
     709             :      * Avoid overflow by doing division first for large values */
     710           2 :     elapsed = now - peer->cold.caps.rate_last_update;
     711           2 :     if (elapsed > 0) {
     712             :         /* Calculate tokens to add (milliseconds * bytes/sec / 1000)
     713             :          * Use 64-bit intermediate to avoid overflow */
     714           0 :         uint32_t rate = peer->cold.caps.rate_limit_bytes_per_sec;
     715           0 :         tokens_to_add = (uint32_t)(((uint32_t)elapsed * rate) / 1000);
     716             : 
     717           0 :         peer->cold.caps.rate_bucket_tokens += tokens_to_add;
     718             : 
     719             :         /* Cap at bucket max */
     720           0 :         if (peer->cold.caps.rate_bucket_tokens > peer->cold.caps.rate_bucket_max) {
     721           0 :             peer->cold.caps.rate_bucket_tokens = peer->cold.caps.rate_bucket_max;
     722             :         }
     723             : 
     724           0 :         peer->cold.caps.rate_last_update = now;
     725             :     }
     726             : 
     727             :     /* Check if we have enough tokens */
     728           2 :     if (peer->cold.caps.rate_bucket_tokens < bytes) {
     729           1 :         return 1;  /* Rate limited - not enough tokens */
     730             :     }
     731             : 
     732             :     /* Consume tokens */
     733           1 :     peer->cold.caps.rate_bucket_tokens -= bytes;
     734           1 :     return 0;  /* OK to send */
     735             : }
     736             : 
     737             : /* ========================================================================== */
     738             : /* Async Send Pipeline Management                                             */
     739             : /* ========================================================================== */
     740             : 
     741             : /**
     742             :  * Calculate buffer size for a pipeline slot
     743             :  */
     744           0 : static size_t pt_pipeline_buf_size(void)
     745             : {
     746           0 :     return (size_t)PT_PIPELINE_MAX_PAYLOAD + PT_MESSAGE_HEADER_SIZE + 4;
     747             : }
     748             : 
     749           0 : int pt_pipeline_init(struct pt_context *ctx, struct pt_peer *peer)
     750             : {
     751             :     int i;
     752             :     size_t buf_size;
     753             : 
     754           0 :     if (!ctx || !peer || peer->hot.magic != PT_PEER_MAGIC) {
     755           0 :         return PT_ERR_INVALID_PARAM;
     756             :     }
     757             : 
     758             :     /* Already initialized? */
     759           0 :     if (peer->pipeline.initialized) {
     760           0 :         return PT_OK;
     761             :     }
     762             : 
     763           0 :     buf_size = pt_pipeline_buf_size();
     764             : 
     765             :     /* Initialize slot metadata */
     766           0 :     for (i = 0; i < PT_SEND_PIPELINE_DEPTH; i++) {
     767           0 :         peer->pipeline.slots[i].in_use = 0;
     768           0 :         peer->pipeline.slots[i].completed = 0;
     769           0 :         peer->pipeline.slots[i].platform_data = NULL;
     770             : 
     771             :         /* Initialize WDS sentinel */
     772           0 :         peer->pipeline.slots[i].wds[1].length = 0;
     773           0 :         peer->pipeline.slots[i].wds[1].ptr = NULL;
     774             : 
     775             : #ifdef PT_LOWMEM
     776             :         /* Lazy allocation: defer buffer allocation until first use.
     777             :          * This saves ~2KB per peer on Mac SE (4MB RAM) when peers
     778             :          * aren't actively sending data. */
     779             :         peer->pipeline.slots[i].buffer = NULL;
     780             :         peer->pipeline.slots[i].buffer_size = (uint16_t)buf_size;  /* Remember target size */
     781             : #else
     782             :         /* Eager allocation: allocate all buffers upfront.
     783             :          * Better for high-throughput scenarios. */
     784           0 :         peer->pipeline.slots[i].buffer = (uint8_t *)pt_alloc(buf_size);
     785           0 :         if (!peer->pipeline.slots[i].buffer) {
     786           0 :             PT_CTX_WARN(ctx, PT_LOG_CAT_MEMORY,
     787             :                 "Pipeline init failed: slot %d alloc (%zu bytes)", i, buf_size);
     788           0 :             pt_pipeline_cleanup(ctx, peer);
     789           0 :             return PT_ERR_NO_MEMORY;
     790             :         }
     791           0 :         peer->pipeline.slots[i].buffer_size = (uint16_t)buf_size;
     792             : #endif
     793             :     }
     794             : 
     795           0 :     peer->pipeline.pending_count = 0;
     796           0 :     peer->pipeline.next_slot = 0;
     797           0 :     peer->pipeline.initialized = 1;
     798             : 
     799             : #ifdef PT_LOWMEM
     800             :     PT_CTX_DEBUG(ctx, PT_LOG_CAT_MEMORY,
     801             :         "Pipeline init (lazy): peer=%u depth=%d buf_size=%zu",
     802             :         peer->hot.id, PT_SEND_PIPELINE_DEPTH, buf_size);
     803             : #else
     804           0 :     PT_CTX_DEBUG(ctx, PT_LOG_CAT_MEMORY,
     805             :         "Pipeline init: peer=%u depth=%d buf_size=%zu",
     806             :         peer->hot.id, PT_SEND_PIPELINE_DEPTH, buf_size);
     807             : #endif
     808             : 
     809           0 :     return PT_OK;
     810             : }
     811             : 
     812           0 : pt_send_slot *pt_pipeline_get_slot(struct pt_context *ctx, struct pt_peer *peer)
     813             : {
     814             :     int i;
     815             : 
     816           0 :     if (!peer || !peer->pipeline.initialized) {
     817           0 :         return NULL;
     818             :     }
     819             : 
     820             :     /* Find a free slot */
     821           0 :     for (i = 0; i < PT_SEND_PIPELINE_DEPTH; i++) {
     822           0 :         pt_send_slot *slot = &peer->pipeline.slots[i];
     823             : 
     824           0 :         if (!slot->in_use) {
     825             : #ifdef PT_LOWMEM
     826             :             /* Lazy allocation: allocate buffer on first use */
     827             :             if (!slot->buffer) {
     828             :                 size_t buf_size = pt_pipeline_buf_size();
     829             :                 slot->buffer = (uint8_t *)pt_alloc(buf_size);
     830             :                 if (!slot->buffer) {
     831             :                     PT_CTX_WARN(ctx, PT_LOG_CAT_MEMORY,
     832             :                         "Pipeline lazy alloc failed: slot %d (%zu bytes)",
     833             :                         i, buf_size);
     834             :                     return NULL;  /* Allocation failed */
     835             :                 }
     836             :                 PT_CTX_DEBUG(ctx, PT_LOG_CAT_MEMORY,
     837             :                     "Pipeline lazy alloc: peer=%u slot=%d", peer->hot.id, i);
     838             :             }
     839             : #else
     840             :             (void)ctx;  /* Unused in non-lowmem builds */
     841             : #endif
     842           0 :             return slot;
     843             :         }
     844             :     }
     845             : 
     846           0 :     return NULL;  /* All slots busy */
     847             : }
     848             : 
     849           2 : void pt_pipeline_cleanup(struct pt_context *ctx, struct pt_peer *peer)
     850             : {
     851             :     int i;
     852             :     uint8_t pending;
     853             : 
     854           2 :     if (!peer) {
     855           0 :         return;
     856             :     }
     857             : 
     858           2 :     pending = peer->pipeline.pending_count;
     859             : 
     860             :     /* Free buffers */
     861          10 :     for (i = 0; i < PT_SEND_PIPELINE_DEPTH; i++) {
     862           8 :         if (peer->pipeline.slots[i].buffer) {
     863           0 :             pt_free(peer->pipeline.slots[i].buffer);
     864           0 :             peer->pipeline.slots[i].buffer = NULL;
     865             :         }
     866             :         /* Note: platform_data cleanup is platform's responsibility */
     867           8 :         peer->pipeline.slots[i].in_use = 0;
     868           8 :         peer->pipeline.slots[i].completed = 0;
     869             :     }
     870             : 
     871           2 :     peer->pipeline.pending_count = 0;
     872           2 :     peer->pipeline.next_slot = 0;
     873           2 :     peer->pipeline.initialized = 0;
     874             : 
     875           2 :     if (ctx && pending > 0) {
     876           0 :         PT_CTX_DEBUG(ctx, PT_LOG_CAT_MEMORY,
     877             :             "Pipeline cleanup: peer=%u pending=%u",
     878             :             peer->hot.id, pending);
     879             :     }
     880             : }
     881             : 
     882             : /* ========================================================================== */
     883             : /* Adaptive Performance Tuning                                                */
     884             : /* ========================================================================== */
     885             : 
     886           0 : void pt_peer_update_adaptive_params(struct pt_context *ctx, struct pt_peer *peer)
     887             : {
     888             :     uint16_t rtt;
     889             :     uint16_t new_chunk;
     890             :     uint16_t peer_optimal;
     891             :     uint8_t new_pipeline;
     892             :     uint16_t new_window;
     893             :     uint32_t new_rate_limit;
     894             :     uint8_t peer_pressure;
     895             : 
     896           0 :     if (!peer || peer->hot.magic != PT_PEER_MAGIC) {
     897           0 :         return;
     898             :     }
     899             : 
     900           0 :     rtt = peer->hot.latency_ms;
     901           0 :     peer_optimal = peer->cold.caps.optimal_chunk;
     902           0 :     peer_pressure = peer->cold.caps.buffer_pressure;
     903             : 
     904             :     /* Tuning logic based on measured RTT
     905             :      *
     906             :      * Larger chunks reduce per-message overhead but increase latency.
     907             :      * Smaller chunks are better for slow/lossy links.
     908             :      * Pipeline depth controls how many messages in flight.
     909             :      */
     910           0 :     if (rtt < 50) {
     911             :         /* Fast LAN - maximize throughput */
     912           0 :         new_chunk = 4096;
     913           0 :         new_pipeline = 4;
     914           0 :         new_window = 6;   /* Larger window for better utilization */
     915           0 :     } else if (rtt < 100) {
     916             :         /* Good connection */
     917           0 :         new_chunk = 2048;
     918           0 :         new_pipeline = 3;
     919           0 :         new_window = 4;   /* Normal */
     920           0 :     } else if (rtt < 200) {
     921             :         /* Moderate latency */
     922           0 :         new_chunk = 1024;
     923           0 :         new_pipeline = 2;
     924           0 :         new_window = 3;   /* Moderate latency */
     925             :     } else {
     926             :         /* Slow/lossy - minimize in-flight data */
     927           0 :         new_chunk = 512;
     928           0 :         new_pipeline = 1;
     929           0 :         new_window = 2;   /* High latency - smaller window */
     930             :     }
     931             : 
     932             :     /* Clamp window to protocol limits */
     933           0 :     if (new_window < PT_FLOW_WINDOW_MIN) new_window = PT_FLOW_WINDOW_MIN;
     934           0 :     if (new_window > PT_FLOW_WINDOW_MAX) new_window = PT_FLOW_WINDOW_MAX;
     935             : 
     936             :     /* Incorporate peer's optimal_chunk from capability exchange.
     937             :      * The peer advertises their 25% threshold - the ideal chunk size
     938             :      * that triggers efficient receive completion on their end.
     939             :      * Use the smaller of RTT-based and peer-optimal to be safe. */
     940           0 :     if (peer_optimal > 0 && peer_optimal < new_chunk) {
     941           0 :         uint16_t rtt_based_chunk = new_chunk;  /* Save RTT-based value for logging */
     942           0 :         new_chunk = peer_optimal;
     943           0 :         if (ctx) {
     944           0 :             PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
     945             :                 "Using peer %u optimal_chunk=%u (smaller than RTT-based %u)",
     946             :                 peer->hot.id, peer_optimal, rtt_based_chunk);
     947             :         }
     948             :     }
     949             : 
     950             :     /* ================================================================
     951             :      * AUTO RATE LIMITING: Adjust rate based on peer's reported pressure
     952             :      *
     953             :      * When peer reports high buffer pressure, we automatically enable
     954             :      * rate limiting to prevent overwhelming them. This replaces manual
     955             :      * rate limiting in application code (e.g., perf_partner's sleep).
     956             :      *
     957             :      * Uses configurable thresholds from ctx->pressure_* if available,
     958             :      * otherwise falls back to PT_PRESSURE_* constants.
     959             :      * ================================================================ */
     960             :     {
     961           0 :         uint8_t thresh_high = ctx ? ctx->pressure_high : PT_PRESSURE_HIGH;
     962           0 :         uint8_t thresh_med = ctx ? ctx->pressure_medium : PT_PRESSURE_MEDIUM;
     963             : 
     964           0 :         if (peer_pressure >= thresh_high) {
     965             :             /* High pressure: aggressive rate limiting */
     966           0 :             new_rate_limit = 50 * 1024;  /* 50 KB/s */
     967           0 :         } else if (peer_pressure >= thresh_med) {
     968             :             /* Medium pressure: moderate rate limiting */
     969           0 :             new_rate_limit = 100 * 1024; /* 100 KB/s */
     970             :         } else {
     971             :             /* Low/no pressure: unlimited */
     972           0 :             new_rate_limit = 0;
     973             :         }
     974             :     }
     975             : 
     976             :     /* Update rate limit if changed */
     977           0 :     if (peer->cold.caps.rate_limit_bytes_per_sec != new_rate_limit) {
     978           0 :         peer->cold.caps.rate_limit_bytes_per_sec = new_rate_limit;
     979             : 
     980             :         /* Initialize/reset token bucket when rate limit changes */
     981           0 :         if (new_rate_limit > 0) {
     982             :             /* Bucket max = 2x rate (allows small bursts) */
     983           0 :             peer->cold.caps.rate_bucket_max = new_rate_limit * 2;
     984             :             /* Start with full bucket */
     985           0 :             peer->cold.caps.rate_bucket_tokens = peer->cold.caps.rate_bucket_max;
     986             :             /* Initialize timer */
     987           0 :             if (ctx && ctx->plat && ctx->plat->get_ticks) {
     988           0 :                 peer->cold.caps.rate_last_update = ctx->plat->get_ticks();
     989             :             }
     990             :         }
     991             : 
     992           0 :         if (ctx) {
     993           0 :             if (new_rate_limit > 0) {
     994           0 :                 PT_CTX_INFO(ctx, PT_LOG_CAT_PROTOCOL,
     995             :                     "Rate limit enabled for peer %u: %u KB/s (pressure=%u%%)",
     996             :                     peer->hot.id, new_rate_limit / 1024, peer_pressure);
     997             :             } else {
     998           0 :                 PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
     999             :                     "Rate limit disabled for peer %u (pressure=%u%%)",
    1000             :                     peer->hot.id, peer_pressure);
    1001             :             }
    1002             :         }
    1003             :     }
    1004             : 
    1005             :     /* Update send window if changed */
    1006           0 :     if (peer->cold.caps.send_window != new_window) {
    1007           0 :         if (ctx) {
    1008           0 :             PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
    1009             :                 "Adaptive window for peer %u: %u -> %u (RTT=%ums)",
    1010             :                 peer->hot.id, peer->cold.caps.send_window, new_window, rtt);
    1011             :         }
    1012           0 :         peer->cold.caps.send_window = new_window;
    1013             :     }
    1014             : 
    1015             :     /* Only log chunk/pipeline if they actually changed */
    1016           0 :     if (peer->hot.effective_chunk != new_chunk ||
    1017           0 :         peer->hot.pipeline_depth != new_pipeline) {
    1018             : 
    1019           0 :         peer->hot.effective_chunk = new_chunk;
    1020           0 :         peer->hot.pipeline_depth = new_pipeline;
    1021             : 
    1022           0 :         if (ctx) {
    1023           0 :             PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
    1024             :                 "Adaptive tuning for peer %u: RTT=%ums chunk=%u pipeline=%u peer_optimal=%u",
    1025             :                 peer->hot.id, rtt, new_chunk, new_pipeline, peer_optimal);
    1026             :         }
    1027             :     }
    1028             : }

Generated by: LCOV version 1.14