Line data Source code
1 : /* send.c - Batch send operations for PeerTalk
2 : *
3 : * Implements batching to combine multiple small messages into one TCP packet
4 : * for improved efficiency. Common in games where many small control messages
5 : * (position updates, input events) would otherwise have high TCP/IP overhead.
6 : *
7 : * Also implements transparent fragmentation for large messages sent to
8 : * constrained peers. Applications call PeerTalk_Send() and the SDK handles
9 : * fragment splitting automatically based on negotiated capabilities.
10 : */
11 :
12 : #include "pt_internal.h"
13 : #include "send.h"
14 : #include "queue.h"
15 : #include "protocol.h"
16 : #include "peer.h"
17 : #include "pt_compat.h"
18 : #include "direct_buffer.h"
19 :
20 : /* ========================================================================
21 : * Fragment Send (Internal)
22 : *
23 : * Sends a single fragment of a larger message. Each fragment is framed
24 : * as a complete message with PT_MSG_FLAG_FRAGMENT set and a fragment
25 : * header prepended to the payload.
26 : * ======================================================================== */
27 :
28 0 : static int pt_send_fragment(struct pt_context *ctx, struct pt_peer *peer,
29 : const uint8_t *data, uint16_t frag_len,
30 : uint16_t msg_id, uint16_t total_len,
31 : uint16_t offset, uint8_t frag_flags,
32 : uint8_t priority) {
33 0 : pt_queue *q = peer->send_queue;
34 : pt_fragment_header frag_hdr;
35 : uint8_t frag_buf[PT_QUEUE_SLOT_SIZE];
36 : uint16_t total_frag_len;
37 : int result;
38 :
39 0 : if (!q) {
40 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_SEND,
41 : "Peer %u has no send queue", peer->hot.id);
42 0 : return PT_ERR_INVALID_STATE;
43 : }
44 :
45 : /* Build fragment header */
46 0 : frag_hdr.message_id = msg_id;
47 0 : frag_hdr.total_length = total_len;
48 0 : frag_hdr.fragment_offset = offset;
49 0 : frag_hdr.fragment_flags = frag_flags;
50 0 : frag_hdr.reserved = 0;
51 :
52 : /* Check fragment fits in queue slot */
53 0 : total_frag_len = PT_FRAGMENT_HEADER_SIZE + frag_len;
54 0 : if (total_frag_len > PT_QUEUE_SLOT_SIZE) {
55 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_SEND,
56 : "Fragment too large: %u + %u > %u",
57 : PT_FRAGMENT_HEADER_SIZE, frag_len, PT_QUEUE_SLOT_SIZE);
58 0 : return PT_ERR_MESSAGE_TOO_LARGE;
59 : }
60 :
61 : /* Build fragment: header + data */
62 0 : pt_fragment_encode(&frag_hdr, frag_buf);
63 0 : pt_memcpy(frag_buf + PT_FRAGMENT_HEADER_SIZE, data, frag_len);
64 :
65 : /* Queue fragment with PT_SLOT_FRAGMENT flag
66 : * The drain code checks this flag and sets PT_MSG_FLAG_FRAGMENT on send */
67 0 : result = pt_queue_push(ctx, q, frag_buf, total_frag_len,
68 : priority, PT_SLOT_FRAGMENT);
69 :
70 0 : if (result == 0) {
71 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
72 : "Queued fragment: id=%u offset=%u len=%u %s%s",
73 : msg_id, offset, frag_len,
74 : (frag_flags & PT_FRAGMENT_FLAG_FIRST) ? "FIRST " : "",
75 : (frag_flags & PT_FRAGMENT_FLAG_LAST) ? "LAST" : "");
76 : }
77 :
78 0 : return result;
79 : }
80 :
81 : /* ========================================================================
82 : * Batch Send Operations
83 : * ======================================================================== */
84 :
85 : /*
86 : * Batch send - combine multiple small messages into one TCP packet
87 : *
88 : * This improves efficiency by reducing TCP/IP overhead for
89 : * many small messages common in games.
90 : *
91 : * NOTE: The actual pt_batch_send() function requires a platform-specific
92 : * send function. This is provided by Phases 3/4/5:
93 : * - POSIX: pt_posix_send()
94 : * - MacTCP: pt_mactcp_send()
95 : * - Open Transport: pt_ot_send()
96 : *
97 : * The batch building (pt_batch_init, pt_batch_add) is platform-independent.
98 : */
99 :
100 : /* Batch constants and type are defined in send.h */
101 :
102 10 : void pt_batch_init(pt_batch *batch) {
103 10 : batch->used = 0;
104 10 : batch->count = 0;
105 10 : batch->is_fragment = 0;
106 10 : batch->reserved = 0;
107 10 : }
108 :
109 53 : int pt_batch_add(pt_batch *batch, const void *data, uint16_t len) {
110 53 : if (batch->used + PT_BATCH_HEADER + len > PT_BATCH_MAX_SIZE)
111 2 : return -1; /* Batch full */
112 :
113 : /* Add length prefix (big-endian) */
114 51 : batch->buffer[batch->used++] = (len >> 8) & 0xFF;
115 51 : batch->buffer[batch->used++] = len & 0xFF;
116 :
117 : /* Reserved bytes for flags/type */
118 51 : batch->buffer[batch->used++] = 0;
119 51 : batch->buffer[batch->used++] = 0;
120 :
121 : /* Add data */
122 51 : pt_memcpy(batch->buffer + batch->used, data, len);
123 51 : batch->used += len;
124 51 : batch->count++;
125 :
126 51 : return 0;
127 : }
128 :
129 : /*
130 : * Send a batch as a single framed message
131 : *
132 : * This function builds the complete framed message but delegates
133 : * the actual send to the platform layer. The caller (poll loop)
134 : * should use the appropriate platform send function.
135 : *
136 : * Returns: bytes to send (stored in batch->buffer with header prepended),
137 : * or 0 if batch is empty
138 : */
139 2 : int pt_batch_prepare(struct pt_peer *peer, pt_batch *batch) {
140 : pt_message_header hdr;
141 : uint8_t header_buf[PT_MESSAGE_HEADER_SIZE];
142 :
143 2 : if (batch->count == 0)
144 1 : return 0;
145 :
146 : /* Build header */
147 1 : hdr.version = PT_PROTOCOL_VERSION;
148 1 : hdr.type = PT_MSG_TYPE_DATA;
149 1 : hdr.flags = PT_MSG_FLAG_BATCH;
150 1 : hdr.sequence = peer->hot.send_seq++;
151 1 : hdr.payload_len = batch->used;
152 :
153 1 : pt_message_encode_header(&hdr, header_buf);
154 :
155 : /*
156 : * The caller should send:
157 : * 1. header_buf (PT_MESSAGE_HEADER_SIZE bytes)
158 : * 2. batch->buffer (batch->used bytes)
159 : * 3. CRC-16 of (header + payload)
160 : *
161 : * Or use the platform's send_framed() function if available.
162 : */
163 1 : return batch->used;
164 : }
165 :
166 : /*
167 : * NOTE: pt_queue_peek() and pt_queue_consume() were REMOVED.
168 : *
169 : * These functions bypassed the priority free-lists and coalesce hash,
170 : * causing data structure corruption. The batch send code now uses
171 : * pt_queue_pop_priority() which properly maintains all data structures.
172 : *
173 : * If you need peek-like functionality, pop into a local buffer instead:
174 : * uint8_t buf[PT_QUEUE_SLOT_SIZE];
175 : * uint16_t len;
176 : * if (pt_queue_pop_priority(q, buf, &len) == 0) {
177 : * // Process buf[0..len-1]
178 : * }
179 : */
180 :
181 : /* pt_batch_send_fn typedef is in send.h */
182 :
183 : /*
184 : * Drain send queue in batches
185 : *
186 : * Called from poll loop - combines queued messages into batches.
187 : * The send_fn callback is platform-specific and provided by the
188 : * networking layer (Phase 4 POSIX, Phase 5 MacTCP, Phase 6 OT).
189 : *
190 : * Uses pre-allocated batch buffer from ctx->send_batch to avoid
191 : * 1.4KB stack allocation per call.
192 : *
193 : * IMPORTANT: Uses pt_queue_pop_priority() which properly maintains
194 : * priority free-lists and coalesce hash. Messages are batched in
195 : * priority order (CRITICAL first, then HIGH, NORMAL, LOW).
196 : *
197 : * LOGGING: Logs DEBUG on successful batch send, ERR on failure, WARN
198 : * for oversized messages.
199 : */
200 3 : int pt_drain_send_queue(struct pt_context *ctx, struct pt_peer *peer,
201 : pt_batch_send_fn send_fn) {
202 : pt_batch *batch;
203 3 : pt_queue *q = peer->send_queue;
204 : const void *msg_data; /* Zero-copy pointer to slot data */
205 : uint16_t len;
206 : uint8_t slot_flags;
207 3 : int sent = 0;
208 :
209 3 : if (!ctx || !q || pt_queue_is_empty(q) || !send_fn)
210 1 : return 0;
211 :
212 : /* Use pre-allocated batch buffer from context */
213 2 : batch = &ctx->send_batch;
214 2 : pt_batch_init(batch);
215 :
216 : /* Pop messages in priority order using ZERO-COPY direct pop.
217 : * This avoids double-copy: instead of slot->temp->batch, we do slot->batch.
218 : * On 68k with 2-10 MB/s memory bandwidth, this saves significant time.
219 : *
220 : * FRAGMENT HANDLING: Fragments (PT_SLOT_FRAGMENT) must be sent individually
221 : * with PT_MSG_FLAG_FRAGMENT set, NOT batched with normal messages.
222 : *
223 : * PROTOCOL: Only commit after successful batch_add to avoid message loss. */
224 6 : while (pt_queue_pop_priority_direct(q, &msg_data, &len) == 0) {
225 : /* Check if this is a fragment - needs special handling */
226 4 : slot_flags = q->slots[q->pending_pop_slot].flags;
227 :
228 4 : if (slot_flags & PT_SLOT_FRAGMENT) {
229 : /* Fragment: Flush any pending batch first, then send fragment alone */
230 0 : if (batch->count > 0) {
231 0 : if (send_fn(ctx, peer, batch) == 0) {
232 0 : sent++;
233 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
234 : "Batch sent: %u messages, %u bytes", batch->count, batch->used);
235 : } else {
236 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL, "Batch send failed");
237 : }
238 0 : pt_batch_init(batch);
239 : }
240 :
241 : /* Send fragment as its own "batch" with is_fragment flag */
242 0 : if (pt_batch_add(batch, msg_data, len) == 0) {
243 0 : batch->is_fragment = 1;
244 0 : if (send_fn(ctx, peer, batch) == 0) {
245 0 : sent++;
246 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
247 : "Fragment sent: %u bytes", len);
248 : } else {
249 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL, "Fragment send failed");
250 : }
251 : }
252 0 : pt_batch_init(batch);
253 0 : pt_queue_pop_priority_commit(q);
254 0 : continue;
255 : }
256 :
257 : /* Regular message: add to batch */
258 4 : if (pt_batch_add(batch, msg_data, len) < 0) {
259 : /* Batch full - DON'T commit yet, send current batch first */
260 0 : if (send_fn(ctx, peer, batch) == 0) {
261 0 : sent++;
262 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
263 : "Batch sent: %u messages, %u bytes", batch->count, batch->used);
264 : } else {
265 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL, "Batch send failed");
266 : }
267 0 : pt_batch_init(batch);
268 :
269 : /* Now try adding the message to the fresh batch */
270 0 : if (pt_batch_add(batch, msg_data, len) < 0) {
271 : /* Message too large even for empty batch - should not happen
272 : * if PT_BATCH_MAX_SIZE > PT_QUEUE_SLOT_SIZE */
273 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_PROTOCOL,
274 : "Message too large for batch (%u bytes), dropped", len);
275 : /* Commit the pop anyway - message is lost (config error) */
276 0 : pt_queue_pop_priority_commit(q);
277 0 : continue;
278 : }
279 : }
280 : /* Message added to batch - now commit the pop */
281 4 : pt_queue_pop_priority_commit(q);
282 : }
283 :
284 : /* Send remaining batch */
285 2 : if (batch->count > 0) {
286 2 : if (send_fn(ctx, peer, batch) == 0) {
287 1 : sent++;
288 1 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_PROTOCOL,
289 : "Batch sent: %u messages, %u bytes", batch->count, batch->used);
290 : } else {
291 1 : PT_CTX_ERR(ctx, PT_LOG_CAT_PROTOCOL, "Batch send failed");
292 : }
293 : }
294 :
295 2 : return sent;
296 : }
297 :
298 : /* ========================================================================
299 : * Tier 2: Direct Buffer Send
300 : * ======================================================================== */
301 :
302 0 : int pt_drain_direct_buffer(struct pt_context *ctx, struct pt_peer *peer,
303 : pt_direct_send_fn send_fn) {
304 : pt_direct_buffer *buf;
305 : int result;
306 :
307 0 : if (!ctx || !peer || !send_fn) {
308 0 : return 0;
309 : }
310 :
311 0 : buf = &peer->send_direct;
312 :
313 : /* Check if there's data queued to send */
314 0 : if (!pt_direct_buffer_ready(buf)) {
315 0 : return 0;
316 : }
317 :
318 : /* Mark as sending */
319 0 : if (pt_direct_buffer_mark_sending(buf) != 0) {
320 0 : return 0;
321 : }
322 :
323 : /* Send via platform callback */
324 0 : result = send_fn(ctx, peer, buf->data, buf->length);
325 :
326 0 : if (result == 0) {
327 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
328 : "Tier 2: Sent %u bytes to peer %u",
329 : buf->length, peer->hot.id);
330 0 : pt_direct_buffer_complete(buf);
331 0 : return 1;
332 : } else {
333 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_SEND,
334 : "Tier 2: Send failed for peer %u (%u bytes)",
335 : peer->hot.id, buf->length);
336 : /* Mark complete even on error to allow retry with new data */
337 0 : pt_direct_buffer_complete(buf);
338 0 : return -1;
339 : }
340 : }
341 :
342 : /* ========================================================================
343 : * Phase 3.5: SendEx API - Priority-based Sending with Coalescing
344 : * ======================================================================== */
345 :
346 : /**
347 : * Send message to peer with extended options
348 : *
349 : * Routes message to appropriate transport and queue based on flags:
350 : * - PT_SEND_UNRELIABLE: Uses UDP if available, else TCP
351 : * - PT_SEND_COALESCABLE: Allows message coalescing with matching key
352 : * - PT_SEND_NO_DELAY: Disables Nagle algorithm for this message
353 : *
354 : * Priority determines queue placement (CRITICAL > HIGH > NORMAL > LOW).
355 : * Coalesce key enables deduplication of repeated messages (e.g., position updates).
356 : *
357 : * FRAGMENTATION: If the message exceeds the peer's negotiated max_message_size
358 : * and fragmentation is enabled, the SDK automatically fragments the message.
359 : * The receiver reassembles fragments transparently before delivering to the
360 : * application callback. App developers never see fragments.
361 : *
362 : * @param ctx Valid PeerTalk context
363 : * @param peer_id Destination peer ID
364 : * @param data Message data (not null)
365 : * @param length Message length (1-PT_MAX_MESSAGE bytes)
366 : * @param priority PT_PRIORITY_* constant
367 : * @param flags Bitmask of PT_SEND_* flags
368 : * @param coalesce_key Key for message coalescing (0 = no coalescing)
369 : *
370 : * @return PT_OK on success, PT_ERR_* on failure
371 : */
372 269 : PeerTalk_Error PeerTalk_SendEx(PeerTalk_Context *ctx_pub,
373 : PeerTalk_PeerID peer_id,
374 : const void *data,
375 : uint16_t length,
376 : uint8_t priority,
377 : uint8_t flags,
378 : uint16_t coalesce_key) {
379 269 : struct pt_context *ctx = (struct pt_context *)ctx_pub;
380 : struct pt_peer *peer;
381 : pt_queue *q;
382 : int result;
383 :
384 : /* Validate parameters */
385 269 : if (!ctx || ctx->magic != PT_CONTEXT_MAGIC) {
386 1 : return PT_ERR_INVALID_STATE;
387 : }
388 268 : if (!data || length == 0 || length > PT_MAX_MESSAGE_SIZE) {
389 5 : return PT_ERR_INVALID_PARAM;
390 : }
391 263 : if (priority > PT_PRIORITY_CRITICAL) {
392 2 : return PT_ERR_INVALID_PARAM;
393 : }
394 :
395 : /* Find peer by ID */
396 261 : peer = pt_peer_find_by_id(ctx, peer_id);
397 261 : if (!peer) {
398 2 : PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
399 : "SendEx failed: Peer %u not found", peer_id);
400 2 : return PT_ERR_PEER_NOT_FOUND;
401 : }
402 :
403 : /* ================================================================
404 : * FLOW CONTROL: Check peer's reported buffer pressure
405 : *
406 : * The peer reports its receive queue pressure via capability updates.
407 : * When peer reports high pressure, we throttle outgoing sends based
408 : * on message priority. This prevents overwhelming slow receivers.
409 : *
410 : * The throttling is transparent to the app - we return WOULD_BLOCK
411 : * so they can retry later. The SDK handles all the complexity.
412 : * ================================================================ */
413 259 : if (pt_peer_should_throttle(peer, priority)) {
414 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
415 : "Throttled: peer %u pressure=%u%%, priority=%u",
416 : peer_id, peer->cold.caps.buffer_pressure, priority);
417 0 : return PT_ERR_WOULD_BLOCK;
418 : }
419 :
420 : /* ================================================================
421 : * RATE LIMITING: Token bucket based on peer pressure
422 : *
423 : * When peer reports high buffer pressure, we automatically enable
424 : * rate limiting to prevent flooding. The rate is auto-adjusted
425 : * based on pressure level (set in pt_peer_update_adaptive_params).
426 : *
427 : * Returns PT_ERR_RATE_LIMITED so apps know to back off - this is
428 : * different from WOULD_BLOCK (buffer busy) vs rate limited (pacing).
429 : * ================================================================ */
430 259 : if (pt_peer_check_rate_limit(ctx, peer, length)) {
431 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
432 : "Rate limited: peer %u at %u KB/s, msg=%u bytes",
433 : peer_id, peer->cold.caps.rate_limit_bytes_per_sec / 1024, length);
434 0 : return PT_ERR_RATE_LIMITED;
435 : }
436 :
437 : /* ================================================================
438 : * FLOW CONTROL: Check send window based on peer's buffer capacity
439 : *
440 : * After capability exchange, we know the peer's recv_buffer_size.
441 : * We calculate a send_window = recv_buffer / max_message to limit
442 : * how many messages can be queued/in-flight simultaneously.
443 : *
444 : * This prevents flooding slow receivers (e.g., 68k Mac) faster
445 : * than they can process, which would cause severe send/recv
446 : * asymmetry and poor throughput.
447 : *
448 : * The window is checked against: pipeline.pending_count + queue.count
449 : *
450 : * CRITICAL priority bypasses this check - control messages (ACKs,
451 : * capability updates) must always get through regardless of flow
452 : * control state. Without this, protocol messages could be blocked
453 : * by their own flow control feedback.
454 : * ================================================================ */
455 259 : if (priority < PT_PRIORITY_CRITICAL &&
456 257 : peer->cold.caps.caps_exchanged && peer->cold.caps.send_window > 0) {
457 0 : uint16_t in_flight = peer->pipeline.pending_count;
458 0 : q = peer->send_queue;
459 0 : if (q) {
460 0 : in_flight += q->count;
461 : }
462 :
463 0 : if (in_flight >= peer->cold.caps.send_window) {
464 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
465 : "Flow control: peer %u at window limit (in_flight=%u, window=%u)",
466 : peer_id, in_flight, peer->cold.caps.send_window);
467 0 : return PT_ERR_WOULD_BLOCK;
468 : }
469 : }
470 :
471 : /* ================================================================
472 : * AUTOMATIC FRAGMENTATION
473 : *
474 : * If message exceeds peer's negotiated max and fragmentation is enabled,
475 : * split into fragments. Each fragment is queued separately and
476 : * reassembled by the receiver before delivery to app callback.
477 : *
478 : * PRESSURE-TRIGGERED FRAGMENTATION: When peer reports high buffer
479 : * pressure (>= PT_PRESSURE_FRAG_THRESHOLD), we proactively fragment
480 : * even messages that would fit, using a reduced max size. This gives
481 : * the receiver smaller chunks to process, reducing buffer pressure.
482 : *
483 : * The app developer never sees this - they just call PeerTalk_Send()
484 : * and the SDK handles everything.
485 : * ================================================================ */
486 : /* Log effective_max on first send to this peer for debugging */
487 259 : if (!peer->cold.caps.first_send_logged) {
488 22 : PT_CTX_INFO(ctx, PT_LOG_CAT_SEND,
489 : "First send to peer %u: effective_max=%u, fragmentation=%s",
490 : peer_id, peer->hot.effective_max_msg,
491 : ctx->enable_fragmentation ? "enabled" : "disabled");
492 22 : peer->cold.caps.first_send_logged = 1;
493 : }
494 :
495 : /* Determine if we need to fragment */
496 : {
497 259 : int needs_fragmentation = 0;
498 259 : uint16_t frag_max = peer->hot.effective_max_msg;
499 :
500 259 : if (ctx->enable_fragmentation && frag_max > 0) {
501 : /* Always fragment if over effective max */
502 196 : if (length > frag_max) {
503 0 : needs_fragmentation = 1;
504 : }
505 : /* Fragment large messages when peer is under pressure
506 : * Use configurable threshold from ctx->pressure_frag if available */
507 196 : else if (peer->cold.caps.buffer_pressure >= ctx->pressure_frag &&
508 : length > PT_PRESSURE_REDUCED_MAX) {
509 0 : needs_fragmentation = 1;
510 0 : frag_max = PT_PRESSURE_REDUCED_MAX; /* Use reduced max for this send */
511 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
512 : "Pressure-triggered fragmentation: peer %u at %u%% pressure, msg %u bytes",
513 : peer_id, peer->cold.caps.buffer_pressure, length);
514 : }
515 : }
516 :
517 259 : if (needs_fragmentation) {
518 :
519 0 : const uint8_t *src = (const uint8_t *)data;
520 : uint16_t max_frag_data;
521 0 : uint16_t offset = 0;
522 0 : uint16_t remaining = length;
523 : uint16_t msg_id;
524 : uint8_t frag_flags;
525 : int frag_result;
526 :
527 : /* Allocate unique message ID for this fragmented message */
528 0 : msg_id = (uint16_t)(ctx->next_message_id++ & 0xFFFF);
529 :
530 : /* Determine optimal fragment size:
531 : * 1. Use peer's optimal_chunk if negotiated (fills 25% of MacTCP buffer)
532 : * 2. Fall back to frag_max minus overhead (may be reduced due to pressure)
533 : * 3. Cap by queue slot size */
534 0 : if (peer->cold.caps.caps_exchanged && peer->cold.caps.optimal_chunk > 0 &&
535 0 : peer->cold.caps.optimal_chunk < frag_max) {
536 : /* Use peer's advertised optimal chunk for best receive performance */
537 0 : max_frag_data = peer->cold.caps.optimal_chunk;
538 : } else {
539 : /* Default: frag_max minus fragment header overhead
540 : * frag_max may be reduced from effective_max_msg due to pressure */
541 0 : max_frag_data = frag_max - PT_FRAGMENT_HEADER_SIZE;
542 : }
543 :
544 : /* Cap by frag_max (may be pressure-reduced) */
545 : {
546 0 : uint16_t max_peer = frag_max - PT_FRAGMENT_HEADER_SIZE;
547 0 : if (max_frag_data > max_peer) {
548 0 : max_frag_data = max_peer;
549 : }
550 : }
551 :
552 : /* Limit to queue slot size (fragment header + data must fit) */
553 : {
554 0 : uint16_t max_slot_data = PT_QUEUE_SLOT_SIZE - PT_FRAGMENT_HEADER_SIZE;
555 0 : if (max_frag_data > max_slot_data) {
556 0 : max_frag_data = max_slot_data;
557 : }
558 : }
559 :
560 0 : if (max_frag_data < 64) {
561 : /* Peer's max is too small for practical fragmentation */
562 0 : PT_CTX_ERR(ctx, PT_LOG_CAT_SEND,
563 : "Frag max %u too small for fragmentation",
564 : frag_max);
565 0 : return PT_ERR_MESSAGE_TOO_LARGE;
566 : }
567 :
568 0 : PT_CTX_INFO(ctx, PT_LOG_CAT_SEND,
569 : "Fragmenting %u bytes for peer %u (chunk=%u, max=%u, count=%u)",
570 : length, peer_id, max_frag_data, frag_max,
571 : (length + max_frag_data - 1) / max_frag_data);
572 :
573 0 : while (remaining > 0) {
574 0 : uint16_t frag_len = (remaining > max_frag_data) ? max_frag_data : remaining;
575 :
576 0 : frag_flags = 0;
577 0 : if (offset == 0) {
578 0 : frag_flags |= PT_FRAGMENT_FLAG_FIRST;
579 : }
580 0 : if (remaining <= max_frag_data) {
581 0 : frag_flags |= PT_FRAGMENT_FLAG_LAST;
582 : }
583 :
584 0 : frag_result = pt_send_fragment(ctx, peer, src + offset, frag_len,
585 : msg_id, length, offset, frag_flags,
586 : priority);
587 :
588 0 : if (frag_result != 0) {
589 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
590 : "Fragment send failed at offset %u: %d", offset, frag_result);
591 0 : return frag_result;
592 : }
593 :
594 0 : offset += frag_len;
595 0 : remaining -= frag_len;
596 :
597 : /* NOTE: For synchronous fragmentation, we send one fragment at a time.
598 : * The poll loop will drain the direct buffer before we can queue the next.
599 : * For now, return after first fragment - caller may need to retry for more.
600 : * A full implementation would queue all fragments atomically. */
601 0 : if (remaining > 0) {
602 : /* More fragments pending - let poll loop drain first fragment */
603 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
604 : "Fragment %u/%u queued, %u bytes remaining",
605 : (offset / max_frag_data), ((length + max_frag_data - 1) / max_frag_data),
606 : remaining);
607 : /* For now, continue - real implementation would need state machine */
608 : }
609 : }
610 :
611 0 : return PT_OK;
612 : }
613 : } /* End of fragmentation decision block */
614 :
615 : /* Route unreliable sends to UDP if available */
616 259 : if (flags & PT_SEND_UNRELIABLE) {
617 : /* Check if UDP is available on this platform */
618 1 : if (ctx->plat && ctx->plat->send_udp) {
619 1 : int udp_result = ctx->plat->send_udp(ctx, peer, data, length);
620 1 : if (udp_result == 0) {
621 1 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
622 : "Sent %u bytes to peer %u via UDP",
623 : length, peer_id);
624 1 : return PT_OK;
625 : }
626 : /* UDP failed, fall back to TCP */
627 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
628 : "UDP send failed, falling back to TCP");
629 : }
630 : /* No UDP available, fall through to TCP */
631 : }
632 :
633 : /* ================================================================
634 : * ASYNC SEND PIPELINE (MacTCP optimization)
635 : *
636 : * Try async TCP send if available and slots are free. This bypasses
637 : * the queue for immediate sends, keeping multiple operations in
638 : * flight for improved throughput (200-400% on 68k Macs).
639 : *
640 : * Conditions for async path:
641 : * - Platform supports tcp_send_async (MacTCP)
642 : * - Peer pipeline is initialized (CONNECTED state)
643 : * - At least one slot is available
644 : * - Message fits in a single frame (not fragmented above)
645 : *
646 : * Falls back to queue-based approach if async unavailable or full.
647 : * ================================================================ */
648 258 : if (ctx->plat && ctx->plat->tcp_send_async &&
649 0 : peer->pipeline.initialized &&
650 0 : peer->pipeline.pending_count < PT_SEND_PIPELINE_DEPTH) {
651 :
652 : /* Convert PT_SEND_* flags to PT_MSG_FLAG_* for protocol layer.
653 : * The values are designed to match for the common flags:
654 : * - PT_SEND_UNRELIABLE (0x01) -> PT_MSG_FLAG_UNRELIABLE (0x01)
655 : * - PT_SEND_COALESCABLE (0x02) -> PT_MSG_FLAG_COALESCABLE (0x02)
656 : * - PT_SEND_NO_DELAY (0x04) -> PT_MSG_FLAG_NO_DELAY (0x04)
657 : * The PT_MSG_FLAG_NO_DELAY flag is used to set pushFlag=1 in MacTCP. */
658 0 : uint8_t msg_flags = flags & 0x0F; /* Mask to low nibble (protocol flags) */
659 :
660 0 : int async_err = ctx->plat->tcp_send_async(ctx, peer, data, length, msg_flags);
661 0 : if (async_err == PT_OK) {
662 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
663 : "Async send: %u bytes to peer %u (slots used: %d/%d)",
664 : length, peer_id, peer->pipeline.pending_count,
665 : PT_SEND_PIPELINE_DEPTH);
666 0 : return PT_OK;
667 : }
668 0 : if (async_err != PT_ERR_WOULD_BLOCK) {
669 : /* Fatal error - don't fallback */
670 0 : return async_err;
671 : }
672 : /* WOULD_BLOCK means slots full - fall through to queue */
673 0 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
674 : "Async pipeline full (%d pending), falling back to queue",
675 : peer->pipeline.pending_count);
676 : }
677 :
678 : /* Two-tier routing: large messages go to Tier 2, small to Tier 1 */
679 : {
680 258 : uint16_t threshold = ctx->direct_threshold;
681 258 : if (threshold == 0) {
682 0 : threshold = PT_DIRECT_THRESHOLD;
683 : }
684 :
685 258 : if (length > threshold) {
686 : /* Tier 2: Direct buffer for large messages */
687 82 : result = pt_direct_buffer_queue(&peer->send_direct, data, length, priority);
688 82 : if (result == PT_ERR_WOULD_BLOCK) {
689 1 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
690 : "Tier 2 buffer busy for peer %u, caller should retry",
691 : peer_id);
692 1 : return PT_ERR_WOULD_BLOCK;
693 : }
694 81 : if (result != 0) {
695 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
696 : "Tier 2 queue failed for peer %u: %d", peer_id, result);
697 0 : return result;
698 : }
699 :
700 81 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
701 : "Tier 2: Queued %u bytes to peer %u (pri=%u)",
702 : length, peer_id, priority);
703 81 : return PT_OK;
704 : }
705 : }
706 :
707 : /* Tier 1: Queue for small messages */
708 176 : q = peer->send_queue;
709 176 : if (!q) {
710 2 : PT_CTX_ERR(ctx, PT_LOG_CAT_SEND,
711 : "SendEx failed: Peer %u has no send queue", peer_id);
712 2 : return PT_ERR_INVALID_STATE;
713 : }
714 :
715 : /* Check backpressure before queuing
716 : * pt_queue_pressure() returns 0-100 (percentage), not 0.0-1.0 */
717 : {
718 174 : uint8_t pressure = pt_queue_pressure(q);
719 174 : if (pressure >= 90) {
720 : /* Queue >90% full - reject LOW priority messages */
721 5 : if (priority == PT_PRIORITY_LOW) {
722 5 : PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
723 : "Queue pressure %u%% - rejecting LOW priority message",
724 : (unsigned)pressure);
725 5 : return PT_ERR_BUFFER_FULL;
726 : }
727 : }
728 169 : if (pressure >= 75) {
729 : /* Queue >75% full - reject NORMAL priority messages */
730 91 : if (priority == PT_PRIORITY_NORMAL) {
731 88 : PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
732 : "Queue pressure %u%% - rejecting NORMAL priority message",
733 : (unsigned)pressure);
734 88 : return PT_ERR_BUFFER_FULL;
735 : }
736 : }
737 : }
738 :
739 : /* Enqueue message with priority and optional coalescing */
740 81 : if (flags & PT_SEND_COALESCABLE && coalesce_key != 0) {
741 : /* Coalescing enabled - replace existing message with same key */
742 5 : result = pt_queue_push_coalesce(q, data, length, priority, coalesce_key);
743 : } else {
744 : /* Normal enqueue */
745 76 : result = pt_queue_push(ctx, q, data, length, priority, 0);
746 : }
747 :
748 81 : if (result != 0) {
749 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
750 : "Queue full - message dropped (peer=%u, len=%u, pri=%u)",
751 : peer_id, length, priority);
752 0 : return PT_ERR_BUFFER_FULL;
753 : }
754 :
755 : /* Flags are now handled by tcp_send_async (MacTCP pushFlag control)
756 : * and protocol layer (PT_MSG_FLAG_* in message headers). */
757 :
758 81 : PT_CTX_DEBUG(ctx, PT_LOG_CAT_SEND,
759 : "Queued %u bytes to peer %u (pri=%u, flags=0x%02X, key=%u)",
760 : length, peer_id, priority, flags, coalesce_key);
761 :
762 81 : return PT_OK;
763 : }
764 :
765 : /**
766 : * Send message to peer (simple wrapper)
767 : *
768 : * Sends with default priority (NORMAL), no special flags, and no coalescing.
769 : * Most applications should use this for typical reliable messaging.
770 : *
771 : * @param ctx Valid PeerTalk context
772 : * @param peer_id Destination peer ID
773 : * @param data Message data (not null)
774 : * @param length Message length (1-PT_MAX_MESSAGE bytes)
775 : *
776 : * @return PT_OK on success, PT_ERR_* on failure
777 : */
778 228 : PeerTalk_Error PeerTalk_Send(PeerTalk_Context *ctx_pub,
779 : PeerTalk_PeerID peer_id,
780 : const void *data,
781 : uint16_t length) {
782 : /* Delegate to SendEx with default parameters */
783 228 : return PeerTalk_SendEx(ctx_pub, peer_id, data, length,
784 : PT_PRIORITY_NORMAL,
785 : PT_SEND_DEFAULT,
786 : 0); /* No coalescing */
787 : }
|