Line data Source code
1 : /*
2 : * PeerTalk POSIX Networking Implementation
3 : *
4 : * Session 4.1: UDP Discovery
5 : * - Non-blocking UDP broadcast for peer discovery
6 : * - Local IP detection with fallback strategies
7 : * - Discovery packet handling (ANNOUNCE, QUERY, GOODBYE)
8 : */
9 :
10 : #include "net_posix.h"
11 : #include "protocol.h"
12 : #include "peer.h"
13 : #include "queue.h"
14 : #include "direct_buffer.h"
15 : #include "stream.h"
16 : #include "pt_compat.h"
17 : #include <sys/socket.h>
18 : #include <netinet/in.h>
19 : #include <netinet/tcp.h>
20 : #include <arpa/inet.h>
21 : #include <fcntl.h>
22 : #include <unistd.h>
23 : #include <errno.h>
24 : #include <string.h>
25 : #include <ifaddrs.h>
26 : #include <stdlib.h>
27 : #include <poll.h>
28 : #include <sys/uio.h>
29 :
30 : /* ========================================================================== */
31 : /* Port Configuration */
32 : /* ========================================================================== */
33 :
34 : #define DEFAULT_DISCOVERY_PORT 7353
35 : #define DEFAULT_TCP_PORT 7354
36 : #define DEFAULT_UDP_PORT 7355
37 :
38 : /* Port accessor macros - use config if set, otherwise defaults */
39 : #define DISCOVERY_PORT(ctx) \
40 : ((ctx)->config.discovery_port > 0 ? (ctx)->config.discovery_port : DEFAULT_DISCOVERY_PORT)
41 : #define TCP_PORT(ctx) \
42 : ((ctx)->config.tcp_port > 0 ? (ctx)->config.tcp_port : DEFAULT_TCP_PORT)
43 : #define UDP_PORT(ctx) \
44 : ((ctx)->config.udp_port > 0 ? (ctx)->config.udp_port : DEFAULT_UDP_PORT)
45 :
46 : /* ========================================================================== */
47 : /* Helper Functions */
48 : /* ========================================================================== */
49 :
50 : /**
51 : * Set socket to non-blocking mode
52 : *
53 : * Returns: 0 on success, -1 on failure
54 : */
55 308 : static int set_nonblocking(struct pt_context *ctx, int fd) {
56 308 : int flags = fcntl(fd, F_GETFL, 0);
57 308 : if (flags < 0) {
58 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_NETWORK,
59 : "Failed to get socket flags: %s", strerror(errno));
60 0 : return -1;
61 : }
62 :
63 308 : if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
64 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_NETWORK,
65 : "Failed to set non-blocking: %s", strerror(errno));
66 0 : return -1;
67 : }
68 :
69 308 : return 0;
70 : }
71 :
72 : /**
73 : * Enable SO_BROADCAST on socket
74 : *
75 : * Returns: 0 on success, -1 on failure
76 : */
77 19 : static int set_broadcast(struct pt_context *ctx, int fd) {
78 19 : int opt = 1;
79 19 : if (setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &opt, sizeof(opt)) < 0) {
80 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_NETWORK,
81 : "Failed to enable broadcast: %s", strerror(errno));
82 0 : return -1;
83 : }
84 19 : return 0;
85 : }
86 :
87 : /**
88 : * Enable SO_REUSEADDR on socket
89 : *
90 : * Allows fast restart without "Address already in use" error.
91 : *
92 : * Returns: 0 on success, -1 on failure
93 : */
94 276 : static int set_reuseaddr(struct pt_context *ctx, int fd) {
95 276 : int opt = 1;
96 276 : if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
97 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_NETWORK,
98 : "Failed to set SO_REUSEADDR: %s", strerror(errno));
99 0 : return -1;
100 : }
101 276 : return 0;
102 : }
103 :
104 : /**
105 : * Get local IP address using three-tier fallback strategy
106 : *
107 : * 1. Try getifaddrs() - works on air-gapped networks and normal LANs
108 : * 2. Fall back to "connect to 8.8.8.8" trick - works in containers with NAT
109 : * 3. Return loopback (127.0.0.1) as last resort
110 : *
111 : * Returns: Local IP in host byte order
112 : */
113 230 : static uint32_t get_local_ip(struct pt_context *ctx) {
114 : struct ifaddrs *ifaddr, *ifa;
115 230 : uint32_t local_ip = 0;
116 :
117 : /* Strategy 1: getifaddrs() - preferred method */
118 230 : if (getifaddrs(&ifaddr) == 0) {
119 920 : for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) {
120 920 : if (ifa->ifa_addr == NULL)
121 0 : continue;
122 :
123 920 : if (ifa->ifa_addr->sa_family == AF_INET) {
124 460 : struct sockaddr_in *addr = (struct sockaddr_in *)ifa->ifa_addr;
125 460 : uint32_t ip = ntohl(addr->sin_addr.s_addr);
126 :
127 : /* Skip loopback */
128 460 : if ((ip >> 24) == 127)
129 230 : continue;
130 :
131 : /* Found valid interface */
132 230 : local_ip = ip;
133 230 : PT_CTX_INFO(ctx, PT_LOG_CAT_NETWORK,
134 : "Local IP detected via getifaddrs: %u.%u.%u.%u",
135 : (ip >> 24) & 0xFF, (ip >> 16) & 0xFF,
136 : (ip >> 8) & 0xFF, ip & 0xFF);
137 230 : break;
138 : }
139 : }
140 230 : freeifaddrs(ifaddr);
141 : }
142 :
143 : /* Strategy 2: Connect to 8.8.8.8 (Google DNS) - works in containers */
144 230 : if (local_ip == 0) {
145 0 : int sock = socket(AF_INET, SOCK_DGRAM, 0);
146 0 : if (sock >= 0) {
147 : struct sockaddr_in addr;
148 0 : pt_memset(&addr, 0, sizeof(addr));
149 0 : addr.sin_family = AF_INET;
150 0 : addr.sin_addr.s_addr = htonl(0x08080808); /* 8.8.8.8 */
151 0 : addr.sin_port = htons(53);
152 :
153 0 : if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) == 0) {
154 : struct sockaddr_in local_addr;
155 0 : socklen_t len = sizeof(local_addr);
156 0 : if (getsockname(sock, (struct sockaddr *)&local_addr, &len) == 0) {
157 0 : local_ip = ntohl(local_addr.sin_addr.s_addr);
158 0 : PT_CTX_INFO(ctx, PT_LOG_CAT_NETWORK,
159 : "Local IP detected via 8.8.8.8 trick: %u.%u.%u.%u",
160 : (local_ip >> 24) & 0xFF, (local_ip >> 16) & 0xFF,
161 : (local_ip >> 8) & 0xFF, local_ip & 0xFF);
162 : }
163 : }
164 0 : close(sock);
165 : }
166 : }
167 :
168 : /* Strategy 3: Fallback to loopback */
169 230 : if (local_ip == 0) {
170 0 : local_ip = 0x7F000001; /* 127.0.0.1 */
171 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK,
172 : "Could not detect local IP, using loopback 127.0.0.1");
173 : }
174 :
175 230 : return local_ip;
176 : }
177 :
178 : /* ========================================================================== */
179 : /* Platform Size/Init */
180 : /* ========================================================================== */
181 :
182 230 : size_t pt_posix_extra_size(void) {
183 230 : return sizeof(pt_posix_data);
184 : }
185 :
186 230 : int pt_posix_net_init(struct pt_context *ctx) {
187 : pt_posix_data *pd;
188 : size_t i;
189 :
190 230 : if (!ctx) {
191 0 : return -1;
192 : }
193 :
194 230 : pd = pt_posix_get(ctx);
195 :
196 : /* Initialize HOT fields */
197 230 : pd->max_fd = -1;
198 230 : pd->active_count = 0;
199 230 : pd->fd_dirty = 1; /* Initial build needed */
200 230 : pd->batch_count = 0;
201 230 : pd->last_announce = 0;
202 230 : pd->local_ip = get_local_ip(ctx);
203 :
204 : /* Clear active peers tracking */
205 230 : pt_memset(pd->active_peers, 0, sizeof(pd->active_peers));
206 230 : pt_memset(pd->active_position, 0xFF, sizeof(pd->active_position)); /* -1 */
207 :
208 : /* Initialize WARM fields - sockets to -1 */
209 230 : pd->discovery_sock = -1;
210 230 : pd->listen_sock = -1;
211 230 : pd->udp_msg_sock = -1;
212 230 : pd->broadcast_addr = INADDR_BROADCAST; /* 255.255.255.255 */
213 230 : pd->discovery_port = DISCOVERY_PORT(ctx);
214 230 : pd->listen_port = TCP_PORT(ctx);
215 230 : pd->udp_msg_port = UDP_PORT(ctx);
216 :
217 : /* Initialize fd_set cache */
218 3910 : FD_ZERO(&pd->cached_read_fds);
219 3910 : FD_ZERO(&pd->cached_write_fds);
220 :
221 : /* Initialize COLD fields - TCP sockets */
222 3910 : for (i = 0; i < PT_MAX_PEERS; i++) {
223 3680 : pd->tcp_socks[i] = -1;
224 : }
225 :
226 : /* CRITICAL: Allocate recv_bufs separately for cache efficiency */
227 230 : pd->recv_bufs = (pt_recv_buffer *)pt_alloc_clear(
228 : sizeof(pt_recv_buffer) * PT_MAX_PEERS);
229 230 : if (!pd->recv_bufs) {
230 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_MEMORY,
231 : "Failed to allocate receive buffers");
232 0 : return -1;
233 : }
234 :
235 : /* Initialize receive buffer states */
236 3910 : for (i = 0; i < PT_MAX_PEERS; i++) {
237 3680 : pd->recv_bufs[i].hot.state = PT_RECV_HEADER;
238 3680 : pd->recv_bufs[i].hot.bytes_needed = PT_COMPACT_HEADER_SIZE;
239 3680 : pd->recv_bufs[i].hot.bytes_received = 0;
240 : }
241 :
242 : /* Initialize UDP messaging socket (Session 4.4) */
243 230 : if (pt_posix_udp_init(ctx) < 0) {
244 0 : pt_free(pd->recv_bufs);
245 0 : pd->recv_bufs = NULL;
246 0 : return -1;
247 : }
248 :
249 230 : PT_CTX_INFO(ctx, PT_LOG_CAT_NETWORK,
250 : "POSIX networking initialized (discovery=%u, tcp=%u, udp=%u)",
251 : pd->discovery_port, pd->listen_port, pd->udp_msg_port);
252 :
253 230 : return 0;
254 : }
255 :
256 230 : void pt_posix_net_shutdown(struct pt_context *ctx) {
257 : pt_posix_data *pd;
258 : size_t i;
259 :
260 230 : if (!ctx) {
261 0 : return;
262 : }
263 :
264 230 : pd = pt_posix_get(ctx);
265 :
266 : /* Close server sockets */
267 230 : if (pd->discovery_sock >= 0) {
268 4 : close(pd->discovery_sock);
269 4 : pd->discovery_sock = -1;
270 : }
271 :
272 230 : if (pd->listen_sock >= 0) {
273 4 : close(pd->listen_sock);
274 4 : pd->listen_sock = -1;
275 : }
276 :
277 230 : if (pd->udp_msg_sock >= 0) {
278 230 : close(pd->udp_msg_sock);
279 230 : pd->udp_msg_sock = -1;
280 : }
281 :
282 : /* Close peer TCP sockets */
283 3910 : for (i = 0; i < PT_MAX_PEERS; i++) {
284 3680 : if (pd->tcp_socks[i] >= 0) {
285 23 : close(pd->tcp_socks[i]);
286 23 : pd->tcp_socks[i] = -1;
287 : }
288 : }
289 :
290 : /* Free receive buffers */
291 230 : if (pd->recv_bufs) {
292 230 : pt_free(pd->recv_bufs);
293 230 : pd->recv_bufs = NULL;
294 : }
295 :
296 230 : PT_CTX_INFO(ctx, PT_LOG_CAT_NETWORK, "POSIX networking shut down");
297 : }
298 :
299 : /* ========================================================================== */
300 : /* Queue Lifecycle Helpers (Session 4.3.5) */
301 : /* ========================================================================== */
302 :
303 : /**
304 : * Allocate and initialize a peer queue
305 : *
306 : * @param ctx PeerTalk context
307 : * @return Allocated queue or NULL on failure
308 : */
309 64 : static pt_queue *pt_alloc_peer_queue(struct pt_context *ctx) {
310 : pt_queue *q;
311 : int result;
312 :
313 64 : q = (pt_queue *)pt_alloc(sizeof(pt_queue));
314 64 : if (!q) {
315 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_MEMORY,
316 : "Failed to allocate queue: out of memory");
317 0 : return NULL;
318 : }
319 :
320 64 : result = pt_queue_init(ctx, q, 16);
321 64 : if (result != 0) {
322 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_MEMORY,
323 : "Failed to initialize queue: error %d", result);
324 0 : pt_free(q);
325 0 : return NULL;
326 : }
327 :
328 : /* Phase 3 extensions initialized automatically by pt_queue_init() */
329 :
330 64 : return q;
331 : }
332 :
333 : /**
334 : * Free a peer queue
335 : *
336 : * @param q Queue to free (can be NULL)
337 : */
338 24 : static void pt_free_peer_queue(pt_queue *q) {
339 24 : if (q) {
340 18 : pt_queue_free(q);
341 18 : pt_free(q);
342 : }
343 24 : }
344 :
345 : /* ========================================================================== */
346 : /* Discovery */
347 : /* ========================================================================== */
348 :
349 19 : int pt_posix_discovery_start(struct pt_context *ctx) {
350 : pt_posix_data *pd;
351 : int sock;
352 : struct sockaddr_in addr;
353 :
354 19 : if (!ctx) {
355 0 : return -1;
356 : }
357 :
358 19 : pd = pt_posix_get(ctx);
359 :
360 : /* Create UDP socket */
361 19 : sock = socket(AF_INET, SOCK_DGRAM, 0);
362 19 : if (sock < 0) {
363 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_NETWORK,
364 : "Failed to create discovery socket: %s", strerror(errno));
365 0 : return -1;
366 : }
367 :
368 : /* Set non-blocking mode */
369 19 : if (set_nonblocking(ctx, sock) < 0) {
370 0 : close(sock);
371 0 : return -1;
372 : }
373 :
374 : /* Enable SO_BROADCAST */
375 19 : if (set_broadcast(ctx, sock) < 0) {
376 0 : close(sock);
377 0 : return -1;
378 : }
379 :
380 : /* Enable SO_REUSEADDR for fast restart */
381 19 : if (set_reuseaddr(ctx, sock) < 0) {
382 0 : close(sock);
383 0 : return -1;
384 : }
385 :
386 : /* Bind to INADDR_ANY on discovery port */
387 19 : pt_memset(&addr, 0, sizeof(addr));
388 19 : addr.sin_family = AF_INET;
389 19 : addr.sin_addr.s_addr = INADDR_ANY;
390 19 : addr.sin_port = htons(pd->discovery_port);
391 :
392 19 : if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
393 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_NETWORK,
394 : "Failed to bind discovery socket to port %u: %s",
395 : pd->discovery_port, strerror(errno));
396 0 : close(sock);
397 0 : return -1;
398 : }
399 :
400 19 : pd->discovery_sock = sock;
401 :
402 : /* Update max_fd for select() */
403 19 : if (sock > pd->max_fd) {
404 17 : pd->max_fd = sock;
405 : }
406 19 : pd->fd_dirty = 1; /* Rebuild fd_sets */
407 :
408 19 : PT_CTX_INFO(ctx, PT_LOG_CAT_DISCOVERY,
409 : "Discovery started on UDP port %u", pd->discovery_port);
410 :
411 : /* Send initial announcement */
412 19 : pt_posix_discovery_send(ctx, PT_DISC_TYPE_ANNOUNCE);
413 19 : pd->last_announce = ctx->plat->get_ticks();
414 :
415 19 : return 0;
416 : }
417 :
418 16 : void pt_posix_discovery_stop(struct pt_context *ctx) {
419 : pt_posix_data *pd;
420 :
421 16 : if (!ctx) {
422 0 : return;
423 : }
424 :
425 16 : pd = pt_posix_get(ctx);
426 :
427 16 : if (pd->discovery_sock >= 0) {
428 : /* Send goodbye before closing */
429 15 : pt_posix_discovery_send(ctx, PT_DISC_TYPE_GOODBYE);
430 :
431 15 : close(pd->discovery_sock);
432 15 : pd->discovery_sock = -1;
433 15 : pd->fd_dirty = 1;
434 :
435 15 : PT_CTX_INFO(ctx, PT_LOG_CAT_DISCOVERY, "Discovery stopped");
436 : }
437 : }
438 :
439 39 : int pt_posix_discovery_send(struct pt_context *ctx, uint8_t type) {
440 : pt_posix_data *pd;
441 : pt_discovery_packet pkt;
442 : uint8_t buf[PT_DISCOVERY_MAX_SIZE];
443 : ssize_t encoded_len;
444 : struct sockaddr_in dest;
445 : ssize_t sent;
446 :
447 39 : if (!ctx) {
448 0 : return -1;
449 : }
450 :
451 39 : pd = pt_posix_get(ctx);
452 :
453 39 : if (pd->discovery_sock < 0) {
454 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_DISCOVERY,
455 : "Discovery socket not initialized");
456 0 : return -1;
457 : }
458 :
459 : /* Build discovery packet */
460 39 : pt_memset(&pkt, 0, sizeof(pkt));
461 39 : pkt.version = PT_PROTOCOL_VERSION;
462 39 : pkt.type = type;
463 39 : pkt.flags = 0;
464 39 : pkt.sender_port = pd->listen_port;
465 39 : pkt.transports = PT_TRANSPORT_TCP | PT_TRANSPORT_UDP; /* POSIX supports both */
466 :
467 : /* Copy local name */
468 39 : if (ctx->config.local_name[0] != '\0') {
469 39 : pt_strncpy(pkt.name, ctx->config.local_name, PT_MAX_PEER_NAME);
470 : } else {
471 0 : pt_strncpy(pkt.name, "PeerTalk", PT_MAX_PEER_NAME);
472 : }
473 :
474 : /* Set name length */
475 39 : pkt.name_len = pt_strlen(pkt.name);
476 :
477 : /* Encode packet */
478 39 : encoded_len = pt_discovery_encode(&pkt, buf, sizeof(buf));
479 39 : if (encoded_len < 0) {
480 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_DISCOVERY,
481 : "Failed to encode discovery packet");
482 0 : return -1;
483 : }
484 :
485 : /* Send to broadcast address */
486 39 : pt_memset(&dest, 0, sizeof(dest));
487 39 : dest.sin_family = AF_INET;
488 39 : dest.sin_addr.s_addr = htonl(pd->broadcast_addr);
489 39 : dest.sin_port = htons(pd->discovery_port);
490 :
491 39 : sent = sendto(pd->discovery_sock, buf, encoded_len, 0,
492 : (struct sockaddr *)&dest, sizeof(dest));
493 :
494 39 : if (sent < 0) {
495 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_DISCOVERY,
496 : "Discovery send failed: %s", strerror(errno));
497 0 : return -1;
498 : }
499 :
500 39 : if (sent != encoded_len) {
501 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_DISCOVERY,
502 : "Discovery send incomplete: %zd/%zd bytes", sent, encoded_len);
503 0 : return -1;
504 : }
505 :
506 : /* Update statistics */
507 39 : ctx->global_stats.discovery_packets_sent++;
508 :
509 39 : PT_CTX_INFO(ctx, PT_LOG_CAT_DISCOVERY,
510 : "Discovery %s sent to %u.%u.%u.%u:%u (%zd bytes)",
511 : type == PT_DISC_TYPE_ANNOUNCE ? "ANNOUNCE" :
512 : type == PT_DISC_TYPE_QUERY ? "QUERY" : "GOODBYE",
513 : (pd->broadcast_addr >> 24) & 0xFF,
514 : (pd->broadcast_addr >> 16) & 0xFF,
515 : (pd->broadcast_addr >> 8) & 0xFF,
516 : pd->broadcast_addr & 0xFF,
517 : pd->discovery_port,
518 : sent);
519 :
520 39 : return 0;
521 : }
522 :
523 29 : int pt_posix_discovery_poll(struct pt_context *ctx) {
524 : pt_posix_data *pd;
525 : uint8_t buf[PT_DISCOVERY_MAX_SIZE];
526 : struct sockaddr_in from_addr;
527 29 : socklen_t from_len = sizeof(from_addr);
528 : ssize_t ret;
529 : uint32_t sender_ip;
530 : pt_discovery_packet pkt;
531 : struct pt_peer *peer;
532 :
533 29 : if (!ctx) {
534 0 : return -1;
535 : }
536 :
537 29 : pd = pt_posix_get(ctx);
538 :
539 29 : if (pd->discovery_sock < 0) {
540 0 : return 0; /* Not initialized yet */
541 : }
542 :
543 : /* Non-blocking receive */
544 29 : ret = recvfrom(pd->discovery_sock, buf, sizeof(buf), 0,
545 : (struct sockaddr *)&from_addr, &from_len);
546 :
547 29 : if (ret < 0) {
548 0 : if (errno == EWOULDBLOCK || errno == EAGAIN) {
549 0 : return 0; /* No data available - not an error */
550 : }
551 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_DISCOVERY,
552 : "Discovery recv error: %s", strerror(errno));
553 0 : return -1;
554 : }
555 :
556 : /* Extract sender IP */
557 29 : sender_ip = ntohl(from_addr.sin_addr.s_addr);
558 :
559 : /* CRITICAL: Ignore broadcasts from our own IP */
560 29 : if (sender_ip == pd->local_ip) {
561 21 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_DISCOVERY,
562 : "Ignoring packet from our own IP %u.%u.%u.%u",
563 : (sender_ip >> 24) & 0xFF, (sender_ip >> 16) & 0xFF,
564 : (sender_ip >> 8) & 0xFF, sender_ip & 0xFF);
565 21 : return 0; /* Our own broadcast, ignore */
566 : }
567 :
568 : /* Decode packet */
569 8 : if (pt_discovery_decode(ctx, buf, ret, &pkt) < 0) {
570 8 : PT_CTX_WARN(ctx, PT_LOG_CAT_DISCOVERY,
571 : "Failed to decode discovery packet from %u.%u.%u.%u",
572 : (sender_ip >> 24) & 0xFF, (sender_ip >> 16) & 0xFF,
573 : (sender_ip >> 8) & 0xFF, sender_ip & 0xFF);
574 8 : return 0; /* Ignore invalid packets */
575 : }
576 :
577 : /* Update statistics */
578 0 : ctx->global_stats.discovery_packets_received++;
579 :
580 0 : PT_CTX_INFO(ctx, PT_LOG_CAT_DISCOVERY,
581 : "Discovery %s received from %u.%u.%u.%u:%u (%s)",
582 : pkt.type == PT_DISC_TYPE_ANNOUNCE ? "ANNOUNCE" :
583 : pkt.type == PT_DISC_TYPE_QUERY ? "QUERY" : "GOODBYE",
584 : (sender_ip >> 24) & 0xFF, (sender_ip >> 16) & 0xFF,
585 : (sender_ip >> 8) & 0xFF, sender_ip & 0xFF,
586 : pkt.sender_port, pkt.name);
587 :
588 : /* Handle packet type */
589 0 : PT_CTX_INFO(ctx, PT_LOG_CAT_DISCOVERY,
590 : "Processing packet type %u", pkt.type);
591 0 : switch (pkt.type) {
592 0 : case PT_DISC_TYPE_ANNOUNCE:
593 0 : PT_CTX_INFO(ctx, PT_LOG_CAT_DISCOVERY,
594 : "Handling ANNOUNCE packet");
595 : /* Find or create peer */
596 0 : peer = pt_peer_find_by_addr(ctx, sender_ip, pkt.sender_port);
597 0 : PT_CTX_INFO(ctx, PT_LOG_CAT_DISCOVERY,
598 : "pt_peer_find_by_addr returned: %p", (void*)peer);
599 0 : if (!peer) {
600 : /* Create new peer */
601 0 : PT_CTX_INFO(ctx, PT_LOG_CAT_DISCOVERY,
602 : "Creating new peer: %s at %u.%u.%u.%u:%u",
603 : pkt.name,
604 : (sender_ip >> 24) & 0xFF, (sender_ip >> 16) & 0xFF,
605 : (sender_ip >> 8) & 0xFF, sender_ip & 0xFF,
606 : pkt.sender_port);
607 0 : peer = pt_peer_create(ctx, pkt.name, sender_ip, pkt.sender_port);
608 0 : if (!peer) {
609 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_DISCOVERY,
610 : "Failed to create peer for %s", pkt.name);
611 0 : return 0;
612 : }
613 :
614 0 : PT_CTX_INFO(ctx, PT_LOG_CAT_DISCOVERY,
615 : "Peer created successfully, firing callback");
616 :
617 : /* Fire on_peer_discovered callback */
618 0 : if (ctx->callbacks.on_peer_discovered) {
619 : PeerTalk_PeerInfo info;
620 0 : pt_peer_get_info(peer, &info);
621 0 : PT_CTX_INFO(ctx, PT_LOG_CAT_DISCOVERY,
622 : "Calling on_peer_discovered callback");
623 0 : ctx->callbacks.on_peer_discovered((PeerTalk_Context *)ctx,
624 : &info, ctx->callbacks.user_data);
625 : } else {
626 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_DISCOVERY,
627 : "No on_peer_discovered callback registered!");
628 : }
629 : } else {
630 : /* Update existing peer */
631 0 : peer->hot.last_seen = ctx->plat->get_ticks();
632 : }
633 0 : break;
634 :
635 0 : case PT_DISC_TYPE_QUERY:
636 : /* Respond with ANNOUNCE */
637 0 : pt_posix_discovery_send(ctx, PT_DISC_TYPE_ANNOUNCE);
638 0 : break;
639 :
640 0 : case PT_DISC_TYPE_GOODBYE:
641 : /* Remove peer */
642 0 : peer = pt_peer_find_by_addr(ctx, sender_ip, pkt.sender_port);
643 0 : if (peer) {
644 0 : PeerTalk_PeerID peer_id = peer->hot.id;
645 :
646 : /* Fire on_peer_lost callback before destroying */
647 0 : if (ctx->callbacks.on_peer_lost) {
648 0 : ctx->callbacks.on_peer_lost((PeerTalk_Context *)ctx,
649 : peer_id, ctx->callbacks.user_data);
650 : }
651 :
652 0 : pt_peer_destroy(ctx, peer);
653 : }
654 0 : break;
655 :
656 0 : default:
657 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_DISCOVERY,
658 : "Unknown discovery packet type: %u", pkt.type);
659 0 : break;
660 : }
661 :
662 0 : return 1; /* Packet processed */
663 : }
664 :
665 : /* ========================================================================== */
666 : /* Active Peer Tracking */
667 : /* ========================================================================== */
668 :
669 : /**
670 : * Add peer to active tracking list (O(1))
671 : *
672 : * Used when accepting connection or initiating connect.
673 : * Marks fd_sets dirty for rebuild.
674 : */
675 32 : static void pt_posix_add_active_peer(pt_posix_data *pd, uint8_t peer_idx) {
676 : /* Skip if already in list */
677 32 : if (pd->active_position[peer_idx] != 0xFF) {
678 0 : return;
679 : }
680 :
681 : /* Add to end of active list */
682 32 : pd->active_peers[pd->active_count] = peer_idx;
683 32 : pd->active_position[peer_idx] = pd->active_count;
684 32 : pd->active_count++;
685 32 : pd->fd_dirty = 1;
686 : }
687 :
688 : /**
689 : * Remove peer from active tracking list (O(1) swap-back)
690 : *
691 : * Used when closing connection. Swaps last element into removed position.
692 : * Marks fd_sets dirty for rebuild.
693 : */
694 9 : static void pt_posix_remove_active_peer(pt_posix_data *pd, uint8_t peer_idx) {
695 9 : uint8_t pos = pd->active_position[peer_idx];
696 :
697 : /* Not in list */
698 9 : if (pos == 0xFF) {
699 0 : return;
700 : }
701 :
702 : /* Swap last element into this position */
703 9 : if (pos < pd->active_count - 1) {
704 1 : uint8_t last_idx = pd->active_peers[pd->active_count - 1];
705 1 : pd->active_peers[pos] = last_idx;
706 1 : pd->active_position[last_idx] = pos;
707 : }
708 :
709 : /* Clear removed peer's position */
710 9 : pd->active_position[peer_idx] = 0xFF;
711 9 : pd->active_count--;
712 9 : pd->fd_dirty = 1;
713 : }
714 :
715 : /**
716 : * Rebuild cached fd_sets when connections change (Session 4.6)
717 : *
718 : * Only called when pd->fd_dirty is set. Rebuilds read and write fd_sets
719 : * from scratch by adding server sockets and iterating active peers.
720 : *
721 : * For connecting peers, also adds socket to write_fds to detect completion.
722 : */
723 87 : static void pt_posix_rebuild_fd_sets(struct pt_context *ctx) {
724 87 : pt_posix_data *pd = pt_posix_get(ctx);
725 :
726 : /* Clear both fd_sets */
727 1479 : FD_ZERO(&pd->cached_read_fds);
728 1479 : FD_ZERO(&pd->cached_write_fds);
729 87 : pd->max_fd = -1;
730 :
731 : /* Add server sockets to read set */
732 87 : if (pd->discovery_sock >= 0) {
733 18 : FD_SET(pd->discovery_sock, &pd->cached_read_fds);
734 18 : if (pd->discovery_sock > pd->max_fd)
735 18 : pd->max_fd = pd->discovery_sock;
736 : }
737 :
738 87 : if (pd->listen_sock >= 0) {
739 49 : FD_SET(pd->listen_sock, &pd->cached_read_fds);
740 49 : if (pd->listen_sock > pd->max_fd)
741 49 : pd->max_fd = pd->listen_sock;
742 : }
743 :
744 87 : if (pd->udp_msg_sock >= 0) {
745 87 : FD_SET(pd->udp_msg_sock, &pd->cached_read_fds);
746 87 : if (pd->udp_msg_sock > pd->max_fd)
747 23 : pd->max_fd = pd->udp_msg_sock;
748 : }
749 :
750 : /* Iterate active peers only (not all PT_MAX_PEERS slots) */
751 126 : for (uint8_t i = 0; i < pd->active_count; i++) {
752 39 : uint8_t peer_idx = pd->active_peers[i];
753 39 : int sock = pd->tcp_socks[peer_idx];
754 :
755 39 : if (sock >= 0) {
756 39 : struct pt_peer *peer = &ctx->peers[peer_idx];
757 :
758 : /* All active sockets go in read set */
759 39 : FD_SET(sock, &pd->cached_read_fds);
760 :
761 : /* Connecting sockets also go in write set */
762 39 : if (peer->hot.state == PT_PEER_CONNECTING) {
763 11 : FD_SET(sock, &pd->cached_write_fds);
764 : }
765 :
766 39 : if (sock > pd->max_fd)
767 39 : pd->max_fd = sock;
768 : }
769 : }
770 :
771 : /* Clear dirty flag */
772 87 : pd->fd_dirty = 0;
773 87 : }
774 :
775 : /* ========================================================================== */
776 : /* TCP Server */
777 : /* ========================================================================== */
778 :
779 27 : int pt_posix_listen_start(struct pt_context *ctx) {
780 27 : pt_posix_data *pd = pt_posix_get(ctx);
781 : struct sockaddr_in addr;
782 : int sock;
783 : uint16_t port;
784 :
785 27 : port = ctx->config.tcp_port > 0 ?
786 : ctx->config.tcp_port : TCP_PORT(ctx);
787 :
788 27 : sock = socket(AF_INET, SOCK_STREAM, 0);
789 27 : if (sock < 0) {
790 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_CONNECT,
791 : "Failed to create listen socket: %s", strerror(errno));
792 0 : return -1;
793 : }
794 :
795 27 : if (set_nonblocking(ctx, sock) < 0) {
796 : /* Error already logged by helper */
797 0 : close(sock);
798 0 : return -1;
799 : }
800 :
801 27 : if (set_reuseaddr(ctx, sock) < 0) {
802 : /* Error already logged by helper */
803 0 : close(sock);
804 0 : return -1;
805 : }
806 :
807 27 : pt_memset(&addr, 0, sizeof(addr));
808 27 : addr.sin_family = AF_INET;
809 27 : addr.sin_addr.s_addr = INADDR_ANY;
810 27 : addr.sin_port = htons(port);
811 :
812 27 : if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
813 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_CONNECT,
814 : "Failed to bind listen socket: %s", strerror(errno));
815 0 : close(sock);
816 0 : return -1;
817 : }
818 :
819 27 : if (listen(sock, 8) < 0) {
820 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_CONNECT,
821 : "Listen failed: %s", strerror(errno));
822 0 : close(sock);
823 0 : return -1;
824 : }
825 :
826 27 : pd->listen_sock = sock;
827 27 : pd->listen_port = port;
828 :
829 27 : if (sock > pd->max_fd)
830 25 : pd->max_fd = sock;
831 :
832 27 : PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
833 : "Listening on port %u", port);
834 :
835 27 : return 0;
836 : }
837 :
838 25 : void pt_posix_listen_stop(struct pt_context *ctx) {
839 25 : pt_posix_data *pd = pt_posix_get(ctx);
840 :
841 25 : if (pd->listen_sock >= 0) {
842 23 : close(pd->listen_sock);
843 23 : pd->listen_sock = -1;
844 23 : PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT, "Listen stopped");
845 : }
846 25 : }
847 :
848 40 : int pt_posix_listen_poll(struct pt_context *ctx) {
849 40 : pt_posix_data *pd = pt_posix_get(ctx);
850 : struct sockaddr_in addr;
851 40 : socklen_t addr_len = sizeof(addr);
852 : int client_sock;
853 : uint32_t client_ip;
854 : struct pt_peer *peer;
855 :
856 40 : if (pd->listen_sock < 0)
857 0 : return 0;
858 :
859 : /* Non-blocking accept */
860 40 : client_sock = accept(pd->listen_sock, (struct sockaddr *)&addr, &addr_len);
861 40 : if (client_sock < 0) {
862 19 : if (errno == EWOULDBLOCK || errno == EAGAIN)
863 19 : return 0;
864 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_CONNECT,
865 : "Accept error: %s", strerror(errno));
866 0 : return -1;
867 : }
868 :
869 21 : set_nonblocking(ctx, client_sock);
870 :
871 : /* Disable Nagle - ensure immediate send for Classic Mac receivers */
872 : {
873 21 : int flag = 1;
874 21 : setsockopt(client_sock, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
875 : }
876 :
877 21 : client_ip = ntohl(addr.sin_addr.s_addr);
878 :
879 21 : PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
880 : "Incoming connection from 0x%08X", client_ip);
881 :
882 : /* Find or create peer */
883 21 : peer = pt_peer_find_by_addr(ctx, client_ip, 0);
884 21 : if (!peer) {
885 : /* Unknown peer - create with empty name */
886 21 : peer = pt_peer_create(ctx, "", client_ip, ntohs(addr.sin_port));
887 : }
888 :
889 21 : if (!peer) {
890 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_CONNECT,
891 : "No peer slot for incoming connection");
892 0 : close(client_sock);
893 0 : return 0;
894 : }
895 :
896 : /* Store socket and reset receive state */
897 21 : int peer_idx = peer->hot.id - 1;
898 21 : pd->tcp_socks[peer_idx] = client_sock;
899 21 : pd->recv_bufs[peer_idx].hot.state = PT_RECV_HEADER;
900 21 : pd->recv_bufs[peer_idx].hot.bytes_needed = PT_COMPACT_HEADER_SIZE;
901 21 : pd->recv_bufs[peer_idx].hot.bytes_received = 0;
902 :
903 : /* Add to active peers list and mark fd_sets dirty */
904 21 : pt_posix_add_active_peer(pd, peer_idx);
905 :
906 : /* Allocate send and receive queues */
907 21 : peer->send_queue = pt_alloc_peer_queue(ctx);
908 21 : peer->recv_queue = pt_alloc_peer_queue(ctx);
909 :
910 21 : if (!peer->send_queue || !peer->recv_queue) {
911 : /* Allocation failed - clean up and reject connection */
912 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_MEMORY,
913 : "Failed to allocate queues for peer %u, rejecting connection",
914 : peer->hot.id);
915 :
916 : /* Free any partially allocated queues */
917 0 : pt_free_peer_queue(peer->send_queue);
918 0 : pt_free_peer_queue(peer->recv_queue);
919 0 : peer->send_queue = NULL;
920 0 : peer->recv_queue = NULL;
921 :
922 : /* Close socket and clean up peer */
923 0 : close(client_sock);
924 0 : pd->tcp_socks[peer_idx] = -1;
925 0 : pt_posix_remove_active_peer(pd, peer_idx);
926 0 : return 0;
927 : }
928 :
929 : /* Update state with logging */
930 21 : pt_peer_set_state(ctx, peer, PT_PEER_CONNECTED);
931 21 : peer->hot.last_seen = ctx->plat->get_ticks();
932 :
933 21 : PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
934 : "Accepted connection from peer %u at 0x%08X (assigned to slot %u)",
935 : peer->hot.id, client_ip, peer_idx);
936 :
937 : /* Fire callback */
938 21 : if (ctx->callbacks.on_peer_connected) {
939 14 : ctx->callbacks.on_peer_connected((PeerTalk_Context *)ctx,
940 14 : peer->hot.id, ctx->callbacks.user_data);
941 : }
942 :
943 : /* Send capabilities for negotiation */
944 21 : pt_posix_send_capability(ctx, peer);
945 :
946 21 : return 1;
947 : }
948 :
949 : /* ========================================================================== */
950 : /* TCP Client */
951 : /* ========================================================================== */
952 :
953 12 : int pt_posix_connect(struct pt_context *ctx, struct pt_peer *peer) {
954 12 : pt_posix_data *pd = pt_posix_get(ctx);
955 : struct sockaddr_in addr;
956 : int sock;
957 : int result;
958 : uint32_t peer_ip;
959 : uint16_t peer_port;
960 :
961 12 : if (!peer || peer->hot.magic != PT_PEER_MAGIC)
962 0 : return PT_ERR_INVALID_PARAM;
963 :
964 12 : if (peer->hot.state != PT_PEER_DISCOVERED)
965 1 : return PT_ERR_INVALID_STATE;
966 :
967 : /* Get peer address from first address entry */
968 11 : if (peer->hot.address_count == 0)
969 0 : return PT_ERR_INVALID_STATE;
970 :
971 11 : peer_ip = peer->cold.addresses[0].address;
972 11 : peer_port = peer->cold.addresses[0].port;
973 :
974 11 : sock = socket(AF_INET, SOCK_STREAM, 0);
975 11 : if (sock < 0) {
976 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_CONNECT,
977 : "Failed to create socket: %s", strerror(errno));
978 0 : return PT_ERR_NETWORK;
979 : }
980 :
981 11 : set_nonblocking(ctx, sock);
982 :
983 : /* Disable Nagle's algorithm - send immediately rather than coalescing.
984 : * This ensures TCP PSH flag is set promptly, triggering immediate
985 : * TCPNoCopyRcv completion on Classic Mac receivers. */
986 : {
987 11 : int flag = 1;
988 11 : setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
989 : }
990 :
991 11 : pt_memset(&addr, 0, sizeof(addr));
992 11 : addr.sin_family = AF_INET;
993 11 : addr.sin_addr.s_addr = htonl(peer_ip);
994 11 : addr.sin_port = htons(peer_port);
995 :
996 11 : PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
997 : "Connecting to peer %u (%s) at 0x%08X:%u",
998 : peer->hot.id, peer->cold.name, peer_ip, peer_port);
999 :
1000 11 : result = connect(sock, (struct sockaddr *)&addr, sizeof(addr));
1001 :
1002 11 : if (result < 0) {
1003 11 : if (errno == EINPROGRESS) {
1004 : /* Connection in progress - this is expected for non-blocking */
1005 11 : int peer_idx = peer->hot.id - 1;
1006 11 : pd->tcp_socks[peer_idx] = sock;
1007 11 : pd->recv_bufs[peer_idx].hot.state = PT_RECV_HEADER;
1008 11 : pd->recv_bufs[peer_idx].hot.bytes_needed = PT_COMPACT_HEADER_SIZE;
1009 11 : pd->recv_bufs[peer_idx].hot.bytes_received = 0;
1010 :
1011 : /* Add to active peers list and mark fd_sets dirty */
1012 11 : pt_posix_add_active_peer(pd, peer_idx);
1013 :
1014 11 : pt_peer_set_state(ctx, peer, PT_PEER_CONNECTING);
1015 11 : peer->cold.ping_sent_time = ctx->plat->get_ticks();
1016 11 : return PT_OK;
1017 : }
1018 :
1019 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_CONNECT,
1020 : "Connect failed to peer %u (%s) at 0x%08X:%u: %s",
1021 : peer->hot.id, peer->cold.name, peer_ip, peer_port, strerror(errno));
1022 0 : close(sock);
1023 0 : pt_peer_set_state(ctx, peer, PT_PEER_FAILED);
1024 0 : return PT_ERR_NETWORK;
1025 : }
1026 :
1027 : /* Immediate connection (unlikely but possible on localhost) */
1028 0 : int peer_idx = peer->hot.id - 1;
1029 0 : pd->tcp_socks[peer_idx] = sock;
1030 0 : pd->recv_bufs[peer_idx].hot.state = PT_RECV_HEADER;
1031 0 : pd->recv_bufs[peer_idx].hot.bytes_needed = PT_COMPACT_HEADER_SIZE;
1032 0 : pd->recv_bufs[peer_idx].hot.bytes_received = 0;
1033 :
1034 : /* Add to active peers list and mark fd_sets dirty */
1035 0 : pt_posix_add_active_peer(pd, peer_idx);
1036 :
1037 : /* Allocate send and receive queues */
1038 0 : peer->send_queue = pt_alloc_peer_queue(ctx);
1039 0 : peer->recv_queue = pt_alloc_peer_queue(ctx);
1040 :
1041 0 : if (!peer->send_queue || !peer->recv_queue) {
1042 : /* Allocation failed - clean up */
1043 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_MEMORY,
1044 : "Failed to allocate queues for peer %u",
1045 : peer->hot.id);
1046 :
1047 0 : pt_free_peer_queue(peer->send_queue);
1048 0 : pt_free_peer_queue(peer->recv_queue);
1049 0 : peer->send_queue = NULL;
1050 0 : peer->recv_queue = NULL;
1051 :
1052 0 : close(sock);
1053 0 : pd->tcp_socks[peer_idx] = -1;
1054 0 : pt_posix_remove_active_peer(pd, peer_idx);
1055 0 : pt_peer_set_state(ctx, peer, PT_PEER_FAILED);
1056 0 : return PT_ERR_NO_MEMORY;
1057 : }
1058 :
1059 0 : pt_peer_set_state(ctx, peer, PT_PEER_CONNECTED);
1060 0 : peer->hot.last_seen = ctx->plat->get_ticks();
1061 :
1062 0 : PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
1063 : "Connected to peer %u (%s) - immediate connect on localhost",
1064 : peer->hot.id, peer->cold.name);
1065 :
1066 0 : if (ctx->callbacks.on_peer_connected) {
1067 0 : ctx->callbacks.on_peer_connected((PeerTalk_Context *)ctx,
1068 0 : peer->hot.id, ctx->callbacks.user_data);
1069 : }
1070 :
1071 : /* Send capabilities for negotiation */
1072 0 : pt_posix_send_capability(ctx, peer);
1073 :
1074 0 : return PT_OK;
1075 : }
1076 :
1077 3 : int pt_posix_disconnect(struct pt_context *ctx, struct pt_peer *peer) {
1078 3 : pt_posix_data *pd = pt_posix_get(ctx);
1079 : int peer_idx;
1080 : int sock;
1081 :
1082 3 : if (!peer || peer->hot.magic != PT_PEER_MAGIC)
1083 0 : return PT_ERR_INVALID_PARAM;
1084 :
1085 3 : peer_idx = peer->hot.id - 1;
1086 3 : sock = pd->tcp_socks[peer_idx];
1087 :
1088 3 : if (sock >= 0) {
1089 0 : PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
1090 : "Disconnecting peer %u (%s)", peer->hot.id, peer->cold.name);
1091 :
1092 : /* Send disconnect message if connected */
1093 0 : if (peer->hot.state == PT_PEER_CONNECTED) {
1094 : pt_message_header hdr;
1095 : uint8_t buf[PT_MESSAGE_HEADER_SIZE + 2];
1096 :
1097 0 : hdr.version = PT_PROTOCOL_VERSION;
1098 0 : hdr.type = PT_MSG_TYPE_DISCONNECT;
1099 0 : hdr.flags = 0;
1100 0 : hdr.sequence = peer->hot.send_seq++;
1101 0 : hdr.payload_len = 0;
1102 :
1103 0 : pt_message_encode_header(&hdr, buf);
1104 : /* Add CRC for empty payload */
1105 0 : uint16_t crc = pt_crc16(buf, PT_MESSAGE_HEADER_SIZE);
1106 0 : buf[PT_MESSAGE_HEADER_SIZE] = (crc >> 8) & 0xFF;
1107 0 : buf[PT_MESSAGE_HEADER_SIZE + 1] = crc & 0xFF;
1108 :
1109 0 : send(sock, buf, sizeof(buf), 0);
1110 : }
1111 :
1112 0 : close(sock);
1113 0 : pd->tcp_socks[peer_idx] = -1;
1114 :
1115 : /* Remove from active peers list */
1116 0 : pt_posix_remove_active_peer(pd, peer_idx);
1117 : }
1118 :
1119 3 : pt_peer_set_state(ctx, peer, PT_PEER_DISCONNECTING);
1120 :
1121 3 : if (ctx->callbacks.on_peer_disconnected) {
1122 0 : ctx->callbacks.on_peer_disconnected((PeerTalk_Context *)ctx,
1123 0 : peer->hot.id, PT_OK,
1124 : ctx->callbacks.user_data);
1125 : }
1126 :
1127 : /* Free queues */
1128 3 : pt_free_peer_queue(peer->send_queue);
1129 3 : pt_free_peer_queue(peer->recv_queue);
1130 3 : peer->send_queue = NULL;
1131 3 : peer->recv_queue = NULL;
1132 :
1133 : /* Back to DISCOVERED so reconnection is possible */
1134 3 : pt_peer_set_state(ctx, peer, PT_PEER_DISCOVERED);
1135 :
1136 3 : return PT_OK;
1137 : }
1138 :
1139 : /* ========================================================================== */
1140 : /* TCP Message I/O (Session 4.3) */
1141 : /* ========================================================================== */
1142 :
1143 : /**
1144 : * Send framed message to a connected peer
1145 : *
1146 : * Uses writev() for atomic transmission of header + payload + CRC in a single
1147 : * syscall. This is more efficient than multiple send() calls and avoids TCP
1148 : * Nagle algorithm issues.
1149 : *
1150 : * @param ctx PeerTalk context
1151 : * @param peer Target peer (must be in CONNECTED state)
1152 : * @param data Message payload
1153 : * @param len Payload length (max PT_MAX_MESSAGE_SIZE)
1154 : * @return PT_OK on success, error code on failure
1155 : */
1156 : /**
1157 : * Send data message to peer with specified flags
1158 : *
1159 : * Internal function that allows setting message flags (e.g., for fragments).
1160 : * Regular data messages use flags=0, fragments use PT_MSG_FLAG_FRAGMENT.
1161 : */
1162 21 : static int pt_posix_send_with_flags(struct pt_context *ctx, struct pt_peer *peer,
1163 : const void *data, size_t len, uint8_t msg_flags) {
1164 21 : pt_posix_data *pd = pt_posix_get(ctx);
1165 : pt_message_header hdr;
1166 : pt_compact_header compact_hdr;
1167 : uint8_t header_buf[PT_MESSAGE_HEADER_SIZE]; /* Large enough for either format */
1168 : uint8_t crc_buf[2];
1169 : struct iovec iov[3];
1170 : ssize_t total_len;
1171 : ssize_t sent;
1172 : int peer_idx;
1173 : int sock;
1174 : uint16_t crc;
1175 : int use_compact;
1176 : uint16_t header_size;
1177 :
1178 : /* Validation */
1179 21 : if (!peer || peer->hot.magic != PT_PEER_MAGIC) {
1180 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL, "Invalid peer");
1181 0 : return PT_ERR_INVALID_PARAM;
1182 : }
1183 :
1184 21 : if (peer->hot.state != PT_PEER_CONNECTED) {
1185 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_PROTOCOL,
1186 : "Cannot send to peer %u: not connected (state=%d)",
1187 : peer->hot.id, peer->hot.state);
1188 0 : return PT_ERR_INVALID_STATE;
1189 : }
1190 :
1191 21 : if (len > PT_MAX_MESSAGE_SIZE) {
1192 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
1193 : "Message too large: %zu bytes (max %d)",
1194 : len, PT_MAX_MESSAGE_SIZE);
1195 0 : return PT_ERR_MESSAGE_TOO_LARGE;
1196 : }
1197 :
1198 21 : peer_idx = peer->hot.id - 1;
1199 21 : sock = pd->tcp_socks[peer_idx];
1200 :
1201 21 : if (sock < 0) {
1202 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL, "Invalid socket for peer %u", peer->hot.id);
1203 0 : return PT_ERR_INVALID_STATE;
1204 : }
1205 :
1206 : /* Check if socket is writable BEFORE attempting send.
1207 : * This avoids starting a partial write that requires blocking to complete.
1208 : * If socket buffer is full, return WOULD_BLOCK so caller can retry later.
1209 : */
1210 : {
1211 : struct pollfd pfd;
1212 21 : pfd.fd = sock;
1213 21 : pfd.events = POLLOUT;
1214 21 : pfd.revents = 0;
1215 21 : if (poll(&pfd, 1, 0) <= 0 || !(pfd.revents & POLLOUT)) {
1216 : /* Socket not ready for writing - avoid partial write */
1217 0 : return PT_ERR_WOULD_BLOCK;
1218 : }
1219 : }
1220 :
1221 : /* Check if compact headers are negotiated with this peer.
1222 : * IMPORTANT: Don't use compact headers for fragments because
1223 : * PT_MSG_FLAG_FRAGMENT (0x10) doesn't fit in the 4-bit flags field.
1224 : * The compact header format only supports flags 0x00-0x0F. */
1225 40 : use_compact = peer->cold.caps.compact_mode &&
1226 19 : !(msg_flags & PT_MSG_FLAG_FRAGMENT);
1227 :
1228 21 : if (use_compact) {
1229 : /* Encode compact header (4 bytes, no CRC) */
1230 19 : compact_hdr.type = PT_MSG_TYPE_DATA;
1231 19 : compact_hdr.flags = msg_flags;
1232 19 : compact_hdr.payload_len = (uint16_t)len;
1233 19 : pt_message_encode_compact(&compact_hdr, header_buf);
1234 19 : header_size = PT_COMPACT_HEADER_SIZE;
1235 :
1236 : /* Prepare iovec for atomic send - no CRC */
1237 19 : iov[0].iov_base = header_buf;
1238 19 : iov[0].iov_len = header_size;
1239 19 : iov[1].iov_base = (void *)data;
1240 19 : iov[1].iov_len = len;
1241 :
1242 19 : total_len = header_size + len;
1243 : } else {
1244 : /* Encode full header (10 bytes + 2 CRC) */
1245 2 : hdr.version = PT_PROTOCOL_VERSION;
1246 2 : hdr.type = PT_MSG_TYPE_DATA;
1247 2 : hdr.flags = msg_flags;
1248 2 : hdr.sequence = peer->hot.send_seq++;
1249 2 : hdr.payload_len = (uint16_t)len;
1250 2 : pt_message_encode_header(&hdr, header_buf);
1251 2 : header_size = PT_MESSAGE_HEADER_SIZE;
1252 :
1253 : /* Calculate CRC over header + payload */
1254 2 : crc = pt_crc16(header_buf, header_size);
1255 2 : crc = pt_crc16_update(crc, data, len);
1256 2 : crc_buf[0] = (crc >> 8) & 0xFF;
1257 2 : crc_buf[1] = crc & 0xFF;
1258 :
1259 : /* Prepare iovec for atomic send */
1260 2 : iov[0].iov_base = header_buf;
1261 2 : iov[0].iov_len = header_size;
1262 2 : iov[1].iov_base = (void *)data;
1263 2 : iov[1].iov_len = len;
1264 2 : iov[2].iov_base = crc_buf;
1265 2 : iov[2].iov_len = 2;
1266 :
1267 2 : total_len = header_size + len + 2;
1268 : }
1269 :
1270 : /* Send with writev - handles partial writes */
1271 21 : sent = writev(sock, iov, use_compact ? 2 : 3);
1272 :
1273 21 : if (sent < 0) {
1274 0 : if (errno == EWOULDBLOCK || errno == EAGAIN) {
1275 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
1276 : "Send would block for peer %u", peer->hot.id);
1277 0 : return PT_ERR_WOULD_BLOCK;
1278 : }
1279 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
1280 : "Send failed for peer %u: %s", peer->hot.id, strerror(errno));
1281 0 : return PT_ERR_NETWORK;
1282 : }
1283 :
1284 : /* Handle partial writes by continuing with write() */
1285 21 : if (sent < total_len) {
1286 : /* Build combined buffer for remaining data */
1287 : uint8_t *sendbuf;
1288 0 : size_t buflen = (size_t)total_len;
1289 0 : size_t offset = (size_t)sent;
1290 0 : int max_retries = 20; /* 200ms max block (down from 1s) */
1291 0 : int retry_count = 0;
1292 :
1293 0 : sendbuf = (uint8_t *)malloc(buflen);
1294 0 : if (!sendbuf) {
1295 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL, "Failed to allocate send buffer");
1296 0 : return PT_ERR_NO_MEMORY;
1297 : }
1298 :
1299 : /* Copy header + payload (+ crc if not compact) into buffer */
1300 0 : memcpy(sendbuf, header_buf, header_size);
1301 0 : memcpy(sendbuf + header_size, data, len);
1302 0 : if (!use_compact) {
1303 0 : memcpy(sendbuf + header_size + len, crc_buf, 2);
1304 : }
1305 :
1306 : /* Continue sending from where we left off */
1307 0 : while (offset < buflen && retry_count < max_retries) {
1308 0 : ssize_t n = write(sock, sendbuf + offset, buflen - offset);
1309 0 : if (n < 0) {
1310 0 : if (errno == EWOULDBLOCK || errno == EAGAIN) {
1311 : struct timeval tv;
1312 : fd_set wfds;
1313 0 : FD_ZERO(&wfds);
1314 0 : FD_SET(sock, &wfds);
1315 0 : tv.tv_sec = 0;
1316 0 : tv.tv_usec = 10000; /* 10ms */
1317 0 : select(sock + 1, NULL, &wfds, NULL, &tv);
1318 0 : retry_count++;
1319 0 : continue;
1320 : }
1321 0 : free(sendbuf);
1322 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
1323 : "Send failed for peer %u: %s", peer->hot.id, strerror(errno));
1324 0 : return PT_ERR_NETWORK;
1325 : }
1326 0 : offset += (size_t)n;
1327 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
1328 : "Partial send to peer %u: %zu/%zu bytes, continuing...",
1329 : peer->hot.id, offset, buflen);
1330 : }
1331 :
1332 0 : free(sendbuf);
1333 :
1334 0 : if (offset < buflen) {
1335 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
1336 : "Send incomplete after %d retries: %zu/%zu bytes",
1337 : retry_count, offset, buflen);
1338 0 : return PT_ERR_NETWORK;
1339 : }
1340 :
1341 0 : sent = (ssize_t)offset;
1342 : }
1343 :
1344 : /* Update statistics */
1345 21 : peer->cold.stats.bytes_sent += sent;
1346 21 : peer->cold.stats.messages_sent++;
1347 21 : ctx->global_stats.total_bytes_sent += sent;
1348 21 : ctx->global_stats.total_messages_sent++;
1349 :
1350 21 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
1351 : "Sent %zd bytes to peer %u (seq=%u, flags=0x%02X)",
1352 : sent, peer->hot.id, hdr.sequence, msg_flags);
1353 :
1354 21 : return PT_OK;
1355 : }
1356 :
1357 : /**
1358 : * Send data message to peer (backward-compatible wrapper)
1359 : *
1360 : * Calls pt_posix_send_with_flags with flags=0 for normal data messages.
1361 : */
1362 19 : int pt_posix_send(struct pt_context *ctx, struct pt_peer *peer,
1363 : const void *data, size_t len) {
1364 19 : return pt_posix_send_with_flags(ctx, peer, data, len, 0);
1365 : }
1366 :
1367 : /**
1368 : * Send control message (PING, PONG, DISCONNECT)
1369 : *
1370 : * Control messages have zero-length payload and use sequence=0 to distinguish
1371 : * from user data messages.
1372 : *
1373 : * @param ctx PeerTalk context
1374 : * @param peer Target peer
1375 : * @param msg_type PT_MSG_TYPE_PING, PT_MSG_TYPE_PONG, or PT_MSG_TYPE_DISCONNECT
1376 : * @return PT_OK on success, error code on failure
1377 : */
1378 0 : int pt_posix_send_control(struct pt_context *ctx, struct pt_peer *peer,
1379 : uint8_t msg_type) {
1380 0 : pt_posix_data *pd = pt_posix_get(ctx);
1381 : pt_message_header hdr;
1382 : uint8_t buf[PT_MESSAGE_HEADER_SIZE + 2];
1383 : uint16_t crc;
1384 : ssize_t sent;
1385 : int peer_idx;
1386 : int sock;
1387 :
1388 0 : if (!peer || peer->hot.magic != PT_PEER_MAGIC)
1389 0 : return PT_ERR_INVALID_PARAM;
1390 :
1391 0 : if (peer->hot.state != PT_PEER_CONNECTED)
1392 0 : return PT_ERR_INVALID_STATE;
1393 :
1394 0 : peer_idx = peer->hot.id - 1;
1395 0 : sock = pd->tcp_socks[peer_idx];
1396 :
1397 0 : if (sock < 0)
1398 0 : return PT_ERR_INVALID_STATE;
1399 :
1400 : /* Encode header (sequence=0 for control messages) */
1401 0 : hdr.version = PT_PROTOCOL_VERSION;
1402 0 : hdr.type = msg_type;
1403 0 : hdr.flags = 0;
1404 0 : hdr.sequence = 0; /* Control messages use seq=0 intentionally */
1405 0 : hdr.payload_len = 0;
1406 0 : pt_message_encode_header(&hdr, buf);
1407 :
1408 : /* Calculate CRC */
1409 0 : crc = pt_crc16(buf, PT_MESSAGE_HEADER_SIZE);
1410 0 : buf[PT_MESSAGE_HEADER_SIZE] = (crc >> 8) & 0xFF;
1411 0 : buf[PT_MESSAGE_HEADER_SIZE + 1] = crc & 0xFF;
1412 :
1413 : /* Send */
1414 0 : sent = send(sock, buf, sizeof(buf), 0);
1415 :
1416 0 : if (sent < 0) {
1417 0 : if (errno == EWOULDBLOCK || errno == EAGAIN)
1418 0 : return PT_ERR_WOULD_BLOCK;
1419 0 : return PT_ERR_NETWORK;
1420 : }
1421 :
1422 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
1423 : "Sent control message type=%u to peer %u", msg_type, peer->hot.id);
1424 :
1425 0 : return PT_OK;
1426 : }
1427 :
1428 : /**
1429 : * Send capability message to peer
1430 : *
1431 : * Called after connection is established to exchange capability information.
1432 : * This enables automatic fragmentation for constrained peers.
1433 : *
1434 : * @param ctx PeerTalk context
1435 : * @param peer Target peer
1436 : * @return PT_OK on success, error code on failure
1437 : */
1438 32 : int pt_posix_send_capability(struct pt_context *ctx, struct pt_peer *peer) {
1439 32 : pt_posix_data *pd = pt_posix_get(ctx);
1440 : pt_message_header hdr;
1441 : pt_capability_msg caps;
1442 : uint8_t header_buf[PT_MESSAGE_HEADER_SIZE];
1443 : uint8_t payload_buf[32]; /* Capability TLV max ~15 bytes */
1444 : uint8_t crc_buf[2];
1445 : struct iovec iov[3];
1446 : uint16_t crc;
1447 : ssize_t sent;
1448 : int payload_len;
1449 : int peer_idx;
1450 : int sock;
1451 :
1452 32 : if (!peer || peer->hot.magic != PT_PEER_MAGIC)
1453 0 : return PT_ERR_INVALID_PARAM;
1454 :
1455 32 : if (peer->hot.state != PT_PEER_CONNECTED)
1456 0 : return PT_ERR_INVALID_STATE;
1457 :
1458 32 : peer_idx = peer->hot.id - 1;
1459 32 : sock = pd->tcp_socks[peer_idx];
1460 :
1461 32 : if (sock < 0)
1462 0 : return PT_ERR_INVALID_STATE;
1463 :
1464 : /* Rate-limit capability sends to prevent flooding the peer's receive buffer.
1465 : *
1466 : * When the peer (e.g., a Classic Mac) is under heavy receive load, sending
1467 : * many capability messages in rapid succession causes small TCP segments that
1468 : * pile up in an already-congested ibuf. This causes "dropping incomplete
1469 : * reassembly" errors and can crash MacTCP.
1470 : *
1471 : * We enforce PT_CAP_MIN_INTERVAL_MS between sends. If rate-limited, we keep
1472 : * pressure_update_pending=1 so the next poll retries. The first send
1473 : * (cap_last_sent=0) is always allowed.
1474 : */
1475 : {
1476 32 : pt_tick_t now_ms = ctx->plat->get_ticks();
1477 32 : if (peer->cold.caps.cap_last_sent != 0 &&
1478 0 : (now_ms - peer->cold.caps.cap_last_sent) < PT_CAP_MIN_INTERVAL_MS) {
1479 0 : peer->cold.caps.pressure_update_pending = 1; /* Retry later */
1480 0 : return PT_OK;
1481 : }
1482 32 : peer->cold.caps.cap_last_sent = now_ms;
1483 : }
1484 :
1485 : /* Fill in our capabilities */
1486 32 : caps.max_message_size = ctx->local_max_message;
1487 32 : caps.preferred_chunk = ctx->local_preferred_chunk;
1488 32 : caps.capability_flags = ctx->local_capability_flags;
1489 :
1490 : /* POSIX: Report standard socket buffer sizes.
1491 : * POSIX doesn't have MacTCP's 25% threshold issue, so we report
1492 : * typical defaults. The optimal_chunk is our preferred chunk since
1493 : * there's no completion threshold to optimize for. */
1494 32 : caps.recv_buffer_size = 65535; /* Typical socket buffer size */
1495 32 : caps.optimal_chunk = ctx->local_preferred_chunk;
1496 :
1497 : /* Calculate current buffer pressure from BOTH queues - report the worse one.
1498 : * This captures the actual constraint regardless of where it is:
1499 : * - High send_pressure: "I can't transmit fast enough"
1500 : * - High recv_pressure: "I can't receive fast enough"
1501 : */
1502 : {
1503 32 : uint8_t send_pressure = peer->send_queue ? pt_queue_pressure(peer->send_queue) : 0;
1504 32 : uint8_t recv_pressure = peer->recv_queue ? pt_queue_pressure(peer->recv_queue) : 0;
1505 32 : caps.buffer_pressure = (send_pressure > recv_pressure) ? send_pressure : recv_pressure;
1506 : }
1507 32 : caps.reserved = 0;
1508 :
1509 : /* Track what we reported for flow control threshold detection */
1510 32 : peer->cold.caps.last_reported_pressure = caps.buffer_pressure;
1511 32 : peer->cold.caps.pressure_update_pending = 0;
1512 :
1513 : /* Encode capability TLV payload */
1514 32 : payload_len = pt_capability_encode(&caps, payload_buf, sizeof(payload_buf));
1515 32 : if (payload_len < 0) {
1516 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
1517 : "Failed to encode capabilities for peer %u", peer->hot.id);
1518 0 : return PT_ERR_INTERNAL;
1519 : }
1520 :
1521 : /* Encode header */
1522 32 : hdr.version = PT_PROTOCOL_VERSION;
1523 32 : hdr.type = PT_MSG_TYPE_CAPABILITY;
1524 32 : hdr.flags = 0;
1525 32 : hdr.sequence = 0; /* Capability messages use seq=0 */
1526 32 : hdr.payload_len = (uint16_t)payload_len;
1527 32 : pt_message_encode_header(&hdr, header_buf);
1528 :
1529 : /* Calculate CRC over header + payload */
1530 32 : crc = pt_crc16(header_buf, PT_MESSAGE_HEADER_SIZE);
1531 32 : crc = pt_crc16_update(crc, payload_buf, (size_t)payload_len);
1532 32 : crc_buf[0] = (crc >> 8) & 0xFF;
1533 32 : crc_buf[1] = crc & 0xFF;
1534 :
1535 : /* Prepare iovec for atomic send */
1536 32 : iov[0].iov_base = header_buf;
1537 32 : iov[0].iov_len = PT_MESSAGE_HEADER_SIZE;
1538 32 : iov[1].iov_base = payload_buf;
1539 32 : iov[1].iov_len = (size_t)payload_len;
1540 32 : iov[2].iov_base = crc_buf;
1541 32 : iov[2].iov_len = 2;
1542 :
1543 : /* Send with writev */
1544 32 : sent = writev(sock, iov, 3);
1545 :
1546 32 : if (sent < 0) {
1547 0 : if (errno == EWOULDBLOCK || errno == EAGAIN)
1548 0 : return PT_ERR_WOULD_BLOCK;
1549 0 : return PT_ERR_NETWORK;
1550 : }
1551 :
1552 32 : PT_CTX_INFO(ctx, PT_LOG_CAT_PROTOCOL,
1553 : "Sent capabilities to peer %u: max=%u chunk=%u",
1554 : peer->hot.id, caps.max_message_size, caps.preferred_chunk);
1555 :
1556 32 : return PT_OK;
1557 : }
1558 :
1559 : /* ========================================================================== */
1560 : /* Statistics Helpers (Session 4.5) */
1561 : /* ========================================================================== */
1562 :
1563 : /**
1564 : * Calculate connection quality score from latency
1565 : *
1566 : * Quality score (0-100) based on LAN latency thresholds:
1567 : * - < 5ms: 100 (excellent - typical wired LAN)
1568 : * - < 10ms: 90 (very good - good WiFi or loaded LAN)
1569 : * - < 20ms: 75 (good - congested WiFi or slower network)
1570 : * - < 50ms: 50 (fair - problematic connection for LAN)
1571 : * - >= 50ms: 25 (poor - very bad for LAN, investigate)
1572 : *
1573 : * Returns: Quality score (0-100)
1574 : */
1575 0 : static uint8_t calculate_quality(uint16_t latency_ms) {
1576 0 : if (latency_ms < 5) {
1577 0 : return 100;
1578 0 : } else if (latency_ms < 10) {
1579 0 : return 90;
1580 0 : } else if (latency_ms < 20) {
1581 0 : return 75;
1582 0 : } else if (latency_ms < 50) {
1583 0 : return 50;
1584 : } else {
1585 0 : return 25;
1586 : }
1587 : }
1588 :
1589 : /**
1590 : * Update peer latency from PONG response
1591 : *
1592 : * Called when PONG message is received. Calculates RTT, updates rolling average,
1593 : * and calculates connection quality score.
1594 : */
1595 0 : static void update_peer_latency(struct pt_context *ctx, struct pt_peer *peer) {
1596 : pt_tick_t now;
1597 : uint16_t rtt;
1598 :
1599 0 : if (peer->cold.ping_sent_time == 0) {
1600 0 : return; /* No pending ping */
1601 : }
1602 :
1603 0 : now = ctx->plat->get_ticks();
1604 0 : rtt = (uint16_t)(now - peer->cold.ping_sent_time);
1605 0 : peer->cold.ping_sent_time = 0;
1606 :
1607 : /* Rolling average: new = (old * 3 + sample) / 4 */
1608 0 : if (peer->cold.stats.latency_ms == 0) {
1609 0 : peer->cold.stats.latency_ms = rtt;
1610 : } else {
1611 0 : peer->cold.stats.latency_ms = (peer->cold.stats.latency_ms * 3 + rtt) / 4;
1612 : }
1613 :
1614 : /* Update hot latency for adaptive tuning */
1615 0 : peer->hot.latency_ms = peer->cold.stats.latency_ms;
1616 :
1617 : /* Update adaptive chunk size and pipeline depth based on RTT */
1618 0 : pt_peer_update_adaptive_params(ctx, peer);
1619 :
1620 : /* Calculate quality */
1621 0 : peer->cold.stats.quality = calculate_quality(peer->cold.stats.latency_ms);
1622 :
1623 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
1624 : "Peer %u latency: %u ms (quality: %u%%)",
1625 : peer->hot.id, peer->cold.stats.latency_ms, peer->cold.stats.quality);
1626 : }
1627 :
1628 : /* ========================================================================== */
1629 : /* TCP Receive State Machine (Session 4.3) */
1630 : /* ========================================================================== */
1631 :
1632 : /**
1633 : * Reset receive buffer to initial state
1634 : *
1635 : * @param buf Receive buffer to reset
1636 : */
1637 41 : static void pt_recv_reset(pt_recv_buffer *buf) {
1638 41 : buf->hot.state = PT_RECV_HEADER;
1639 41 : buf->hot.bytes_needed = PT_COMPACT_HEADER_SIZE; /* Start with minimum for format detection */
1640 41 : buf->hot.bytes_received = 0;
1641 41 : buf->hot.is_compact = 0;
1642 41 : }
1643 :
1644 : /**
1645 : * Receive header bytes
1646 : *
1647 : * Accumulates header bytes until complete, then transitions to payload or CRC state.
1648 : *
1649 : * @param ctx PeerTalk context
1650 : * @param peer Target peer
1651 : * @param buf Receive buffer
1652 : * @param sock TCP socket
1653 : * @return 1 = header complete, 0 = waiting for more data, -1 = error
1654 : */
1655 80 : static int pt_recv_header(struct pt_context *ctx, struct pt_peer *peer,
1656 : pt_recv_buffer *buf, int sock) {
1657 : ssize_t ret;
1658 : size_t remaining;
1659 :
1660 80 : remaining = buf->hot.bytes_needed - buf->hot.bytes_received;
1661 :
1662 80 : ret = recv(sock, buf->cold.header_buf + buf->hot.bytes_received,
1663 : remaining, 0);
1664 :
1665 80 : if (ret < 0) {
1666 7 : if (errno == EWOULDBLOCK || errno == EAGAIN)
1667 0 : return 0; /* No data yet */
1668 7 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
1669 : "Header recv error for peer %u: %s", peer->hot.id, strerror(errno));
1670 7 : return -1;
1671 : }
1672 :
1673 73 : if (ret == 0) {
1674 0 : PT_CTX_INFO(ctx, PT_LOG_CAT_PROTOCOL,
1675 : "Peer %u closed connection during header", peer->hot.id);
1676 0 : return -1;
1677 : }
1678 :
1679 73 : buf->hot.bytes_received += ret;
1680 :
1681 73 : if (buf->hot.bytes_received >= buf->hot.bytes_needed) {
1682 : /* Check if we've received enough to detect header format */
1683 73 : if (buf->hot.bytes_needed == PT_COMPACT_HEADER_SIZE) {
1684 : /* First 4 bytes received - detect format */
1685 45 : PT_CTX_INFO(ctx, PT_LOG_CAT_PROTOCOL,
1686 : "Header bytes [%02X %02X %02X %02X], received=%u",
1687 : buf->cold.header_buf[0], buf->cold.header_buf[1],
1688 : buf->cold.header_buf[2], buf->cold.header_buf[3],
1689 : buf->hot.bytes_received);
1690 :
1691 45 : if (pt_message_is_compact(buf->cold.header_buf, buf->hot.bytes_received)) {
1692 : /* Compact header (4 bytes, no CRC) */
1693 : pt_compact_header compact_hdr;
1694 14 : if (pt_message_decode_compact(buf->cold.header_buf,
1695 14 : buf->hot.bytes_received,
1696 : &compact_hdr) != PT_OK) {
1697 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
1698 : "Invalid compact header from peer %u", peer->hot.id);
1699 0 : return -1;
1700 : }
1701 :
1702 : /* Convert to standard header struct */
1703 14 : buf->cold.hdr.version = PT_PROTOCOL_VERSION;
1704 14 : buf->cold.hdr.type = compact_hdr.type;
1705 14 : buf->cold.hdr.flags = compact_hdr.flags;
1706 14 : buf->cold.hdr.sequence = 0; /* Compact has no sequence */
1707 14 : buf->cold.hdr.payload_len = compact_hdr.payload_len;
1708 14 : buf->hot.is_compact = 1;
1709 :
1710 : /* Validate payload size */
1711 14 : if (buf->cold.hdr.payload_len > PT_MAX_MESSAGE_SIZE) {
1712 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
1713 : "Payload too large from peer %u: %u bytes (max %d)",
1714 : peer->hot.id, buf->cold.hdr.payload_len, PT_MAX_MESSAGE_SIZE);
1715 0 : return -1;
1716 : }
1717 :
1718 : /* Transition to payload (skip CRC for compact) */
1719 14 : if (buf->cold.hdr.payload_len > 0) {
1720 14 : buf->hot.state = PT_RECV_PAYLOAD;
1721 14 : buf->hot.bytes_needed = buf->cold.hdr.payload_len;
1722 14 : buf->hot.bytes_received = 0;
1723 : } else {
1724 : /* No payload and no CRC - message complete */
1725 0 : return 2; /* Signal message complete (no CRC needed) */
1726 : }
1727 :
1728 14 : return 1; /* Header complete */
1729 : } else {
1730 : /* Full header - need 6 more bytes total (10 bytes header) */
1731 31 : buf->hot.bytes_needed = PT_MESSAGE_HEADER_SIZE;
1732 31 : buf->hot.is_compact = 0;
1733 :
1734 : /* Check if we already received enough bytes in first recv.
1735 : * This can happen when all header bytes arrive together.
1736 : * If so, fall through to decode. Otherwise continue receiving. */
1737 31 : if (buf->hot.bytes_received < PT_MESSAGE_HEADER_SIZE) {
1738 31 : return 0; /* Continue receiving more header bytes */
1739 : }
1740 : /* Fall through to decode full header */
1741 : }
1742 : }
1743 :
1744 : /* Full header (10 bytes) - decode it */
1745 28 : if (pt_message_decode_header(ctx, buf->cold.header_buf,
1746 : PT_MESSAGE_HEADER_SIZE,
1747 : &buf->cold.hdr) != PT_OK) {
1748 1 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
1749 : "Invalid message header from peer %u", peer->hot.id);
1750 1 : return -1;
1751 : }
1752 :
1753 : /* Validate payload size */
1754 27 : if (buf->cold.hdr.payload_len > PT_MAX_MESSAGE_SIZE) {
1755 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
1756 : "Payload too large from peer %u: %u bytes (max %d)",
1757 : peer->hot.id, buf->cold.hdr.payload_len, PT_MAX_MESSAGE_SIZE);
1758 0 : return -1;
1759 : }
1760 :
1761 : /* Transition to next state */
1762 27 : if (buf->cold.hdr.payload_len > 0) {
1763 27 : buf->hot.state = PT_RECV_PAYLOAD;
1764 27 : buf->hot.bytes_needed = buf->cold.hdr.payload_len;
1765 27 : buf->hot.bytes_received = 0;
1766 : } else {
1767 : /* No payload - go straight to CRC */
1768 0 : buf->hot.state = PT_RECV_CRC;
1769 0 : buf->hot.bytes_needed = 2;
1770 0 : buf->hot.bytes_received = 0;
1771 : }
1772 :
1773 27 : return 1; /* Header complete */
1774 : }
1775 :
1776 0 : return 0; /* Waiting for more header bytes */
1777 : }
1778 :
1779 : /**
1780 : * Receive payload bytes
1781 : *
1782 : * @return 1 = payload complete, 0 = waiting for more data, -1 = error
1783 : */
1784 42 : static int pt_recv_payload(struct pt_context *ctx, struct pt_peer *peer,
1785 : pt_recv_buffer *buf, int sock) {
1786 : ssize_t ret;
1787 : size_t remaining;
1788 :
1789 42 : remaining = buf->hot.bytes_needed - buf->hot.bytes_received;
1790 :
1791 42 : ret = recv(sock, buf->cold.payload_buf + buf->hot.bytes_received,
1792 : remaining, 0);
1793 :
1794 42 : if (ret < 0) {
1795 0 : if (errno == EWOULDBLOCK || errno == EAGAIN)
1796 0 : return 0;
1797 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
1798 : "Payload recv error for peer %u: %s", peer->hot.id, strerror(errno));
1799 0 : return -1;
1800 : }
1801 :
1802 42 : if (ret == 0) {
1803 0 : PT_CTX_INFO(ctx, PT_LOG_CAT_PROTOCOL,
1804 : "Peer %u closed connection during payload", peer->hot.id);
1805 0 : return -1;
1806 : }
1807 :
1808 42 : buf->hot.bytes_received += ret;
1809 :
1810 42 : if (buf->hot.bytes_received >= buf->hot.bytes_needed) {
1811 41 : if (buf->hot.is_compact) {
1812 : /* Compact header - no CRC, message complete */
1813 14 : return 2; /* Signal message complete (no CRC needed) */
1814 : }
1815 : /* Full header - move to CRC */
1816 27 : buf->hot.state = PT_RECV_CRC;
1817 27 : buf->hot.bytes_needed = 2;
1818 27 : buf->hot.bytes_received = 0;
1819 27 : return 1;
1820 : }
1821 :
1822 1 : return 0; /* Waiting for more payload bytes */
1823 : }
1824 :
1825 : /**
1826 : * Receive CRC bytes
1827 : *
1828 : * @return 1 = CRC complete, 0 = waiting for more data, -1 = error
1829 : */
1830 27 : static int pt_recv_crc(struct pt_context *ctx, struct pt_peer *peer,
1831 : pt_recv_buffer *buf, int sock) {
1832 : ssize_t ret;
1833 : size_t remaining;
1834 :
1835 27 : remaining = buf->hot.bytes_needed - buf->hot.bytes_received;
1836 :
1837 27 : ret = recv(sock, buf->cold.crc_buf + buf->hot.bytes_received,
1838 : remaining, 0);
1839 :
1840 27 : if (ret < 0) {
1841 0 : if (errno == EWOULDBLOCK || errno == EAGAIN)
1842 0 : return 0;
1843 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
1844 : "CRC recv error for peer %u: %s", peer->hot.id, strerror(errno));
1845 0 : return -1;
1846 : }
1847 :
1848 27 : if (ret == 0) {
1849 0 : PT_CTX_INFO(ctx, PT_LOG_CAT_PROTOCOL,
1850 : "Peer %u closed connection during CRC", peer->hot.id);
1851 0 : return -1;
1852 : }
1853 :
1854 27 : buf->hot.bytes_received += ret;
1855 :
1856 27 : if (buf->hot.bytes_received >= buf->hot.bytes_needed) {
1857 : /* CRC complete - message fully received */
1858 27 : return 1;
1859 : }
1860 :
1861 0 : return 0; /* Waiting for more CRC bytes */
1862 : }
1863 :
1864 : /**
1865 : * Process complete message after CRC validation
1866 : *
1867 : * Verifies CRC (for full headers only), updates statistics, and dispatches by message type.
1868 : * Compact headers have no CRC - skip validation for those.
1869 : *
1870 : * @return 0 on success, -1 on error or disconnect
1871 : */
1872 41 : static int pt_recv_process_message(struct pt_context *ctx, struct pt_peer *peer,
1873 : pt_recv_buffer *buf) {
1874 : uint16_t header_size;
1875 :
1876 : /* Compact headers have no CRC - skip validation */
1877 41 : if (!buf->hot.is_compact) {
1878 : uint16_t expected_crc;
1879 : uint16_t received_crc;
1880 :
1881 : /* Calculate expected CRC */
1882 27 : expected_crc = pt_crc16(buf->cold.header_buf, PT_MESSAGE_HEADER_SIZE);
1883 27 : if (buf->cold.hdr.payload_len > 0) {
1884 27 : expected_crc = pt_crc16_update(expected_crc, buf->cold.payload_buf,
1885 27 : buf->cold.hdr.payload_len);
1886 : }
1887 :
1888 : /* Extract received CRC */
1889 27 : received_crc = ((uint16_t)buf->cold.crc_buf[0] << 8) | buf->cold.crc_buf[1];
1890 :
1891 27 : if (expected_crc != received_crc) {
1892 1 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
1893 : "CRC mismatch from peer %u: expected 0x%04X, got 0x%04X",
1894 : peer->hot.id, expected_crc, received_crc);
1895 1 : return -1;
1896 : }
1897 : }
1898 :
1899 : /* Debug: Log what we're processing */
1900 40 : PT_CTX_INFO(ctx, PT_LOG_CAT_PROTOCOL,
1901 : "Processing msg from peer %u: type=%u flags=0x%02X len=%u compact=%u",
1902 : peer->hot.id, buf->cold.hdr.type, buf->cold.hdr.flags,
1903 : buf->cold.hdr.payload_len, buf->hot.is_compact);
1904 :
1905 : /* Update statistics - use correct header size */
1906 40 : header_size = buf->hot.is_compact ? PT_COMPACT_HEADER_SIZE : PT_MESSAGE_HEADER_SIZE;
1907 80 : peer->cold.stats.bytes_received += header_size +
1908 80 : buf->cold.hdr.payload_len +
1909 40 : (buf->hot.is_compact ? 0 : 2);
1910 40 : peer->cold.stats.messages_received++;
1911 80 : ctx->global_stats.total_bytes_received += header_size +
1912 80 : buf->cold.hdr.payload_len +
1913 40 : (buf->hot.is_compact ? 0 : 2);
1914 40 : ctx->global_stats.total_messages_received++;
1915 :
1916 : /* Dispatch by message type */
1917 40 : switch (buf->cold.hdr.type) {
1918 23 : case PT_MSG_TYPE_DATA:
1919 : /* Check for fragmented message */
1920 23 : if (buf->cold.hdr.flags & PT_MSG_FLAG_FRAGMENT) {
1921 : /* Fragment - process through reassembly */
1922 : pt_fragment_header frag_hdr;
1923 0 : const uint8_t *complete_data = NULL;
1924 0 : uint16_t complete_len = 0;
1925 : int reassembly_result;
1926 :
1927 : /* Decode fragment header from payload */
1928 0 : if (pt_fragment_decode(buf->cold.payload_buf,
1929 0 : buf->cold.hdr.payload_len,
1930 : &frag_hdr) != 0) {
1931 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_PROTOCOL,
1932 : "Failed to decode fragment header from peer %u",
1933 : peer->hot.id);
1934 0 : break;
1935 : }
1936 :
1937 : /* Process fragment through reassembly */
1938 0 : reassembly_result = pt_reassembly_process(ctx, peer,
1939 0 : buf->cold.payload_buf, buf->cold.hdr.payload_len,
1940 : &frag_hdr, &complete_data, &complete_len);
1941 :
1942 0 : if (reassembly_result == 1 && complete_data != NULL) {
1943 : /* Complete message reassembled - deliver to app callback */
1944 0 : if (ctx->callbacks.on_message_received) {
1945 0 : ctx->callbacks.on_message_received((PeerTalk_Context *)ctx,
1946 0 : peer->hot.id,
1947 : complete_data,
1948 : complete_len,
1949 : ctx->callbacks.user_data);
1950 : }
1951 0 : } else if (reassembly_result < 0) {
1952 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_PROTOCOL,
1953 : "Fragment reassembly error: %d", reassembly_result);
1954 : }
1955 : /* If 0, more fragments expected - nothing to do yet */
1956 : } else {
1957 : /* Non-fragmented - fire callback directly */
1958 23 : if (ctx->callbacks.on_message_received) {
1959 22 : ctx->callbacks.on_message_received((PeerTalk_Context *)ctx,
1960 22 : peer->hot.id,
1961 22 : buf->cold.payload_buf,
1962 22 : buf->cold.hdr.payload_len,
1963 : ctx->callbacks.user_data);
1964 : }
1965 : }
1966 23 : break;
1967 :
1968 0 : case PT_MSG_TYPE_PING:
1969 : /* Send PONG response */
1970 0 : pt_posix_send_control(ctx, peer, PT_MSG_TYPE_PONG);
1971 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
1972 : "Received PING from peer %u, sent PONG", peer->hot.id);
1973 0 : break;
1974 :
1975 0 : case PT_MSG_TYPE_PONG:
1976 : /* Update latency with rolling average and quality calculation */
1977 0 : update_peer_latency(ctx, peer);
1978 0 : break;
1979 :
1980 0 : case PT_MSG_TYPE_DISCONNECT:
1981 0 : PT_CTX_INFO(ctx, PT_LOG_CAT_PROTOCOL,
1982 : "Received DISCONNECT from peer %u", peer->hot.id);
1983 0 : return -1; /* Trigger disconnect */
1984 :
1985 0 : case PT_MSG_TYPE_ACK:
1986 : /* Handle reliable message ACK - TODO in future session */
1987 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
1988 : "Received ACK from peer %u", peer->hot.id);
1989 0 : break;
1990 :
1991 17 : case PT_MSG_TYPE_CAPABILITY:
1992 : /* Process capability message */
1993 : {
1994 : pt_capability_msg caps;
1995 : uint16_t effective_max;
1996 :
1997 17 : if (pt_capability_decode(ctx, buf->cold.payload_buf,
1998 17 : buf->cold.hdr.payload_len, &caps) == 0) {
1999 : /* Store peer capabilities */
2000 17 : peer->cold.caps.max_message_size = caps.max_message_size;
2001 17 : peer->cold.caps.preferred_chunk = caps.preferred_chunk;
2002 17 : peer->cold.caps.capability_flags = caps.capability_flags;
2003 17 : peer->cold.caps.buffer_pressure = caps.buffer_pressure;
2004 17 : peer->cold.caps.caps_exchanged = 1;
2005 :
2006 : /* Store receive buffer size and optimal chunk for tuning */
2007 17 : peer->cold.caps.recv_buffer_size = caps.recv_buffer_size;
2008 17 : peer->cold.caps.optimal_chunk = caps.optimal_chunk;
2009 :
2010 : /* Calculate flow control window based on peer's buffer capacity
2011 : *
2012 : * Window = peer_recv_buffer / our_max_message
2013 : * This limits how many messages we queue to avoid flooding.
2014 : */
2015 : {
2016 : uint16_t window;
2017 17 : uint16_t our_max = ctx->local_max_message;
2018 17 : uint16_t peer_buf = caps.recv_buffer_size;
2019 :
2020 17 : if (peer_buf > 0 && our_max > 0) {
2021 17 : window = peer_buf / our_max;
2022 : } else {
2023 0 : window = PT_FLOW_WINDOW_DEFAULT;
2024 : }
2025 :
2026 : /* Clamp to min/max */
2027 17 : if (window < PT_FLOW_WINDOW_MIN) {
2028 0 : window = PT_FLOW_WINDOW_MIN;
2029 : }
2030 17 : if (window > PT_FLOW_WINDOW_MAX) {
2031 1 : window = PT_FLOW_WINDOW_MAX;
2032 : }
2033 :
2034 17 : peer->cold.caps.send_window = window;
2035 : }
2036 :
2037 : /* Negotiate compact header mode - both must support it */
2038 17 : if ((caps.capability_flags & PT_CAPFLAG_COMPACT_HEADER) &&
2039 17 : (ctx->local_capability_flags & PT_CAPFLAG_COMPACT_HEADER)) {
2040 17 : peer->cold.caps.compact_mode = 1;
2041 : } else {
2042 0 : peer->cold.caps.compact_mode = 0;
2043 : }
2044 :
2045 : /* Check if peer needs push for performance */
2046 17 : peer->cold.caps.push_preferred =
2047 17 : (caps.capability_flags & PT_CAPFLAG_PUSH_PREFERRED) ? 1 : 0;
2048 :
2049 : /* Calculate effective max = min(ours, theirs) */
2050 17 : effective_max = ctx->local_max_message;
2051 17 : if (caps.max_message_size < effective_max) {
2052 1 : effective_max = caps.max_message_size;
2053 : }
2054 17 : peer->hot.effective_max_msg = effective_max;
2055 :
2056 17 : PT_CTX_INFO(ctx, PT_LOG_CAT_PROTOCOL,
2057 : "Received capabilities from peer %u: max=%u chunk=%u pressure=%u compact=%u recv_buf=%u optimal=%u push=%u",
2058 : peer->hot.id, caps.max_message_size, caps.preferred_chunk,
2059 : caps.buffer_pressure, peer->cold.caps.compact_mode,
2060 : caps.recv_buffer_size, caps.optimal_chunk, peer->cold.caps.push_preferred);
2061 : } else {
2062 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_PROTOCOL,
2063 : "Failed to decode capabilities from peer %u", peer->hot.id);
2064 : }
2065 : }
2066 17 : break;
2067 :
2068 0 : default:
2069 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_PROTOCOL,
2070 : "Unknown message type %u from peer %u",
2071 : buf->cold.hdr.type, peer->hot.id);
2072 0 : break;
2073 : }
2074 :
2075 40 : return 0;
2076 : }
2077 :
2078 : /**
2079 : * Main receive function - drives state machine
2080 : *
2081 : * Non-blocking receive with state machine for partial reads.
2082 : *
2083 : * @param ctx PeerTalk context
2084 : * @param peer Target peer
2085 : * @return 0 on success, -1 on error or disconnect
2086 : */
2087 81 : int pt_posix_recv(struct pt_context *ctx, struct pt_peer *peer) {
2088 81 : pt_posix_data *pd = pt_posix_get(ctx);
2089 : pt_recv_buffer *buf;
2090 : int peer_idx;
2091 : int sock;
2092 : int ret;
2093 :
2094 81 : if (!peer || peer->hot.magic != PT_PEER_MAGIC)
2095 0 : return -1;
2096 :
2097 81 : peer_idx = peer->hot.id - 1;
2098 81 : sock = pd->tcp_socks[peer_idx];
2099 81 : buf = &pd->recv_bufs[peer_idx];
2100 :
2101 81 : if (sock < 0)
2102 0 : return -1;
2103 :
2104 : /* Drive state machine */
2105 : while (1) {
2106 149 : switch (buf->hot.state) {
2107 80 : case PT_RECV_HEADER:
2108 80 : ret = pt_recv_header(ctx, peer, buf, sock);
2109 80 : if (ret <= 0) return ret;
2110 41 : if (ret == 2) {
2111 : /* Compact header with no payload - message complete */
2112 0 : ret = pt_recv_process_message(ctx, peer, buf);
2113 0 : pt_recv_reset(buf);
2114 0 : return ret;
2115 : }
2116 41 : break;
2117 :
2118 42 : case PT_RECV_PAYLOAD:
2119 42 : ret = pt_recv_payload(ctx, peer, buf, sock);
2120 42 : if (ret <= 0) return ret;
2121 41 : if (ret == 2) {
2122 : /* Compact header - skip CRC, message complete */
2123 14 : ret = pt_recv_process_message(ctx, peer, buf);
2124 14 : pt_recv_reset(buf);
2125 14 : return ret;
2126 : }
2127 27 : break;
2128 :
2129 27 : case PT_RECV_CRC:
2130 27 : ret = pt_recv_crc(ctx, peer, buf, sock);
2131 27 : if (ret <= 0) return ret;
2132 :
2133 : /* Message complete - process it */
2134 27 : ret = pt_recv_process_message(ctx, peer, buf);
2135 27 : pt_recv_reset(buf); /* Reset for next message */
2136 27 : return ret;
2137 :
2138 0 : default:
2139 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL,
2140 : "Invalid recv state %u for peer %u",
2141 : buf->hot.state, peer->hot.id);
2142 0 : pt_recv_reset(buf);
2143 0 : return -1;
2144 : }
2145 : }
2146 : }
2147 :
2148 : /* ========================================================================== */
2149 : /* UDP Messaging (Session 4.4 - Stub for now) */
2150 : /* ========================================================================== */
2151 :
2152 : /**
2153 : * Initialize UDP messaging socket
2154 : *
2155 : * Creates dedicated UDP socket for unreliable messaging (separate from discovery).
2156 : * Binds to DEFAULT_UDP_PORT (7355) or user-configured port.
2157 : *
2158 : * Returns: 0 on success, -1 on failure
2159 : */
2160 230 : int pt_posix_udp_init(struct pt_context *ctx) {
2161 230 : pt_posix_data *pd = pt_posix_get(ctx);
2162 : struct sockaddr_in addr;
2163 : int sock;
2164 230 : uint16_t port = UDP_PORT(ctx);
2165 :
2166 : /* Create UDP socket */
2167 230 : sock = socket(AF_INET, SOCK_DGRAM, 0);
2168 230 : if (sock < 0) {
2169 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK,
2170 : "Failed to create UDP messaging socket: %s", strerror(errno));
2171 0 : return -1;
2172 : }
2173 :
2174 : /* Set non-blocking */
2175 230 : if (set_nonblocking(ctx, sock) < 0) {
2176 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK,
2177 : "Failed to set UDP messaging socket non-blocking");
2178 0 : close(sock);
2179 0 : return -1;
2180 : }
2181 :
2182 : /* Set SO_REUSEADDR */
2183 230 : if (set_reuseaddr(ctx, sock) < 0) {
2184 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK,
2185 : "Failed to set SO_REUSEADDR on UDP messaging socket");
2186 0 : close(sock);
2187 0 : return -1;
2188 : }
2189 :
2190 : /* Bind to port */
2191 230 : memset(&addr, 0, sizeof(addr));
2192 230 : addr.sin_family = AF_INET;
2193 230 : addr.sin_addr.s_addr = INADDR_ANY;
2194 230 : addr.sin_port = htons(port);
2195 :
2196 230 : if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
2197 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK,
2198 : "Failed to bind UDP messaging socket to port %u: %s",
2199 : port, strerror(errno));
2200 0 : close(sock);
2201 0 : return -1;
2202 : }
2203 :
2204 230 : pd->udp_msg_sock = sock;
2205 230 : pd->udp_msg_port = port;
2206 :
2207 : /* Update max_fd for select() */
2208 230 : if (sock > pd->max_fd) {
2209 230 : pd->max_fd = sock;
2210 : }
2211 230 : pd->fd_dirty = 1; /* Rebuild fd_sets */
2212 :
2213 230 : PT_CTX_INFO(ctx, PT_LOG_CAT_NETWORK,
2214 : "UDP messaging socket initialized on port %u", port);
2215 230 : return 0;
2216 : }
2217 :
2218 : /**
2219 : * Shutdown UDP messaging socket
2220 : *
2221 : * Closes dedicated UDP messaging socket and cleans up resources.
2222 : */
2223 0 : void pt_posix_udp_shutdown(struct pt_context *ctx) {
2224 0 : pt_posix_data *pd = pt_posix_get(ctx);
2225 :
2226 0 : if (pd->udp_msg_sock >= 0) {
2227 0 : close(pd->udp_msg_sock);
2228 0 : pd->udp_msg_sock = -1;
2229 0 : pd->fd_dirty = 1;
2230 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_NETWORK, "UDP messaging socket closed");
2231 : }
2232 0 : }
2233 :
2234 : /**
2235 : * Send UDP message to peer
2236 : *
2237 : * Sends unreliable UDP datagram with 8-byte header (magic, sender_port, payload_len).
2238 : * No CRC - UDP has its own checksum at transport layer.
2239 : *
2240 : * Returns: PT_OK on success, PT_ERR_* on failure
2241 : */
2242 1 : int pt_posix_send_udp(struct pt_context *ctx, struct pt_peer *peer,
2243 : const void *data, uint16_t len) {
2244 1 : pt_posix_data *pd = pt_posix_get(ctx);
2245 : uint8_t packet_buf[PT_MAX_UDP_MESSAGE_SIZE];
2246 : struct sockaddr_in dest_addr;
2247 : int packet_len;
2248 : ssize_t sent;
2249 :
2250 : /* Validate socket */
2251 1 : if (pd->udp_msg_sock < 0) {
2252 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK, "UDP messaging socket not initialized");
2253 0 : return PT_ERR_NOT_INITIALIZED;
2254 : }
2255 :
2256 : /* Validate peer state */
2257 1 : if (peer->hot.state == PT_PEER_UNUSED) {
2258 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_NETWORK,
2259 : "Attempted to send UDP to peer %u (not connected)", peer->hot.id);
2260 0 : return PT_ERR_PEER_NOT_FOUND;
2261 : }
2262 :
2263 : /* Validate message size (PT_MAX_UDP_MESSAGE_SIZE minus 8-byte header) */
2264 1 : if (len > PT_MAX_UDP_MESSAGE_SIZE - 8) {
2265 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK,
2266 : "UDP message too large: %u bytes (max %u)",
2267 : len, PT_MAX_UDP_MESSAGE_SIZE - 8);
2268 0 : return PT_ERR_MESSAGE_TOO_LARGE;
2269 : }
2270 :
2271 : /* Encode UDP packet */
2272 1 : packet_len = pt_udp_encode(data, len, pd->udp_msg_port,
2273 : packet_buf, sizeof(packet_buf));
2274 1 : if (packet_len < 0) {
2275 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK,
2276 : "Failed to encode UDP packet: %d", packet_len);
2277 0 : return packet_len;
2278 : }
2279 :
2280 : /* Build destination address */
2281 1 : memset(&dest_addr, 0, sizeof(dest_addr));
2282 1 : dest_addr.sin_family = AF_INET;
2283 1 : dest_addr.sin_addr.s_addr = htonl(peer->cold.addresses[0].address);
2284 1 : dest_addr.sin_port = htons(pd->udp_msg_port); /* Use same UDP port */
2285 :
2286 : /* Send datagram */
2287 1 : sent = sendto(pd->udp_msg_sock, packet_buf, packet_len, 0,
2288 : (struct sockaddr *)&dest_addr, sizeof(dest_addr));
2289 :
2290 1 : if (sent < 0) {
2291 0 : if (errno == EWOULDBLOCK || errno == EAGAIN) {
2292 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_NETWORK, "UDP send would block");
2293 0 : return PT_ERR_WOULD_BLOCK;
2294 : }
2295 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK,
2296 : "UDP sendto failed: %s", strerror(errno));
2297 0 : return PT_ERR_NETWORK;
2298 : }
2299 :
2300 : /* Update statistics */
2301 1 : peer->cold.stats.bytes_sent += sent;
2302 1 : peer->cold.stats.messages_sent++;
2303 1 : ctx->global_stats.total_bytes_sent += sent;
2304 1 : ctx->global_stats.total_messages_sent++;
2305 :
2306 1 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_NETWORK,
2307 : "Sent UDP message to peer %u (%u bytes payload, %zd bytes total)",
2308 : peer->hot.id, len, sent);
2309 :
2310 1 : return PT_OK;
2311 : }
2312 :
2313 : /**
2314 : * Receive UDP messages from any peer
2315 : *
2316 : * Non-blocking recvfrom() on UDP messaging socket. Validates magic "PTUD",
2317 : * finds peer by source IP, and fires on_message_received callback.
2318 : *
2319 : * Returns: 1 if message received, 0 if no data, -1 on error
2320 : */
2321 1 : int pt_posix_recv_udp(struct pt_context *ctx) {
2322 1 : pt_posix_data *pd = pt_posix_get(ctx);
2323 : uint8_t packet_buf[PT_MAX_UDP_MESSAGE_SIZE];
2324 : struct sockaddr_in from_addr;
2325 1 : socklen_t from_len = sizeof(from_addr);
2326 : ssize_t received;
2327 : uint16_t sender_port;
2328 : const void *payload;
2329 : uint16_t payload_len;
2330 : uint32_t sender_ip;
2331 : struct pt_peer *peer;
2332 : int ret;
2333 :
2334 : /* Check socket */
2335 1 : if (pd->udp_msg_sock < 0) {
2336 0 : return 0; /* Not initialized yet */
2337 : }
2338 :
2339 : /* Non-blocking receive */
2340 1 : received = recvfrom(pd->udp_msg_sock, packet_buf, sizeof(packet_buf), 0,
2341 : (struct sockaddr *)&from_addr, &from_len);
2342 :
2343 1 : if (received < 0) {
2344 0 : if (errno == EWOULDBLOCK || errno == EAGAIN) {
2345 0 : return 0; /* No data available */
2346 : }
2347 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK,
2348 : "UDP recvfrom failed: %s", strerror(errno));
2349 0 : return -1;
2350 : }
2351 :
2352 : /* Decode packet */
2353 1 : ret = pt_udp_decode(ctx, packet_buf, received, &sender_port, &payload, &payload_len);
2354 1 : if (ret < 0) {
2355 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_NETWORK,
2356 : "Failed to decode UDP packet: %d", ret);
2357 0 : return 0; /* Ignore malformed packets */
2358 : }
2359 :
2360 : /* Find peer by source IP */
2361 1 : sender_ip = ntohl(from_addr.sin_addr.s_addr);
2362 1 : peer = pt_peer_find_by_addr(ctx, sender_ip, 0); /* Match any port */
2363 :
2364 1 : if (!peer) {
2365 1 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_NETWORK,
2366 : "Received UDP from unknown peer %u.%u.%u.%u",
2367 : (sender_ip >> 24) & 0xFF,
2368 : (sender_ip >> 16) & 0xFF,
2369 : (sender_ip >> 8) & 0xFF,
2370 : sender_ip & 0xFF);
2371 1 : return 0; /* Ignore unknown peers */
2372 : }
2373 :
2374 : /* Update peer statistics and timestamp */
2375 0 : peer->cold.stats.bytes_received += received;
2376 0 : peer->cold.stats.messages_received++;
2377 0 : peer->hot.last_seen = ctx->plat->get_ticks();
2378 :
2379 : /* Update global statistics */
2380 0 : ctx->global_stats.total_bytes_received += received;
2381 0 : ctx->global_stats.total_messages_received++;
2382 :
2383 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_NETWORK,
2384 : "Received UDP from peer %u (%u bytes payload, %zd bytes total)",
2385 : peer->hot.id, payload_len, received);
2386 :
2387 : /* Fire callback */
2388 0 : if (ctx->callbacks.on_message_received) {
2389 0 : ctx->callbacks.on_message_received((PeerTalk_Context *)ctx,
2390 0 : peer->hot.id,
2391 : payload,
2392 : payload_len,
2393 : ctx->callbacks.user_data);
2394 : }
2395 :
2396 0 : return 1; /* Message processed */
2397 : }
2398 :
2399 : /* ========================================================================== */
2400 : /* Connection Completion Checking (Session 4.2) */
2401 : /* ========================================================================== */
2402 :
2403 : /**
2404 : * Check for async connection completion
2405 : *
2406 : * NOTE: This function is no longer used - connection checking is now
2407 : * integrated into pt_posix_poll() main loop (Session 4.6).
2408 : * Kept for reference only.
2409 : *
2410 : * Polls sockets in CONNECTING state for writability (indicates connection complete).
2411 : * Uses select() with zero timeout for non-blocking check.
2412 : *
2413 : * Returns: Number of connections completed
2414 : */
2415 : #if 0 /* Integrated into pt_posix_poll() in Session 4.6 */
2416 : static int pt_posix_connect_poll(struct pt_context *ctx) {
2417 : pt_posix_data *pd = pt_posix_get(ctx);
2418 : struct pt_peer *peer;
2419 : fd_set write_fds;
2420 : struct timeval tv;
2421 : int completed = 0;
2422 : size_t i;
2423 :
2424 : FD_ZERO(&write_fds);
2425 : int max_fd = -1;
2426 :
2427 : /* Build fd_set of CONNECTING sockets */
2428 : for (i = 0; i < ctx->max_peers; i++) {
2429 : peer = &ctx->peers[i];
2430 : if (peer->hot.state == PT_PEER_CONNECTING) {
2431 : int sock = pd->tcp_socks[i];
2432 : if (sock >= 0) {
2433 : FD_SET(sock, &write_fds);
2434 : if (sock > max_fd)
2435 : max_fd = sock;
2436 : }
2437 : }
2438 : }
2439 :
2440 : if (max_fd < 0)
2441 : return 0; /* No connecting sockets */
2442 :
2443 : /* Zero timeout for non-blocking check */
2444 : tv.tv_sec = 0;
2445 : tv.tv_usec = 0;
2446 :
2447 : if (select(max_fd + 1, NULL, &write_fds, NULL, &tv) < 0) {
2448 : if (errno != EINTR) {
2449 : PT_CTX_WARN(ctx, PT_LOG_CAT_CONNECT,
2450 : "Select failed in connect poll: %s", strerror(errno));
2451 : }
2452 : return 0;
2453 : }
2454 :
2455 : /* Check which sockets are now writable */
2456 : for (i = 0; i < ctx->max_peers; i++) {
2457 : peer = &ctx->peers[i];
2458 : if (peer->hot.state == PT_PEER_CONNECTING) {
2459 : int sock = pd->tcp_socks[i];
2460 : if (sock >= 0 && FD_ISSET(sock, &write_fds)) {
2461 : /* Socket is writable - connection complete or failed */
2462 : int error = 0;
2463 : socklen_t len = sizeof(error);
2464 :
2465 : if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
2466 : PT_CTX_ERR(ctx, PT_LOG_CAT_CONNECT,
2467 : "getsockopt failed for peer %u: %s",
2468 : peer->hot.id, strerror(errno));
2469 : pt_peer_set_state(ctx, peer, PT_PEER_FAILED);
2470 : close(sock);
2471 : pd->tcp_socks[i] = -1;
2472 : pt_posix_remove_active_peer(pd, i);
2473 : continue;
2474 : }
2475 :
2476 : if (error != 0) {
2477 : /* Connection failed */
2478 : PT_CTX_WARN(ctx, PT_LOG_CAT_CONNECT,
2479 : "Connection failed for peer %u (%s): %s",
2480 : peer->hot.id, peer->cold.name, strerror(error));
2481 : pt_peer_set_state(ctx, peer, PT_PEER_FAILED);
2482 : close(sock);
2483 : pd->tcp_socks[i] = -1;
2484 : pt_posix_remove_active_peer(pd, i);
2485 : continue;
2486 : }
2487 :
2488 : /* Connection successful */
2489 : pt_peer_set_state(ctx, peer, PT_PEER_CONNECTED);
2490 : peer->hot.last_seen = ctx->plat->get_ticks();
2491 :
2492 : PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
2493 : "Connection established to peer %u (%s)",
2494 : peer->hot.id, peer->cold.name);
2495 :
2496 : /* Fire callback */
2497 : if (ctx->callbacks.on_peer_connected) {
2498 : ctx->callbacks.on_peer_connected((PeerTalk_Context *)ctx,
2499 : peer->hot.id,
2500 : ctx->callbacks.user_data);
2501 : }
2502 :
2503 : completed++;
2504 : }
2505 : }
2506 : }
2507 :
2508 : return completed;
2509 : }
2510 : #endif /* End of unused pt_posix_connect_poll() */
2511 :
2512 : /* ========================================================================== */
2513 : /* Main Poll (Session 4.6 - Integration) */
2514 : /* ========================================================================== */
2515 :
2516 741 : int pt_posix_poll(struct pt_context *ctx) {
2517 : pt_posix_data *pd;
2518 : pt_tick_t poll_time;
2519 : fd_set read_fds, write_fds;
2520 : struct timeval tv;
2521 : int select_result;
2522 :
2523 741 : if (!ctx) {
2524 0 : return -1;
2525 : }
2526 :
2527 741 : pd = pt_posix_get(ctx);
2528 :
2529 : /* Cache poll time at start (avoids repeated trap calls on Classic Mac) */
2530 741 : poll_time = ctx->plat->get_ticks();
2531 :
2532 : /* Reset batch count for this poll cycle */
2533 741 : pd->batch_count = 0;
2534 :
2535 : /* Rebuild fd_sets only if dirty flag set (connections changed) */
2536 741 : if (pd->fd_dirty) {
2537 87 : pt_posix_rebuild_fd_sets(ctx);
2538 : }
2539 :
2540 : /* Copy cached fd_sets (select modifies them) */
2541 741 : read_fds = pd->cached_read_fds;
2542 741 : write_fds = pd->cached_write_fds;
2543 :
2544 : /* Set timeout to 10ms for responsive polling */
2545 741 : tv.tv_sec = 0;
2546 741 : tv.tv_usec = 10000;
2547 :
2548 : /* Call select() with all sockets */
2549 741 : select_result = select(pd->max_fd + 1, &read_fds, &write_fds, NULL, &tv);
2550 :
2551 741 : if (select_result < 0) {
2552 0 : if (errno != EINTR) {
2553 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK,
2554 : "Select failed in main poll: %s", strerror(errno));
2555 : }
2556 0 : return 0;
2557 : }
2558 :
2559 741 : if (select_result == 0) {
2560 : /* Hot path: no events - just periodic work */
2561 612 : goto periodic_work;
2562 : }
2563 :
2564 : /* Process discovery packets if discovery_sock readable */
2565 129 : if (pd->discovery_sock >= 0 && FD_ISSET(pd->discovery_sock, &read_fds)) {
2566 29 : while (pt_posix_discovery_poll(ctx) > 0) {
2567 : /* Process all pending discovery packets */
2568 : }
2569 : }
2570 :
2571 : /* Process UDP messages if udp_msg_sock readable */
2572 129 : if (pd->udp_msg_sock >= 0 && FD_ISSET(pd->udp_msg_sock, &read_fds)) {
2573 1 : while (pt_posix_recv_udp(ctx) > 0) {
2574 : /* Process all pending UDP messages */
2575 : }
2576 : }
2577 :
2578 : /* Process incoming connections if listen_sock readable */
2579 129 : if (pd->listen_sock >= 0 && FD_ISSET(pd->listen_sock, &read_fds)) {
2580 40 : while (pt_posix_listen_poll(ctx) > 0) {
2581 : /* Process all pending incoming connections */
2582 : }
2583 : }
2584 :
2585 : /* For each active peer socket, check for events */
2586 231 : for (uint8_t i = 0; i < pd->active_count; i++) {
2587 102 : uint8_t peer_idx = pd->active_peers[i];
2588 102 : struct pt_peer *peer = &ctx->peers[peer_idx];
2589 102 : int sock = pd->tcp_socks[peer_idx];
2590 :
2591 102 : if (sock < 0)
2592 0 : continue;
2593 :
2594 : /* Check connect completion (if state is CONNECTING and socket is writable) */
2595 102 : if (peer->hot.state == PT_PEER_CONNECTING && FD_ISSET(sock, &write_fds)) {
2596 11 : int error = 0;
2597 11 : socklen_t len = sizeof(error);
2598 :
2599 11 : if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
2600 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_CONNECT,
2601 : "getsockopt failed for peer %u: %s",
2602 : peer->hot.id, strerror(errno));
2603 0 : pt_peer_set_state(ctx, peer, PT_PEER_FAILED);
2604 0 : close(sock);
2605 0 : pd->tcp_socks[peer_idx] = -1;
2606 0 : pt_posix_remove_active_peer(pd, peer_idx);
2607 0 : continue;
2608 : }
2609 :
2610 11 : if (error != 0) {
2611 : /* Connection failed */
2612 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_CONNECT,
2613 : "Connection failed for peer %u (%s): %s",
2614 : peer->hot.id, peer->cold.name, strerror(error));
2615 0 : pt_peer_set_state(ctx, peer, PT_PEER_FAILED);
2616 0 : close(sock);
2617 0 : pd->tcp_socks[peer_idx] = -1;
2618 0 : pt_posix_remove_active_peer(pd, peer_idx);
2619 0 : continue;
2620 : }
2621 :
2622 : /* Connection successful - allocate queues */
2623 11 : peer->send_queue = pt_alloc_peer_queue(ctx);
2624 11 : peer->recv_queue = pt_alloc_peer_queue(ctx);
2625 :
2626 11 : if (!peer->send_queue || !peer->recv_queue) {
2627 : /* Allocation failed - disconnect */
2628 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_MEMORY,
2629 : "Failed to allocate queues for peer %u", peer->hot.id);
2630 :
2631 0 : pt_free_peer_queue(peer->send_queue);
2632 0 : pt_free_peer_queue(peer->recv_queue);
2633 0 : peer->send_queue = NULL;
2634 0 : peer->recv_queue = NULL;
2635 :
2636 0 : pt_peer_set_state(ctx, peer, PT_PEER_FAILED);
2637 0 : close(sock);
2638 0 : pd->tcp_socks[peer_idx] = -1;
2639 0 : pt_posix_remove_active_peer(pd, peer_idx);
2640 0 : continue;
2641 : }
2642 :
2643 : /* Transition to CONNECTED */
2644 11 : pt_peer_set_state(ctx, peer, PT_PEER_CONNECTED);
2645 11 : peer->hot.last_seen = poll_time;
2646 :
2647 11 : PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
2648 : "Connection established to peer %u (%s)",
2649 : peer->hot.id, peer->cold.name);
2650 :
2651 : /* Fire on_peer_connected callback */
2652 11 : if (ctx->callbacks.on_peer_connected) {
2653 9 : ctx->callbacks.on_peer_connected((PeerTalk_Context *)ctx,
2654 9 : peer->hot.id,
2655 : ctx->callbacks.user_data);
2656 : }
2657 :
2658 : /* Send capabilities for negotiation */
2659 11 : pt_posix_send_capability(ctx, peer);
2660 :
2661 : /* Mark fd_sets dirty to move socket from write set to read-only */
2662 11 : pd->fd_dirty = 1;
2663 : }
2664 :
2665 : /* Check for incoming data (if socket readable) */
2666 102 : if (FD_ISSET(sock, &read_fds)) {
2667 : /* Process messages via pt_posix_recv() loop */
2668 : int recv_ret;
2669 81 : while ((recv_ret = pt_posix_recv(ctx, peer)) > 0) {
2670 : /* Keep receiving until no more complete messages */
2671 : }
2672 :
2673 : /* If recv returned -1 (error/close), mark peer for disconnection */
2674 81 : if (recv_ret < 0 && peer->hot.state == PT_PEER_CONNECTED) {
2675 9 : peer->hot.state = PT_PEER_DISCONNECTING;
2676 : }
2677 :
2678 : /* On connection error, close socket and remove from active */
2679 81 : if (peer->hot.state == PT_PEER_DISCONNECTING ||
2680 72 : peer->hot.state == PT_PEER_FAILED) {
2681 9 : PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
2682 : "Closing connection to peer %u", peer->hot.id);
2683 :
2684 : /* Free queues before closing */
2685 9 : pt_free_peer_queue(peer->send_queue);
2686 9 : pt_free_peer_queue(peer->recv_queue);
2687 9 : peer->send_queue = NULL;
2688 9 : peer->recv_queue = NULL;
2689 :
2690 9 : close(sock);
2691 9 : pd->tcp_socks[peer_idx] = -1;
2692 9 : pt_posix_remove_active_peer(pd, peer_idx);
2693 :
2694 : /* Fire on_peer_disconnected callback */
2695 9 : if (ctx->callbacks.on_peer_disconnected) {
2696 3 : ctx->callbacks.on_peer_disconnected((PeerTalk_Context *)ctx,
2697 3 : peer->hot.id,
2698 : PT_OK,
2699 : ctx->callbacks.user_data);
2700 : }
2701 :
2702 : /* Transition back to DISCOVERED state for reconnection support.
2703 : * Don't destroy the peer - it may reconnect or continue sending
2704 : * discovery announcements. The peer will timeout after 30s if
2705 : * no further announcements are received. */
2706 9 : pt_peer_set_state(ctx, peer, PT_PEER_STATE_DISCOVERED);
2707 9 : peer->hot.last_seen = ctx->plat->get_ticks();
2708 : }
2709 : }
2710 : }
2711 :
2712 129 : periodic_work:
2713 : /* Drain send buffers for all connected peers
2714 : * Two-tier system: Tier 2 (large messages) first, then Tier 1 (small messages)
2715 : */
2716 12597 : for (uint8_t i = 0; i < ctx->max_peers; i++) {
2717 11856 : struct pt_peer *peer = &ctx->peers[i];
2718 :
2719 11856 : if (peer->hot.state != PT_PEER_CONNECTED) {
2720 11539 : continue;
2721 : }
2722 :
2723 : /* Tier 2: Send large message from direct buffer first (priority) */
2724 317 : if (pt_direct_buffer_ready(&peer->send_direct)) {
2725 2 : pt_direct_buffer *buf = &peer->send_direct;
2726 :
2727 : /* Mark as sending */
2728 2 : pt_direct_buffer_mark_sending(buf);
2729 :
2730 : /* Send via TCP with message flags (supports fragmentation) */
2731 2 : int result = pt_posix_send_with_flags(ctx, peer, buf->data, buf->length, buf->msg_flags);
2732 :
2733 : /* Complete the send (success or fail, buffer becomes available) */
2734 2 : pt_direct_buffer_complete(buf);
2735 :
2736 2 : if (result == PT_OK) {
2737 2 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
2738 : "Tier 2: Sent %u bytes to peer %u", buf->length, peer->hot.id);
2739 0 : } else if (result != PT_ERR_WOULD_BLOCK) {
2740 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
2741 : "Tier 2: Failed to send to peer %u: error %d",
2742 : peer->hot.id, result);
2743 : }
2744 : }
2745 :
2746 : /* Tier 1: Drain queue (small messages and fragments)
2747 : *
2748 : * CRITICAL: Drain MULTIPLE messages per poll iteration, not just one!
2749 : * At 60Hz poll rate, draining one message = 60 msg/sec max.
2750 : * With fragmentation (17 chunks per 4KB message), this causes queue
2751 : * overflow. Drain up to 16 messages or until WOULD_BLOCK.
2752 : */
2753 317 : if (peer->send_queue) {
2754 : const void *data;
2755 : uint16_t len;
2756 317 : pt_queue *q = peer->send_queue;
2757 317 : int drain_count = 0;
2758 317 : const int max_drain = 16; /* Drain up to 16 messages per poll */
2759 :
2760 672 : while (drain_count < max_drain &&
2761 336 : pt_queue_pop_priority_direct(q, &data, &len) == 0) {
2762 : int result;
2763 19 : uint8_t slot_flags = q->slots[q->pending_pop_slot].flags;
2764 :
2765 : /* Check if this is a fragment - needs PT_MSG_FLAG_FRAGMENT */
2766 19 : if (slot_flags & PT_SLOT_FRAGMENT) {
2767 0 : result = pt_posix_send_with_flags(ctx, peer, data, len,
2768 : PT_MSG_FLAG_FRAGMENT);
2769 : } else {
2770 19 : result = pt_posix_send(ctx, peer, data, len);
2771 : }
2772 :
2773 19 : if (result == PT_ERR_WOULD_BLOCK) {
2774 : /* Socket buffer full - DON'T commit, retry next poll */
2775 0 : pt_queue_pop_priority_rollback(q);
2776 0 : break;
2777 : }
2778 :
2779 : /* Commit the pop to remove message from queue */
2780 19 : pt_queue_pop_priority_commit(q);
2781 19 : drain_count++;
2782 :
2783 19 : if (result != PT_OK) {
2784 : /* Send failed (network error) - message lost, continue draining */
2785 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_PROTOCOL,
2786 : "Failed to drain message for peer %u: error %d",
2787 : peer->hot.id, result);
2788 : }
2789 : }
2790 : }
2791 :
2792 : /* Stream: Process active stream transfers */
2793 317 : pt_stream_poll(ctx, peer, pt_posix_send);
2794 :
2795 : /* Flow control: Check for pressure updates to send
2796 : *
2797 : * When our recv queue pressure crosses a threshold (25%, 50%, 75%),
2798 : * we need to inform the peer so they can throttle their sends.
2799 : * This implements receiver-driven flow control - SDK handles it
2800 : * transparently so app developers don't need to manage it.
2801 : */
2802 634 : if (peer->cold.caps.pressure_update_pending ||
2803 317 : pt_peer_check_pressure_update(ctx, peer)) {
2804 : /* Send updated capabilities with new pressure value */
2805 0 : pt_posix_send_capability(ctx, peer);
2806 : }
2807 : }
2808 :
2809 : /* Periodic discovery announce every 10 seconds */
2810 741 : if (pd->discovery_sock >= 0 && (poll_time - pd->last_announce >= 10000)) {
2811 2 : pt_posix_discovery_send(ctx, PT_DISC_TYPE_ANNOUNCE);
2812 2 : pd->last_announce = poll_time;
2813 : }
2814 :
2815 : /* Check for peer timeouts (30 second discovery timeout) */
2816 12597 : for (uint8_t i = 0; i < ctx->max_peers; i++) {
2817 11856 : struct pt_peer *peer = &ctx->peers[i];
2818 :
2819 : /* Skip peers updated in this poll iteration (last_seen >= poll_time)
2820 : * to avoid false timeouts when discovery packet arrived this iteration */
2821 11856 : if (peer->hot.state == PT_PEER_DISCOVERED &&
2822 102 : peer->hot.last_seen < poll_time &&
2823 92 : (poll_time - peer->hot.last_seen >= 30000)) {
2824 0 : PT_CTX_INFO(ctx, PT_LOG_CAT_DISCOVERY,
2825 : "Peer %u (%s) timed out after 30 seconds",
2826 : peer->hot.id, ctx->peer_names[peer->hot.name_idx]);
2827 :
2828 : /* Fire on_peer_lost callback before destroying */
2829 0 : if (ctx->callbacks.on_peer_lost) {
2830 0 : ctx->callbacks.on_peer_lost((PeerTalk_Context *)ctx,
2831 0 : peer->hot.id,
2832 : ctx->callbacks.user_data);
2833 : }
2834 :
2835 0 : pt_peer_destroy(ctx, peer);
2836 : }
2837 : }
2838 :
2839 741 : return 0;
2840 : }
2841 :
2842 : /* ========================================================================== */
2843 : /* Fast Poll (TCP I/O Only) */
2844 : /* ========================================================================== */
2845 :
2846 101 : int pt_posix_poll_fast(struct pt_context *ctx) {
2847 : pt_posix_data *pd;
2848 : fd_set read_fds;
2849 : struct timeval tv;
2850 : int select_result;
2851 :
2852 101 : if (!ctx) {
2853 0 : return -1;
2854 : }
2855 :
2856 101 : pd = pt_posix_get(ctx);
2857 :
2858 : /* Build fd_set with only active TCP peer sockets */
2859 1717 : FD_ZERO(&read_fds);
2860 101 : int max_fd = -1;
2861 :
2862 101 : for (uint8_t i = 0; i < pd->active_count; i++) {
2863 0 : uint8_t peer_idx = pd->active_peers[i];
2864 0 : int sock = pd->tcp_socks[peer_idx];
2865 0 : struct pt_peer *peer = &ctx->peers[peer_idx];
2866 :
2867 : /* Only include connected peers (not connecting) */
2868 0 : if (sock >= 0 && peer->hot.state == PT_PEER_CONNECTED) {
2869 0 : FD_SET(sock, &read_fds);
2870 0 : if (sock > max_fd)
2871 0 : max_fd = sock;
2872 : }
2873 : }
2874 :
2875 101 : if (max_fd < 0) {
2876 : /* No connected peers - nothing to do */
2877 101 : goto drain_queues;
2878 : }
2879 :
2880 : /* Zero timeout - non-blocking check */
2881 0 : tv.tv_sec = 0;
2882 0 : tv.tv_usec = 0;
2883 :
2884 0 : select_result = select(max_fd + 1, &read_fds, NULL, NULL, &tv);
2885 :
2886 0 : if (select_result < 0) {
2887 0 : if (errno != EINTR) {
2888 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_NETWORK,
2889 : "Select failed in fast poll: %s", strerror(errno));
2890 : }
2891 0 : goto drain_queues;
2892 : }
2893 :
2894 0 : if (select_result == 0) {
2895 : /* No incoming data - just drain queues */
2896 0 : goto drain_queues;
2897 : }
2898 :
2899 : /* Process incoming TCP data for connected peers */
2900 0 : for (uint8_t i = 0; i < pd->active_count; i++) {
2901 0 : uint8_t peer_idx = pd->active_peers[i];
2902 0 : struct pt_peer *peer = &ctx->peers[peer_idx];
2903 0 : int sock = pd->tcp_socks[peer_idx];
2904 :
2905 0 : if (sock < 0 || peer->hot.state != PT_PEER_CONNECTED)
2906 0 : continue;
2907 :
2908 0 : if (FD_ISSET(sock, &read_fds)) {
2909 : int recv_ret;
2910 0 : while ((recv_ret = pt_posix_recv(ctx, peer)) > 0) {
2911 : /* Keep receiving until no more complete messages */
2912 : }
2913 :
2914 : /* If recv returned -1, mark for disconnection */
2915 0 : if (recv_ret < 0 && peer->hot.state == PT_PEER_CONNECTED) {
2916 0 : peer->hot.state = PT_PEER_DISCONNECTING;
2917 : }
2918 :
2919 : /* Handle disconnection */
2920 0 : if (peer->hot.state == PT_PEER_DISCONNECTING ||
2921 0 : peer->hot.state == PT_PEER_FAILED) {
2922 0 : PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
2923 : "Closing connection to peer %u (fast poll)", peer->hot.id);
2924 :
2925 0 : pt_free_peer_queue(peer->send_queue);
2926 0 : pt_free_peer_queue(peer->recv_queue);
2927 0 : peer->send_queue = NULL;
2928 0 : peer->recv_queue = NULL;
2929 :
2930 0 : close(sock);
2931 0 : pd->tcp_socks[peer_idx] = -1;
2932 0 : pt_posix_remove_active_peer(pd, peer_idx);
2933 :
2934 0 : if (ctx->callbacks.on_peer_disconnected) {
2935 0 : ctx->callbacks.on_peer_disconnected((PeerTalk_Context *)ctx,
2936 0 : peer->hot.id,
2937 : PT_OK,
2938 : ctx->callbacks.user_data);
2939 : }
2940 :
2941 : /* Transition back to DISCOVERED state for reconnection support.
2942 : * Don't destroy the peer - it may reconnect or continue sending
2943 : * discovery announcements. The peer will timeout after 30s if
2944 : * no further announcements are received. */
2945 0 : pt_peer_set_state(ctx, peer, PT_PEER_STATE_DISCOVERED);
2946 0 : peer->hot.last_seen = ctx->plat->get_ticks();
2947 : }
2948 : }
2949 : }
2950 :
2951 0 : drain_queues:
2952 : /* Drain send queues for all connected peers */
2953 505 : for (uint8_t i = 0; i < ctx->max_peers; i++) {
2954 404 : struct pt_peer *peer = &ctx->peers[i];
2955 :
2956 404 : if (peer->hot.state != PT_PEER_CONNECTED) {
2957 404 : continue;
2958 : }
2959 :
2960 : /* Tier 2: Direct buffer first (priority path) */
2961 0 : if (pt_direct_buffer_ready(&peer->send_direct)) {
2962 0 : pt_direct_buffer *buf = &peer->send_direct;
2963 0 : pt_direct_buffer_mark_sending(buf);
2964 0 : pt_posix_send_with_flags(ctx, peer, buf->data, buf->length, buf->msg_flags);
2965 0 : pt_direct_buffer_complete(buf);
2966 : /* Errors logged by regular poll, not fast poll */
2967 : }
2968 :
2969 : /* Tier 1: Queue drain */
2970 0 : if (peer->send_queue) {
2971 : const void *data;
2972 : uint16_t len;
2973 0 : pt_queue *q = peer->send_queue;
2974 0 : int drain_count = 0;
2975 0 : const int max_drain = 16;
2976 :
2977 0 : while (drain_count < max_drain &&
2978 0 : pt_queue_pop_priority_direct(q, &data, &len) == 0) {
2979 : int result;
2980 0 : uint8_t slot_flags = q->slots[q->pending_pop_slot].flags;
2981 :
2982 0 : if (slot_flags & PT_SLOT_FRAGMENT) {
2983 0 : result = pt_posix_send_with_flags(ctx, peer, data, len,
2984 : PT_MSG_FLAG_FRAGMENT);
2985 : } else {
2986 0 : result = pt_posix_send(ctx, peer, data, len);
2987 : }
2988 :
2989 0 : if (result == PT_ERR_WOULD_BLOCK) {
2990 0 : pt_queue_pop_priority_rollback(q);
2991 0 : break;
2992 : }
2993 :
2994 0 : pt_queue_pop_priority_commit(q);
2995 0 : drain_count++;
2996 : }
2997 : }
2998 :
2999 : /* NOTE: Stream poll skipped in PollFast - use regular Poll for streams */
3000 : }
3001 :
3002 101 : return 0;
3003 : }
3004 :
3005 : /* ========================================================================== */
3006 : /* Public API (Session 4.4) */
3007 : /* ========================================================================== */
3008 :
3009 : /**
3010 : * Send unreliable UDP message to peer
3011 : *
3012 : * Public API wrapper for pt_posix_send_udp(). Validates context and peer ID,
3013 : * then sends UDP datagram with 8-byte header (no CRC).
3014 : *
3015 : * Returns: PT_OK on success, PT_ERR_* on failure
3016 : */
3017 1 : PeerTalk_Error PeerTalk_SendUDP(PeerTalk_Context *ctx, PeerTalk_PeerID peer_id,
3018 : const void *data, uint16_t length) {
3019 1 : struct pt_context *ictx = (struct pt_context *)ctx;
3020 : struct pt_peer *peer;
3021 :
3022 : /* Validate context */
3023 1 : if (!ictx || ictx->magic != PT_CONTEXT_MAGIC) {
3024 0 : return PT_ERR_INVALID_STATE;
3025 : }
3026 :
3027 : /* Validate data and length */
3028 1 : if (!data && length > 0) {
3029 0 : return PT_ERR_INVALID_PARAM;
3030 : }
3031 :
3032 : /* Find peer by ID */
3033 1 : peer = pt_peer_find_by_id(ictx, peer_id);
3034 1 : if (!peer) {
3035 1 : return PT_ERR_PEER_NOT_FOUND;
3036 : }
3037 :
3038 : /* Call internal send function */
3039 0 : return pt_posix_send_udp(ictx, peer, data, length);
3040 : }
3041 :
3042 : /**
3043 : * Send UDP message with zero-queue semantics (fast path)
3044 : *
3045 : * Identical to PeerTalk_SendUDP() - UDP already has no queuing.
3046 : * This function makes the zero-queue semantics explicit for documentation
3047 : * and supports larger payloads up to PT_MAX_UDP_MESSAGE_SIZE (1400 bytes).
3048 : *
3049 : * Returns: PT_OK on success, PT_ERR_* on failure
3050 : */
3051 3 : PeerTalk_Error PeerTalk_SendUDPFast(PeerTalk_Context *ctx, PeerTalk_PeerID peer_id,
3052 : const void *data, uint16_t length) {
3053 3 : struct pt_context *ictx = (struct pt_context *)ctx;
3054 : struct pt_peer *peer;
3055 :
3056 : /* Validate context */
3057 3 : if (!ictx || ictx->magic != PT_CONTEXT_MAGIC) {
3058 1 : return PT_ERR_INVALID_STATE;
3059 : }
3060 :
3061 : /* Validate data and length */
3062 2 : if (!data && length > 0) {
3063 0 : return PT_ERR_INVALID_PARAM;
3064 : }
3065 :
3066 : /* Check max size for UDP fast path */
3067 2 : if (length > PT_MAX_UDP_MESSAGE_SIZE) {
3068 1 : PT_CTX_WARN(ictx, PT_LOG_CAT_SEND,
3069 : "UDP fast path: message too large (%u > %u)",
3070 : length, PT_MAX_UDP_MESSAGE_SIZE);
3071 1 : return PT_ERR_MESSAGE_TOO_LARGE;
3072 : }
3073 :
3074 : /* Find peer by ID */
3075 1 : peer = pt_peer_find_by_id(ictx, peer_id);
3076 1 : if (!peer) {
3077 1 : return PT_ERR_PEER_NOT_FOUND;
3078 : }
3079 :
3080 : /* Call internal send function - no queuing, direct to network */
3081 0 : return pt_posix_send_udp(ictx, peer, data, length);
3082 : }
3083 :
3084 : /* ========================================================================== */
3085 : /* Statistics API (Session 4.5) */
3086 : /* ========================================================================== */
3087 :
3088 : /**
3089 : * Get global network statistics
3090 : *
3091 : * Returns aggregate statistics for all network activity including bytes sent/received,
3092 : * message counts, peer counts, and connection statistics.
3093 : *
3094 : * Returns: PT_OK on success, PT_ERR_* on failure
3095 : */
3096 5 : PeerTalk_Error PeerTalk_GetGlobalStats(PeerTalk_Context *ctx, PeerTalk_GlobalStats *stats) {
3097 5 : struct pt_context *ictx = (struct pt_context *)ctx;
3098 5 : uint16_t connected_count = 0;
3099 :
3100 : /* Validate parameters */
3101 5 : if (!ictx || ictx->magic != PT_CONTEXT_MAGIC) {
3102 1 : return PT_ERR_INVALID_STATE;
3103 : }
3104 4 : if (!stats) {
3105 1 : return PT_ERR_INVALID_PARAM;
3106 : }
3107 :
3108 : /* Copy global statistics */
3109 3 : stats->total_bytes_sent = ictx->global_stats.total_bytes_sent;
3110 3 : stats->total_bytes_received = ictx->global_stats.total_bytes_received;
3111 3 : stats->total_messages_sent = ictx->global_stats.total_messages_sent;
3112 3 : stats->total_messages_received = ictx->global_stats.total_messages_received;
3113 3 : stats->discovery_packets_sent = ictx->global_stats.discovery_packets_sent;
3114 3 : stats->discovery_packets_received = ictx->global_stats.discovery_packets_received;
3115 3 : stats->connections_accepted = ictx->global_stats.connections_accepted;
3116 3 : stats->connections_rejected = ictx->global_stats.connections_rejected;
3117 :
3118 : /* Count peers by state */
3119 3 : stats->peers_discovered = 0;
3120 51 : for (uint8_t i = 0; i < ictx->max_peers; i++) {
3121 48 : if (ictx->peers[i].hot.state != PT_PEER_UNUSED) {
3122 0 : stats->peers_discovered++;
3123 0 : if (ictx->peers[i].hot.state == PT_PEER_CONNECTED) {
3124 0 : connected_count++;
3125 : }
3126 : }
3127 : }
3128 3 : stats->peers_connected = connected_count;
3129 :
3130 : /* Memory and streams (not yet tracked - set to 0) */
3131 3 : stats->memory_used = 0;
3132 3 : stats->streams_active = connected_count;
3133 3 : stats->reserved = 0;
3134 :
3135 3 : return PT_OK;
3136 : }
3137 :
3138 : /**
3139 : * Get per-peer statistics
3140 : *
3141 : * Returns statistics for a specific peer including bytes sent/received,
3142 : * message counts, latency, and connection quality.
3143 : *
3144 : * Returns: PT_OK on success, PT_ERR_* on failure
3145 : */
3146 1 : PeerTalk_Error PeerTalk_GetPeerStats(PeerTalk_Context *ctx, PeerTalk_PeerID peer_id,
3147 : PeerTalk_PeerStats *stats) {
3148 1 : struct pt_context *ictx = (struct pt_context *)ctx;
3149 : struct pt_peer *peer;
3150 :
3151 : /* Validate parameters */
3152 1 : if (!ictx || ictx->magic != PT_CONTEXT_MAGIC) {
3153 0 : return PT_ERR_INVALID_STATE;
3154 : }
3155 1 : if (!stats) {
3156 0 : return PT_ERR_INVALID_PARAM;
3157 : }
3158 :
3159 : /* Find peer by ID */
3160 1 : peer = pt_peer_find_by_id(ictx, peer_id);
3161 1 : if (!peer) {
3162 1 : return PT_ERR_PEER_NOT_FOUND;
3163 : }
3164 :
3165 : /* Copy peer statistics */
3166 0 : *stats = peer->cold.stats;
3167 :
3168 0 : return PT_OK;
3169 : }
3170 :
3171 : /**
3172 : * Reset statistics for peer (or all peers if peer_id == 0)
3173 : *
3174 : * Clears all counters for the specified peer, or for all peers and global
3175 : * stats if peer_id is 0.
3176 : *
3177 : * Returns: PT_OK on success, PT_ERR_* on failure
3178 : */
3179 3 : PeerTalk_Error PeerTalk_ResetStats(PeerTalk_Context *ctx, PeerTalk_PeerID peer_id) {
3180 3 : struct pt_context *ictx = (struct pt_context *)ctx;
3181 : struct pt_peer *peer;
3182 : /* cppcheck-suppress variableScope ; C89 style for cross-platform consistency */
3183 : uint16_t i;
3184 :
3185 : /* Validate parameters */
3186 3 : if (!ictx || ictx->magic != PT_CONTEXT_MAGIC) {
3187 1 : return PT_ERR_INVALID_STATE;
3188 : }
3189 :
3190 2 : if (peer_id == 0) {
3191 : /* Reset global statistics */
3192 0 : pt_memset(&ictx->global_stats, 0, sizeof(PeerTalk_GlobalStats));
3193 :
3194 : /* Reset all peer statistics */
3195 0 : for (i = 0; i < ictx->max_peers; i++) {
3196 0 : peer = &ictx->peers[i];
3197 0 : if (peer->hot.state != PT_PEER_UNUSED) {
3198 0 : pt_memset(&peer->cold.stats, 0, sizeof(PeerTalk_PeerStats));
3199 : /* Reset latency tracking */
3200 0 : peer->hot.latency_ms = 0;
3201 0 : peer->cold.rtt_index = 0;
3202 0 : peer->cold.rtt_count = 0;
3203 0 : pt_memset(peer->cold.rtt_samples, 0, sizeof(peer->cold.rtt_samples));
3204 : }
3205 : }
3206 :
3207 0 : PT_CTX_INFO(ictx, PT_LOG_CAT_PERF, "Reset all statistics");
3208 : } else {
3209 : /* Reset single peer statistics */
3210 2 : peer = pt_peer_find_by_id(ictx, peer_id);
3211 2 : if (!peer) {
3212 1 : return PT_ERR_PEER_NOT_FOUND;
3213 : }
3214 :
3215 1 : pt_memset(&peer->cold.stats, 0, sizeof(PeerTalk_PeerStats));
3216 : /* Reset latency tracking */
3217 1 : peer->hot.latency_ms = 0;
3218 1 : peer->cold.rtt_index = 0;
3219 1 : peer->cold.rtt_count = 0;
3220 1 : pt_memset(peer->cold.rtt_samples, 0, sizeof(peer->cold.rtt_samples));
3221 :
3222 1 : PT_CTX_INFO(ictx, PT_LOG_CAT_PERF,
3223 : "Reset statistics for peer %u", peer_id);
3224 : }
3225 :
3226 1 : return PT_OK;
3227 : }
3228 :
|