LCOV - code coverage report
Current view: top level - posix - net_posix.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 788 1321 59.7 %
Date: 2026-02-22 12:14:12 Functions: 40 44 90.9 %

          Line data    Source code
       1             : /*
       2             :  * PeerTalk POSIX Networking Implementation
       3             :  *
       4             :  * Session 4.1: UDP Discovery
       5             :  * - Non-blocking UDP broadcast for peer discovery
       6             :  * - Local IP detection with fallback strategies
       7             :  * - Discovery packet handling (ANNOUNCE, QUERY, GOODBYE)
       8             :  */
       9             : 
      10             : #include "net_posix.h"
      11             : #include "protocol.h"
      12             : #include "peer.h"
      13             : #include "queue.h"
      14             : #include "direct_buffer.h"
      15             : #include "stream.h"
      16             : #include "pt_compat.h"
      17             : #include <sys/socket.h>
      18             : #include <netinet/in.h>
      19             : #include <netinet/tcp.h>
      20             : #include <arpa/inet.h>
      21             : #include <fcntl.h>
      22             : #include <unistd.h>
      23             : #include <errno.h>
      24             : #include <string.h>
      25             : #include <ifaddrs.h>
      26             : #include <stdlib.h>
      27             : #include <poll.h>
      28             : #include <sys/uio.h>
      29             : 
      30             : /* ========================================================================== */
      31             : /* Port Configuration                                                         */
      32             : /* ========================================================================== */
      33             : 
      34             : #define DEFAULT_DISCOVERY_PORT 7353
      35             : #define DEFAULT_TCP_PORT 7354
      36             : #define DEFAULT_UDP_PORT 7355
      37             : 
      38             : /* Port accessor macros - use config if set, otherwise defaults */
      39             : #define DISCOVERY_PORT(ctx) \
      40             :     ((ctx)->config.discovery_port > 0 ? (ctx)->config.discovery_port : DEFAULT_DISCOVERY_PORT)
      41             : #define TCP_PORT(ctx) \
      42             :     ((ctx)->config.tcp_port > 0 ? (ctx)->config.tcp_port : DEFAULT_TCP_PORT)
      43             : #define UDP_PORT(ctx) \
      44             :     ((ctx)->config.udp_port > 0 ? (ctx)->config.udp_port : DEFAULT_UDP_PORT)
      45             : 
      46             : /* ========================================================================== */
      47             : /* Helper Functions                                                           */
      48             : /* ========================================================================== */
      49             : 
      50             : /**
      51             :  * Set socket to non-blocking mode
      52             :  *
      53             :  * Returns: 0 on success, -1 on failure
      54             :  */
      55         308 : static int set_nonblocking(struct pt_context *ctx, int fd) {
      56         308 :     int flags = fcntl(fd, F_GETFL, 0);
      57         308 :     if (flags < 0) {
      58           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_NETWORK,
      59             :                    "Failed to get socket flags: %s", strerror(errno));
      60           0 :         return -1;
      61             :     }
      62             : 
      63         308 :     if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
      64           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_NETWORK,
      65             :                    "Failed to set non-blocking: %s", strerror(errno));
      66           0 :         return -1;
      67             :     }
      68             : 
      69         308 :     return 0;
      70             : }
      71             : 
      72             : /**
      73             :  * Enable SO_BROADCAST on socket
      74             :  *
      75             :  * Returns: 0 on success, -1 on failure
      76             :  */
      77          19 : static int set_broadcast(struct pt_context *ctx, int fd) {
      78          19 :     int opt = 1;
      79          19 :     if (setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &opt, sizeof(opt)) < 0) {
      80           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_NETWORK,
      81             :                    "Failed to enable broadcast: %s", strerror(errno));
      82           0 :         return -1;
      83             :     }
      84          19 :     return 0;
      85             : }
      86             : 
      87             : /**
      88             :  * Enable SO_REUSEADDR on socket
      89             :  *
      90             :  * Allows fast restart without "Address already in use" error.
      91             :  *
      92             :  * Returns: 0 on success, -1 on failure
      93             :  */
      94         276 : static int set_reuseaddr(struct pt_context *ctx, int fd) {
      95         276 :     int opt = 1;
      96         276 :     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
      97           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_NETWORK,
      98             :                    "Failed to set SO_REUSEADDR: %s", strerror(errno));
      99           0 :         return -1;
     100             :     }
     101         276 :     return 0;
     102             : }
     103             : 
     104             : /**
     105             :  * Get local IP address using three-tier fallback strategy
     106             :  *
     107             :  * 1. Try getifaddrs() - works on air-gapped networks and normal LANs
     108             :  * 2. Fall back to "connect to 8.8.8.8" trick - works in containers with NAT
     109             :  * 3. Return loopback (127.0.0.1) as last resort
     110             :  *
     111             :  * Returns: Local IP in host byte order
     112             :  */
     113         230 : static uint32_t get_local_ip(struct pt_context *ctx) {
     114             :     struct ifaddrs *ifaddr, *ifa;
     115         230 :     uint32_t local_ip = 0;
     116             : 
     117             :     /* Strategy 1: getifaddrs() - preferred method */
     118         230 :     if (getifaddrs(&ifaddr) == 0) {
     119         920 :         for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) {
     120         920 :             if (ifa->ifa_addr == NULL)
     121           0 :                 continue;
     122             : 
     123         920 :             if (ifa->ifa_addr->sa_family == AF_INET) {
     124         460 :                 struct sockaddr_in *addr = (struct sockaddr_in *)ifa->ifa_addr;
     125         460 :                 uint32_t ip = ntohl(addr->sin_addr.s_addr);
     126             : 
     127             :                 /* Skip loopback */
     128         460 :                 if ((ip >> 24) == 127)
     129         230 :                     continue;
     130             : 
     131             :                 /* Found valid interface */
     132         230 :                 local_ip = ip;
     133         230 :                 PT_CTX_INFO(ctx, PT_LOG_CAT_NETWORK,
     134             :                             "Local IP detected via getifaddrs: %u.%u.%u.%u",
     135             :                             (ip >> 24) & 0xFF, (ip >> 16) & 0xFF,
     136             :                             (ip >> 8) & 0xFF, ip & 0xFF);
     137         230 :                 break;
     138             :             }
     139             :         }
     140         230 :         freeifaddrs(ifaddr);
     141             :     }
     142             : 
     143             :     /* Strategy 2: Connect to 8.8.8.8 (Google DNS) - works in containers */
     144         230 :     if (local_ip == 0) {
     145           0 :         int sock = socket(AF_INET, SOCK_DGRAM, 0);
     146           0 :         if (sock >= 0) {
     147             :             struct sockaddr_in addr;
     148           0 :             pt_memset(&addr, 0, sizeof(addr));
     149           0 :             addr.sin_family = AF_INET;
     150           0 :             addr.sin_addr.s_addr = htonl(0x08080808);  /* 8.8.8.8 */
     151           0 :             addr.sin_port = htons(53);
     152             : 
     153           0 :             if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) == 0) {
     154             :                 struct sockaddr_in local_addr;
     155           0 :                 socklen_t len = sizeof(local_addr);
     156           0 :                 if (getsockname(sock, (struct sockaddr *)&local_addr, &len) == 0) {
     157           0 :                     local_ip = ntohl(local_addr.sin_addr.s_addr);
     158           0 :                     PT_CTX_INFO(ctx, PT_LOG_CAT_NETWORK,
     159             :                                 "Local IP detected via 8.8.8.8 trick: %u.%u.%u.%u",
     160             :                                 (local_ip >> 24) & 0xFF, (local_ip >> 16) & 0xFF,
     161             :                                 (local_ip >> 8) & 0xFF, local_ip & 0xFF);
     162             :                 }
     163             :             }
     164           0 :             close(sock);
     165             :         }
     166             :     }
     167             : 
     168             :     /* Strategy 3: Fallback to loopback */
     169         230 :     if (local_ip == 0) {
     170           0 :         local_ip = 0x7F000001;  /* 127.0.0.1 */
     171           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK,
     172             :                    "Could not detect local IP, using loopback 127.0.0.1");
     173             :     }
     174             : 
     175         230 :     return local_ip;
     176             : }
     177             : 
     178             : /* ========================================================================== */
     179             : /* Platform Size/Init                                                         */
     180             : /* ========================================================================== */
     181             : 
     182         230 : size_t pt_posix_extra_size(void) {
     183         230 :     return sizeof(pt_posix_data);
     184             : }
     185             : 
     186         230 : int pt_posix_net_init(struct pt_context *ctx) {
     187             :     pt_posix_data *pd;
     188             :     size_t i;
     189             : 
     190         230 :     if (!ctx) {
     191           0 :         return -1;
     192             :     }
     193             : 
     194         230 :     pd = pt_posix_get(ctx);
     195             : 
     196             :     /* Initialize HOT fields */
     197         230 :     pd->max_fd = -1;
     198         230 :     pd->active_count = 0;
     199         230 :     pd->fd_dirty = 1;  /* Initial build needed */
     200         230 :     pd->batch_count = 0;
     201         230 :     pd->last_announce = 0;
     202         230 :     pd->local_ip = get_local_ip(ctx);
     203             : 
     204             :     /* Clear active peers tracking */
     205         230 :     pt_memset(pd->active_peers, 0, sizeof(pd->active_peers));
     206         230 :     pt_memset(pd->active_position, 0xFF, sizeof(pd->active_position));  /* -1 */
     207             : 
     208             :     /* Initialize WARM fields - sockets to -1 */
     209         230 :     pd->discovery_sock = -1;
     210         230 :     pd->listen_sock = -1;
     211         230 :     pd->udp_msg_sock = -1;
     212         230 :     pd->broadcast_addr = INADDR_BROADCAST;  /* 255.255.255.255 */
     213         230 :     pd->discovery_port = DISCOVERY_PORT(ctx);
     214         230 :     pd->listen_port = TCP_PORT(ctx);
     215         230 :     pd->udp_msg_port = UDP_PORT(ctx);
     216             : 
     217             :     /* Initialize fd_set cache */
     218        3910 :     FD_ZERO(&pd->cached_read_fds);
     219        3910 :     FD_ZERO(&pd->cached_write_fds);
     220             : 
     221             :     /* Initialize COLD fields - TCP sockets */
     222        3910 :     for (i = 0; i < PT_MAX_PEERS; i++) {
     223        3680 :         pd->tcp_socks[i] = -1;
     224             :     }
     225             : 
     226             :     /* CRITICAL: Allocate recv_bufs separately for cache efficiency */
     227         230 :     pd->recv_bufs = (pt_recv_buffer *)pt_alloc_clear(
     228             :         sizeof(pt_recv_buffer) * PT_MAX_PEERS);
     229         230 :     if (!pd->recv_bufs) {
     230           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_MEMORY,
     231             :                    "Failed to allocate receive buffers");
     232           0 :         return -1;
     233             :     }
     234             : 
     235             :     /* Initialize receive buffer states */
     236        3910 :     for (i = 0; i < PT_MAX_PEERS; i++) {
     237        3680 :         pd->recv_bufs[i].hot.state = PT_RECV_HEADER;
     238        3680 :         pd->recv_bufs[i].hot.bytes_needed = PT_COMPACT_HEADER_SIZE;
     239        3680 :         pd->recv_bufs[i].hot.bytes_received = 0;
     240             :     }
     241             : 
     242             :     /* Initialize UDP messaging socket (Session 4.4) */
     243         230 :     if (pt_posix_udp_init(ctx) < 0) {
     244           0 :         pt_free(pd->recv_bufs);
     245           0 :         pd->recv_bufs = NULL;
     246           0 :         return -1;
     247             :     }
     248             : 
     249         230 :     PT_CTX_INFO(ctx, PT_LOG_CAT_NETWORK,
     250             :                 "POSIX networking initialized (discovery=%u, tcp=%u, udp=%u)",
     251             :                 pd->discovery_port, pd->listen_port, pd->udp_msg_port);
     252             : 
     253         230 :     return 0;
     254             : }
     255             : 
     256         230 : void pt_posix_net_shutdown(struct pt_context *ctx) {
     257             :     pt_posix_data *pd;
     258             :     size_t i;
     259             : 
     260         230 :     if (!ctx) {
     261           0 :         return;
     262             :     }
     263             : 
     264         230 :     pd = pt_posix_get(ctx);
     265             : 
     266             :     /* Close server sockets */
     267         230 :     if (pd->discovery_sock >= 0) {
     268           4 :         close(pd->discovery_sock);
     269           4 :         pd->discovery_sock = -1;
     270             :     }
     271             : 
     272         230 :     if (pd->listen_sock >= 0) {
     273           4 :         close(pd->listen_sock);
     274           4 :         pd->listen_sock = -1;
     275             :     }
     276             : 
     277         230 :     if (pd->udp_msg_sock >= 0) {
     278         230 :         close(pd->udp_msg_sock);
     279         230 :         pd->udp_msg_sock = -1;
     280             :     }
     281             : 
     282             :     /* Close peer TCP sockets */
     283        3910 :     for (i = 0; i < PT_MAX_PEERS; i++) {
     284        3680 :         if (pd->tcp_socks[i] >= 0) {
     285          23 :             close(pd->tcp_socks[i]);
     286          23 :             pd->tcp_socks[i] = -1;
     287             :         }
     288             :     }
     289             : 
     290             :     /* Free receive buffers */
     291         230 :     if (pd->recv_bufs) {
     292         230 :         pt_free(pd->recv_bufs);
     293         230 :         pd->recv_bufs = NULL;
     294             :     }
     295             : 
     296         230 :     PT_CTX_INFO(ctx, PT_LOG_CAT_NETWORK, "POSIX networking shut down");
     297             : }
     298             : 
     299             : /* ========================================================================== */
     300             : /* Queue Lifecycle Helpers (Session 4.3.5)                                   */
     301             : /* ========================================================================== */
     302             : 
     303             : /**
     304             :  * Allocate and initialize a peer queue
     305             :  *
     306             :  * @param ctx PeerTalk context
     307             :  * @return Allocated queue or NULL on failure
     308             :  */
     309          64 : static pt_queue *pt_alloc_peer_queue(struct pt_context *ctx) {
     310             :     pt_queue *q;
     311             :     int result;
     312             : 
     313          64 :     q = (pt_queue *)pt_alloc(sizeof(pt_queue));
     314          64 :     if (!q) {
     315           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_MEMORY,
     316             :             "Failed to allocate queue: out of memory");
     317           0 :         return NULL;
     318             :     }
     319             : 
     320          64 :     result = pt_queue_init(ctx, q, 16);
     321          64 :     if (result != 0) {
     322           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_MEMORY,
     323             :             "Failed to initialize queue: error %d", result);
     324           0 :         pt_free(q);
     325           0 :         return NULL;
     326             :     }
     327             : 
     328             :     /* Phase 3 extensions initialized automatically by pt_queue_init() */
     329             : 
     330          64 :     return q;
     331             : }
     332             : 
     333             : /**
     334             :  * Free a peer queue
     335             :  *
     336             :  * @param q Queue to free (can be NULL)
     337             :  */
     338          24 : static void pt_free_peer_queue(pt_queue *q) {
     339          24 :     if (q) {
     340          18 :         pt_queue_free(q);
     341          18 :         pt_free(q);
     342             :     }
     343          24 : }
     344             : 
     345             : /* ========================================================================== */
     346             : /* Discovery                                                                  */
     347             : /* ========================================================================== */
     348             : 
     349          19 : int pt_posix_discovery_start(struct pt_context *ctx) {
     350             :     pt_posix_data *pd;
     351             :     int sock;
     352             :     struct sockaddr_in addr;
     353             : 
     354          19 :     if (!ctx) {
     355           0 :         return -1;
     356             :     }
     357             : 
     358          19 :     pd = pt_posix_get(ctx);
     359             : 
     360             :     /* Create UDP socket */
     361          19 :     sock = socket(AF_INET, SOCK_DGRAM, 0);
     362          19 :     if (sock < 0) {
     363           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_NETWORK,
     364             :                    "Failed to create discovery socket: %s", strerror(errno));
     365           0 :         return -1;
     366             :     }
     367             : 
     368             :     /* Set non-blocking mode */
     369          19 :     if (set_nonblocking(ctx, sock) < 0) {
     370           0 :         close(sock);
     371           0 :         return -1;
     372             :     }
     373             : 
     374             :     /* Enable SO_BROADCAST */
     375          19 :     if (set_broadcast(ctx, sock) < 0) {
     376           0 :         close(sock);
     377           0 :         return -1;
     378             :     }
     379             : 
     380             :     /* Enable SO_REUSEADDR for fast restart */
     381          19 :     if (set_reuseaddr(ctx, sock) < 0) {
     382           0 :         close(sock);
     383           0 :         return -1;
     384             :     }
     385             : 
     386             :     /* Bind to INADDR_ANY on discovery port */
     387          19 :     pt_memset(&addr, 0, sizeof(addr));
     388          19 :     addr.sin_family = AF_INET;
     389          19 :     addr.sin_addr.s_addr = INADDR_ANY;
     390          19 :     addr.sin_port = htons(pd->discovery_port);
     391             : 
     392          19 :     if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
     393           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_NETWORK,
     394             :                    "Failed to bind discovery socket to port %u: %s",
     395             :                    pd->discovery_port, strerror(errno));
     396           0 :         close(sock);
     397           0 :         return -1;
     398             :     }
     399             : 
     400          19 :     pd->discovery_sock = sock;
     401             : 
     402             :     /* Update max_fd for select() */
     403          19 :     if (sock > pd->max_fd) {
     404          17 :         pd->max_fd = sock;
     405             :     }
     406          19 :     pd->fd_dirty = 1;  /* Rebuild fd_sets */
     407             : 
     408          19 :     PT_CTX_INFO(ctx, PT_LOG_CAT_DISCOVERY,
     409             :                 "Discovery started on UDP port %u", pd->discovery_port);
     410             : 
     411             :     /* Send initial announcement */
     412          19 :     pt_posix_discovery_send(ctx, PT_DISC_TYPE_ANNOUNCE);
     413          19 :     pd->last_announce = ctx->plat->get_ticks();
     414             : 
     415          19 :     return 0;
     416             : }
     417             : 
     418          16 : void pt_posix_discovery_stop(struct pt_context *ctx) {
     419             :     pt_posix_data *pd;
     420             : 
     421          16 :     if (!ctx) {
     422           0 :         return;
     423             :     }
     424             : 
     425          16 :     pd = pt_posix_get(ctx);
     426             : 
     427          16 :     if (pd->discovery_sock >= 0) {
     428             :         /* Send goodbye before closing */
     429          15 :         pt_posix_discovery_send(ctx, PT_DISC_TYPE_GOODBYE);
     430             : 
     431          15 :         close(pd->discovery_sock);
     432          15 :         pd->discovery_sock = -1;
     433          15 :         pd->fd_dirty = 1;
     434             : 
     435          15 :         PT_CTX_INFO(ctx, PT_LOG_CAT_DISCOVERY, "Discovery stopped");
     436             :     }
     437             : }
     438             : 
     439          39 : int pt_posix_discovery_send(struct pt_context *ctx, uint8_t type) {
     440             :     pt_posix_data *pd;
     441             :     pt_discovery_packet pkt;
     442             :     uint8_t buf[PT_DISCOVERY_MAX_SIZE];
     443             :     ssize_t encoded_len;
     444             :     struct sockaddr_in dest;
     445             :     ssize_t sent;
     446             : 
     447          39 :     if (!ctx) {
     448           0 :         return -1;
     449             :     }
     450             : 
     451          39 :     pd = pt_posix_get(ctx);
     452             : 
     453          39 :     if (pd->discovery_sock < 0) {
     454           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_DISCOVERY,
     455             :                    "Discovery socket not initialized");
     456           0 :         return -1;
     457             :     }
     458             : 
     459             :     /* Build discovery packet */
     460          39 :     pt_memset(&pkt, 0, sizeof(pkt));
     461          39 :     pkt.version = PT_PROTOCOL_VERSION;
     462          39 :     pkt.type = type;
     463          39 :     pkt.flags = 0;
     464          39 :     pkt.sender_port = pd->listen_port;
     465          39 :     pkt.transports = PT_TRANSPORT_TCP | PT_TRANSPORT_UDP;  /* POSIX supports both */
     466             : 
     467             :     /* Copy local name */
     468          39 :     if (ctx->config.local_name[0] != '\0') {
     469          39 :         pt_strncpy(pkt.name, ctx->config.local_name, PT_MAX_PEER_NAME);
     470             :     } else {
     471           0 :         pt_strncpy(pkt.name, "PeerTalk", PT_MAX_PEER_NAME);
     472             :     }
     473             : 
     474             :     /* Set name length */
     475          39 :     pkt.name_len = pt_strlen(pkt.name);
     476             : 
     477             :     /* Encode packet */
     478          39 :     encoded_len = pt_discovery_encode(&pkt, buf, sizeof(buf));
     479          39 :     if (encoded_len < 0) {
     480           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_DISCOVERY,
     481             :                    "Failed to encode discovery packet");
     482           0 :         return -1;
     483             :     }
     484             : 
     485             :     /* Send to broadcast address */
     486          39 :     pt_memset(&dest, 0, sizeof(dest));
     487          39 :     dest.sin_family = AF_INET;
     488          39 :     dest.sin_addr.s_addr = htonl(pd->broadcast_addr);
     489          39 :     dest.sin_port = htons(pd->discovery_port);
     490             : 
     491          39 :     sent = sendto(pd->discovery_sock, buf, encoded_len, 0,
     492             :                   (struct sockaddr *)&dest, sizeof(dest));
     493             : 
     494          39 :     if (sent < 0) {
     495           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_DISCOVERY,
     496             :                    "Discovery send failed: %s", strerror(errno));
     497           0 :         return -1;
     498             :     }
     499             : 
     500          39 :     if (sent != encoded_len) {
     501           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_DISCOVERY,
     502             :                    "Discovery send incomplete: %zd/%zd bytes", sent, encoded_len);
     503           0 :         return -1;
     504             :     }
     505             : 
     506             :     /* Update statistics */
     507          39 :     ctx->global_stats.discovery_packets_sent++;
     508             : 
     509          39 :     PT_CTX_INFO(ctx, PT_LOG_CAT_DISCOVERY,
     510             :                 "Discovery %s sent to %u.%u.%u.%u:%u (%zd bytes)",
     511             :                 type == PT_DISC_TYPE_ANNOUNCE ? "ANNOUNCE" :
     512             :                 type == PT_DISC_TYPE_QUERY ? "QUERY" : "GOODBYE",
     513             :                 (pd->broadcast_addr >> 24) & 0xFF,
     514             :                 (pd->broadcast_addr >> 16) & 0xFF,
     515             :                 (pd->broadcast_addr >> 8) & 0xFF,
     516             :                 pd->broadcast_addr & 0xFF,
     517             :                 pd->discovery_port,
     518             :                 sent);
     519             : 
     520          39 :     return 0;
     521             : }
     522             : 
     523          29 : int pt_posix_discovery_poll(struct pt_context *ctx) {
     524             :     pt_posix_data *pd;
     525             :     uint8_t buf[PT_DISCOVERY_MAX_SIZE];
     526             :     struct sockaddr_in from_addr;
     527          29 :     socklen_t from_len = sizeof(from_addr);
     528             :     ssize_t ret;
     529             :     uint32_t sender_ip;
     530             :     pt_discovery_packet pkt;
     531             :     struct pt_peer *peer;
     532             : 
     533          29 :     if (!ctx) {
     534           0 :         return -1;
     535             :     }
     536             : 
     537          29 :     pd = pt_posix_get(ctx);
     538             : 
     539          29 :     if (pd->discovery_sock < 0) {
     540           0 :         return 0;  /* Not initialized yet */
     541             :     }
     542             : 
     543             :     /* Non-blocking receive */
     544          29 :     ret = recvfrom(pd->discovery_sock, buf, sizeof(buf), 0,
     545             :                    (struct sockaddr *)&from_addr, &from_len);
     546             : 
     547          29 :     if (ret < 0) {
     548           0 :         if (errno == EWOULDBLOCK || errno == EAGAIN) {
     549           0 :             return 0;  /* No data available - not an error */
     550             :         }
     551           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_DISCOVERY,
     552             :                    "Discovery recv error: %s", strerror(errno));
     553           0 :         return -1;
     554             :     }
     555             : 
     556             :     /* Extract sender IP */
     557          29 :     sender_ip = ntohl(from_addr.sin_addr.s_addr);
     558             : 
     559             :     /* CRITICAL: Ignore broadcasts from our own IP */
     560          29 :     if (sender_ip == pd->local_ip) {
     561          21 :         PT_CTX_DEBUG(ctx, PT_LOG_CAT_DISCOVERY,
     562             :                     "Ignoring packet from our own IP %u.%u.%u.%u",
     563             :                     (sender_ip >> 24) & 0xFF, (sender_ip >> 16) & 0xFF,
     564             :                     (sender_ip >> 8) & 0xFF, sender_ip & 0xFF);
     565          21 :         return 0;  /* Our own broadcast, ignore */
     566             :     }
     567             : 
     568             :     /* Decode packet */
     569           8 :     if (pt_discovery_decode(ctx, buf, ret, &pkt) < 0) {
     570           8 :         PT_CTX_WARN(ctx, PT_LOG_CAT_DISCOVERY,
     571             :                    "Failed to decode discovery packet from %u.%u.%u.%u",
     572             :                    (sender_ip >> 24) & 0xFF, (sender_ip >> 16) & 0xFF,
     573             :                    (sender_ip >> 8) & 0xFF, sender_ip & 0xFF);
     574           8 :         return 0;  /* Ignore invalid packets */
     575             :     }
     576             : 
     577             :     /* Update statistics */
     578           0 :     ctx->global_stats.discovery_packets_received++;
     579             : 
     580           0 :     PT_CTX_INFO(ctx, PT_LOG_CAT_DISCOVERY,
     581             :                 "Discovery %s received from %u.%u.%u.%u:%u (%s)",
     582             :                 pkt.type == PT_DISC_TYPE_ANNOUNCE ? "ANNOUNCE" :
     583             :                 pkt.type == PT_DISC_TYPE_QUERY ? "QUERY" : "GOODBYE",
     584             :                 (sender_ip >> 24) & 0xFF, (sender_ip >> 16) & 0xFF,
     585             :                 (sender_ip >> 8) & 0xFF, sender_ip & 0xFF,
     586             :                 pkt.sender_port, pkt.name);
     587             : 
     588             :     /* Handle packet type */
     589           0 :     PT_CTX_INFO(ctx, PT_LOG_CAT_DISCOVERY,
     590             :                "Processing packet type %u", pkt.type);
     591           0 :     switch (pkt.type) {
     592           0 :     case PT_DISC_TYPE_ANNOUNCE:
     593           0 :         PT_CTX_INFO(ctx, PT_LOG_CAT_DISCOVERY,
     594             :                    "Handling ANNOUNCE packet");
     595             :         /* Find or create peer */
     596           0 :         peer = pt_peer_find_by_addr(ctx, sender_ip, pkt.sender_port);
     597           0 :         PT_CTX_INFO(ctx, PT_LOG_CAT_DISCOVERY,
     598             :                    "pt_peer_find_by_addr returned: %p", (void*)peer);
     599           0 :         if (!peer) {
     600             :             /* Create new peer */
     601           0 :             PT_CTX_INFO(ctx, PT_LOG_CAT_DISCOVERY,
     602             :                        "Creating new peer: %s at %u.%u.%u.%u:%u",
     603             :                        pkt.name,
     604             :                        (sender_ip >> 24) & 0xFF, (sender_ip >> 16) & 0xFF,
     605             :                        (sender_ip >> 8) & 0xFF, sender_ip & 0xFF,
     606             :                        pkt.sender_port);
     607           0 :             peer = pt_peer_create(ctx, pkt.name, sender_ip, pkt.sender_port);
     608           0 :             if (!peer) {
     609           0 :                 PT_CTX_WARN(ctx, PT_LOG_CAT_DISCOVERY,
     610             :                            "Failed to create peer for %s", pkt.name);
     611           0 :                 return 0;
     612             :             }
     613             : 
     614           0 :             PT_CTX_INFO(ctx, PT_LOG_CAT_DISCOVERY,
     615             :                        "Peer created successfully, firing callback");
     616             : 
     617             :             /* Fire on_peer_discovered callback */
     618           0 :             if (ctx->callbacks.on_peer_discovered) {
     619             :                 PeerTalk_PeerInfo info;
     620           0 :                 pt_peer_get_info(peer, &info);
     621           0 :                 PT_CTX_INFO(ctx, PT_LOG_CAT_DISCOVERY,
     622             :                            "Calling on_peer_discovered callback");
     623           0 :                 ctx->callbacks.on_peer_discovered((PeerTalk_Context *)ctx,
     624             :                                                   &info, ctx->callbacks.user_data);
     625             :             } else {
     626           0 :                 PT_CTX_WARN(ctx, PT_LOG_CAT_DISCOVERY,
     627             :                            "No on_peer_discovered callback registered!");
     628             :             }
     629             :         } else {
     630             :             /* Update existing peer */
     631           0 :             peer->hot.last_seen = ctx->plat->get_ticks();
     632             :         }
     633           0 :         break;
     634             : 
     635           0 :     case PT_DISC_TYPE_QUERY:
     636             :         /* Respond with ANNOUNCE */
     637           0 :         pt_posix_discovery_send(ctx, PT_DISC_TYPE_ANNOUNCE);
     638           0 :         break;
     639             : 
     640           0 :     case PT_DISC_TYPE_GOODBYE:
     641             :         /* Remove peer */
     642           0 :         peer = pt_peer_find_by_addr(ctx, sender_ip, pkt.sender_port);
     643           0 :         if (peer) {
     644           0 :             PeerTalk_PeerID peer_id = peer->hot.id;
     645             : 
     646             :             /* Fire on_peer_lost callback before destroying */
     647           0 :             if (ctx->callbacks.on_peer_lost) {
     648           0 :                 ctx->callbacks.on_peer_lost((PeerTalk_Context *)ctx,
     649             :                                            peer_id, ctx->callbacks.user_data);
     650             :             }
     651             : 
     652           0 :             pt_peer_destroy(ctx, peer);
     653             :         }
     654           0 :         break;
     655             : 
     656           0 :     default:
     657           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_DISCOVERY,
     658             :                    "Unknown discovery packet type: %u", pkt.type);
     659           0 :         break;
     660             :     }
     661             : 
     662           0 :     return 1;  /* Packet processed */
     663             : }
     664             : 
     665             : /* ========================================================================== */
     666             : /* Active Peer Tracking                                                      */
     667             : /* ========================================================================== */
     668             : 
     669             : /**
     670             :  * Add peer to active tracking list (O(1))
     671             :  *
     672             :  * Used when accepting connection or initiating connect.
     673             :  * Marks fd_sets dirty for rebuild.
     674             :  */
     675          32 : static void pt_posix_add_active_peer(pt_posix_data *pd, uint8_t peer_idx) {
     676             :     /* Skip if already in list */
     677          32 :     if (pd->active_position[peer_idx] != 0xFF) {
     678           0 :         return;
     679             :     }
     680             : 
     681             :     /* Add to end of active list */
     682          32 :     pd->active_peers[pd->active_count] = peer_idx;
     683          32 :     pd->active_position[peer_idx] = pd->active_count;
     684          32 :     pd->active_count++;
     685          32 :     pd->fd_dirty = 1;
     686             : }
     687             : 
     688             : /**
     689             :  * Remove peer from active tracking list (O(1) swap-back)
     690             :  *
     691             :  * Used when closing connection. Swaps last element into removed position.
     692             :  * Marks fd_sets dirty for rebuild.
     693             :  */
     694           9 : static void pt_posix_remove_active_peer(pt_posix_data *pd, uint8_t peer_idx) {
     695           9 :     uint8_t pos = pd->active_position[peer_idx];
     696             : 
     697             :     /* Not in list */
     698           9 :     if (pos == 0xFF) {
     699           0 :         return;
     700             :     }
     701             : 
     702             :     /* Swap last element into this position */
     703           9 :     if (pos < pd->active_count - 1) {
     704           1 :         uint8_t last_idx = pd->active_peers[pd->active_count - 1];
     705           1 :         pd->active_peers[pos] = last_idx;
     706           1 :         pd->active_position[last_idx] = pos;
     707             :     }
     708             : 
     709             :     /* Clear removed peer's position */
     710           9 :     pd->active_position[peer_idx] = 0xFF;
     711           9 :     pd->active_count--;
     712           9 :     pd->fd_dirty = 1;
     713             : }
     714             : 
     715             : /**
     716             :  * Rebuild cached fd_sets when connections change (Session 4.6)
     717             :  *
     718             :  * Only called when pd->fd_dirty is set. Rebuilds read and write fd_sets
     719             :  * from scratch by adding server sockets and iterating active peers.
     720             :  *
     721             :  * For connecting peers, also adds socket to write_fds to detect completion.
     722             :  */
     723          87 : static void pt_posix_rebuild_fd_sets(struct pt_context *ctx) {
     724          87 :     pt_posix_data *pd = pt_posix_get(ctx);
     725             : 
     726             :     /* Clear both fd_sets */
     727        1479 :     FD_ZERO(&pd->cached_read_fds);
     728        1479 :     FD_ZERO(&pd->cached_write_fds);
     729          87 :     pd->max_fd = -1;
     730             : 
     731             :     /* Add server sockets to read set */
     732          87 :     if (pd->discovery_sock >= 0) {
     733          18 :         FD_SET(pd->discovery_sock, &pd->cached_read_fds);
     734          18 :         if (pd->discovery_sock > pd->max_fd)
     735          18 :             pd->max_fd = pd->discovery_sock;
     736             :     }
     737             : 
     738          87 :     if (pd->listen_sock >= 0) {
     739          49 :         FD_SET(pd->listen_sock, &pd->cached_read_fds);
     740          49 :         if (pd->listen_sock > pd->max_fd)
     741          49 :             pd->max_fd = pd->listen_sock;
     742             :     }
     743             : 
     744          87 :     if (pd->udp_msg_sock >= 0) {
     745          87 :         FD_SET(pd->udp_msg_sock, &pd->cached_read_fds);
     746          87 :         if (pd->udp_msg_sock > pd->max_fd)
     747          23 :             pd->max_fd = pd->udp_msg_sock;
     748             :     }
     749             : 
     750             :     /* Iterate active peers only (not all PT_MAX_PEERS slots) */
     751         126 :     for (uint8_t i = 0; i < pd->active_count; i++) {
     752          39 :         uint8_t peer_idx = pd->active_peers[i];
     753          39 :         int sock = pd->tcp_socks[peer_idx];
     754             : 
     755          39 :         if (sock >= 0) {
     756          39 :             struct pt_peer *peer = &ctx->peers[peer_idx];
     757             : 
     758             :             /* All active sockets go in read set */
     759          39 :             FD_SET(sock, &pd->cached_read_fds);
     760             : 
     761             :             /* Connecting sockets also go in write set */
     762          39 :             if (peer->hot.state == PT_PEER_CONNECTING) {
     763          11 :                 FD_SET(sock, &pd->cached_write_fds);
     764             :             }
     765             : 
     766          39 :             if (sock > pd->max_fd)
     767          39 :                 pd->max_fd = sock;
     768             :         }
     769             :     }
     770             : 
     771             :     /* Clear dirty flag */
     772          87 :     pd->fd_dirty = 0;
     773          87 : }
     774             : 
     775             : /* ========================================================================== */
     776             : /* TCP Server                                                                 */
     777             : /* ========================================================================== */
     778             : 
     779          27 : int pt_posix_listen_start(struct pt_context *ctx) {
     780          27 :     pt_posix_data *pd = pt_posix_get(ctx);
     781             :     struct sockaddr_in addr;
     782             :     int sock;
     783             :     uint16_t port;
     784             : 
     785          27 :     port = ctx->config.tcp_port > 0 ?
     786             :            ctx->config.tcp_port : TCP_PORT(ctx);
     787             : 
     788          27 :     sock = socket(AF_INET, SOCK_STREAM, 0);
     789          27 :     if (sock < 0) {
     790           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_CONNECT,
     791             :             "Failed to create listen socket: %s", strerror(errno));
     792           0 :         return -1;
     793             :     }
     794             : 
     795          27 :     if (set_nonblocking(ctx, sock) < 0) {
     796             :         /* Error already logged by helper */
     797           0 :         close(sock);
     798           0 :         return -1;
     799             :     }
     800             : 
     801          27 :     if (set_reuseaddr(ctx, sock) < 0) {
     802             :         /* Error already logged by helper */
     803           0 :         close(sock);
     804           0 :         return -1;
     805             :     }
     806             : 
     807          27 :     pt_memset(&addr, 0, sizeof(addr));
     808          27 :     addr.sin_family = AF_INET;
     809          27 :     addr.sin_addr.s_addr = INADDR_ANY;
     810          27 :     addr.sin_port = htons(port);
     811             : 
     812          27 :     if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
     813           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_CONNECT,
     814             :             "Failed to bind listen socket: %s", strerror(errno));
     815           0 :         close(sock);
     816           0 :         return -1;
     817             :     }
     818             : 
     819          27 :     if (listen(sock, 8) < 0) {
     820           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_CONNECT,
     821             :             "Listen failed: %s", strerror(errno));
     822           0 :         close(sock);
     823           0 :         return -1;
     824             :     }
     825             : 
     826          27 :     pd->listen_sock = sock;
     827          27 :     pd->listen_port = port;
     828             : 
     829          27 :     if (sock > pd->max_fd)
     830          25 :         pd->max_fd = sock;
     831             : 
     832          27 :     PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
     833             :         "Listening on port %u", port);
     834             : 
     835          27 :     return 0;
     836             : }
     837             : 
     838          25 : void pt_posix_listen_stop(struct pt_context *ctx) {
     839          25 :     pt_posix_data *pd = pt_posix_get(ctx);
     840             : 
     841          25 :     if (pd->listen_sock >= 0) {
     842          23 :         close(pd->listen_sock);
     843          23 :         pd->listen_sock = -1;
     844          23 :         PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT, "Listen stopped");
     845             :     }
     846          25 : }
     847             : 
     848          40 : int pt_posix_listen_poll(struct pt_context *ctx) {
     849          40 :     pt_posix_data *pd = pt_posix_get(ctx);
     850             :     struct sockaddr_in addr;
     851          40 :     socklen_t addr_len = sizeof(addr);
     852             :     int client_sock;
     853             :     uint32_t client_ip;
     854             :     struct pt_peer *peer;
     855             : 
     856          40 :     if (pd->listen_sock < 0)
     857           0 :         return 0;
     858             : 
     859             :     /* Non-blocking accept */
     860          40 :     client_sock = accept(pd->listen_sock, (struct sockaddr *)&addr, &addr_len);
     861          40 :     if (client_sock < 0) {
     862          19 :         if (errno == EWOULDBLOCK || errno == EAGAIN)
     863          19 :             return 0;
     864           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_CONNECT,
     865             :             "Accept error: %s", strerror(errno));
     866           0 :         return -1;
     867             :     }
     868             : 
     869          21 :     set_nonblocking(ctx, client_sock);
     870             : 
     871             :     /* Disable Nagle - ensure immediate send for Classic Mac receivers */
     872             :     {
     873          21 :         int flag = 1;
     874          21 :         setsockopt(client_sock, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
     875             :     }
     876             : 
     877          21 :     client_ip = ntohl(addr.sin_addr.s_addr);
     878             : 
     879          21 :     PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
     880             :         "Incoming connection from 0x%08X", client_ip);
     881             : 
     882             :     /* Find or create peer */
     883          21 :     peer = pt_peer_find_by_addr(ctx, client_ip, 0);
     884          21 :     if (!peer) {
     885             :         /* Unknown peer - create with empty name */
     886          21 :         peer = pt_peer_create(ctx, "", client_ip, ntohs(addr.sin_port));
     887             :     }
     888             : 
     889          21 :     if (!peer) {
     890           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_CONNECT,
     891             :             "No peer slot for incoming connection");
     892           0 :         close(client_sock);
     893           0 :         return 0;
     894             :     }
     895             : 
     896             :     /* Store socket and reset receive state */
     897          21 :     int peer_idx = peer->hot.id - 1;
     898          21 :     pd->tcp_socks[peer_idx] = client_sock;
     899          21 :     pd->recv_bufs[peer_idx].hot.state = PT_RECV_HEADER;
     900          21 :     pd->recv_bufs[peer_idx].hot.bytes_needed = PT_COMPACT_HEADER_SIZE;
     901          21 :     pd->recv_bufs[peer_idx].hot.bytes_received = 0;
     902             : 
     903             :     /* Add to active peers list and mark fd_sets dirty */
     904          21 :     pt_posix_add_active_peer(pd, peer_idx);
     905             : 
     906             :     /* Allocate send and receive queues */
     907          21 :     peer->send_queue = pt_alloc_peer_queue(ctx);
     908          21 :     peer->recv_queue = pt_alloc_peer_queue(ctx);
     909             : 
     910          21 :     if (!peer->send_queue || !peer->recv_queue) {
     911             :         /* Allocation failed - clean up and reject connection */
     912           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_MEMORY,
     913             :             "Failed to allocate queues for peer %u, rejecting connection",
     914             :             peer->hot.id);
     915             : 
     916             :         /* Free any partially allocated queues */
     917           0 :         pt_free_peer_queue(peer->send_queue);
     918           0 :         pt_free_peer_queue(peer->recv_queue);
     919           0 :         peer->send_queue = NULL;
     920           0 :         peer->recv_queue = NULL;
     921             : 
     922             :         /* Close socket and clean up peer */
     923           0 :         close(client_sock);
     924           0 :         pd->tcp_socks[peer_idx] = -1;
     925           0 :         pt_posix_remove_active_peer(pd, peer_idx);
     926           0 :         return 0;
     927             :     }
     928             : 
     929             :     /* Update state with logging */
     930          21 :     pt_peer_set_state(ctx, peer, PT_PEER_CONNECTED);
     931          21 :     peer->hot.last_seen = ctx->plat->get_ticks();
     932             : 
     933          21 :     PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
     934             :         "Accepted connection from peer %u at 0x%08X (assigned to slot %u)",
     935             :         peer->hot.id, client_ip, peer_idx);
     936             : 
     937             :     /* Fire callback */
     938          21 :     if (ctx->callbacks.on_peer_connected) {
     939          14 :         ctx->callbacks.on_peer_connected((PeerTalk_Context *)ctx,
     940          14 :                                          peer->hot.id, ctx->callbacks.user_data);
     941             :     }
     942             : 
     943             :     /* Send capabilities for negotiation */
     944          21 :     pt_posix_send_capability(ctx, peer);
     945             : 
     946          21 :     return 1;
     947             : }
     948             : 
     949             : /* ========================================================================== */
     950             : /* TCP Client                                                                 */
     951             : /* ========================================================================== */
     952             : 
     953          12 : int pt_posix_connect(struct pt_context *ctx, struct pt_peer *peer) {
     954          12 :     pt_posix_data *pd = pt_posix_get(ctx);
     955             :     struct sockaddr_in addr;
     956             :     int sock;
     957             :     int result;
     958             :     uint32_t peer_ip;
     959             :     uint16_t peer_port;
     960             : 
     961          12 :     if (!peer || peer->hot.magic != PT_PEER_MAGIC)
     962           0 :         return PT_ERR_INVALID_PARAM;
     963             : 
     964          12 :     if (peer->hot.state != PT_PEER_DISCOVERED)
     965           1 :         return PT_ERR_INVALID_STATE;
     966             : 
     967             :     /* Get peer address from first address entry */
     968          11 :     if (peer->hot.address_count == 0)
     969           0 :         return PT_ERR_INVALID_STATE;
     970             : 
     971          11 :     peer_ip = peer->cold.addresses[0].address;
     972          11 :     peer_port = peer->cold.addresses[0].port;
     973             : 
     974          11 :     sock = socket(AF_INET, SOCK_STREAM, 0);
     975          11 :     if (sock < 0) {
     976           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_CONNECT,
     977             :             "Failed to create socket: %s", strerror(errno));
     978           0 :         return PT_ERR_NETWORK;
     979             :     }
     980             : 
     981          11 :     set_nonblocking(ctx, sock);
     982             : 
     983             :     /* Disable Nagle's algorithm - send immediately rather than coalescing.
     984             :      * This ensures TCP PSH flag is set promptly, triggering immediate
     985             :      * TCPNoCopyRcv completion on Classic Mac receivers. */
     986             :     {
     987          11 :         int flag = 1;
     988          11 :         setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
     989             :     }
     990             : 
     991          11 :     pt_memset(&addr, 0, sizeof(addr));
     992          11 :     addr.sin_family = AF_INET;
     993          11 :     addr.sin_addr.s_addr = htonl(peer_ip);
     994          11 :     addr.sin_port = htons(peer_port);
     995             : 
     996          11 :     PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
     997             :         "Connecting to peer %u (%s) at 0x%08X:%u",
     998             :         peer->hot.id, peer->cold.name, peer_ip, peer_port);
     999             : 
    1000          11 :     result = connect(sock, (struct sockaddr *)&addr, sizeof(addr));
    1001             : 
    1002          11 :     if (result < 0) {
    1003          11 :         if (errno == EINPROGRESS) {
    1004             :             /* Connection in progress - this is expected for non-blocking */
    1005          11 :             int peer_idx = peer->hot.id - 1;
    1006          11 :             pd->tcp_socks[peer_idx] = sock;
    1007          11 :             pd->recv_bufs[peer_idx].hot.state = PT_RECV_HEADER;
    1008          11 :             pd->recv_bufs[peer_idx].hot.bytes_needed = PT_COMPACT_HEADER_SIZE;
    1009          11 :             pd->recv_bufs[peer_idx].hot.bytes_received = 0;
    1010             : 
    1011             :             /* Add to active peers list and mark fd_sets dirty */
    1012          11 :             pt_posix_add_active_peer(pd, peer_idx);
    1013             : 
    1014          11 :             pt_peer_set_state(ctx, peer, PT_PEER_CONNECTING);
    1015          11 :             peer->cold.ping_sent_time = ctx->plat->get_ticks();
    1016          11 :             return PT_OK;
    1017             :         }
    1018             : 
    1019           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_CONNECT,
    1020             :             "Connect failed to peer %u (%s) at 0x%08X:%u: %s",
    1021             :             peer->hot.id, peer->cold.name, peer_ip, peer_port, strerror(errno));
    1022           0 :         close(sock);
    1023           0 :         pt_peer_set_state(ctx, peer, PT_PEER_FAILED);
    1024           0 :         return PT_ERR_NETWORK;
    1025             :     }
    1026             : 
    1027             :     /* Immediate connection (unlikely but possible on localhost) */
    1028           0 :     int peer_idx = peer->hot.id - 1;
    1029           0 :     pd->tcp_socks[peer_idx] = sock;
    1030           0 :     pd->recv_bufs[peer_idx].hot.state = PT_RECV_HEADER;
    1031           0 :     pd->recv_bufs[peer_idx].hot.bytes_needed = PT_COMPACT_HEADER_SIZE;
    1032           0 :     pd->recv_bufs[peer_idx].hot.bytes_received = 0;
    1033             : 
    1034             :     /* Add to active peers list and mark fd_sets dirty */
    1035           0 :     pt_posix_add_active_peer(pd, peer_idx);
    1036             : 
    1037             :     /* Allocate send and receive queues */
    1038           0 :     peer->send_queue = pt_alloc_peer_queue(ctx);
    1039           0 :     peer->recv_queue = pt_alloc_peer_queue(ctx);
    1040             : 
    1041           0 :     if (!peer->send_queue || !peer->recv_queue) {
    1042             :         /* Allocation failed - clean up */
    1043           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_MEMORY,
    1044             :             "Failed to allocate queues for peer %u",
    1045             :             peer->hot.id);
    1046             : 
    1047           0 :         pt_free_peer_queue(peer->send_queue);
    1048           0 :         pt_free_peer_queue(peer->recv_queue);
    1049           0 :         peer->send_queue = NULL;
    1050           0 :         peer->recv_queue = NULL;
    1051             : 
    1052           0 :         close(sock);
    1053           0 :         pd->tcp_socks[peer_idx] = -1;
    1054           0 :         pt_posix_remove_active_peer(pd, peer_idx);
    1055           0 :         pt_peer_set_state(ctx, peer, PT_PEER_FAILED);
    1056           0 :         return PT_ERR_NO_MEMORY;
    1057             :     }
    1058             : 
    1059           0 :     pt_peer_set_state(ctx, peer, PT_PEER_CONNECTED);
    1060           0 :     peer->hot.last_seen = ctx->plat->get_ticks();
    1061             : 
    1062           0 :     PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
    1063             :         "Connected to peer %u (%s) - immediate connect on localhost",
    1064             :         peer->hot.id, peer->cold.name);
    1065             : 
    1066           0 :     if (ctx->callbacks.on_peer_connected) {
    1067           0 :         ctx->callbacks.on_peer_connected((PeerTalk_Context *)ctx,
    1068           0 :                                          peer->hot.id, ctx->callbacks.user_data);
    1069             :     }
    1070             : 
    1071             :     /* Send capabilities for negotiation */
    1072           0 :     pt_posix_send_capability(ctx, peer);
    1073             : 
    1074           0 :     return PT_OK;
    1075             : }
    1076             : 
    1077           3 : int pt_posix_disconnect(struct pt_context *ctx, struct pt_peer *peer) {
    1078           3 :     pt_posix_data *pd = pt_posix_get(ctx);
    1079             :     int peer_idx;
    1080             :     int sock;
    1081             : 
    1082           3 :     if (!peer || peer->hot.magic != PT_PEER_MAGIC)
    1083           0 :         return PT_ERR_INVALID_PARAM;
    1084             : 
    1085           3 :     peer_idx = peer->hot.id - 1;
    1086           3 :     sock = pd->tcp_socks[peer_idx];
    1087             : 
    1088           3 :     if (sock >= 0) {
    1089           0 :         PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
    1090             :             "Disconnecting peer %u (%s)", peer->hot.id, peer->cold.name);
    1091             : 
    1092             :         /* Send disconnect message if connected */
    1093           0 :         if (peer->hot.state == PT_PEER_CONNECTED) {
    1094             :             pt_message_header hdr;
    1095             :             uint8_t buf[PT_MESSAGE_HEADER_SIZE + 2];
    1096             : 
    1097           0 :             hdr.version = PT_PROTOCOL_VERSION;
    1098           0 :             hdr.type = PT_MSG_TYPE_DISCONNECT;
    1099           0 :             hdr.flags = 0;
    1100           0 :             hdr.sequence = peer->hot.send_seq++;
    1101           0 :             hdr.payload_len = 0;
    1102             : 
    1103           0 :             pt_message_encode_header(&hdr, buf);
    1104             :             /* Add CRC for empty payload */
    1105           0 :             uint16_t crc = pt_crc16(buf, PT_MESSAGE_HEADER_SIZE);
    1106           0 :             buf[PT_MESSAGE_HEADER_SIZE] = (crc >> 8) & 0xFF;
    1107           0 :             buf[PT_MESSAGE_HEADER_SIZE + 1] = crc & 0xFF;
    1108             : 
    1109           0 :             send(sock, buf, sizeof(buf), 0);
    1110             :         }
    1111             : 
    1112           0 :         close(sock);
    1113           0 :         pd->tcp_socks[peer_idx] = -1;
    1114             : 
    1115             :         /* Remove from active peers list */
    1116           0 :         pt_posix_remove_active_peer(pd, peer_idx);
    1117             :     }
    1118             : 
    1119           3 :     pt_peer_set_state(ctx, peer, PT_PEER_DISCONNECTING);
    1120             : 
    1121           3 :     if (ctx->callbacks.on_peer_disconnected) {
    1122           0 :         ctx->callbacks.on_peer_disconnected((PeerTalk_Context *)ctx,
    1123           0 :                                             peer->hot.id, PT_OK,
    1124             :                                             ctx->callbacks.user_data);
    1125             :     }
    1126             : 
    1127             :     /* Free queues */
    1128           3 :     pt_free_peer_queue(peer->send_queue);
    1129           3 :     pt_free_peer_queue(peer->recv_queue);
    1130           3 :     peer->send_queue = NULL;
    1131           3 :     peer->recv_queue = NULL;
    1132             : 
    1133             :     /* Back to DISCOVERED so reconnection is possible */
    1134           3 :     pt_peer_set_state(ctx, peer, PT_PEER_DISCOVERED);
    1135             : 
    1136           3 :     return PT_OK;
    1137             : }
    1138             : 
    1139             : /* ========================================================================== */
    1140             : /* TCP Message I/O (Session 4.3)                                             */
    1141             : /* ========================================================================== */
    1142             : 
    1143             : /**
    1144             :  * Send framed message to a connected peer
    1145             :  *
    1146             :  * Uses writev() for atomic transmission of header + payload + CRC in a single
    1147             :  * syscall. This is more efficient than multiple send() calls and avoids TCP
    1148             :  * Nagle algorithm issues.
    1149             :  *
    1150             :  * @param ctx PeerTalk context
    1151             :  * @param peer Target peer (must be in CONNECTED state)
    1152             :  * @param data Message payload
    1153             :  * @param len Payload length (max PT_MAX_MESSAGE_SIZE)
    1154             :  * @return PT_OK on success, error code on failure
    1155             :  */
    1156             : /**
    1157             :  * Send data message to peer with specified flags
    1158             :  *
    1159             :  * Internal function that allows setting message flags (e.g., for fragments).
    1160             :  * Regular data messages use flags=0, fragments use PT_MSG_FLAG_FRAGMENT.
    1161             :  */
    1162          21 : static int pt_posix_send_with_flags(struct pt_context *ctx, struct pt_peer *peer,
    1163             :                                     const void *data, size_t len, uint8_t msg_flags) {
    1164          21 :     pt_posix_data *pd = pt_posix_get(ctx);
    1165             :     pt_message_header hdr;
    1166             :     pt_compact_header compact_hdr;
    1167             :     uint8_t header_buf[PT_MESSAGE_HEADER_SIZE];  /* Large enough for either format */
    1168             :     uint8_t crc_buf[2];
    1169             :     struct iovec iov[3];
    1170             :     ssize_t total_len;
    1171             :     ssize_t sent;
    1172             :     int peer_idx;
    1173             :     int sock;
    1174             :     uint16_t crc;
    1175             :     int use_compact;
    1176             :     uint16_t header_size;
    1177             : 
    1178             :     /* Validation */
    1179          21 :     if (!peer || peer->hot.magic != PT_PEER_MAGIC) {
    1180           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL, "Invalid peer");
    1181           0 :         return PT_ERR_INVALID_PARAM;
    1182             :     }
    1183             : 
    1184          21 :     if (peer->hot.state != PT_PEER_CONNECTED) {
    1185           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_PROTOCOL,
    1186             :             "Cannot send to peer %u: not connected (state=%d)",
    1187             :             peer->hot.id, peer->hot.state);
    1188           0 :         return PT_ERR_INVALID_STATE;
    1189             :     }
    1190             : 
    1191          21 :     if (len > PT_MAX_MESSAGE_SIZE) {
    1192           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
    1193             :             "Message too large: %zu bytes (max %d)",
    1194             :             len, PT_MAX_MESSAGE_SIZE);
    1195           0 :         return PT_ERR_MESSAGE_TOO_LARGE;
    1196             :     }
    1197             : 
    1198          21 :     peer_idx = peer->hot.id - 1;
    1199          21 :     sock = pd->tcp_socks[peer_idx];
    1200             : 
    1201          21 :     if (sock < 0) {
    1202           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL, "Invalid socket for peer %u", peer->hot.id);
    1203           0 :         return PT_ERR_INVALID_STATE;
    1204             :     }
    1205             : 
    1206             :     /* Check if socket is writable BEFORE attempting send.
    1207             :      * This avoids starting a partial write that requires blocking to complete.
    1208             :      * If socket buffer is full, return WOULD_BLOCK so caller can retry later.
    1209             :      */
    1210             :     {
    1211             :         struct pollfd pfd;
    1212          21 :         pfd.fd = sock;
    1213          21 :         pfd.events = POLLOUT;
    1214          21 :         pfd.revents = 0;
    1215          21 :         if (poll(&pfd, 1, 0) <= 0 || !(pfd.revents & POLLOUT)) {
    1216             :             /* Socket not ready for writing - avoid partial write */
    1217           0 :             return PT_ERR_WOULD_BLOCK;
    1218             :         }
    1219             :     }
    1220             : 
    1221             :     /* Check if compact headers are negotiated with this peer.
    1222             :      * IMPORTANT: Don't use compact headers for fragments because
    1223             :      * PT_MSG_FLAG_FRAGMENT (0x10) doesn't fit in the 4-bit flags field.
    1224             :      * The compact header format only supports flags 0x00-0x0F. */
    1225          40 :     use_compact = peer->cold.caps.compact_mode &&
    1226          19 :                   !(msg_flags & PT_MSG_FLAG_FRAGMENT);
    1227             : 
    1228          21 :     if (use_compact) {
    1229             :         /* Encode compact header (4 bytes, no CRC) */
    1230          19 :         compact_hdr.type = PT_MSG_TYPE_DATA;
    1231          19 :         compact_hdr.flags = msg_flags;
    1232          19 :         compact_hdr.payload_len = (uint16_t)len;
    1233          19 :         pt_message_encode_compact(&compact_hdr, header_buf);
    1234          19 :         header_size = PT_COMPACT_HEADER_SIZE;
    1235             : 
    1236             :         /* Prepare iovec for atomic send - no CRC */
    1237          19 :         iov[0].iov_base = header_buf;
    1238          19 :         iov[0].iov_len = header_size;
    1239          19 :         iov[1].iov_base = (void *)data;
    1240          19 :         iov[1].iov_len = len;
    1241             : 
    1242          19 :         total_len = header_size + len;
    1243             :     } else {
    1244             :         /* Encode full header (10 bytes + 2 CRC) */
    1245           2 :         hdr.version = PT_PROTOCOL_VERSION;
    1246           2 :         hdr.type = PT_MSG_TYPE_DATA;
    1247           2 :         hdr.flags = msg_flags;
    1248           2 :         hdr.sequence = peer->hot.send_seq++;
    1249           2 :         hdr.payload_len = (uint16_t)len;
    1250           2 :         pt_message_encode_header(&hdr, header_buf);
    1251           2 :         header_size = PT_MESSAGE_HEADER_SIZE;
    1252             : 
    1253             :         /* Calculate CRC over header + payload */
    1254           2 :         crc = pt_crc16(header_buf, header_size);
    1255           2 :         crc = pt_crc16_update(crc, data, len);
    1256           2 :         crc_buf[0] = (crc >> 8) & 0xFF;
    1257           2 :         crc_buf[1] = crc & 0xFF;
    1258             : 
    1259             :         /* Prepare iovec for atomic send */
    1260           2 :         iov[0].iov_base = header_buf;
    1261           2 :         iov[0].iov_len = header_size;
    1262           2 :         iov[1].iov_base = (void *)data;
    1263           2 :         iov[1].iov_len = len;
    1264           2 :         iov[2].iov_base = crc_buf;
    1265           2 :         iov[2].iov_len = 2;
    1266             : 
    1267           2 :         total_len = header_size + len + 2;
    1268             :     }
    1269             : 
    1270             :     /* Send with writev - handles partial writes */
    1271          21 :     sent = writev(sock, iov, use_compact ? 2 : 3);
    1272             : 
    1273          21 :     if (sent < 0) {
    1274           0 :         if (errno == EWOULDBLOCK || errno == EAGAIN) {
    1275           0 :             PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
    1276             :                 "Send would block for peer %u", peer->hot.id);
    1277           0 :             return PT_ERR_WOULD_BLOCK;
    1278             :         }
    1279           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
    1280             :             "Send failed for peer %u: %s", peer->hot.id, strerror(errno));
    1281           0 :         return PT_ERR_NETWORK;
    1282             :     }
    1283             : 
    1284             :     /* Handle partial writes by continuing with write() */
    1285          21 :     if (sent < total_len) {
    1286             :         /* Build combined buffer for remaining data */
    1287             :         uint8_t *sendbuf;
    1288           0 :         size_t buflen = (size_t)total_len;
    1289           0 :         size_t offset = (size_t)sent;
    1290           0 :         int max_retries = 20;  /* 200ms max block (down from 1s) */
    1291           0 :         int retry_count = 0;
    1292             : 
    1293           0 :         sendbuf = (uint8_t *)malloc(buflen);
    1294           0 :         if (!sendbuf) {
    1295           0 :             PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL, "Failed to allocate send buffer");
    1296           0 :             return PT_ERR_NO_MEMORY;
    1297             :         }
    1298             : 
    1299             :         /* Copy header + payload (+ crc if not compact) into buffer */
    1300           0 :         memcpy(sendbuf, header_buf, header_size);
    1301           0 :         memcpy(sendbuf + header_size, data, len);
    1302           0 :         if (!use_compact) {
    1303           0 :             memcpy(sendbuf + header_size + len, crc_buf, 2);
    1304             :         }
    1305             : 
    1306             :         /* Continue sending from where we left off */
    1307           0 :         while (offset < buflen && retry_count < max_retries) {
    1308           0 :             ssize_t n = write(sock, sendbuf + offset, buflen - offset);
    1309           0 :             if (n < 0) {
    1310           0 :                 if (errno == EWOULDBLOCK || errno == EAGAIN) {
    1311             :                     struct timeval tv;
    1312             :                     fd_set wfds;
    1313           0 :                     FD_ZERO(&wfds);
    1314           0 :                     FD_SET(sock, &wfds);
    1315           0 :                     tv.tv_sec = 0;
    1316           0 :                     tv.tv_usec = 10000;  /* 10ms */
    1317           0 :                     select(sock + 1, NULL, &wfds, NULL, &tv);
    1318           0 :                     retry_count++;
    1319           0 :                     continue;
    1320             :                 }
    1321           0 :                 free(sendbuf);
    1322           0 :                 PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
    1323             :                     "Send failed for peer %u: %s", peer->hot.id, strerror(errno));
    1324           0 :                 return PT_ERR_NETWORK;
    1325             :             }
    1326           0 :             offset += (size_t)n;
    1327           0 :             PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
    1328             :                 "Partial send to peer %u: %zu/%zu bytes, continuing...",
    1329             :                 peer->hot.id, offset, buflen);
    1330             :         }
    1331             : 
    1332           0 :         free(sendbuf);
    1333             : 
    1334           0 :         if (offset < buflen) {
    1335           0 :             PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
    1336             :                 "Send incomplete after %d retries: %zu/%zu bytes",
    1337             :                 retry_count, offset, buflen);
    1338           0 :             return PT_ERR_NETWORK;
    1339             :         }
    1340             : 
    1341           0 :         sent = (ssize_t)offset;
    1342             :     }
    1343             : 
    1344             :     /* Update statistics */
    1345          21 :     peer->cold.stats.bytes_sent += sent;
    1346          21 :     peer->cold.stats.messages_sent++;
    1347          21 :     ctx->global_stats.total_bytes_sent += sent;
    1348          21 :     ctx->global_stats.total_messages_sent++;
    1349             : 
    1350          21 :     PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
    1351             :         "Sent %zd bytes to peer %u (seq=%u, flags=0x%02X)",
    1352             :         sent, peer->hot.id, hdr.sequence, msg_flags);
    1353             : 
    1354          21 :     return PT_OK;
    1355             : }
    1356             : 
    1357             : /**
    1358             :  * Send data message to peer (backward-compatible wrapper)
    1359             :  *
    1360             :  * Calls pt_posix_send_with_flags with flags=0 for normal data messages.
    1361             :  */
    1362          19 : int pt_posix_send(struct pt_context *ctx, struct pt_peer *peer,
    1363             :                   const void *data, size_t len) {
    1364          19 :     return pt_posix_send_with_flags(ctx, peer, data, len, 0);
    1365             : }
    1366             : 
    1367             : /**
    1368             :  * Send control message (PING, PONG, DISCONNECT)
    1369             :  *
    1370             :  * Control messages have zero-length payload and use sequence=0 to distinguish
    1371             :  * from user data messages.
    1372             :  *
    1373             :  * @param ctx PeerTalk context
    1374             :  * @param peer Target peer
    1375             :  * @param msg_type PT_MSG_TYPE_PING, PT_MSG_TYPE_PONG, or PT_MSG_TYPE_DISCONNECT
    1376             :  * @return PT_OK on success, error code on failure
    1377             :  */
    1378           0 : int pt_posix_send_control(struct pt_context *ctx, struct pt_peer *peer,
    1379             :                            uint8_t msg_type) {
    1380           0 :     pt_posix_data *pd = pt_posix_get(ctx);
    1381             :     pt_message_header hdr;
    1382             :     uint8_t buf[PT_MESSAGE_HEADER_SIZE + 2];
    1383             :     uint16_t crc;
    1384             :     ssize_t sent;
    1385             :     int peer_idx;
    1386             :     int sock;
    1387             : 
    1388           0 :     if (!peer || peer->hot.magic != PT_PEER_MAGIC)
    1389           0 :         return PT_ERR_INVALID_PARAM;
    1390             : 
    1391           0 :     if (peer->hot.state != PT_PEER_CONNECTED)
    1392           0 :         return PT_ERR_INVALID_STATE;
    1393             : 
    1394           0 :     peer_idx = peer->hot.id - 1;
    1395           0 :     sock = pd->tcp_socks[peer_idx];
    1396             : 
    1397           0 :     if (sock < 0)
    1398           0 :         return PT_ERR_INVALID_STATE;
    1399             : 
    1400             :     /* Encode header (sequence=0 for control messages) */
    1401           0 :     hdr.version = PT_PROTOCOL_VERSION;
    1402           0 :     hdr.type = msg_type;
    1403           0 :     hdr.flags = 0;
    1404           0 :     hdr.sequence = 0;  /* Control messages use seq=0 intentionally */
    1405           0 :     hdr.payload_len = 0;
    1406           0 :     pt_message_encode_header(&hdr, buf);
    1407             : 
    1408             :     /* Calculate CRC */
    1409           0 :     crc = pt_crc16(buf, PT_MESSAGE_HEADER_SIZE);
    1410           0 :     buf[PT_MESSAGE_HEADER_SIZE] = (crc >> 8) & 0xFF;
    1411           0 :     buf[PT_MESSAGE_HEADER_SIZE + 1] = crc & 0xFF;
    1412             : 
    1413             :     /* Send */
    1414           0 :     sent = send(sock, buf, sizeof(buf), 0);
    1415             : 
    1416           0 :     if (sent < 0) {
    1417           0 :         if (errno == EWOULDBLOCK || errno == EAGAIN)
    1418           0 :             return PT_ERR_WOULD_BLOCK;
    1419           0 :         return PT_ERR_NETWORK;
    1420             :     }
    1421             : 
    1422           0 :     PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
    1423             :         "Sent control message type=%u to peer %u", msg_type, peer->hot.id);
    1424             : 
    1425           0 :     return PT_OK;
    1426             : }
    1427             : 
    1428             : /**
    1429             :  * Send capability message to peer
    1430             :  *
    1431             :  * Called after connection is established to exchange capability information.
    1432             :  * This enables automatic fragmentation for constrained peers.
    1433             :  *
    1434             :  * @param ctx PeerTalk context
    1435             :  * @param peer Target peer
    1436             :  * @return PT_OK on success, error code on failure
    1437             :  */
    1438          32 : int pt_posix_send_capability(struct pt_context *ctx, struct pt_peer *peer) {
    1439          32 :     pt_posix_data *pd = pt_posix_get(ctx);
    1440             :     pt_message_header hdr;
    1441             :     pt_capability_msg caps;
    1442             :     uint8_t header_buf[PT_MESSAGE_HEADER_SIZE];
    1443             :     uint8_t payload_buf[32];  /* Capability TLV max ~15 bytes */
    1444             :     uint8_t crc_buf[2];
    1445             :     struct iovec iov[3];
    1446             :     uint16_t crc;
    1447             :     ssize_t sent;
    1448             :     int payload_len;
    1449             :     int peer_idx;
    1450             :     int sock;
    1451             : 
    1452          32 :     if (!peer || peer->hot.magic != PT_PEER_MAGIC)
    1453           0 :         return PT_ERR_INVALID_PARAM;
    1454             : 
    1455          32 :     if (peer->hot.state != PT_PEER_CONNECTED)
    1456           0 :         return PT_ERR_INVALID_STATE;
    1457             : 
    1458          32 :     peer_idx = peer->hot.id - 1;
    1459          32 :     sock = pd->tcp_socks[peer_idx];
    1460             : 
    1461          32 :     if (sock < 0)
    1462           0 :         return PT_ERR_INVALID_STATE;
    1463             : 
    1464             :     /* Rate-limit capability sends to prevent flooding the peer's receive buffer.
    1465             :      *
    1466             :      * When the peer (e.g., a Classic Mac) is under heavy receive load, sending
    1467             :      * many capability messages in rapid succession causes small TCP segments that
    1468             :      * pile up in an already-congested ibuf. This causes "dropping incomplete
    1469             :      * reassembly" errors and can crash MacTCP.
    1470             :      *
    1471             :      * We enforce PT_CAP_MIN_INTERVAL_MS between sends. If rate-limited, we keep
    1472             :      * pressure_update_pending=1 so the next poll retries. The first send
    1473             :      * (cap_last_sent=0) is always allowed.
    1474             :      */
    1475             :     {
    1476          32 :         pt_tick_t now_ms = ctx->plat->get_ticks();
    1477          32 :         if (peer->cold.caps.cap_last_sent != 0 &&
    1478           0 :             (now_ms - peer->cold.caps.cap_last_sent) < PT_CAP_MIN_INTERVAL_MS) {
    1479           0 :             peer->cold.caps.pressure_update_pending = 1;  /* Retry later */
    1480           0 :             return PT_OK;
    1481             :         }
    1482          32 :         peer->cold.caps.cap_last_sent = now_ms;
    1483             :     }
    1484             : 
    1485             :     /* Fill in our capabilities */
    1486          32 :     caps.max_message_size = ctx->local_max_message;
    1487          32 :     caps.preferred_chunk = ctx->local_preferred_chunk;
    1488          32 :     caps.capability_flags = ctx->local_capability_flags;
    1489             : 
    1490             :     /* POSIX: Report standard socket buffer sizes.
    1491             :      * POSIX doesn't have MacTCP's 25% threshold issue, so we report
    1492             :      * typical defaults. The optimal_chunk is our preferred chunk since
    1493             :      * there's no completion threshold to optimize for. */
    1494          32 :     caps.recv_buffer_size = 65535;  /* Typical socket buffer size */
    1495          32 :     caps.optimal_chunk = ctx->local_preferred_chunk;
    1496             : 
    1497             :     /* Calculate current buffer pressure from BOTH queues - report the worse one.
    1498             :      * This captures the actual constraint regardless of where it is:
    1499             :      * - High send_pressure: "I can't transmit fast enough"
    1500             :      * - High recv_pressure: "I can't receive fast enough"
    1501             :      */
    1502             :     {
    1503          32 :         uint8_t send_pressure = peer->send_queue ? pt_queue_pressure(peer->send_queue) : 0;
    1504          32 :         uint8_t recv_pressure = peer->recv_queue ? pt_queue_pressure(peer->recv_queue) : 0;
    1505          32 :         caps.buffer_pressure = (send_pressure > recv_pressure) ? send_pressure : recv_pressure;
    1506             :     }
    1507          32 :     caps.reserved = 0;
    1508             : 
    1509             :     /* Track what we reported for flow control threshold detection */
    1510          32 :     peer->cold.caps.last_reported_pressure = caps.buffer_pressure;
    1511          32 :     peer->cold.caps.pressure_update_pending = 0;
    1512             : 
    1513             :     /* Encode capability TLV payload */
    1514          32 :     payload_len = pt_capability_encode(&caps, payload_buf, sizeof(payload_buf));
    1515          32 :     if (payload_len < 0) {
    1516           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
    1517             :             "Failed to encode capabilities for peer %u", peer->hot.id);
    1518           0 :         return PT_ERR_INTERNAL;
    1519             :     }
    1520             : 
    1521             :     /* Encode header */
    1522          32 :     hdr.version = PT_PROTOCOL_VERSION;
    1523          32 :     hdr.type = PT_MSG_TYPE_CAPABILITY;
    1524          32 :     hdr.flags = 0;
    1525          32 :     hdr.sequence = 0;  /* Capability messages use seq=0 */
    1526          32 :     hdr.payload_len = (uint16_t)payload_len;
    1527          32 :     pt_message_encode_header(&hdr, header_buf);
    1528             : 
    1529             :     /* Calculate CRC over header + payload */
    1530          32 :     crc = pt_crc16(header_buf, PT_MESSAGE_HEADER_SIZE);
    1531          32 :     crc = pt_crc16_update(crc, payload_buf, (size_t)payload_len);
    1532          32 :     crc_buf[0] = (crc >> 8) & 0xFF;
    1533          32 :     crc_buf[1] = crc & 0xFF;
    1534             : 
    1535             :     /* Prepare iovec for atomic send */
    1536          32 :     iov[0].iov_base = header_buf;
    1537          32 :     iov[0].iov_len = PT_MESSAGE_HEADER_SIZE;
    1538          32 :     iov[1].iov_base = payload_buf;
    1539          32 :     iov[1].iov_len = (size_t)payload_len;
    1540          32 :     iov[2].iov_base = crc_buf;
    1541          32 :     iov[2].iov_len = 2;
    1542             : 
    1543             :     /* Send with writev */
    1544          32 :     sent = writev(sock, iov, 3);
    1545             : 
    1546          32 :     if (sent < 0) {
    1547           0 :         if (errno == EWOULDBLOCK || errno == EAGAIN)
    1548           0 :             return PT_ERR_WOULD_BLOCK;
    1549           0 :         return PT_ERR_NETWORK;
    1550             :     }
    1551             : 
    1552          32 :     PT_CTX_INFO(ctx, PT_LOG_CAT_PROTOCOL,
    1553             :         "Sent capabilities to peer %u: max=%u chunk=%u",
    1554             :         peer->hot.id, caps.max_message_size, caps.preferred_chunk);
    1555             : 
    1556          32 :     return PT_OK;
    1557             : }
    1558             : 
    1559             : /* ========================================================================== */
    1560             : /* Statistics Helpers (Session 4.5)                                          */
    1561             : /* ========================================================================== */
    1562             : 
    1563             : /**
    1564             :  * Calculate connection quality score from latency
    1565             :  *
    1566             :  * Quality score (0-100) based on LAN latency thresholds:
    1567             :  * - < 5ms: 100 (excellent - typical wired LAN)
    1568             :  * - < 10ms: 90 (very good - good WiFi or loaded LAN)
    1569             :  * - < 20ms: 75 (good - congested WiFi or slower network)
    1570             :  * - < 50ms: 50 (fair - problematic connection for LAN)
    1571             :  * - >= 50ms: 25 (poor - very bad for LAN, investigate)
    1572             :  *
    1573             :  * Returns: Quality score (0-100)
    1574             :  */
    1575           0 : static uint8_t calculate_quality(uint16_t latency_ms) {
    1576           0 :     if (latency_ms < 5) {
    1577           0 :         return 100;
    1578           0 :     } else if (latency_ms < 10) {
    1579           0 :         return 90;
    1580           0 :     } else if (latency_ms < 20) {
    1581           0 :         return 75;
    1582           0 :     } else if (latency_ms < 50) {
    1583           0 :         return 50;
    1584             :     } else {
    1585           0 :         return 25;
    1586             :     }
    1587             : }
    1588             : 
    1589             : /**
    1590             :  * Update peer latency from PONG response
    1591             :  *
    1592             :  * Called when PONG message is received. Calculates RTT, updates rolling average,
    1593             :  * and calculates connection quality score.
    1594             :  */
    1595           0 : static void update_peer_latency(struct pt_context *ctx, struct pt_peer *peer) {
    1596             :     pt_tick_t now;
    1597             :     uint16_t rtt;
    1598             : 
    1599           0 :     if (peer->cold.ping_sent_time == 0) {
    1600           0 :         return;  /* No pending ping */
    1601             :     }
    1602             : 
    1603           0 :     now = ctx->plat->get_ticks();
    1604           0 :     rtt = (uint16_t)(now - peer->cold.ping_sent_time);
    1605           0 :     peer->cold.ping_sent_time = 0;
    1606             : 
    1607             :     /* Rolling average: new = (old * 3 + sample) / 4 */
    1608           0 :     if (peer->cold.stats.latency_ms == 0) {
    1609           0 :         peer->cold.stats.latency_ms = rtt;
    1610             :     } else {
    1611           0 :         peer->cold.stats.latency_ms = (peer->cold.stats.latency_ms * 3 + rtt) / 4;
    1612             :     }
    1613             : 
    1614             :     /* Update hot latency for adaptive tuning */
    1615           0 :     peer->hot.latency_ms = peer->cold.stats.latency_ms;
    1616             : 
    1617             :     /* Update adaptive chunk size and pipeline depth based on RTT */
    1618           0 :     pt_peer_update_adaptive_params(ctx, peer);
    1619             : 
    1620             :     /* Calculate quality */
    1621           0 :     peer->cold.stats.quality = calculate_quality(peer->cold.stats.latency_ms);
    1622             : 
    1623           0 :     PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
    1624             :                  "Peer %u latency: %u ms (quality: %u%%)",
    1625             :                  peer->hot.id, peer->cold.stats.latency_ms, peer->cold.stats.quality);
    1626             : }
    1627             : 
    1628             : /* ========================================================================== */
    1629             : /* TCP Receive State Machine (Session 4.3)                                   */
    1630             : /* ========================================================================== */
    1631             : 
    1632             : /**
    1633             :  * Reset receive buffer to initial state
    1634             :  *
    1635             :  * @param buf Receive buffer to reset
    1636             :  */
    1637          41 : static void pt_recv_reset(pt_recv_buffer *buf) {
    1638          41 :     buf->hot.state = PT_RECV_HEADER;
    1639          41 :     buf->hot.bytes_needed = PT_COMPACT_HEADER_SIZE;  /* Start with minimum for format detection */
    1640          41 :     buf->hot.bytes_received = 0;
    1641          41 :     buf->hot.is_compact = 0;
    1642          41 : }
    1643             : 
    1644             : /**
    1645             :  * Receive header bytes
    1646             :  *
    1647             :  * Accumulates header bytes until complete, then transitions to payload or CRC state.
    1648             :  *
    1649             :  * @param ctx PeerTalk context
    1650             :  * @param peer Target peer
    1651             :  * @param buf Receive buffer
    1652             :  * @param sock TCP socket
    1653             :  * @return 1 = header complete, 0 = waiting for more data, -1 = error
    1654             :  */
    1655          80 : static int pt_recv_header(struct pt_context *ctx, struct pt_peer *peer,
    1656             :                            pt_recv_buffer *buf, int sock) {
    1657             :     ssize_t ret;
    1658             :     size_t remaining;
    1659             : 
    1660          80 :     remaining = buf->hot.bytes_needed - buf->hot.bytes_received;
    1661             : 
    1662          80 :     ret = recv(sock, buf->cold.header_buf + buf->hot.bytes_received,
    1663             :                remaining, 0);
    1664             : 
    1665          80 :     if (ret < 0) {
    1666           7 :         if (errno == EWOULDBLOCK || errno == EAGAIN)
    1667           0 :             return 0;  /* No data yet */
    1668           7 :         PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
    1669             :             "Header recv error for peer %u: %s", peer->hot.id, strerror(errno));
    1670           7 :         return -1;
    1671             :     }
    1672             : 
    1673          73 :     if (ret == 0) {
    1674           0 :         PT_CTX_INFO(ctx, PT_LOG_CAT_PROTOCOL,
    1675             :             "Peer %u closed connection during header", peer->hot.id);
    1676           0 :         return -1;
    1677             :     }
    1678             : 
    1679          73 :     buf->hot.bytes_received += ret;
    1680             : 
    1681          73 :     if (buf->hot.bytes_received >= buf->hot.bytes_needed) {
    1682             :         /* Check if we've received enough to detect header format */
    1683          73 :         if (buf->hot.bytes_needed == PT_COMPACT_HEADER_SIZE) {
    1684             :             /* First 4 bytes received - detect format */
    1685          45 :             PT_CTX_INFO(ctx, PT_LOG_CAT_PROTOCOL,
    1686             :                 "Header bytes [%02X %02X %02X %02X], received=%u",
    1687             :                 buf->cold.header_buf[0], buf->cold.header_buf[1],
    1688             :                 buf->cold.header_buf[2], buf->cold.header_buf[3],
    1689             :                 buf->hot.bytes_received);
    1690             : 
    1691          45 :             if (pt_message_is_compact(buf->cold.header_buf, buf->hot.bytes_received)) {
    1692             :                 /* Compact header (4 bytes, no CRC) */
    1693             :                 pt_compact_header compact_hdr;
    1694          14 :                 if (pt_message_decode_compact(buf->cold.header_buf,
    1695          14 :                                               buf->hot.bytes_received,
    1696             :                                               &compact_hdr) != PT_OK) {
    1697           0 :                     PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
    1698             :                         "Invalid compact header from peer %u", peer->hot.id);
    1699           0 :                     return -1;
    1700             :                 }
    1701             : 
    1702             :                 /* Convert to standard header struct */
    1703          14 :                 buf->cold.hdr.version = PT_PROTOCOL_VERSION;
    1704          14 :                 buf->cold.hdr.type = compact_hdr.type;
    1705          14 :                 buf->cold.hdr.flags = compact_hdr.flags;
    1706          14 :                 buf->cold.hdr.sequence = 0;  /* Compact has no sequence */
    1707          14 :                 buf->cold.hdr.payload_len = compact_hdr.payload_len;
    1708          14 :                 buf->hot.is_compact = 1;
    1709             : 
    1710             :                 /* Validate payload size */
    1711          14 :                 if (buf->cold.hdr.payload_len > PT_MAX_MESSAGE_SIZE) {
    1712           0 :                     PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
    1713             :                         "Payload too large from peer %u: %u bytes (max %d)",
    1714             :                         peer->hot.id, buf->cold.hdr.payload_len, PT_MAX_MESSAGE_SIZE);
    1715           0 :                     return -1;
    1716             :                 }
    1717             : 
    1718             :                 /* Transition to payload (skip CRC for compact) */
    1719          14 :                 if (buf->cold.hdr.payload_len > 0) {
    1720          14 :                     buf->hot.state = PT_RECV_PAYLOAD;
    1721          14 :                     buf->hot.bytes_needed = buf->cold.hdr.payload_len;
    1722          14 :                     buf->hot.bytes_received = 0;
    1723             :                 } else {
    1724             :                     /* No payload and no CRC - message complete */
    1725           0 :                     return 2;  /* Signal message complete (no CRC needed) */
    1726             :                 }
    1727             : 
    1728          14 :                 return 1;  /* Header complete */
    1729             :             } else {
    1730             :                 /* Full header - need 6 more bytes total (10 bytes header) */
    1731          31 :                 buf->hot.bytes_needed = PT_MESSAGE_HEADER_SIZE;
    1732          31 :                 buf->hot.is_compact = 0;
    1733             : 
    1734             :                 /* Check if we already received enough bytes in first recv.
    1735             :                  * This can happen when all header bytes arrive together.
    1736             :                  * If so, fall through to decode. Otherwise continue receiving. */
    1737          31 :                 if (buf->hot.bytes_received < PT_MESSAGE_HEADER_SIZE) {
    1738          31 :                     return 0;  /* Continue receiving more header bytes */
    1739             :                 }
    1740             :                 /* Fall through to decode full header */
    1741             :             }
    1742             :         }
    1743             : 
    1744             :         /* Full header (10 bytes) - decode it */
    1745          28 :         if (pt_message_decode_header(ctx, buf->cold.header_buf,
    1746             :                                       PT_MESSAGE_HEADER_SIZE,
    1747             :                                       &buf->cold.hdr) != PT_OK) {
    1748           1 :             PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
    1749             :                 "Invalid message header from peer %u", peer->hot.id);
    1750           1 :             return -1;
    1751             :         }
    1752             : 
    1753             :         /* Validate payload size */
    1754          27 :         if (buf->cold.hdr.payload_len > PT_MAX_MESSAGE_SIZE) {
    1755           0 :             PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
    1756             :                 "Payload too large from peer %u: %u bytes (max %d)",
    1757             :                 peer->hot.id, buf->cold.hdr.payload_len, PT_MAX_MESSAGE_SIZE);
    1758           0 :             return -1;
    1759             :         }
    1760             : 
    1761             :         /* Transition to next state */
    1762          27 :         if (buf->cold.hdr.payload_len > 0) {
    1763          27 :             buf->hot.state = PT_RECV_PAYLOAD;
    1764          27 :             buf->hot.bytes_needed = buf->cold.hdr.payload_len;
    1765          27 :             buf->hot.bytes_received = 0;
    1766             :         } else {
    1767             :             /* No payload - go straight to CRC */
    1768           0 :             buf->hot.state = PT_RECV_CRC;
    1769           0 :             buf->hot.bytes_needed = 2;
    1770           0 :             buf->hot.bytes_received = 0;
    1771             :         }
    1772             : 
    1773          27 :         return 1;  /* Header complete */
    1774             :     }
    1775             : 
    1776           0 :     return 0;  /* Waiting for more header bytes */
    1777             : }
    1778             : 
    1779             : /**
    1780             :  * Receive payload bytes
    1781             :  *
    1782             :  * @return 1 = payload complete, 0 = waiting for more data, -1 = error
    1783             :  */
    1784          42 : static int pt_recv_payload(struct pt_context *ctx, struct pt_peer *peer,
    1785             :                              pt_recv_buffer *buf, int sock) {
    1786             :     ssize_t ret;
    1787             :     size_t remaining;
    1788             : 
    1789          42 :     remaining = buf->hot.bytes_needed - buf->hot.bytes_received;
    1790             : 
    1791          42 :     ret = recv(sock, buf->cold.payload_buf + buf->hot.bytes_received,
    1792             :                remaining, 0);
    1793             : 
    1794          42 :     if (ret < 0) {
    1795           0 :         if (errno == EWOULDBLOCK || errno == EAGAIN)
    1796           0 :             return 0;
    1797           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
    1798             :             "Payload recv error for peer %u: %s", peer->hot.id, strerror(errno));
    1799           0 :         return -1;
    1800             :     }
    1801             : 
    1802          42 :     if (ret == 0) {
    1803           0 :         PT_CTX_INFO(ctx, PT_LOG_CAT_PROTOCOL,
    1804             :             "Peer %u closed connection during payload", peer->hot.id);
    1805           0 :         return -1;
    1806             :     }
    1807             : 
    1808          42 :     buf->hot.bytes_received += ret;
    1809             : 
    1810          42 :     if (buf->hot.bytes_received >= buf->hot.bytes_needed) {
    1811          41 :         if (buf->hot.is_compact) {
    1812             :             /* Compact header - no CRC, message complete */
    1813          14 :             return 2;  /* Signal message complete (no CRC needed) */
    1814             :         }
    1815             :         /* Full header - move to CRC */
    1816          27 :         buf->hot.state = PT_RECV_CRC;
    1817          27 :         buf->hot.bytes_needed = 2;
    1818          27 :         buf->hot.bytes_received = 0;
    1819          27 :         return 1;
    1820             :     }
    1821             : 
    1822           1 :     return 0;  /* Waiting for more payload bytes */
    1823             : }
    1824             : 
    1825             : /**
    1826             :  * Receive CRC bytes
    1827             :  *
    1828             :  * @return 1 = CRC complete, 0 = waiting for more data, -1 = error
    1829             :  */
    1830          27 : static int pt_recv_crc(struct pt_context *ctx, struct pt_peer *peer,
    1831             :                         pt_recv_buffer *buf, int sock) {
    1832             :     ssize_t ret;
    1833             :     size_t remaining;
    1834             : 
    1835          27 :     remaining = buf->hot.bytes_needed - buf->hot.bytes_received;
    1836             : 
    1837          27 :     ret = recv(sock, buf->cold.crc_buf + buf->hot.bytes_received,
    1838             :                remaining, 0);
    1839             : 
    1840          27 :     if (ret < 0) {
    1841           0 :         if (errno == EWOULDBLOCK || errno == EAGAIN)
    1842           0 :             return 0;
    1843           0 :         PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
    1844             :             "CRC recv error for peer %u: %s", peer->hot.id, strerror(errno));
    1845           0 :         return -1;
    1846             :     }
    1847             : 
    1848          27 :     if (ret == 0) {
    1849           0 :         PT_CTX_INFO(ctx, PT_LOG_CAT_PROTOCOL,
    1850             :             "Peer %u closed connection during CRC", peer->hot.id);
    1851           0 :         return -1;
    1852             :     }
    1853             : 
    1854          27 :     buf->hot.bytes_received += ret;
    1855             : 
    1856          27 :     if (buf->hot.bytes_received >= buf->hot.bytes_needed) {
    1857             :         /* CRC complete - message fully received */
    1858          27 :         return 1;
    1859             :     }
    1860             : 
    1861           0 :     return 0;  /* Waiting for more CRC bytes */
    1862             : }
    1863             : 
    1864             : /**
    1865             :  * Process complete message after CRC validation
    1866             :  *
    1867             :  * Verifies CRC (for full headers only), updates statistics, and dispatches by message type.
    1868             :  * Compact headers have no CRC - skip validation for those.
    1869             :  *
    1870             :  * @return 0 on success, -1 on error or disconnect
    1871             :  */
    1872          41 : static int pt_recv_process_message(struct pt_context *ctx, struct pt_peer *peer,
    1873             :                                      pt_recv_buffer *buf) {
    1874             :     uint16_t header_size;
    1875             : 
    1876             :     /* Compact headers have no CRC - skip validation */
    1877          41 :     if (!buf->hot.is_compact) {
    1878             :         uint16_t expected_crc;
    1879             :         uint16_t received_crc;
    1880             : 
    1881             :         /* Calculate expected CRC */
    1882          27 :         expected_crc = pt_crc16(buf->cold.header_buf, PT_MESSAGE_HEADER_SIZE);
    1883          27 :         if (buf->cold.hdr.payload_len > 0) {
    1884          27 :             expected_crc = pt_crc16_update(expected_crc, buf->cold.payload_buf,
    1885          27 :                                             buf->cold.hdr.payload_len);
    1886             :         }
    1887             : 
    1888             :         /* Extract received CRC */
    1889          27 :         received_crc = ((uint16_t)buf->cold.crc_buf[0] << 8) | buf->cold.crc_buf[1];
    1890             : 
    1891          27 :         if (expected_crc != received_crc) {
    1892           1 :             PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
    1893             :                 "CRC mismatch from peer %u: expected 0x%04X, got 0x%04X",
    1894             :                 peer->hot.id, expected_crc, received_crc);
    1895           1 :             return -1;
    1896             :         }
    1897             :     }
    1898             : 
    1899             :     /* Debug: Log what we're processing */
    1900          40 :     PT_CTX_INFO(ctx, PT_LOG_CAT_PROTOCOL,
    1901             :         "Processing msg from peer %u: type=%u flags=0x%02X len=%u compact=%u",
    1902             :         peer->hot.id, buf->cold.hdr.type, buf->cold.hdr.flags,
    1903             :         buf->cold.hdr.payload_len, buf->hot.is_compact);
    1904             : 
    1905             :     /* Update statistics - use correct header size */
    1906          40 :     header_size = buf->hot.is_compact ? PT_COMPACT_HEADER_SIZE : PT_MESSAGE_HEADER_SIZE;
    1907          80 :     peer->cold.stats.bytes_received += header_size +
    1908          80 :                                         buf->cold.hdr.payload_len +
    1909          40 :                                         (buf->hot.is_compact ? 0 : 2);
    1910          40 :     peer->cold.stats.messages_received++;
    1911          80 :     ctx->global_stats.total_bytes_received += header_size +
    1912          80 :                                                buf->cold.hdr.payload_len +
    1913          40 :                                                (buf->hot.is_compact ? 0 : 2);
    1914          40 :     ctx->global_stats.total_messages_received++;
    1915             : 
    1916             :     /* Dispatch by message type */
    1917          40 :     switch (buf->cold.hdr.type) {
    1918          23 :         case PT_MSG_TYPE_DATA:
    1919             :             /* Check for fragmented message */
    1920          23 :             if (buf->cold.hdr.flags & PT_MSG_FLAG_FRAGMENT) {
    1921             :                 /* Fragment - process through reassembly */
    1922             :                 pt_fragment_header frag_hdr;
    1923           0 :                 const uint8_t *complete_data = NULL;
    1924           0 :                 uint16_t complete_len = 0;
    1925             :                 int reassembly_result;
    1926             : 
    1927             :                 /* Decode fragment header from payload */
    1928           0 :                 if (pt_fragment_decode(buf->cold.payload_buf,
    1929           0 :                                        buf->cold.hdr.payload_len,
    1930             :                                        &frag_hdr) != 0) {
    1931           0 :                     PT_CTX_WARN(ctx, PT_LOG_CAT_PROTOCOL,
    1932             :                         "Failed to decode fragment header from peer %u",
    1933             :                         peer->hot.id);
    1934           0 :                     break;
    1935             :                 }
    1936             : 
    1937             :                 /* Process fragment through reassembly */
    1938           0 :                 reassembly_result = pt_reassembly_process(ctx, peer,
    1939           0 :                     buf->cold.payload_buf, buf->cold.hdr.payload_len,
    1940             :                     &frag_hdr, &complete_data, &complete_len);
    1941             : 
    1942           0 :                 if (reassembly_result == 1 && complete_data != NULL) {
    1943             :                     /* Complete message reassembled - deliver to app callback */
    1944           0 :                     if (ctx->callbacks.on_message_received) {
    1945           0 :                         ctx->callbacks.on_message_received((PeerTalk_Context *)ctx,
    1946           0 :                             peer->hot.id,
    1947             :                             complete_data,
    1948             :                             complete_len,
    1949             :                             ctx->callbacks.user_data);
    1950             :                     }
    1951           0 :                 } else if (reassembly_result < 0) {
    1952           0 :                     PT_CTX_WARN(ctx, PT_LOG_CAT_PROTOCOL,
    1953             :                         "Fragment reassembly error: %d", reassembly_result);
    1954             :                 }
    1955             :                 /* If 0, more fragments expected - nothing to do yet */
    1956             :             } else {
    1957             :                 /* Non-fragmented - fire callback directly */
    1958          23 :                 if (ctx->callbacks.on_message_received) {
    1959          22 :                     ctx->callbacks.on_message_received((PeerTalk_Context *)ctx,
    1960          22 :                         peer->hot.id,
    1961          22 :                         buf->cold.payload_buf,
    1962          22 :                         buf->cold.hdr.payload_len,
    1963             :                         ctx->callbacks.user_data);
    1964             :                 }
    1965             :             }
    1966          23 :             break;
    1967             : 
    1968           0 :         case PT_MSG_TYPE_PING:
    1969             :             /* Send PONG response */
    1970           0 :             pt_posix_send_control(ctx, peer, PT_MSG_TYPE_PONG);
    1971           0 :             PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
    1972             :                 "Received PING from peer %u, sent PONG", peer->hot.id);
    1973           0 :             break;
    1974             : 
    1975           0 :         case PT_MSG_TYPE_PONG:
    1976             :             /* Update latency with rolling average and quality calculation */
    1977           0 :             update_peer_latency(ctx, peer);
    1978           0 :             break;
    1979             : 
    1980           0 :         case PT_MSG_TYPE_DISCONNECT:
    1981           0 :             PT_CTX_INFO(ctx, PT_LOG_CAT_PROTOCOL,
    1982             :                 "Received DISCONNECT from peer %u", peer->hot.id);
    1983           0 :             return -1;  /* Trigger disconnect */
    1984             : 
    1985           0 :         case PT_MSG_TYPE_ACK:
    1986             :             /* Handle reliable message ACK - TODO in future session */
    1987           0 :             PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
    1988             :                 "Received ACK from peer %u", peer->hot.id);
    1989           0 :             break;
    1990             : 
    1991          17 :         case PT_MSG_TYPE_CAPABILITY:
    1992             :             /* Process capability message */
    1993             :             {
    1994             :                 pt_capability_msg caps;
    1995             :                 uint16_t effective_max;
    1996             : 
    1997          17 :                 if (pt_capability_decode(ctx, buf->cold.payload_buf,
    1998          17 :                                          buf->cold.hdr.payload_len, &caps) == 0) {
    1999             :                     /* Store peer capabilities */
    2000          17 :                     peer->cold.caps.max_message_size = caps.max_message_size;
    2001          17 :                     peer->cold.caps.preferred_chunk = caps.preferred_chunk;
    2002          17 :                     peer->cold.caps.capability_flags = caps.capability_flags;
    2003          17 :                     peer->cold.caps.buffer_pressure = caps.buffer_pressure;
    2004          17 :                     peer->cold.caps.caps_exchanged = 1;
    2005             : 
    2006             :                     /* Store receive buffer size and optimal chunk for tuning */
    2007          17 :                     peer->cold.caps.recv_buffer_size = caps.recv_buffer_size;
    2008          17 :                     peer->cold.caps.optimal_chunk = caps.optimal_chunk;
    2009             : 
    2010             :                     /* Calculate flow control window based on peer's buffer capacity
    2011             :                      *
    2012             :                      * Window = peer_recv_buffer / our_max_message
    2013             :                      * This limits how many messages we queue to avoid flooding.
    2014             :                      */
    2015             :                     {
    2016             :                         uint16_t window;
    2017          17 :                         uint16_t our_max = ctx->local_max_message;
    2018          17 :                         uint16_t peer_buf = caps.recv_buffer_size;
    2019             : 
    2020          17 :                         if (peer_buf > 0 && our_max > 0) {
    2021          17 :                             window = peer_buf / our_max;
    2022             :                         } else {
    2023           0 :                             window = PT_FLOW_WINDOW_DEFAULT;
    2024             :                         }
    2025             : 
    2026             :                         /* Clamp to min/max */
    2027          17 :                         if (window < PT_FLOW_WINDOW_MIN) {
    2028           0 :                             window = PT_FLOW_WINDOW_MIN;
    2029             :                         }
    2030          17 :                         if (window > PT_FLOW_WINDOW_MAX) {
    2031           1 :                             window = PT_FLOW_WINDOW_MAX;
    2032             :                         }
    2033             : 
    2034          17 :                         peer->cold.caps.send_window = window;
    2035             :                     }
    2036             : 
    2037             :                     /* Negotiate compact header mode - both must support it */
    2038          17 :                     if ((caps.capability_flags & PT_CAPFLAG_COMPACT_HEADER) &&
    2039          17 :                         (ctx->local_capability_flags & PT_CAPFLAG_COMPACT_HEADER)) {
    2040          17 :                         peer->cold.caps.compact_mode = 1;
    2041             :                     } else {
    2042           0 :                         peer->cold.caps.compact_mode = 0;
    2043             :                     }
    2044             : 
    2045             :                     /* Check if peer needs push for performance */
    2046          17 :                     peer->cold.caps.push_preferred =
    2047          17 :                         (caps.capability_flags & PT_CAPFLAG_PUSH_PREFERRED) ? 1 : 0;
    2048             : 
    2049             :                     /* Calculate effective max = min(ours, theirs) */
    2050          17 :                     effective_max = ctx->local_max_message;
    2051          17 :                     if (caps.max_message_size < effective_max) {
    2052           1 :                         effective_max = caps.max_message_size;
    2053             :                     }
    2054          17 :                     peer->hot.effective_max_msg = effective_max;
    2055             : 
    2056          17 :                     PT_CTX_INFO(ctx, PT_LOG_CAT_PROTOCOL,
    2057             :                         "Received capabilities from peer %u: max=%u chunk=%u pressure=%u compact=%u recv_buf=%u optimal=%u push=%u",
    2058             :                         peer->hot.id, caps.max_message_size, caps.preferred_chunk,
    2059             :                         caps.buffer_pressure, peer->cold.caps.compact_mode,
    2060             :                         caps.recv_buffer_size, caps.optimal_chunk, peer->cold.caps.push_preferred);
    2061             :                 } else {
    2062           0 :                     PT_CTX_WARN(ctx, PT_LOG_CAT_PROTOCOL,
    2063             :                         "Failed to decode capabilities from peer %u", peer->hot.id);
    2064             :                 }
    2065             :             }
    2066          17 :             break;
    2067             : 
    2068           0 :         default:
    2069           0 :             PT_CTX_WARN(ctx, PT_LOG_CAT_PROTOCOL,
    2070             :                 "Unknown message type %u from peer %u",
    2071             :                 buf->cold.hdr.type, peer->hot.id);
    2072           0 :             break;
    2073             :     }
    2074             : 
    2075          40 :     return 0;
    2076             : }
    2077             : 
    2078             : /**
    2079             :  * Main receive function - drives state machine
    2080             :  *
    2081             :  * Non-blocking receive with state machine for partial reads.
    2082             :  *
    2083             :  * @param ctx PeerTalk context
    2084             :  * @param peer Target peer
    2085             :  * @return 0 on success, -1 on error or disconnect
    2086             :  */
    2087          81 : int pt_posix_recv(struct pt_context *ctx, struct pt_peer *peer) {
    2088          81 :     pt_posix_data *pd = pt_posix_get(ctx);
    2089             :     pt_recv_buffer *buf;
    2090             :     int peer_idx;
    2091             :     int sock;
    2092             :     int ret;
    2093             : 
    2094          81 :     if (!peer || peer->hot.magic != PT_PEER_MAGIC)
    2095           0 :         return -1;
    2096             : 
    2097          81 :     peer_idx = peer->hot.id - 1;
    2098          81 :     sock = pd->tcp_socks[peer_idx];
    2099          81 :     buf = &pd->recv_bufs[peer_idx];
    2100             : 
    2101          81 :     if (sock < 0)
    2102           0 :         return -1;
    2103             : 
    2104             :     /* Drive state machine */
    2105             :     while (1) {
    2106         149 :         switch (buf->hot.state) {
    2107          80 :             case PT_RECV_HEADER:
    2108          80 :                 ret = pt_recv_header(ctx, peer, buf, sock);
    2109          80 :                 if (ret <= 0) return ret;
    2110          41 :                 if (ret == 2) {
    2111             :                     /* Compact header with no payload - message complete */
    2112           0 :                     ret = pt_recv_process_message(ctx, peer, buf);
    2113           0 :                     pt_recv_reset(buf);
    2114           0 :                     return ret;
    2115             :                 }
    2116          41 :                 break;
    2117             : 
    2118          42 :             case PT_RECV_PAYLOAD:
    2119          42 :                 ret = pt_recv_payload(ctx, peer, buf, sock);
    2120          42 :                 if (ret <= 0) return ret;
    2121          41 :                 if (ret == 2) {
    2122             :                     /* Compact header - skip CRC, message complete */
    2123          14 :                     ret = pt_recv_process_message(ctx, peer, buf);
    2124          14 :                     pt_recv_reset(buf);
    2125          14 :                     return ret;
    2126             :                 }
    2127          27 :                 break;
    2128             : 
    2129          27 :             case PT_RECV_CRC:
    2130          27 :                 ret = pt_recv_crc(ctx, peer, buf, sock);
    2131          27 :                 if (ret <= 0) return ret;
    2132             : 
    2133             :                 /* Message complete - process it */
    2134          27 :                 ret = pt_recv_process_message(ctx, peer, buf);
    2135          27 :                 pt_recv_reset(buf);  /* Reset for next message */
    2136          27 :                 return ret;
    2137             : 
    2138           0 :             default:
    2139           0 :                 PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
    2140             :                     "Invalid recv state %u for peer %u",
    2141             :                     buf->hot.state, peer->hot.id);
    2142           0 :                 pt_recv_reset(buf);
    2143           0 :                 return -1;
    2144             :         }
    2145             :     }
    2146             : }
    2147             : 
    2148             : /* ========================================================================== */
    2149             : /* UDP Messaging (Session 4.4 - Stub for now)                                */
    2150             : /* ========================================================================== */
    2151             : 
    2152             : /**
    2153             :  * Initialize UDP messaging socket
    2154             :  *
    2155             :  * Creates dedicated UDP socket for unreliable messaging (separate from discovery).
    2156             :  * Binds to DEFAULT_UDP_PORT (7355) or user-configured port.
    2157             :  *
    2158             :  * Returns: 0 on success, -1 on failure
    2159             :  */
    2160         230 : int pt_posix_udp_init(struct pt_context *ctx) {
    2161         230 :     pt_posix_data *pd = pt_posix_get(ctx);
    2162             :     struct sockaddr_in addr;
    2163             :     int sock;
    2164         230 :     uint16_t port = UDP_PORT(ctx);
    2165             : 
    2166             :     /* Create UDP socket */
    2167         230 :     sock = socket(AF_INET, SOCK_DGRAM, 0);
    2168         230 :     if (sock < 0) {
    2169           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK,
    2170             :                     "Failed to create UDP messaging socket: %s", strerror(errno));
    2171           0 :         return -1;
    2172             :     }
    2173             : 
    2174             :     /* Set non-blocking */
    2175         230 :     if (set_nonblocking(ctx, sock) < 0) {
    2176           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK,
    2177             :                     "Failed to set UDP messaging socket non-blocking");
    2178           0 :         close(sock);
    2179           0 :         return -1;
    2180             :     }
    2181             : 
    2182             :     /* Set SO_REUSEADDR */
    2183         230 :     if (set_reuseaddr(ctx, sock) < 0) {
    2184           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK,
    2185             :                     "Failed to set SO_REUSEADDR on UDP messaging socket");
    2186           0 :         close(sock);
    2187           0 :         return -1;
    2188             :     }
    2189             : 
    2190             :     /* Bind to port */
    2191         230 :     memset(&addr, 0, sizeof(addr));
    2192         230 :     addr.sin_family = AF_INET;
    2193         230 :     addr.sin_addr.s_addr = INADDR_ANY;
    2194         230 :     addr.sin_port = htons(port);
    2195             : 
    2196         230 :     if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
    2197           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK,
    2198             :                     "Failed to bind UDP messaging socket to port %u: %s",
    2199             :                     port, strerror(errno));
    2200           0 :         close(sock);
    2201           0 :         return -1;
    2202             :     }
    2203             : 
    2204         230 :     pd->udp_msg_sock = sock;
    2205         230 :     pd->udp_msg_port = port;
    2206             : 
    2207             :     /* Update max_fd for select() */
    2208         230 :     if (sock > pd->max_fd) {
    2209         230 :         pd->max_fd = sock;
    2210             :     }
    2211         230 :     pd->fd_dirty = 1;  /* Rebuild fd_sets */
    2212             : 
    2213         230 :     PT_CTX_INFO(ctx, PT_LOG_CAT_NETWORK,
    2214             :                 "UDP messaging socket initialized on port %u", port);
    2215         230 :     return 0;
    2216             : }
    2217             : 
    2218             : /**
    2219             :  * Shutdown UDP messaging socket
    2220             :  *
    2221             :  * Closes dedicated UDP messaging socket and cleans up resources.
    2222             :  */
    2223           0 : void pt_posix_udp_shutdown(struct pt_context *ctx) {
    2224           0 :     pt_posix_data *pd = pt_posix_get(ctx);
    2225             : 
    2226           0 :     if (pd->udp_msg_sock >= 0) {
    2227           0 :         close(pd->udp_msg_sock);
    2228           0 :         pd->udp_msg_sock = -1;
    2229           0 :         pd->fd_dirty = 1;
    2230           0 :         PT_CTX_DEBUG(ctx, PT_LOG_CAT_NETWORK, "UDP messaging socket closed");
    2231             :     }
    2232           0 : }
    2233             : 
    2234             : /**
    2235             :  * Send UDP message to peer
    2236             :  *
    2237             :  * Sends unreliable UDP datagram with 8-byte header (magic, sender_port, payload_len).
    2238             :  * No CRC - UDP has its own checksum at transport layer.
    2239             :  *
    2240             :  * Returns: PT_OK on success, PT_ERR_* on failure
    2241             :  */
    2242           1 : int pt_posix_send_udp(struct pt_context *ctx, struct pt_peer *peer,
    2243             :                       const void *data, uint16_t len) {
    2244           1 :     pt_posix_data *pd = pt_posix_get(ctx);
    2245             :     uint8_t packet_buf[PT_MAX_UDP_MESSAGE_SIZE];
    2246             :     struct sockaddr_in dest_addr;
    2247             :     int packet_len;
    2248             :     ssize_t sent;
    2249             : 
    2250             :     /* Validate socket */
    2251           1 :     if (pd->udp_msg_sock < 0) {
    2252           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK, "UDP messaging socket not initialized");
    2253           0 :         return PT_ERR_NOT_INITIALIZED;
    2254             :     }
    2255             : 
    2256             :     /* Validate peer state */
    2257           1 :     if (peer->hot.state == PT_PEER_UNUSED) {
    2258           0 :         PT_CTX_DEBUG(ctx, PT_LOG_CAT_NETWORK,
    2259             :                      "Attempted to send UDP to peer %u (not connected)", peer->hot.id);
    2260           0 :         return PT_ERR_PEER_NOT_FOUND;
    2261             :     }
    2262             : 
    2263             :     /* Validate message size (PT_MAX_UDP_MESSAGE_SIZE minus 8-byte header) */
    2264           1 :     if (len > PT_MAX_UDP_MESSAGE_SIZE - 8) {
    2265           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK,
    2266             :                     "UDP message too large: %u bytes (max %u)",
    2267             :                     len, PT_MAX_UDP_MESSAGE_SIZE - 8);
    2268           0 :         return PT_ERR_MESSAGE_TOO_LARGE;
    2269             :     }
    2270             : 
    2271             :     /* Encode UDP packet */
    2272           1 :     packet_len = pt_udp_encode(data, len, pd->udp_msg_port,
    2273             :                                packet_buf, sizeof(packet_buf));
    2274           1 :     if (packet_len < 0) {
    2275           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK,
    2276             :                     "Failed to encode UDP packet: %d", packet_len);
    2277           0 :         return packet_len;
    2278             :     }
    2279             : 
    2280             :     /* Build destination address */
    2281           1 :     memset(&dest_addr, 0, sizeof(dest_addr));
    2282           1 :     dest_addr.sin_family = AF_INET;
    2283           1 :     dest_addr.sin_addr.s_addr = htonl(peer->cold.addresses[0].address);
    2284           1 :     dest_addr.sin_port = htons(pd->udp_msg_port);  /* Use same UDP port */
    2285             : 
    2286             :     /* Send datagram */
    2287           1 :     sent = sendto(pd->udp_msg_sock, packet_buf, packet_len, 0,
    2288             :                   (struct sockaddr *)&dest_addr, sizeof(dest_addr));
    2289             : 
    2290           1 :     if (sent < 0) {
    2291           0 :         if (errno == EWOULDBLOCK || errno == EAGAIN) {
    2292           0 :             PT_CTX_DEBUG(ctx, PT_LOG_CAT_NETWORK, "UDP send would block");
    2293           0 :             return PT_ERR_WOULD_BLOCK;
    2294             :         }
    2295           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK,
    2296             :                     "UDP sendto failed: %s", strerror(errno));
    2297           0 :         return PT_ERR_NETWORK;
    2298             :     }
    2299             : 
    2300             :     /* Update statistics */
    2301           1 :     peer->cold.stats.bytes_sent += sent;
    2302           1 :     peer->cold.stats.messages_sent++;
    2303           1 :     ctx->global_stats.total_bytes_sent += sent;
    2304           1 :     ctx->global_stats.total_messages_sent++;
    2305             : 
    2306           1 :     PT_CTX_DEBUG(ctx, PT_LOG_CAT_NETWORK,
    2307             :                  "Sent UDP message to peer %u (%u bytes payload, %zd bytes total)",
    2308             :                  peer->hot.id, len, sent);
    2309             : 
    2310           1 :     return PT_OK;
    2311             : }
    2312             : 
    2313             : /**
    2314             :  * Receive UDP messages from any peer
    2315             :  *
    2316             :  * Non-blocking recvfrom() on UDP messaging socket. Validates magic "PTUD",
    2317             :  * finds peer by source IP, and fires on_message_received callback.
    2318             :  *
    2319             :  * Returns: 1 if message received, 0 if no data, -1 on error
    2320             :  */
    2321           1 : int pt_posix_recv_udp(struct pt_context *ctx) {
    2322           1 :     pt_posix_data *pd = pt_posix_get(ctx);
    2323             :     uint8_t packet_buf[PT_MAX_UDP_MESSAGE_SIZE];
    2324             :     struct sockaddr_in from_addr;
    2325           1 :     socklen_t from_len = sizeof(from_addr);
    2326             :     ssize_t received;
    2327             :     uint16_t sender_port;
    2328             :     const void *payload;
    2329             :     uint16_t payload_len;
    2330             :     uint32_t sender_ip;
    2331             :     struct pt_peer *peer;
    2332             :     int ret;
    2333             : 
    2334             :     /* Check socket */
    2335           1 :     if (pd->udp_msg_sock < 0) {
    2336           0 :         return 0;  /* Not initialized yet */
    2337             :     }
    2338             : 
    2339             :     /* Non-blocking receive */
    2340           1 :     received = recvfrom(pd->udp_msg_sock, packet_buf, sizeof(packet_buf), 0,
    2341             :                         (struct sockaddr *)&from_addr, &from_len);
    2342             : 
    2343           1 :     if (received < 0) {
    2344           0 :         if (errno == EWOULDBLOCK || errno == EAGAIN) {
    2345           0 :             return 0;  /* No data available */
    2346             :         }
    2347           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK,
    2348             :                     "UDP recvfrom failed: %s", strerror(errno));
    2349           0 :         return -1;
    2350             :     }
    2351             : 
    2352             :     /* Decode packet */
    2353           1 :     ret = pt_udp_decode(ctx, packet_buf, received, &sender_port, &payload, &payload_len);
    2354           1 :     if (ret < 0) {
    2355           0 :         PT_CTX_DEBUG(ctx, PT_LOG_CAT_NETWORK,
    2356             :                      "Failed to decode UDP packet: %d", ret);
    2357           0 :         return 0;  /* Ignore malformed packets */
    2358             :     }
    2359             : 
    2360             :     /* Find peer by source IP */
    2361           1 :     sender_ip = ntohl(from_addr.sin_addr.s_addr);
    2362           1 :     peer = pt_peer_find_by_addr(ctx, sender_ip, 0);  /* Match any port */
    2363             : 
    2364           1 :     if (!peer) {
    2365           1 :         PT_CTX_DEBUG(ctx, PT_LOG_CAT_NETWORK,
    2366             :                      "Received UDP from unknown peer %u.%u.%u.%u",
    2367             :                      (sender_ip >> 24) & 0xFF,
    2368             :                      (sender_ip >> 16) & 0xFF,
    2369             :                      (sender_ip >> 8) & 0xFF,
    2370             :                      sender_ip & 0xFF);
    2371           1 :         return 0;  /* Ignore unknown peers */
    2372             :     }
    2373             : 
    2374             :     /* Update peer statistics and timestamp */
    2375           0 :     peer->cold.stats.bytes_received += received;
    2376           0 :     peer->cold.stats.messages_received++;
    2377           0 :     peer->hot.last_seen = ctx->plat->get_ticks();
    2378             : 
    2379             :     /* Update global statistics */
    2380           0 :     ctx->global_stats.total_bytes_received += received;
    2381           0 :     ctx->global_stats.total_messages_received++;
    2382             : 
    2383           0 :     PT_CTX_DEBUG(ctx, PT_LOG_CAT_NETWORK,
    2384             :                  "Received UDP from peer %u (%u bytes payload, %zd bytes total)",
    2385             :                  peer->hot.id, payload_len, received);
    2386             : 
    2387             :     /* Fire callback */
    2388           0 :     if (ctx->callbacks.on_message_received) {
    2389           0 :         ctx->callbacks.on_message_received((PeerTalk_Context *)ctx,
    2390           0 :                                           peer->hot.id,
    2391             :                                           payload,
    2392             :                                           payload_len,
    2393             :                                           ctx->callbacks.user_data);
    2394             :     }
    2395             : 
    2396           0 :     return 1;  /* Message processed */
    2397             : }
    2398             : 
    2399             : /* ========================================================================== */
    2400             : /* Connection Completion Checking (Session 4.2)                              */
    2401             : /* ========================================================================== */
    2402             : 
    2403             : /**
    2404             :  * Check for async connection completion
    2405             :  *
    2406             :  * NOTE: This function is no longer used - connection checking is now
    2407             :  * integrated into pt_posix_poll() main loop (Session 4.6).
    2408             :  * Kept for reference only.
    2409             :  *
    2410             :  * Polls sockets in CONNECTING state for writability (indicates connection complete).
    2411             :  * Uses select() with zero timeout for non-blocking check.
    2412             :  *
    2413             :  * Returns: Number of connections completed
    2414             :  */
    2415             : #if 0  /* Integrated into pt_posix_poll() in Session 4.6 */
    2416             : static int pt_posix_connect_poll(struct pt_context *ctx) {
    2417             :     pt_posix_data *pd = pt_posix_get(ctx);
    2418             :     struct pt_peer *peer;
    2419             :     fd_set write_fds;
    2420             :     struct timeval tv;
    2421             :     int completed = 0;
    2422             :     size_t i;
    2423             : 
    2424             :     FD_ZERO(&write_fds);
    2425             :     int max_fd = -1;
    2426             : 
    2427             :     /* Build fd_set of CONNECTING sockets */
    2428             :     for (i = 0; i < ctx->max_peers; i++) {
    2429             :         peer = &ctx->peers[i];
    2430             :         if (peer->hot.state == PT_PEER_CONNECTING) {
    2431             :             int sock = pd->tcp_socks[i];
    2432             :             if (sock >= 0) {
    2433             :                 FD_SET(sock, &write_fds);
    2434             :                 if (sock > max_fd)
    2435             :                     max_fd = sock;
    2436             :             }
    2437             :         }
    2438             :     }
    2439             : 
    2440             :     if (max_fd < 0)
    2441             :         return 0;  /* No connecting sockets */
    2442             : 
    2443             :     /* Zero timeout for non-blocking check */
    2444             :     tv.tv_sec = 0;
    2445             :     tv.tv_usec = 0;
    2446             : 
    2447             :     if (select(max_fd + 1, NULL, &write_fds, NULL, &tv) < 0) {
    2448             :         if (errno != EINTR) {
    2449             :             PT_CTX_WARN(ctx, PT_LOG_CAT_CONNECT,
    2450             :                 "Select failed in connect poll: %s", strerror(errno));
    2451             :         }
    2452             :         return 0;
    2453             :     }
    2454             : 
    2455             :     /* Check which sockets are now writable */
    2456             :     for (i = 0; i < ctx->max_peers; i++) {
    2457             :         peer = &ctx->peers[i];
    2458             :         if (peer->hot.state == PT_PEER_CONNECTING) {
    2459             :             int sock = pd->tcp_socks[i];
    2460             :             if (sock >= 0 && FD_ISSET(sock, &write_fds)) {
    2461             :                 /* Socket is writable - connection complete or failed */
    2462             :                 int error = 0;
    2463             :                 socklen_t len = sizeof(error);
    2464             : 
    2465             :                 if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
    2466             :                     PT_CTX_ERR(ctx, PT_LOG_CAT_CONNECT,
    2467             :                         "getsockopt failed for peer %u: %s",
    2468             :                         peer->hot.id, strerror(errno));
    2469             :                     pt_peer_set_state(ctx, peer, PT_PEER_FAILED);
    2470             :                     close(sock);
    2471             :                     pd->tcp_socks[i] = -1;
    2472             :                     pt_posix_remove_active_peer(pd, i);
    2473             :                     continue;
    2474             :                 }
    2475             : 
    2476             :                 if (error != 0) {
    2477             :                     /* Connection failed */
    2478             :                     PT_CTX_WARN(ctx, PT_LOG_CAT_CONNECT,
    2479             :                         "Connection failed for peer %u (%s): %s",
    2480             :                         peer->hot.id, peer->cold.name, strerror(error));
    2481             :                     pt_peer_set_state(ctx, peer, PT_PEER_FAILED);
    2482             :                     close(sock);
    2483             :                     pd->tcp_socks[i] = -1;
    2484             :                     pt_posix_remove_active_peer(pd, i);
    2485             :                     continue;
    2486             :                 }
    2487             : 
    2488             :                 /* Connection successful */
    2489             :                 pt_peer_set_state(ctx, peer, PT_PEER_CONNECTED);
    2490             :                 peer->hot.last_seen = ctx->plat->get_ticks();
    2491             : 
    2492             :                 PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
    2493             :                     "Connection established to peer %u (%s)",
    2494             :                     peer->hot.id, peer->cold.name);
    2495             : 
    2496             :                 /* Fire callback */
    2497             :                 if (ctx->callbacks.on_peer_connected) {
    2498             :                     ctx->callbacks.on_peer_connected((PeerTalk_Context *)ctx,
    2499             :                                                      peer->hot.id,
    2500             :                                                      ctx->callbacks.user_data);
    2501             :                 }
    2502             : 
    2503             :                 completed++;
    2504             :             }
    2505             :         }
    2506             :     }
    2507             : 
    2508             :     return completed;
    2509             : }
    2510             : #endif  /* End of unused pt_posix_connect_poll() */
    2511             : 
    2512             : /* ========================================================================== */
    2513             : /* Main Poll (Session 4.6 - Integration)                                     */
    2514             : /* ========================================================================== */
    2515             : 
    2516         741 : int pt_posix_poll(struct pt_context *ctx) {
    2517             :     pt_posix_data *pd;
    2518             :     pt_tick_t poll_time;
    2519             :     fd_set read_fds, write_fds;
    2520             :     struct timeval tv;
    2521             :     int select_result;
    2522             : 
    2523         741 :     if (!ctx) {
    2524           0 :         return -1;
    2525             :     }
    2526             : 
    2527         741 :     pd = pt_posix_get(ctx);
    2528             : 
    2529             :     /* Cache poll time at start (avoids repeated trap calls on Classic Mac) */
    2530         741 :     poll_time = ctx->plat->get_ticks();
    2531             : 
    2532             :     /* Reset batch count for this poll cycle */
    2533         741 :     pd->batch_count = 0;
    2534             : 
    2535             :     /* Rebuild fd_sets only if dirty flag set (connections changed) */
    2536         741 :     if (pd->fd_dirty) {
    2537          87 :         pt_posix_rebuild_fd_sets(ctx);
    2538             :     }
    2539             : 
    2540             :     /* Copy cached fd_sets (select modifies them) */
    2541         741 :     read_fds = pd->cached_read_fds;
    2542         741 :     write_fds = pd->cached_write_fds;
    2543             : 
    2544             :     /* Set timeout to 10ms for responsive polling */
    2545         741 :     tv.tv_sec = 0;
    2546         741 :     tv.tv_usec = 10000;
    2547             : 
    2548             :     /* Call select() with all sockets */
    2549         741 :     select_result = select(pd->max_fd + 1, &read_fds, &write_fds, NULL, &tv);
    2550             : 
    2551         741 :     if (select_result < 0) {
    2552           0 :         if (errno != EINTR) {
    2553           0 :             PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK,
    2554             :                 "Select failed in main poll: %s", strerror(errno));
    2555             :         }
    2556           0 :         return 0;
    2557             :     }
    2558             : 
    2559         741 :     if (select_result == 0) {
    2560             :         /* Hot path: no events - just periodic work */
    2561         612 :         goto periodic_work;
    2562             :     }
    2563             : 
    2564             :     /* Process discovery packets if discovery_sock readable */
    2565         129 :     if (pd->discovery_sock >= 0 && FD_ISSET(pd->discovery_sock, &read_fds)) {
    2566          29 :         while (pt_posix_discovery_poll(ctx) > 0) {
    2567             :             /* Process all pending discovery packets */
    2568             :         }
    2569             :     }
    2570             : 
    2571             :     /* Process UDP messages if udp_msg_sock readable */
    2572         129 :     if (pd->udp_msg_sock >= 0 && FD_ISSET(pd->udp_msg_sock, &read_fds)) {
    2573           1 :         while (pt_posix_recv_udp(ctx) > 0) {
    2574             :             /* Process all pending UDP messages */
    2575             :         }
    2576             :     }
    2577             : 
    2578             :     /* Process incoming connections if listen_sock readable */
    2579         129 :     if (pd->listen_sock >= 0 && FD_ISSET(pd->listen_sock, &read_fds)) {
    2580          40 :         while (pt_posix_listen_poll(ctx) > 0) {
    2581             :             /* Process all pending incoming connections */
    2582             :         }
    2583             :     }
    2584             : 
    2585             :     /* For each active peer socket, check for events */
    2586         231 :     for (uint8_t i = 0; i < pd->active_count; i++) {
    2587         102 :         uint8_t peer_idx = pd->active_peers[i];
    2588         102 :         struct pt_peer *peer = &ctx->peers[peer_idx];
    2589         102 :         int sock = pd->tcp_socks[peer_idx];
    2590             : 
    2591         102 :         if (sock < 0)
    2592           0 :             continue;
    2593             : 
    2594             :         /* Check connect completion (if state is CONNECTING and socket is writable) */
    2595         102 :         if (peer->hot.state == PT_PEER_CONNECTING && FD_ISSET(sock, &write_fds)) {
    2596          11 :             int error = 0;
    2597          11 :             socklen_t len = sizeof(error);
    2598             : 
    2599          11 :             if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
    2600           0 :                 PT_CTX_ERR(ctx, PT_LOG_CAT_CONNECT,
    2601             :                     "getsockopt failed for peer %u: %s",
    2602             :                     peer->hot.id, strerror(errno));
    2603           0 :                 pt_peer_set_state(ctx, peer, PT_PEER_FAILED);
    2604           0 :                 close(sock);
    2605           0 :                 pd->tcp_socks[peer_idx] = -1;
    2606           0 :                 pt_posix_remove_active_peer(pd, peer_idx);
    2607           0 :                 continue;
    2608             :             }
    2609             : 
    2610          11 :             if (error != 0) {
    2611             :                 /* Connection failed */
    2612           0 :                 PT_CTX_WARN(ctx, PT_LOG_CAT_CONNECT,
    2613             :                     "Connection failed for peer %u (%s): %s",
    2614             :                     peer->hot.id, peer->cold.name, strerror(error));
    2615           0 :                 pt_peer_set_state(ctx, peer, PT_PEER_FAILED);
    2616           0 :                 close(sock);
    2617           0 :                 pd->tcp_socks[peer_idx] = -1;
    2618           0 :                 pt_posix_remove_active_peer(pd, peer_idx);
    2619           0 :                 continue;
    2620             :             }
    2621             : 
    2622             :             /* Connection successful - allocate queues */
    2623          11 :             peer->send_queue = pt_alloc_peer_queue(ctx);
    2624          11 :             peer->recv_queue = pt_alloc_peer_queue(ctx);
    2625             : 
    2626          11 :             if (!peer->send_queue || !peer->recv_queue) {
    2627             :                 /* Allocation failed - disconnect */
    2628           0 :                 PT_CTX_ERR(ctx, PT_LOG_CAT_MEMORY,
    2629             :                     "Failed to allocate queues for peer %u", peer->hot.id);
    2630             : 
    2631           0 :                 pt_free_peer_queue(peer->send_queue);
    2632           0 :                 pt_free_peer_queue(peer->recv_queue);
    2633           0 :                 peer->send_queue = NULL;
    2634           0 :                 peer->recv_queue = NULL;
    2635             : 
    2636           0 :                 pt_peer_set_state(ctx, peer, PT_PEER_FAILED);
    2637           0 :                 close(sock);
    2638           0 :                 pd->tcp_socks[peer_idx] = -1;
    2639           0 :                 pt_posix_remove_active_peer(pd, peer_idx);
    2640           0 :                 continue;
    2641             :             }
    2642             : 
    2643             :             /* Transition to CONNECTED */
    2644          11 :             pt_peer_set_state(ctx, peer, PT_PEER_CONNECTED);
    2645          11 :             peer->hot.last_seen = poll_time;
    2646             : 
    2647          11 :             PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
    2648             :                 "Connection established to peer %u (%s)",
    2649             :                 peer->hot.id, peer->cold.name);
    2650             : 
    2651             :             /* Fire on_peer_connected callback */
    2652          11 :             if (ctx->callbacks.on_peer_connected) {
    2653           9 :                 ctx->callbacks.on_peer_connected((PeerTalk_Context *)ctx,
    2654           9 :                                                  peer->hot.id,
    2655             :                                                  ctx->callbacks.user_data);
    2656             :             }
    2657             : 
    2658             :             /* Send capabilities for negotiation */
    2659          11 :             pt_posix_send_capability(ctx, peer);
    2660             : 
    2661             :             /* Mark fd_sets dirty to move socket from write set to read-only */
    2662          11 :             pd->fd_dirty = 1;
    2663             :         }
    2664             : 
    2665             :         /* Check for incoming data (if socket readable) */
    2666         102 :         if (FD_ISSET(sock, &read_fds)) {
    2667             :             /* Process messages via pt_posix_recv() loop */
    2668             :             int recv_ret;
    2669          81 :             while ((recv_ret = pt_posix_recv(ctx, peer)) > 0) {
    2670             :                 /* Keep receiving until no more complete messages */
    2671             :             }
    2672             : 
    2673             :             /* If recv returned -1 (error/close), mark peer for disconnection */
    2674          81 :             if (recv_ret < 0 && peer->hot.state == PT_PEER_CONNECTED) {
    2675           9 :                 peer->hot.state = PT_PEER_DISCONNECTING;
    2676             :             }
    2677             : 
    2678             :             /* On connection error, close socket and remove from active */
    2679          81 :             if (peer->hot.state == PT_PEER_DISCONNECTING ||
    2680          72 :                 peer->hot.state == PT_PEER_FAILED) {
    2681           9 :                 PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
    2682             :                     "Closing connection to peer %u", peer->hot.id);
    2683             : 
    2684             :                 /* Free queues before closing */
    2685           9 :                 pt_free_peer_queue(peer->send_queue);
    2686           9 :                 pt_free_peer_queue(peer->recv_queue);
    2687           9 :                 peer->send_queue = NULL;
    2688           9 :                 peer->recv_queue = NULL;
    2689             : 
    2690           9 :                 close(sock);
    2691           9 :                 pd->tcp_socks[peer_idx] = -1;
    2692           9 :                 pt_posix_remove_active_peer(pd, peer_idx);
    2693             : 
    2694             :                 /* Fire on_peer_disconnected callback */
    2695           9 :                 if (ctx->callbacks.on_peer_disconnected) {
    2696           3 :                     ctx->callbacks.on_peer_disconnected((PeerTalk_Context *)ctx,
    2697           3 :                                                         peer->hot.id,
    2698             :                                                         PT_OK,
    2699             :                                                         ctx->callbacks.user_data);
    2700             :                 }
    2701             : 
    2702             :                 /* Transition back to DISCOVERED state for reconnection support.
    2703             :                  * Don't destroy the peer - it may reconnect or continue sending
    2704             :                  * discovery announcements. The peer will timeout after 30s if
    2705             :                  * no further announcements are received. */
    2706           9 :                 pt_peer_set_state(ctx, peer, PT_PEER_STATE_DISCOVERED);
    2707           9 :                 peer->hot.last_seen = ctx->plat->get_ticks();
    2708             :             }
    2709             :         }
    2710             :     }
    2711             : 
    2712         129 : periodic_work:
    2713             :     /* Drain send buffers for all connected peers
    2714             :      * Two-tier system: Tier 2 (large messages) first, then Tier 1 (small messages)
    2715             :      */
    2716       12597 :     for (uint8_t i = 0; i < ctx->max_peers; i++) {
    2717       11856 :         struct pt_peer *peer = &ctx->peers[i];
    2718             : 
    2719       11856 :         if (peer->hot.state != PT_PEER_CONNECTED) {
    2720       11539 :             continue;
    2721             :         }
    2722             : 
    2723             :         /* Tier 2: Send large message from direct buffer first (priority) */
    2724         317 :         if (pt_direct_buffer_ready(&peer->send_direct)) {
    2725           2 :             pt_direct_buffer *buf = &peer->send_direct;
    2726             : 
    2727             :             /* Mark as sending */
    2728           2 :             pt_direct_buffer_mark_sending(buf);
    2729             : 
    2730             :             /* Send via TCP with message flags (supports fragmentation) */
    2731           2 :             int result = pt_posix_send_with_flags(ctx, peer, buf->data, buf->length, buf->msg_flags);
    2732             : 
    2733             :             /* Complete the send (success or fail, buffer becomes available) */
    2734           2 :             pt_direct_buffer_complete(buf);
    2735             : 
    2736           2 :             if (result == PT_OK) {
    2737           2 :                 PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
    2738             :                     "Tier 2: Sent %u bytes to peer %u", buf->length, peer->hot.id);
    2739           0 :             } else if (result != PT_ERR_WOULD_BLOCK) {
    2740           0 :                 PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
    2741             :                     "Tier 2: Failed to send to peer %u: error %d",
    2742             :                     peer->hot.id, result);
    2743             :             }
    2744             :         }
    2745             : 
    2746             :         /* Tier 1: Drain queue (small messages and fragments)
    2747             :          *
    2748             :          * CRITICAL: Drain MULTIPLE messages per poll iteration, not just one!
    2749             :          * At 60Hz poll rate, draining one message = 60 msg/sec max.
    2750             :          * With fragmentation (17 chunks per 4KB message), this causes queue
    2751             :          * overflow. Drain up to 16 messages or until WOULD_BLOCK.
    2752             :          */
    2753         317 :         if (peer->send_queue) {
    2754             :             const void *data;
    2755             :             uint16_t len;
    2756         317 :             pt_queue *q = peer->send_queue;
    2757         317 :             int drain_count = 0;
    2758         317 :             const int max_drain = 16;  /* Drain up to 16 messages per poll */
    2759             : 
    2760         672 :             while (drain_count < max_drain &&
    2761         336 :                    pt_queue_pop_priority_direct(q, &data, &len) == 0) {
    2762             :                 int result;
    2763          19 :                 uint8_t slot_flags = q->slots[q->pending_pop_slot].flags;
    2764             : 
    2765             :                 /* Check if this is a fragment - needs PT_MSG_FLAG_FRAGMENT */
    2766          19 :                 if (slot_flags & PT_SLOT_FRAGMENT) {
    2767           0 :                     result = pt_posix_send_with_flags(ctx, peer, data, len,
    2768             :                                                        PT_MSG_FLAG_FRAGMENT);
    2769             :                 } else {
    2770          19 :                     result = pt_posix_send(ctx, peer, data, len);
    2771             :                 }
    2772             : 
    2773          19 :                 if (result == PT_ERR_WOULD_BLOCK) {
    2774             :                     /* Socket buffer full - DON'T commit, retry next poll */
    2775           0 :                     pt_queue_pop_priority_rollback(q);
    2776           0 :                     break;
    2777             :                 }
    2778             : 
    2779             :                 /* Commit the pop to remove message from queue */
    2780          19 :                 pt_queue_pop_priority_commit(q);
    2781          19 :                 drain_count++;
    2782             : 
    2783          19 :                 if (result != PT_OK) {
    2784             :                     /* Send failed (network error) - message lost, continue draining */
    2785           0 :                     PT_CTX_WARN(ctx, PT_LOG_CAT_PROTOCOL,
    2786             :                         "Failed to drain message for peer %u: error %d",
    2787             :                         peer->hot.id, result);
    2788             :                 }
    2789             :             }
    2790             :         }
    2791             : 
    2792             :         /* Stream: Process active stream transfers */
    2793         317 :         pt_stream_poll(ctx, peer, pt_posix_send);
    2794             : 
    2795             :         /* Flow control: Check for pressure updates to send
    2796             :          *
    2797             :          * When our recv queue pressure crosses a threshold (25%, 50%, 75%),
    2798             :          * we need to inform the peer so they can throttle their sends.
    2799             :          * This implements receiver-driven flow control - SDK handles it
    2800             :          * transparently so app developers don't need to manage it.
    2801             :          */
    2802         634 :         if (peer->cold.caps.pressure_update_pending ||
    2803         317 :             pt_peer_check_pressure_update(ctx, peer)) {
    2804             :             /* Send updated capabilities with new pressure value */
    2805           0 :             pt_posix_send_capability(ctx, peer);
    2806             :         }
    2807             :     }
    2808             : 
    2809             :     /* Periodic discovery announce every 10 seconds */
    2810         741 :     if (pd->discovery_sock >= 0 && (poll_time - pd->last_announce >= 10000)) {
    2811           2 :         pt_posix_discovery_send(ctx, PT_DISC_TYPE_ANNOUNCE);
    2812           2 :         pd->last_announce = poll_time;
    2813             :     }
    2814             : 
    2815             :     /* Check for peer timeouts (30 second discovery timeout) */
    2816       12597 :     for (uint8_t i = 0; i < ctx->max_peers; i++) {
    2817       11856 :         struct pt_peer *peer = &ctx->peers[i];
    2818             : 
    2819             :         /* Skip peers updated in this poll iteration (last_seen >= poll_time)
    2820             :          * to avoid false timeouts when discovery packet arrived this iteration */
    2821       11856 :         if (peer->hot.state == PT_PEER_DISCOVERED &&
    2822         102 :             peer->hot.last_seen < poll_time &&
    2823          92 :             (poll_time - peer->hot.last_seen >= 30000)) {
    2824           0 :             PT_CTX_INFO(ctx, PT_LOG_CAT_DISCOVERY,
    2825             :                 "Peer %u (%s) timed out after 30 seconds",
    2826             :                 peer->hot.id, ctx->peer_names[peer->hot.name_idx]);
    2827             : 
    2828             :             /* Fire on_peer_lost callback before destroying */
    2829           0 :             if (ctx->callbacks.on_peer_lost) {
    2830           0 :                 ctx->callbacks.on_peer_lost((PeerTalk_Context *)ctx,
    2831           0 :                                            peer->hot.id,
    2832             :                                            ctx->callbacks.user_data);
    2833             :             }
    2834             : 
    2835           0 :             pt_peer_destroy(ctx, peer);
    2836             :         }
    2837             :     }
    2838             : 
    2839         741 :     return 0;
    2840             : }
    2841             : 
    2842             : /* ========================================================================== */
    2843             : /* Fast Poll (TCP I/O Only)                                                   */
    2844             : /* ========================================================================== */
    2845             : 
    2846         101 : int pt_posix_poll_fast(struct pt_context *ctx) {
    2847             :     pt_posix_data *pd;
    2848             :     fd_set read_fds;
    2849             :     struct timeval tv;
    2850             :     int select_result;
    2851             : 
    2852         101 :     if (!ctx) {
    2853           0 :         return -1;
    2854             :     }
    2855             : 
    2856         101 :     pd = pt_posix_get(ctx);
    2857             : 
    2858             :     /* Build fd_set with only active TCP peer sockets */
    2859        1717 :     FD_ZERO(&read_fds);
    2860         101 :     int max_fd = -1;
    2861             : 
    2862         101 :     for (uint8_t i = 0; i < pd->active_count; i++) {
    2863           0 :         uint8_t peer_idx = pd->active_peers[i];
    2864           0 :         int sock = pd->tcp_socks[peer_idx];
    2865           0 :         struct pt_peer *peer = &ctx->peers[peer_idx];
    2866             : 
    2867             :         /* Only include connected peers (not connecting) */
    2868           0 :         if (sock >= 0 && peer->hot.state == PT_PEER_CONNECTED) {
    2869           0 :             FD_SET(sock, &read_fds);
    2870           0 :             if (sock > max_fd)
    2871           0 :                 max_fd = sock;
    2872             :         }
    2873             :     }
    2874             : 
    2875         101 :     if (max_fd < 0) {
    2876             :         /* No connected peers - nothing to do */
    2877         101 :         goto drain_queues;
    2878             :     }
    2879             : 
    2880             :     /* Zero timeout - non-blocking check */
    2881           0 :     tv.tv_sec = 0;
    2882           0 :     tv.tv_usec = 0;
    2883             : 
    2884           0 :     select_result = select(max_fd + 1, &read_fds, NULL, NULL, &tv);
    2885             : 
    2886           0 :     if (select_result < 0) {
    2887           0 :         if (errno != EINTR) {
    2888           0 :             PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK,
    2889             :                 "Select failed in fast poll: %s", strerror(errno));
    2890             :         }
    2891           0 :         goto drain_queues;
    2892             :     }
    2893             : 
    2894           0 :     if (select_result == 0) {
    2895             :         /* No incoming data - just drain queues */
    2896           0 :         goto drain_queues;
    2897             :     }
    2898             : 
    2899             :     /* Process incoming TCP data for connected peers */
    2900           0 :     for (uint8_t i = 0; i < pd->active_count; i++) {
    2901           0 :         uint8_t peer_idx = pd->active_peers[i];
    2902           0 :         struct pt_peer *peer = &ctx->peers[peer_idx];
    2903           0 :         int sock = pd->tcp_socks[peer_idx];
    2904             : 
    2905           0 :         if (sock < 0 || peer->hot.state != PT_PEER_CONNECTED)
    2906           0 :             continue;
    2907             : 
    2908           0 :         if (FD_ISSET(sock, &read_fds)) {
    2909             :             int recv_ret;
    2910           0 :             while ((recv_ret = pt_posix_recv(ctx, peer)) > 0) {
    2911             :                 /* Keep receiving until no more complete messages */
    2912             :             }
    2913             : 
    2914             :             /* If recv returned -1, mark for disconnection */
    2915           0 :             if (recv_ret < 0 && peer->hot.state == PT_PEER_CONNECTED) {
    2916           0 :                 peer->hot.state = PT_PEER_DISCONNECTING;
    2917             :             }
    2918             : 
    2919             :             /* Handle disconnection */
    2920           0 :             if (peer->hot.state == PT_PEER_DISCONNECTING ||
    2921           0 :                 peer->hot.state == PT_PEER_FAILED) {
    2922           0 :                 PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
    2923             :                     "Closing connection to peer %u (fast poll)", peer->hot.id);
    2924             : 
    2925           0 :                 pt_free_peer_queue(peer->send_queue);
    2926           0 :                 pt_free_peer_queue(peer->recv_queue);
    2927           0 :                 peer->send_queue = NULL;
    2928           0 :                 peer->recv_queue = NULL;
    2929             : 
    2930           0 :                 close(sock);
    2931           0 :                 pd->tcp_socks[peer_idx] = -1;
    2932           0 :                 pt_posix_remove_active_peer(pd, peer_idx);
    2933             : 
    2934           0 :                 if (ctx->callbacks.on_peer_disconnected) {
    2935           0 :                     ctx->callbacks.on_peer_disconnected((PeerTalk_Context *)ctx,
    2936           0 :                                                         peer->hot.id,
    2937             :                                                         PT_OK,
    2938             :                                                         ctx->callbacks.user_data);
    2939             :                 }
    2940             : 
    2941             :                 /* Transition back to DISCOVERED state for reconnection support.
    2942             :                  * Don't destroy the peer - it may reconnect or continue sending
    2943             :                  * discovery announcements. The peer will timeout after 30s if
    2944             :                  * no further announcements are received. */
    2945           0 :                 pt_peer_set_state(ctx, peer, PT_PEER_STATE_DISCOVERED);
    2946           0 :                 peer->hot.last_seen = ctx->plat->get_ticks();
    2947             :             }
    2948             :         }
    2949             :     }
    2950             : 
    2951           0 : drain_queues:
    2952             :     /* Drain send queues for all connected peers */
    2953         505 :     for (uint8_t i = 0; i < ctx->max_peers; i++) {
    2954         404 :         struct pt_peer *peer = &ctx->peers[i];
    2955             : 
    2956         404 :         if (peer->hot.state != PT_PEER_CONNECTED) {
    2957         404 :             continue;
    2958             :         }
    2959             : 
    2960             :         /* Tier 2: Direct buffer first (priority path) */
    2961           0 :         if (pt_direct_buffer_ready(&peer->send_direct)) {
    2962           0 :             pt_direct_buffer *buf = &peer->send_direct;
    2963           0 :             pt_direct_buffer_mark_sending(buf);
    2964           0 :             pt_posix_send_with_flags(ctx, peer, buf->data, buf->length, buf->msg_flags);
    2965           0 :             pt_direct_buffer_complete(buf);
    2966             :             /* Errors logged by regular poll, not fast poll */
    2967             :         }
    2968             : 
    2969             :         /* Tier 1: Queue drain */
    2970           0 :         if (peer->send_queue) {
    2971             :             const void *data;
    2972             :             uint16_t len;
    2973           0 :             pt_queue *q = peer->send_queue;
    2974           0 :             int drain_count = 0;
    2975           0 :             const int max_drain = 16;
    2976             : 
    2977           0 :             while (drain_count < max_drain &&
    2978           0 :                    pt_queue_pop_priority_direct(q, &data, &len) == 0) {
    2979             :                 int result;
    2980           0 :                 uint8_t slot_flags = q->slots[q->pending_pop_slot].flags;
    2981             : 
    2982           0 :                 if (slot_flags & PT_SLOT_FRAGMENT) {
    2983           0 :                     result = pt_posix_send_with_flags(ctx, peer, data, len,
    2984             :                                                        PT_MSG_FLAG_FRAGMENT);
    2985             :                 } else {
    2986           0 :                     result = pt_posix_send(ctx, peer, data, len);
    2987             :                 }
    2988             : 
    2989           0 :                 if (result == PT_ERR_WOULD_BLOCK) {
    2990           0 :                     pt_queue_pop_priority_rollback(q);
    2991           0 :                     break;
    2992             :                 }
    2993             : 
    2994           0 :                 pt_queue_pop_priority_commit(q);
    2995           0 :                 drain_count++;
    2996             :             }
    2997             :         }
    2998             : 
    2999             :         /* NOTE: Stream poll skipped in PollFast - use regular Poll for streams */
    3000             :     }
    3001             : 
    3002         101 :     return 0;
    3003             : }
    3004             : 
    3005             : /* ========================================================================== */
    3006             : /* Public API (Session 4.4)                                                  */
    3007             : /* ========================================================================== */
    3008             : 
    3009             : /**
    3010             :  * Send unreliable UDP message to peer
    3011             :  *
    3012             :  * Public API wrapper for pt_posix_send_udp(). Validates context and peer ID,
    3013             :  * then sends UDP datagram with 8-byte header (no CRC).
    3014             :  *
    3015             :  * Returns: PT_OK on success, PT_ERR_* on failure
    3016             :  */
    3017           1 : PeerTalk_Error PeerTalk_SendUDP(PeerTalk_Context *ctx, PeerTalk_PeerID peer_id,
    3018             :                                 const void *data, uint16_t length) {
    3019           1 :     struct pt_context *ictx = (struct pt_context *)ctx;
    3020             :     struct pt_peer *peer;
    3021             : 
    3022             :     /* Validate context */
    3023           1 :     if (!ictx || ictx->magic != PT_CONTEXT_MAGIC) {
    3024           0 :         return PT_ERR_INVALID_STATE;
    3025             :     }
    3026             : 
    3027             :     /* Validate data and length */
    3028           1 :     if (!data && length > 0) {
    3029           0 :         return PT_ERR_INVALID_PARAM;
    3030             :     }
    3031             : 
    3032             :     /* Find peer by ID */
    3033           1 :     peer = pt_peer_find_by_id(ictx, peer_id);
    3034           1 :     if (!peer) {
    3035           1 :         return PT_ERR_PEER_NOT_FOUND;
    3036             :     }
    3037             : 
    3038             :     /* Call internal send function */
    3039           0 :     return pt_posix_send_udp(ictx, peer, data, length);
    3040             : }
    3041             : 
    3042             : /**
    3043             :  * Send UDP message with zero-queue semantics (fast path)
    3044             :  *
    3045             :  * Identical to PeerTalk_SendUDP() - UDP already has no queuing.
    3046             :  * This function makes the zero-queue semantics explicit for documentation
    3047             :  * and supports larger payloads up to PT_MAX_UDP_MESSAGE_SIZE (1400 bytes).
    3048             :  *
    3049             :  * Returns: PT_OK on success, PT_ERR_* on failure
    3050             :  */
    3051           3 : PeerTalk_Error PeerTalk_SendUDPFast(PeerTalk_Context *ctx, PeerTalk_PeerID peer_id,
    3052             :                                      const void *data, uint16_t length) {
    3053           3 :     struct pt_context *ictx = (struct pt_context *)ctx;
    3054             :     struct pt_peer *peer;
    3055             : 
    3056             :     /* Validate context */
    3057           3 :     if (!ictx || ictx->magic != PT_CONTEXT_MAGIC) {
    3058           1 :         return PT_ERR_INVALID_STATE;
    3059             :     }
    3060             : 
    3061             :     /* Validate data and length */
    3062           2 :     if (!data && length > 0) {
    3063           0 :         return PT_ERR_INVALID_PARAM;
    3064             :     }
    3065             : 
    3066             :     /* Check max size for UDP fast path */
    3067           2 :     if (length > PT_MAX_UDP_MESSAGE_SIZE) {
    3068           1 :         PT_CTX_WARN(ictx, PT_LOG_CAT_SEND,
    3069             :             "UDP fast path: message too large (%u > %u)",
    3070             :             length, PT_MAX_UDP_MESSAGE_SIZE);
    3071           1 :         return PT_ERR_MESSAGE_TOO_LARGE;
    3072             :     }
    3073             : 
    3074             :     /* Find peer by ID */
    3075           1 :     peer = pt_peer_find_by_id(ictx, peer_id);
    3076           1 :     if (!peer) {
    3077           1 :         return PT_ERR_PEER_NOT_FOUND;
    3078             :     }
    3079             : 
    3080             :     /* Call internal send function - no queuing, direct to network */
    3081           0 :     return pt_posix_send_udp(ictx, peer, data, length);
    3082             : }
    3083             : 
    3084             : /* ========================================================================== */
    3085             : /* Statistics API (Session 4.5)                                              */
    3086             : /* ========================================================================== */
    3087             : 
    3088             : /**
    3089             :  * Get global network statistics
    3090             :  *
    3091             :  * Returns aggregate statistics for all network activity including bytes sent/received,
    3092             :  * message counts, peer counts, and connection statistics.
    3093             :  *
    3094             :  * Returns: PT_OK on success, PT_ERR_* on failure
    3095             :  */
    3096           5 : PeerTalk_Error PeerTalk_GetGlobalStats(PeerTalk_Context *ctx, PeerTalk_GlobalStats *stats) {
    3097           5 :     struct pt_context *ictx = (struct pt_context *)ctx;
    3098           5 :     uint16_t connected_count = 0;
    3099             : 
    3100             :     /* Validate parameters */
    3101           5 :     if (!ictx || ictx->magic != PT_CONTEXT_MAGIC) {
    3102           1 :         return PT_ERR_INVALID_STATE;
    3103             :     }
    3104           4 :     if (!stats) {
    3105           1 :         return PT_ERR_INVALID_PARAM;
    3106             :     }
    3107             : 
    3108             :     /* Copy global statistics */
    3109           3 :     stats->total_bytes_sent = ictx->global_stats.total_bytes_sent;
    3110           3 :     stats->total_bytes_received = ictx->global_stats.total_bytes_received;
    3111           3 :     stats->total_messages_sent = ictx->global_stats.total_messages_sent;
    3112           3 :     stats->total_messages_received = ictx->global_stats.total_messages_received;
    3113           3 :     stats->discovery_packets_sent = ictx->global_stats.discovery_packets_sent;
    3114           3 :     stats->discovery_packets_received = ictx->global_stats.discovery_packets_received;
    3115           3 :     stats->connections_accepted = ictx->global_stats.connections_accepted;
    3116           3 :     stats->connections_rejected = ictx->global_stats.connections_rejected;
    3117             : 
    3118             :     /* Count peers by state */
    3119           3 :     stats->peers_discovered = 0;
    3120          51 :     for (uint8_t i = 0; i < ictx->max_peers; i++) {
    3121          48 :         if (ictx->peers[i].hot.state != PT_PEER_UNUSED) {
    3122           0 :             stats->peers_discovered++;
    3123           0 :             if (ictx->peers[i].hot.state == PT_PEER_CONNECTED) {
    3124           0 :                 connected_count++;
    3125             :             }
    3126             :         }
    3127             :     }
    3128           3 :     stats->peers_connected = connected_count;
    3129             : 
    3130             :     /* Memory and streams (not yet tracked - set to 0) */
    3131           3 :     stats->memory_used = 0;
    3132           3 :     stats->streams_active = connected_count;
    3133           3 :     stats->reserved = 0;
    3134             : 
    3135           3 :     return PT_OK;
    3136             : }
    3137             : 
    3138             : /**
    3139             :  * Get per-peer statistics
    3140             :  *
    3141             :  * Returns statistics for a specific peer including bytes sent/received,
    3142             :  * message counts, latency, and connection quality.
    3143             :  *
    3144             :  * Returns: PT_OK on success, PT_ERR_* on failure
    3145             :  */
    3146           1 : PeerTalk_Error PeerTalk_GetPeerStats(PeerTalk_Context *ctx, PeerTalk_PeerID peer_id,
    3147             :                                      PeerTalk_PeerStats *stats) {
    3148           1 :     struct pt_context *ictx = (struct pt_context *)ctx;
    3149             :     struct pt_peer *peer;
    3150             : 
    3151             :     /* Validate parameters */
    3152           1 :     if (!ictx || ictx->magic != PT_CONTEXT_MAGIC) {
    3153           0 :         return PT_ERR_INVALID_STATE;
    3154             :     }
    3155           1 :     if (!stats) {
    3156           0 :         return PT_ERR_INVALID_PARAM;
    3157             :     }
    3158             : 
    3159             :     /* Find peer by ID */
    3160           1 :     peer = pt_peer_find_by_id(ictx, peer_id);
    3161           1 :     if (!peer) {
    3162           1 :         return PT_ERR_PEER_NOT_FOUND;
    3163             :     }
    3164             : 
    3165             :     /* Copy peer statistics */
    3166           0 :     *stats = peer->cold.stats;
    3167             : 
    3168           0 :     return PT_OK;
    3169             : }
    3170             : 
    3171             : /**
    3172             :  * Reset statistics for peer (or all peers if peer_id == 0)
    3173             :  *
    3174             :  * Clears all counters for the specified peer, or for all peers and global
    3175             :  * stats if peer_id is 0.
    3176             :  *
    3177             :  * Returns: PT_OK on success, PT_ERR_* on failure
    3178             :  */
    3179           3 : PeerTalk_Error PeerTalk_ResetStats(PeerTalk_Context *ctx, PeerTalk_PeerID peer_id) {
    3180           3 :     struct pt_context *ictx = (struct pt_context *)ctx;
    3181             :     struct pt_peer *peer;
    3182             :     /* cppcheck-suppress variableScope ; C89 style for cross-platform consistency */
    3183             :     uint16_t i;
    3184             : 
    3185             :     /* Validate parameters */
    3186           3 :     if (!ictx || ictx->magic != PT_CONTEXT_MAGIC) {
    3187           1 :         return PT_ERR_INVALID_STATE;
    3188             :     }
    3189             : 
    3190           2 :     if (peer_id == 0) {
    3191             :         /* Reset global statistics */
    3192           0 :         pt_memset(&ictx->global_stats, 0, sizeof(PeerTalk_GlobalStats));
    3193             : 
    3194             :         /* Reset all peer statistics */
    3195           0 :         for (i = 0; i < ictx->max_peers; i++) {
    3196           0 :             peer = &ictx->peers[i];
    3197           0 :             if (peer->hot.state != PT_PEER_UNUSED) {
    3198           0 :                 pt_memset(&peer->cold.stats, 0, sizeof(PeerTalk_PeerStats));
    3199             :                 /* Reset latency tracking */
    3200           0 :                 peer->hot.latency_ms = 0;
    3201           0 :                 peer->cold.rtt_index = 0;
    3202           0 :                 peer->cold.rtt_count = 0;
    3203           0 :                 pt_memset(peer->cold.rtt_samples, 0, sizeof(peer->cold.rtt_samples));
    3204             :             }
    3205             :         }
    3206             : 
    3207           0 :         PT_CTX_INFO(ictx, PT_LOG_CAT_PERF, "Reset all statistics");
    3208             :     } else {
    3209             :         /* Reset single peer statistics */
    3210           2 :         peer = pt_peer_find_by_id(ictx, peer_id);
    3211           2 :         if (!peer) {
    3212           1 :             return PT_ERR_PEER_NOT_FOUND;
    3213             :         }
    3214             : 
    3215           1 :         pt_memset(&peer->cold.stats, 0, sizeof(PeerTalk_PeerStats));
    3216             :         /* Reset latency tracking */
    3217           1 :         peer->hot.latency_ms = 0;
    3218           1 :         peer->cold.rtt_index = 0;
    3219           1 :         peer->cold.rtt_count = 0;
    3220           1 :         pt_memset(peer->cold.rtt_samples, 0, sizeof(peer->cold.rtt_samples));
    3221             : 
    3222           1 :         PT_CTX_INFO(ictx, PT_LOG_CAT_PERF,
    3223             :                    "Reset statistics for peer %u", peer_id);
    3224             :     }
    3225             : 
    3226           1 :     return PT_OK;
    3227             : }
    3228             : 

Generated by: LCOV version 1.14