Line data Source code
1 : /* peer.c - Peer management implementation */
2 :
3 : #include "peer.h"
4 : #include "queue.h"
5 : #include "pt_compat.h"
6 : #include "direct_buffer.h"
7 : #include "protocol.h"
8 : #include "../../include/peertalk.h"
9 :
10 : /* ========================================================================
11 : * Peer List Operations
12 : * ======================================================================== */
13 :
14 241 : int pt_peer_list_init(struct pt_context *ctx, uint16_t max_peers)
15 : {
16 : size_t alloc_size;
17 : uint16_t i;
18 :
19 241 : if (!ctx || ctx->magic != PT_CONTEXT_MAGIC) {
20 0 : return PT_ERR_INVALID_PARAM;
21 : }
22 :
23 : /* Allocate peer array */
24 241 : alloc_size = sizeof(struct pt_peer) * max_peers;
25 241 : ctx->peers = (struct pt_peer *)pt_alloc_clear(alloc_size);
26 241 : if (!ctx->peers) {
27 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_MEMORY,
28 : "Failed to allocate %zu bytes for %u peers",
29 : alloc_size, max_peers);
30 0 : return PT_ERR_NO_MEMORY;
31 : }
32 :
33 : /* Initialize all peer slots */
34 3801 : for (i = 0; i < max_peers; i++) {
35 3560 : struct pt_peer *peer = &ctx->peers[i];
36 3560 : peer->hot.id = (PeerTalk_PeerID)(i + 1); /* IDs start at 1 */
37 3560 : peer->hot.state = PT_PEER_STATE_UNUSED;
38 3560 : peer->hot.magic = 0; /* Not valid until created */
39 3560 : peer->hot.name_idx = (uint8_t)i; /* Index into centralized name table */
40 : }
41 :
42 241 : ctx->max_peers = max_peers;
43 241 : ctx->peer_count = 0;
44 :
45 241 : PT_CTX_INFO(ctx, PT_LOG_CAT_INIT,
46 : "Peer list initialized: %u slots, %zu bytes",
47 : max_peers, alloc_size);
48 :
49 241 : return 0;
50 : }
51 :
52 242 : void pt_peer_list_free(struct pt_context *ctx)
53 : {
54 242 : if (!ctx || !ctx->peers) {
55 1 : return;
56 : }
57 :
58 241 : pt_free(ctx->peers);
59 241 : ctx->peers = NULL;
60 241 : ctx->max_peers = 0;
61 241 : ctx->peer_count = 0;
62 : }
63 :
64 : /* ========================================================================
65 : * Peer Lookup Functions
66 : * ======================================================================== */
67 :
68 306 : struct pt_peer *pt_peer_find_by_id(struct pt_context *ctx, PeerTalk_PeerID id)
69 : {
70 : uint8_t index;
71 : struct pt_peer *peer;
72 :
73 306 : if (!ctx || ctx->magic != PT_CONTEXT_MAGIC) {
74 0 : return NULL;
75 : }
76 :
77 : /* ID 0 is invalid, IDs start at 1 */
78 306 : if (id == 0 || id > ctx->max_peers) {
79 19 : return NULL;
80 : }
81 :
82 : /* Convert ID to array index (IDs are 1-based) */
83 287 : index = (uint8_t)(id - 1);
84 287 : peer = &ctx->peers[index];
85 :
86 : /* Check if peer is valid */
87 287 : if (peer->hot.state == PT_PEER_STATE_UNUSED ||
88 285 : peer->hot.magic != PT_PEER_MAGIC) {
89 2 : return NULL;
90 : }
91 :
92 285 : return peer;
93 : }
94 :
95 92 : struct pt_peer *pt_peer_find_by_addr(struct pt_context *ctx,
96 : uint32_t ip, uint16_t port)
97 : {
98 : uint16_t i;
99 :
100 : /* DOD PERFORMANCE NOTE:
101 : * This function is called on EVERY incoming packet to identify which peer
102 : * it belongs to. Currently it accesses peer->info.address and peer->info.port
103 : * which are in cold storage (~1.4KB per peer). On 68030 with 256-byte cache,
104 : * scanning 16 peers touches 22KB+ causing severe cache thrashing.
105 : *
106 : * RECOMMENDED OPTIMIZATIONS (Phase 1 modifications):
107 : * 1. Move address/port to pt_peer_hot struct (adds 6 bytes to hot data)
108 : * 2. OR: Add peer_addr_hash[] lookup table to pt_context (similar to
109 : * peer_id_to_index[]) using simple hash: (ip ^ port) & (PT_MAX_PEERS-1)
110 : *
111 : * For low peer counts (<8), the current linear scan is acceptable.
112 : * For higher counts or on 68000/68020 (no cache), optimization is critical.
113 : *
114 : * CRITICAL: This function is called on EVERY incoming packet - must access
115 : * only hot data
116 : */
117 :
118 92 : if (!ctx || ctx->magic != PT_CONTEXT_MAGIC) {
119 0 : return NULL;
120 : }
121 :
122 1404 : for (i = 0; i < ctx->max_peers; i++) {
123 1314 : struct pt_peer *peer = &ctx->peers[i];
124 :
125 1314 : if (peer->hot.state != PT_PEER_STATE_UNUSED &&
126 10 : peer->cold.info.address == ip &&
127 9 : peer->cold.info.port == port) {
128 2 : return peer;
129 : }
130 : }
131 :
132 90 : return NULL;
133 : }
134 :
135 7 : struct pt_peer *pt_peer_find_by_name(struct pt_context *ctx, const char *name)
136 : {
137 : uint16_t i;
138 :
139 : /* DOD Optimization: Use centralized peer_names[] table from Phase 1.
140 : * This avoids accessing cold storage (peer->info.name) which is ~1.4KB
141 : * per peer. On 68030 with 256-byte cache, this prevents severe cache
142 : * thrashing when scanning multiple peers.
143 : *
144 : * Phase 1 stores names in ctx->peer_names[name_idx] and the index
145 : * is stored in peer->hot.name_idx (hot storage, 32 bytes per peer).
146 : */
147 :
148 7 : if (!ctx || ctx->magic != PT_CONTEXT_MAGIC) {
149 0 : return NULL;
150 : }
151 :
152 7 : if (!name || name[0] == '\0') {
153 2 : return NULL;
154 : }
155 :
156 45 : for (i = 0; i < ctx->max_peers; i++) {
157 42 : struct pt_peer *peer = &ctx->peers[i];
158 : const char *peer_name;
159 :
160 42 : if (peer->hot.state == PT_PEER_STATE_UNUSED) {
161 39 : continue;
162 : }
163 :
164 : /* Get peer name from centralized table */
165 3 : peer_name = ctx->peer_names[peer->hot.name_idx];
166 :
167 : /* Compare strings manually (no pt_strcmp available) */
168 : {
169 3 : const char *a = peer_name;
170 3 : const char *b = name;
171 17 : while (*a && *b && *a == *b) {
172 14 : a++;
173 14 : b++;
174 : }
175 3 : if (*a == *b) {
176 2 : return peer;
177 : }
178 : }
179 : }
180 :
181 3 : return NULL;
182 : }
183 :
184 65 : struct pt_peer *pt_peer_find_unused(struct pt_context *ctx)
185 : {
186 : uint16_t i;
187 :
188 65 : if (!ctx || ctx->magic != PT_CONTEXT_MAGIC) {
189 0 : return NULL;
190 : }
191 :
192 68 : for (i = 0; i < ctx->max_peers; i++) {
193 68 : struct pt_peer *peer = &ctx->peers[i];
194 68 : if (peer->hot.state == PT_PEER_STATE_UNUSED) {
195 65 : return peer;
196 : }
197 : }
198 :
199 0 : return NULL;
200 : }
201 :
202 : /* ========================================================================
203 : * Peer Lifecycle
204 : * ======================================================================== */
205 :
206 65 : struct pt_peer *pt_peer_create(struct pt_context *ctx,
207 : const char *name,
208 : uint32_t ip, uint16_t port)
209 : {
210 : struct pt_peer *peer;
211 :
212 65 : if (!ctx || ctx->magic != PT_CONTEXT_MAGIC) {
213 0 : return NULL;
214 : }
215 :
216 : /* Check if peer already exists by address */
217 65 : peer = pt_peer_find_by_addr(ctx, ip, port);
218 65 : if (peer) {
219 : /* Update last_seen and name */
220 0 : peer->hot.last_seen = ctx->plat->get_ticks();
221 :
222 0 : if (name && name[0] != '\0') {
223 0 : size_t name_len = pt_strlen(name);
224 0 : if (name_len > PT_MAX_PEER_NAME) {
225 0 : name_len = PT_MAX_PEER_NAME;
226 : }
227 0 : pt_memcpy(ctx->peer_names[peer->hot.name_idx], name, name_len);
228 0 : ctx->peer_names[peer->hot.name_idx][name_len] = '\0';
229 : }
230 :
231 0 : return peer;
232 : }
233 :
234 : /* Find unused slot */
235 65 : peer = pt_peer_find_unused(ctx);
236 65 : if (!peer) {
237 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_CONNECT,
238 : "No available peer slots (max %u)", ctx->max_peers);
239 0 : return NULL;
240 : }
241 :
242 : /* Initialize peer */
243 65 : pt_memset(&peer->cold, 0, sizeof(peer->cold));
244 :
245 : /* Clear buffer lengths */
246 65 : peer->cold.obuflen = 0;
247 65 : peer->cold.ibuflen = 0;
248 :
249 : #ifdef PT_DEBUG
250 : /* Set canaries in debug mode */
251 : peer->cold.obuf_canary = PT_CANARY_OBUF;
252 : peer->cold.ibuf_canary = PT_CANARY_IBUF;
253 : #endif
254 :
255 : /* Set magic */
256 65 : peer->hot.magic = PT_PEER_MAGIC;
257 :
258 : /* Set address and port */
259 65 : peer->cold.info.address = ip;
260 65 : peer->cold.info.port = port;
261 :
262 : /* Initialize addresses array for PeerTalk_Connect() */
263 65 : peer->hot.address_count = 1;
264 65 : peer->cold.addresses[0].address = ip;
265 65 : peer->cold.addresses[0].port = port;
266 65 : peer->cold.addresses[0].transport = 0; /* TCPIP transport */
267 :
268 : /* Clear connection state */
269 65 : peer->cold.info.connected = 0;
270 65 : peer->hot.latency_ms = 0;
271 65 : peer->cold.info.queue_pressure = 0;
272 :
273 : /* Set initial state */
274 65 : peer->hot.state = PT_PEER_STATE_DISCOVERED;
275 :
276 : /* Update last_seen */
277 65 : peer->hot.last_seen = ctx->plat->get_ticks();
278 :
279 : /* Clear connection timing and sequence numbers */
280 65 : peer->cold.ping_sent_time = 0;
281 65 : peer->hot.send_seq = 0;
282 65 : peer->hot.recv_seq = 0;
283 :
284 : /* Initialize capabilities with conservative defaults for pre-exchange.
285 : * These are used if the peer doesn't support capability negotiation
286 : * (legacy peer) or until the capability message arrives. Values update
287 : * when PT_MSG_TYPE_CAPABILITY is received in poll loop. */
288 65 : peer->cold.caps.max_message_size = PT_CAP_DEFAULT_MAX_MSG; /* 512 = conservative */
289 65 : peer->cold.caps.preferred_chunk = PT_CAP_DEFAULT_CHUNK; /* 1024 */
290 65 : peer->cold.caps.capability_flags = 0;
291 65 : peer->cold.caps.buffer_pressure = 0;
292 65 : peer->cold.caps.caps_exchanged = 0;
293 65 : peer->cold.caps.send_window = PT_FLOW_WINDOW_DEFAULT; /* Updated after cap exchange */
294 65 : peer->hot.effective_max_msg = PT_CAP_DEFAULT_MAX_MSG; /* Conservative default */
295 65 : peer->hot.effective_chunk = 1024; /* Default chunk for unknown RTT */
296 65 : peer->hot.pipeline_depth = 2; /* Conservative pipeline depth */
297 :
298 : /* Initialize reassembly state */
299 65 : peer->cold.reassembly.message_id = 0;
300 65 : peer->cold.reassembly.total_length = 0;
301 65 : peer->cold.reassembly.received_length = 0;
302 65 : peer->cold.reassembly.active = 0;
303 65 : peer->cold.reassembly.reserved = 0;
304 :
305 : /* Initialize stream state */
306 65 : peer->stream.data = NULL;
307 65 : peer->stream.user_data = NULL;
308 65 : peer->stream.on_complete = NULL;
309 65 : peer->stream.total_length = 0;
310 65 : peer->stream.bytes_sent = 0;
311 65 : peer->stream.active = 0;
312 65 : peer->stream.cancelled = 0;
313 :
314 : /* Clear connection handle */
315 65 : peer->hot.connection = NULL;
316 :
317 : /* Copy name */
318 65 : if (name && name[0] != '\0') {
319 44 : size_t name_len = pt_strlen(name);
320 44 : if (name_len > PT_MAX_PEER_NAME) {
321 0 : name_len = PT_MAX_PEER_NAME;
322 : }
323 44 : pt_memcpy(ctx->peer_names[peer->hot.name_idx], name, name_len);
324 44 : ctx->peer_names[peer->hot.name_idx][name_len] = '\0';
325 : } else {
326 21 : ctx->peer_names[peer->hot.name_idx][0] = '\0';
327 : }
328 :
329 : /* Initialize Tier 2 direct buffers for large messages */
330 : {
331 65 : uint16_t buf_size = ctx->direct_buffer_size;
332 65 : if (buf_size == 0) {
333 11 : buf_size = PT_DIRECT_DEFAULT_SIZE;
334 : }
335 65 : if (pt_direct_buffer_init(&peer->send_direct, buf_size) != 0) {
336 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_MEMORY,
337 : "Failed to allocate send direct buffer for peer %u",
338 : peer->hot.id);
339 : /* Non-fatal: peer can still use Tier 1 queue */
340 : }
341 65 : if (pt_direct_buffer_init(&peer->recv_direct, buf_size) != 0) {
342 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_MEMORY,
343 : "Failed to allocate recv direct buffer for peer %u",
344 : peer->hot.id);
345 : /* Non-fatal: small messages still work */
346 : }
347 : }
348 :
349 : /* Increment peer count */
350 65 : ctx->peer_count++;
351 :
352 65 : PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
353 : "Peer created: id=%u name='%s' addr=0x%08X port=%u",
354 : peer->hot.id, ctx->peer_names[peer->hot.name_idx], ip, port);
355 :
356 65 : return peer;
357 : }
358 :
359 2 : void pt_peer_destroy(struct pt_context *ctx, struct pt_peer *peer)
360 : {
361 2 : if (!ctx || !peer || peer->hot.magic != PT_PEER_MAGIC) {
362 0 : return;
363 : }
364 :
365 2 : PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
366 : "Peer destroyed: id=%u name='%s'",
367 : peer->hot.id, ctx->peer_names[peer->hot.name_idx]);
368 :
369 : /* Cleanup async send pipeline */
370 2 : pt_pipeline_cleanup(ctx, peer);
371 :
372 : /* Free Tier 2 direct buffers */
373 2 : pt_direct_buffer_free(&peer->send_direct);
374 2 : pt_direct_buffer_free(&peer->recv_direct);
375 :
376 : /* Clear sensitive data */
377 2 : peer->hot.magic = 0;
378 2 : peer->hot.state = PT_PEER_STATE_UNUSED;
379 2 : ctx->peer_names[peer->hot.name_idx][0] = '\0';
380 2 : peer->cold.info.address = 0;
381 2 : peer->cold.info.port = 0;
382 2 : peer->cold.info.connected = 0;
383 2 : peer->hot.connection = NULL;
384 :
385 : /* Decrement peer count */
386 2 : if (ctx->peer_count > 0) {
387 2 : ctx->peer_count--;
388 : }
389 : }
390 :
391 : /* ========================================================================
392 : * State Management
393 : * ======================================================================== */
394 :
395 80 : int pt_peer_set_state(struct pt_context *ctx, struct pt_peer *peer,
396 : pt_peer_state new_state)
397 : {
398 : pt_peer_state old_state;
399 80 : int valid_transition = 0;
400 :
401 80 : if (!peer || peer->hot.magic != PT_PEER_MAGIC) {
402 0 : return PT_ERR_INVALID_PARAM;
403 : }
404 :
405 80 : old_state = peer->hot.state;
406 :
407 : /* Check if transition is valid */
408 80 : switch (old_state) {
409 2 : case PT_PEER_STATE_UNUSED:
410 2 : valid_transition = (new_state == PT_PEER_STATE_DISCOVERED);
411 2 : break;
412 :
413 44 : case PT_PEER_STATE_DISCOVERED:
414 27 : valid_transition = (new_state == PT_PEER_STATE_CONNECTING ||
415 6 : new_state == PT_PEER_STATE_CONNECTED ||
416 71 : new_state == PT_PEER_STATE_DISCOVERED ||
417 : new_state == PT_PEER_STATE_UNUSED);
418 44 : break;
419 :
420 16 : case PT_PEER_STATE_CONNECTING:
421 2 : valid_transition = (new_state == PT_PEER_STATE_CONNECTED ||
422 18 : new_state == PT_PEER_STATE_FAILED ||
423 : new_state == PT_PEER_STATE_UNUSED);
424 16 : break;
425 :
426 2 : case PT_PEER_STATE_CONNECTED:
427 0 : valid_transition = (new_state == PT_PEER_STATE_DISCONNECTING ||
428 2 : new_state == PT_PEER_STATE_FAILED ||
429 : new_state == PT_PEER_STATE_UNUSED);
430 2 : break;
431 :
432 11 : case PT_PEER_STATE_DISCONNECTING:
433 : /* Allow transition to DISCOVERED for reconnection support */
434 11 : valid_transition = (new_state == PT_PEER_STATE_UNUSED ||
435 : new_state == PT_PEER_STATE_DISCOVERED);
436 11 : break;
437 :
438 5 : case PT_PEER_STATE_FAILED:
439 5 : valid_transition = (new_state == PT_PEER_STATE_UNUSED ||
440 : new_state == PT_PEER_STATE_DISCOVERED);
441 5 : break;
442 :
443 0 : default:
444 0 : valid_transition = 0;
445 0 : break;
446 : }
447 :
448 80 : if (!valid_transition) {
449 8 : if (ctx) {
450 8 : PT_CTX_WARN(ctx, PT_LOG_CAT_CONNECT,
451 : "Invalid state transition: %s → %s (peer id=%u)",
452 : pt_peer_state_str(old_state),
453 : pt_peer_state_str(new_state),
454 : peer->hot.id);
455 : }
456 8 : return PT_ERR_INVALID_STATE;
457 : }
458 :
459 : /* Perform transition */
460 72 : peer->hot.state = new_state;
461 :
462 : /* Log transition */
463 72 : if (ctx) {
464 72 : if (new_state == PT_PEER_STATE_CONNECTED) {
465 : /* Operational visibility for connections */
466 35 : PT_CTX_INFO(ctx, PT_LOG_CAT_CONNECT,
467 : "Peer state: %s → %s (peer id=%u)",
468 : pt_peer_state_str(old_state),
469 : pt_peer_state_str(new_state),
470 : peer->hot.id);
471 : } else {
472 : /* Verbose diagnostics for other transitions */
473 37 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_CONNECT,
474 : "Peer state: %s → %s (peer id=%u)",
475 : pt_peer_state_str(old_state),
476 : pt_peer_state_str(new_state),
477 : peer->hot.id);
478 : }
479 : }
480 :
481 72 : return 0;
482 : }
483 :
484 160 : const char *pt_peer_state_str(pt_peer_state state)
485 : {
486 160 : switch (state) {
487 3 : case PT_PEER_STATE_UNUSED: return "UNUSED";
488 59 : case PT_PEER_STATE_DISCOVERED: return "DISCOVERED";
489 33 : case PT_PEER_STATE_CONNECTING: return "CONNECTING";
490 39 : case PT_PEER_STATE_CONNECTED: return "CONNECTED";
491 15 : case PT_PEER_STATE_DISCONNECTING: return "DISCONNECTING";
492 11 : case PT_PEER_STATE_FAILED: return "FAILED";
493 0 : default: return "UNKNOWN";
494 : }
495 : }
496 :
497 : /* ========================================================================
498 : * Timeout & Validation
499 : * ======================================================================== */
500 :
501 11 : int pt_peer_is_timed_out(struct pt_peer *peer, pt_tick_t now,
502 : pt_tick_t timeout_ticks)
503 : {
504 : pt_tick_t elapsed;
505 :
506 11 : if (!peer || peer->hot.last_seen == 0) {
507 2 : return 0;
508 : }
509 :
510 9 : elapsed = now - peer->hot.last_seen;
511 9 : return (elapsed > timeout_ticks) ? 1 : 0;
512 : }
513 :
514 : /* cppcheck-suppress constParameter ; peer could be const but keeping non-const for API consistency */
515 1 : int pt_peer_check_canaries(struct pt_context *ctx, struct pt_peer *peer)
516 : {
517 1 : int corrupted = 0;
518 :
519 1 : if (!peer) {
520 0 : return -1;
521 : }
522 :
523 : #ifdef PT_DEBUG
524 : /* Check output buffer canary */
525 : if (peer->cold.obuf_canary != PT_CANARY_OBUF) {
526 : if (ctx) {
527 : PT_CTX_ERR(ctx, PT_LOG_CAT_MEMORY,
528 : "Output buffer overflow detected (peer id=%u): "
529 : "expected 0x%08X, got 0x%08X",
530 : peer->hot.id, PT_CANARY_OBUF, peer->cold.obuf_canary);
531 : }
532 : corrupted = 1;
533 : }
534 :
535 : /* Check input buffer canary */
536 : if (peer->cold.ibuf_canary != PT_CANARY_IBUF) {
537 : if (ctx) {
538 : PT_CTX_ERR(ctx, PT_LOG_CAT_MEMORY,
539 : "Input buffer overflow detected (peer id=%u): "
540 : "expected 0x%08X, got 0x%08X",
541 : peer->hot.id, PT_CANARY_IBUF, peer->cold.ibuf_canary);
542 : }
543 : corrupted = 1;
544 : }
545 : #else
546 : /* In release builds, canaries are not present - always return valid */
547 : (void)ctx; /* Suppress unused warning */
548 : #endif
549 :
550 1 : return corrupted ? -1 : 0;
551 : }
552 :
553 3 : void pt_peer_get_info(struct pt_peer *peer, PeerTalk_PeerInfo *info)
554 : {
555 3 : if (!peer || !info) {
556 0 : return;
557 : }
558 :
559 : /* Copy peer info */
560 3 : pt_memcpy(info, &peer->cold.info, sizeof(PeerTalk_PeerInfo));
561 :
562 : /* Update fields from hot data */
563 3 : info->id = peer->hot.id;
564 3 : info->latency_ms = peer->hot.latency_ms;
565 3 : info->name_idx = peer->hot.name_idx;
566 :
567 : /* Update connected field based on current state */
568 3 : info->connected = (peer->hot.state == PT_PEER_STATE_CONNECTED) ? 1 : 0;
569 : }
570 :
571 : /* ========================================================================
572 : * Flow Control
573 : * ======================================================================== */
574 :
575 : /**
576 : * Get the pressure threshold level (0, 25, 50, 75, 100) for a pressure value
577 : */
578 640 : static uint8_t pt_pressure_level(uint8_t pressure)
579 : {
580 640 : if (pressure >= 75) return 75;
581 639 : if (pressure >= 50) return 50;
582 637 : if (pressure >= 25) return 25;
583 637 : return 0;
584 : }
585 :
586 320 : int pt_peer_check_pressure_update(struct pt_context *ctx, struct pt_peer *peer)
587 : {
588 : uint8_t send_pressure;
589 : uint8_t recv_pressure;
590 : uint8_t current_pressure;
591 : uint8_t current_level;
592 : uint8_t last_level;
593 :
594 320 : if (!ctx || !peer || peer->hot.magic != PT_PEER_MAGIC) {
595 0 : return 0;
596 : }
597 :
598 : /* Only check for connected peers */
599 320 : if (peer->hot.state != PT_PEER_STATE_CONNECTED) {
600 0 : return 0;
601 : }
602 :
603 : /* Get pressure from BOTH queues - report the worse one.
604 : *
605 : * This captures the actual constraint regardless of where it is:
606 : * - High send_pressure: "I can't transmit fast enough" (echo backlog)
607 : * - High recv_pressure: "I can't receive fast enough" (processing backlog)
608 : *
609 : * On MacTCP, recv uses zero-copy so recv_queue is often empty.
610 : * The real bottleneck shows up in send_queue when echoing back.
611 : * By reporting MAX, we capture whichever is the actual constraint.
612 : */
613 320 : send_pressure = peer->send_queue ? pt_queue_pressure(peer->send_queue) : 0;
614 320 : recv_pressure = peer->recv_queue ? pt_queue_pressure(peer->recv_queue) : 0;
615 320 : current_pressure = (send_pressure > recv_pressure) ? send_pressure : recv_pressure;
616 :
617 : /* Quantize to threshold levels for hysteresis */
618 320 : current_level = pt_pressure_level(current_pressure);
619 320 : last_level = pt_pressure_level(peer->cold.caps.last_reported_pressure);
620 :
621 : /* Check if we crossed a threshold */
622 320 : if (current_level != last_level) {
623 2 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
624 : "Pressure threshold crossed for peer %u: %u%% -> %u%% (level %u -> %u)",
625 : peer->hot.id, peer->cold.caps.last_reported_pressure,
626 : current_pressure, last_level, current_level);
627 :
628 : /* Mark update pending - poll loop will send capability message */
629 2 : peer->cold.caps.pressure_update_pending = 1;
630 2 : peer->cold.caps.last_reported_pressure = current_pressure;
631 :
632 2 : return 1;
633 : }
634 :
635 318 : return 0;
636 : }
637 :
638 279 : int pt_peer_should_throttle(struct pt_peer *peer, uint8_t priority)
639 : {
640 : /* NOTE: This version uses hardcoded thresholds for backward compatibility.
641 : * The context-aware version pt_peer_should_throttle_ctx() uses configurable
642 : * thresholds from ctx->pressure_*. The send path uses this simpler version
643 : * which relies on the PT_PRESSURE_* constants. */
644 : uint8_t peer_pressure;
645 :
646 279 : if (!peer || peer->hot.magic != PT_PEER_MAGIC) {
647 0 : return 0; /* Don't throttle on invalid peer */
648 : }
649 :
650 : /* Get peer's reported buffer pressure */
651 279 : peer_pressure = peer->cold.caps.buffer_pressure;
652 :
653 : /* Decision thresholds based on peer's reported pressure:
654 : *
655 : * 0-49: No throttle (send normally)
656 : * 50-84: Light throttle (skip LOW priority)
657 : * 85-94: Heavy throttle (skip NORMAL and LOW)
658 : * 95+: Blocking (only CRITICAL passes)
659 : *
660 : * This implements sender-side flow control based on receiver feedback.
661 : * The receiver reports its queue pressure via capability updates,
662 : * and we back off accordingly to prevent overwhelming it.
663 : */
664 :
665 279 : if (peer_pressure >= PT_PRESSURE_CRITICAL) {
666 : /* Blocking: only CRITICAL priority passes */
667 7 : if (priority < PT_PRIORITY_CRITICAL) {
668 4 : return 1; /* Throttle */
669 : }
670 272 : } else if (peer_pressure >= PT_PRESSURE_HIGH) {
671 : /* Heavy throttle: skip NORMAL and LOW */
672 5 : if (priority < PT_PRIORITY_HIGH) {
673 3 : return 1; /* Throttle */
674 : }
675 267 : } else if (peer_pressure >= PT_PRESSURE_MEDIUM) {
676 : /* Light throttle: skip LOW priority */
677 5 : if (priority < PT_PRIORITY_NORMAL) {
678 2 : return 1; /* Throttle */
679 : }
680 : }
681 :
682 270 : return 0; /* Don't throttle - send normally */
683 : }
684 :
685 264 : int pt_peer_check_rate_limit(struct pt_context *ctx, struct pt_peer *peer,
686 : uint16_t bytes)
687 : {
688 : pt_tick_t now, elapsed;
689 : uint32_t tokens_to_add;
690 :
691 264 : if (!peer || peer->hot.magic != PT_PEER_MAGIC) {
692 1 : return 0; /* Don't rate limit invalid peer */
693 : }
694 :
695 : /* No rate limit if disabled */
696 263 : if (peer->cold.caps.rate_limit_bytes_per_sec == 0) {
697 260 : return 0;
698 : }
699 :
700 : /* Get current time */
701 3 : if (ctx && ctx->plat && ctx->plat->get_ticks) {
702 2 : now = ctx->plat->get_ticks();
703 : } else {
704 1 : return 0; /* Can't rate limit without timer */
705 : }
706 :
707 : /* Refill tokens based on elapsed time
708 : * tokens_to_add = elapsed_ms * bytes_per_sec / 1000
709 : * Avoid overflow by doing division first for large values */
710 2 : elapsed = now - peer->cold.caps.rate_last_update;
711 2 : if (elapsed > 0) {
712 : /* Calculate tokens to add (milliseconds * bytes/sec / 1000)
713 : * Use 64-bit intermediate to avoid overflow */
714 0 : uint32_t rate = peer->cold.caps.rate_limit_bytes_per_sec;
715 0 : tokens_to_add = (uint32_t)(((uint32_t)elapsed * rate) / 1000);
716 :
717 0 : peer->cold.caps.rate_bucket_tokens += tokens_to_add;
718 :
719 : /* Cap at bucket max */
720 0 : if (peer->cold.caps.rate_bucket_tokens > peer->cold.caps.rate_bucket_max) {
721 0 : peer->cold.caps.rate_bucket_tokens = peer->cold.caps.rate_bucket_max;
722 : }
723 :
724 0 : peer->cold.caps.rate_last_update = now;
725 : }
726 :
727 : /* Check if we have enough tokens */
728 2 : if (peer->cold.caps.rate_bucket_tokens < bytes) {
729 1 : return 1; /* Rate limited - not enough tokens */
730 : }
731 :
732 : /* Consume tokens */
733 1 : peer->cold.caps.rate_bucket_tokens -= bytes;
734 1 : return 0; /* OK to send */
735 : }
736 :
737 : /* ========================================================================== */
738 : /* Async Send Pipeline Management */
739 : /* ========================================================================== */
740 :
741 : /**
742 : * Calculate buffer size for a pipeline slot
743 : */
744 0 : static size_t pt_pipeline_buf_size(void)
745 : {
746 0 : return (size_t)PT_PIPELINE_MAX_PAYLOAD + PT_MESSAGE_HEADER_SIZE + 4;
747 : }
748 :
749 0 : int pt_pipeline_init(struct pt_context *ctx, struct pt_peer *peer)
750 : {
751 : int i;
752 : size_t buf_size;
753 :
754 0 : if (!ctx || !peer || peer->hot.magic != PT_PEER_MAGIC) {
755 0 : return PT_ERR_INVALID_PARAM;
756 : }
757 :
758 : /* Already initialized? */
759 0 : if (peer->pipeline.initialized) {
760 0 : return PT_OK;
761 : }
762 :
763 0 : buf_size = pt_pipeline_buf_size();
764 :
765 : /* Initialize slot metadata */
766 0 : for (i = 0; i < PT_SEND_PIPELINE_DEPTH; i++) {
767 0 : peer->pipeline.slots[i].in_use = 0;
768 0 : peer->pipeline.slots[i].completed = 0;
769 0 : peer->pipeline.slots[i].platform_data = NULL;
770 :
771 : /* Initialize WDS sentinel */
772 0 : peer->pipeline.slots[i].wds[1].length = 0;
773 0 : peer->pipeline.slots[i].wds[1].ptr = NULL;
774 :
775 : #ifdef PT_LOWMEM
776 : /* Lazy allocation: defer buffer allocation until first use.
777 : * This saves ~2KB per peer on Mac SE (4MB RAM) when peers
778 : * aren't actively sending data. */
779 : peer->pipeline.slots[i].buffer = NULL;
780 : peer->pipeline.slots[i].buffer_size = (uint16_t)buf_size; /* Remember target size */
781 : #else
782 : /* Eager allocation: allocate all buffers upfront.
783 : * Better for high-throughput scenarios. */
784 0 : peer->pipeline.slots[i].buffer = (uint8_t *)pt_alloc(buf_size);
785 0 : if (!peer->pipeline.slots[i].buffer) {
786 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_MEMORY,
787 : "Pipeline init failed: slot %d alloc (%zu bytes)", i, buf_size);
788 0 : pt_pipeline_cleanup(ctx, peer);
789 0 : return PT_ERR_NO_MEMORY;
790 : }
791 0 : peer->pipeline.slots[i].buffer_size = (uint16_t)buf_size;
792 : #endif
793 : }
794 :
795 0 : peer->pipeline.pending_count = 0;
796 0 : peer->pipeline.next_slot = 0;
797 0 : peer->pipeline.initialized = 1;
798 :
799 : #ifdef PT_LOWMEM
800 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_MEMORY,
801 : "Pipeline init (lazy): peer=%u depth=%d buf_size=%zu",
802 : peer->hot.id, PT_SEND_PIPELINE_DEPTH, buf_size);
803 : #else
804 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_MEMORY,
805 : "Pipeline init: peer=%u depth=%d buf_size=%zu",
806 : peer->hot.id, PT_SEND_PIPELINE_DEPTH, buf_size);
807 : #endif
808 :
809 0 : return PT_OK;
810 : }
811 :
812 0 : pt_send_slot *pt_pipeline_get_slot(struct pt_context *ctx, struct pt_peer *peer)
813 : {
814 : int i;
815 :
816 0 : if (!peer || !peer->pipeline.initialized) {
817 0 : return NULL;
818 : }
819 :
820 : /* Find a free slot */
821 0 : for (i = 0; i < PT_SEND_PIPELINE_DEPTH; i++) {
822 0 : pt_send_slot *slot = &peer->pipeline.slots[i];
823 :
824 0 : if (!slot->in_use) {
825 : #ifdef PT_LOWMEM
826 : /* Lazy allocation: allocate buffer on first use */
827 : if (!slot->buffer) {
828 : size_t buf_size = pt_pipeline_buf_size();
829 : slot->buffer = (uint8_t *)pt_alloc(buf_size);
830 : if (!slot->buffer) {
831 : PT_CTX_WARN(ctx, PT_LOG_CAT_MEMORY,
832 : "Pipeline lazy alloc failed: slot %d (%zu bytes)",
833 : i, buf_size);
834 : return NULL; /* Allocation failed */
835 : }
836 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_MEMORY,
837 : "Pipeline lazy alloc: peer=%u slot=%d", peer->hot.id, i);
838 : }
839 : #else
840 : (void)ctx; /* Unused in non-lowmem builds */
841 : #endif
842 0 : return slot;
843 : }
844 : }
845 :
846 0 : return NULL; /* All slots busy */
847 : }
848 :
849 2 : void pt_pipeline_cleanup(struct pt_context *ctx, struct pt_peer *peer)
850 : {
851 : int i;
852 : uint8_t pending;
853 :
854 2 : if (!peer) {
855 0 : return;
856 : }
857 :
858 2 : pending = peer->pipeline.pending_count;
859 :
860 : /* Free buffers */
861 10 : for (i = 0; i < PT_SEND_PIPELINE_DEPTH; i++) {
862 8 : if (peer->pipeline.slots[i].buffer) {
863 0 : pt_free(peer->pipeline.slots[i].buffer);
864 0 : peer->pipeline.slots[i].buffer = NULL;
865 : }
866 : /* Note: platform_data cleanup is platform's responsibility */
867 8 : peer->pipeline.slots[i].in_use = 0;
868 8 : peer->pipeline.slots[i].completed = 0;
869 : }
870 :
871 2 : peer->pipeline.pending_count = 0;
872 2 : peer->pipeline.next_slot = 0;
873 2 : peer->pipeline.initialized = 0;
874 :
875 2 : if (ctx && pending > 0) {
876 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_MEMORY,
877 : "Pipeline cleanup: peer=%u pending=%u",
878 : peer->hot.id, pending);
879 : }
880 : }
881 :
882 : /* ========================================================================== */
883 : /* Adaptive Performance Tuning */
884 : /* ========================================================================== */
885 :
886 0 : void pt_peer_update_adaptive_params(struct pt_context *ctx, struct pt_peer *peer)
887 : {
888 : uint16_t rtt;
889 : uint16_t new_chunk;
890 : uint16_t peer_optimal;
891 : uint8_t new_pipeline;
892 : uint16_t new_window;
893 : uint32_t new_rate_limit;
894 : uint8_t peer_pressure;
895 :
896 0 : if (!peer || peer->hot.magic != PT_PEER_MAGIC) {
897 0 : return;
898 : }
899 :
900 0 : rtt = peer->hot.latency_ms;
901 0 : peer_optimal = peer->cold.caps.optimal_chunk;
902 0 : peer_pressure = peer->cold.caps.buffer_pressure;
903 :
904 : /* Tuning logic based on measured RTT
905 : *
906 : * Larger chunks reduce per-message overhead but increase latency.
907 : * Smaller chunks are better for slow/lossy links.
908 : * Pipeline depth controls how many messages in flight.
909 : */
910 0 : if (rtt < 50) {
911 : /* Fast LAN - maximize throughput */
912 0 : new_chunk = 4096;
913 0 : new_pipeline = 4;
914 0 : new_window = 6; /* Larger window for better utilization */
915 0 : } else if (rtt < 100) {
916 : /* Good connection */
917 0 : new_chunk = 2048;
918 0 : new_pipeline = 3;
919 0 : new_window = 4; /* Normal */
920 0 : } else if (rtt < 200) {
921 : /* Moderate latency */
922 0 : new_chunk = 1024;
923 0 : new_pipeline = 2;
924 0 : new_window = 3; /* Moderate latency */
925 : } else {
926 : /* Slow/lossy - minimize in-flight data */
927 0 : new_chunk = 512;
928 0 : new_pipeline = 1;
929 0 : new_window = 2; /* High latency - smaller window */
930 : }
931 :
932 : /* Clamp window to protocol limits */
933 0 : if (new_window < PT_FLOW_WINDOW_MIN) new_window = PT_FLOW_WINDOW_MIN;
934 0 : if (new_window > PT_FLOW_WINDOW_MAX) new_window = PT_FLOW_WINDOW_MAX;
935 :
936 : /* Incorporate peer's optimal_chunk from capability exchange.
937 : * The peer advertises their 25% threshold - the ideal chunk size
938 : * that triggers efficient receive completion on their end.
939 : * Use the smaller of RTT-based and peer-optimal to be safe. */
940 0 : if (peer_optimal > 0 && peer_optimal < new_chunk) {
941 0 : uint16_t rtt_based_chunk = new_chunk; /* Save RTT-based value for logging */
942 0 : new_chunk = peer_optimal;
943 0 : if (ctx) {
944 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
945 : "Using peer %u optimal_chunk=%u (smaller than RTT-based %u)",
946 : peer->hot.id, peer_optimal, rtt_based_chunk);
947 : }
948 : }
949 :
950 : /* ================================================================
951 : * AUTO RATE LIMITING: Adjust rate based on peer's reported pressure
952 : *
953 : * When peer reports high buffer pressure, we automatically enable
954 : * rate limiting to prevent overwhelming them. This replaces manual
955 : * rate limiting in application code (e.g., perf_partner's sleep).
956 : *
957 : * Uses configurable thresholds from ctx->pressure_* if available,
958 : * otherwise falls back to PT_PRESSURE_* constants.
959 : * ================================================================ */
960 : {
961 0 : uint8_t thresh_high = ctx ? ctx->pressure_high : PT_PRESSURE_HIGH;
962 0 : uint8_t thresh_med = ctx ? ctx->pressure_medium : PT_PRESSURE_MEDIUM;
963 :
964 0 : if (peer_pressure >= thresh_high) {
965 : /* High pressure: aggressive rate limiting */
966 0 : new_rate_limit = 50 * 1024; /* 50 KB/s */
967 0 : } else if (peer_pressure >= thresh_med) {
968 : /* Medium pressure: moderate rate limiting */
969 0 : new_rate_limit = 100 * 1024; /* 100 KB/s */
970 : } else {
971 : /* Low/no pressure: unlimited */
972 0 : new_rate_limit = 0;
973 : }
974 : }
975 :
976 : /* Update rate limit if changed */
977 0 : if (peer->cold.caps.rate_limit_bytes_per_sec != new_rate_limit) {
978 0 : peer->cold.caps.rate_limit_bytes_per_sec = new_rate_limit;
979 :
980 : /* Initialize/reset token bucket when rate limit changes */
981 0 : if (new_rate_limit > 0) {
982 : /* Bucket max = 2x rate (allows small bursts) */
983 0 : peer->cold.caps.rate_bucket_max = new_rate_limit * 2;
984 : /* Start with full bucket */
985 0 : peer->cold.caps.rate_bucket_tokens = peer->cold.caps.rate_bucket_max;
986 : /* Initialize timer */
987 0 : if (ctx && ctx->plat && ctx->plat->get_ticks) {
988 0 : peer->cold.caps.rate_last_update = ctx->plat->get_ticks();
989 : }
990 : }
991 :
992 0 : if (ctx) {
993 0 : if (new_rate_limit > 0) {
994 0 : PT_CTX_INFO(ctx, PT_LOG_CAT_PROTOCOL,
995 : "Rate limit enabled for peer %u: %u KB/s (pressure=%u%%)",
996 : peer->hot.id, new_rate_limit / 1024, peer_pressure);
997 : } else {
998 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
999 : "Rate limit disabled for peer %u (pressure=%u%%)",
1000 : peer->hot.id, peer_pressure);
1001 : }
1002 : }
1003 : }
1004 :
1005 : /* Update send window if changed */
1006 0 : if (peer->cold.caps.send_window != new_window) {
1007 0 : if (ctx) {
1008 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
1009 : "Adaptive window for peer %u: %u -> %u (RTT=%ums)",
1010 : peer->hot.id, peer->cold.caps.send_window, new_window, rtt);
1011 : }
1012 0 : peer->cold.caps.send_window = new_window;
1013 : }
1014 :
1015 : /* Only log chunk/pipeline if they actually changed */
1016 0 : if (peer->hot.effective_chunk != new_chunk ||
1017 0 : peer->hot.pipeline_depth != new_pipeline) {
1018 :
1019 0 : peer->hot.effective_chunk = new_chunk;
1020 0 : peer->hot.pipeline_depth = new_pipeline;
1021 :
1022 0 : if (ctx) {
1023 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
1024 : "Adaptive tuning for peer %u: RTT=%ums chunk=%u pipeline=%u peer_optimal=%u",
1025 : peer->hot.id, rtt, new_chunk, new_pipeline, peer_optimal);
1026 : }
1027 : }
1028 : }
|