LCOV - code coverage report
Current view: top level - core - stream.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 26 83 31.3 %
Date: 2026-02-22 12:14:12 Functions: 4 4 100.0 %

          Line data    Source code
       1             : /**
       2             :  * @file stream.c
       3             :  * @brief PeerTalk Streaming API Implementation
       4             :  *
       5             :  * Implements large data transfer bypassing the normal message queue.
       6             :  * Useful for log file transfers, state synchronization, etc.
       7             :  */
       8             : 
       9             : #include "pt_internal.h"
      10             : #include "peer.h"
      11             : #include "protocol.h"
      12             : #include "../../include/peertalk.h"
      13             : 
      14             : /* ========================================================================== */
      15             : /* Stream Send                                                                */
      16             : /* ========================================================================== */
      17             : 
      18           4 : PeerTalk_Error PeerTalk_StreamSend(
      19             :     PeerTalk_Context *ctx_pub,
      20             :     PeerTalk_PeerID peer_id,
      21             :     const void *data,
      22             :     uint32_t length,
      23             :     PeerTalk_StreamCompleteCB on_complete,
      24             :     void *user_data)
      25             : {
      26           4 :     struct pt_context *ctx = (struct pt_context *)ctx_pub;
      27             :     struct pt_peer *peer;
      28             : 
      29             :     /* Validate context */
      30           4 :     if (!ctx || ctx->magic != PT_CONTEXT_MAGIC) {
      31           1 :         return PT_ERR_INVALID_STATE;
      32             :     }
      33             : 
      34             :     /* Validate data */
      35           3 :     if (!data || length == 0) {
      36           2 :         return PT_ERR_INVALID_PARAM;
      37             :     }
      38             : 
      39             :     /* Check max size */
      40           1 :     if (length > PT_MAX_STREAM_SIZE) {
      41           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
      42             :             "Stream too large: %u > %u", length, PT_MAX_STREAM_SIZE);
      43           0 :         return PT_ERR_MESSAGE_TOO_LARGE;
      44             :     }
      45             : 
      46             :     /* Find peer */
      47           1 :     peer = pt_peer_find_by_id(ctx, peer_id);
      48           1 :     if (!peer) {
      49           1 :         return PT_ERR_PEER_NOT_FOUND;
      50             :     }
      51             : 
      52             :     /* Check peer is connected */
      53           0 :     if (peer->hot.state != PT_PEER_CONNECTED) {
      54           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
      55             :             "Cannot stream to peer %u: not connected", peer_id);
      56           0 :         return PT_ERR_NOT_CONNECTED;
      57             :     }
      58             : 
      59             :     /* Check if stream already active */
      60           0 :     if (peer->stream.active) {
      61           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
      62             :             "Stream already active for peer %u", peer_id);
      63           0 :         return PT_ERR_BUSY;
      64             :     }
      65             : 
      66             :     /* Initialize stream state */
      67           0 :     peer->stream.data = (const uint8_t *)data;
      68           0 :     peer->stream.total_length = length;
      69           0 :     peer->stream.bytes_sent = 0;
      70           0 :     peer->stream.on_complete = (void *)on_complete;
      71           0 :     peer->stream.user_data = user_data;
      72           0 :     peer->stream.cancelled = 0;
      73           0 :     peer->stream.active = 1;
      74             : 
      75           0 :     PT_CTX_INFO(ctx, PT_LOG_CAT_SEND,
      76             :         "Stream started for peer %u: %u bytes", peer_id, length);
      77             : 
      78           0 :     return PT_OK;
      79             : }
      80             : 
      81             : /* ========================================================================== */
      82             : /* Stream Cancel                                                              */
      83             : /* ========================================================================== */
      84             : 
      85           2 : PeerTalk_Error PeerTalk_StreamCancel(
      86             :     PeerTalk_Context *ctx_pub,
      87             :     PeerTalk_PeerID peer_id)
      88             : {
      89           2 :     struct pt_context *ctx = (struct pt_context *)ctx_pub;
      90             :     struct pt_peer *peer;
      91             : 
      92             :     /* Validate context */
      93           2 :     if (!ctx || ctx->magic != PT_CONTEXT_MAGIC) {
      94           0 :         return PT_ERR_INVALID_STATE;
      95             :     }
      96             : 
      97             :     /* Find peer */
      98           2 :     peer = pt_peer_find_by_id(ctx, peer_id);
      99           2 :     if (!peer) {
     100           2 :         return PT_ERR_PEER_NOT_FOUND;
     101             :     }
     102             : 
     103             :     /* Check if stream is active */
     104           0 :     if (!peer->stream.active) {
     105           0 :         return PT_ERR_NOT_FOUND;
     106             :     }
     107             : 
     108             :     /* Mark as cancelled - will be processed in next poll */
     109           0 :     peer->stream.cancelled = 1;
     110             : 
     111           0 :     PT_CTX_INFO(ctx, PT_LOG_CAT_SEND,
     112             :         "Stream cancel requested for peer %u", peer_id);
     113             : 
     114           0 :     return PT_OK;
     115             : }
     116             : 
     117             : /* ========================================================================== */
     118             : /* Stream Active Check                                                        */
     119             : /* ========================================================================== */
     120             : 
     121           2 : int PeerTalk_StreamActive(
     122             :     PeerTalk_Context *ctx_pub,
     123             :     PeerTalk_PeerID peer_id)
     124             : {
     125           2 :     struct pt_context *ctx = (struct pt_context *)ctx_pub;
     126             :     struct pt_peer *peer;
     127             : 
     128             :     /* Validate context */
     129           2 :     if (!ctx || ctx->magic != PT_CONTEXT_MAGIC) {
     130           0 :         return 0;
     131             :     }
     132             : 
     133             :     /* Find peer */
     134           2 :     peer = pt_peer_find_by_id(ctx, peer_id);
     135           2 :     if (!peer) {
     136           2 :         return 0;
     137             :     }
     138             : 
     139           0 :     return peer->stream.active ? 1 : 0;
     140             : }
     141             : 
     142             : /* ========================================================================== */
     143             : /* Stream Poll (Internal)                                                     */
     144             : /* ========================================================================== */
     145             : 
     146             : /**
     147             :  * Process active stream for a peer
     148             :  *
     149             :  * Called from the poll loop to send the next chunk of stream data.
     150             :  * Uses the peer's effective_chunk size for optimal throughput.
     151             :  *
     152             :  * @param ctx   PeerTalk context
     153             :  * @param peer  Peer with active stream
     154             :  * @param send_func Platform-specific send function
     155             :  * @return 0 on success, negative on error
     156             :  */
     157         317 : int pt_stream_poll(struct pt_context *ctx, struct pt_peer *peer,
     158             :                    int (*send_func)(struct pt_context *, struct pt_peer *,
     159             :                                     const void *, size_t))
     160             : {
     161         317 :     pt_peer_stream *stream = &peer->stream;
     162             :     PeerTalk_StreamCompleteCB on_complete;
     163             :     uint32_t chunk_size;
     164             :     uint32_t remaining;
     165             :     int result;
     166             : 
     167         317 :     if (!stream->active) {
     168         317 :         return 0;  /* No active stream */
     169             :     }
     170             : 
     171             :     /* Check for cancellation */
     172           0 :     if (stream->cancelled) {
     173           0 :         on_complete = (PeerTalk_StreamCompleteCB)stream->on_complete;
     174           0 :         stream->active = 0;
     175             : 
     176           0 :         PT_CTX_INFO(ctx, PT_LOG_CAT_SEND,
     177             :             "Stream cancelled for peer %u at %u/%u bytes",
     178             :             peer->hot.id, stream->bytes_sent, stream->total_length);
     179             : 
     180           0 :         if (on_complete) {
     181           0 :             on_complete((PeerTalk_Context *)ctx, peer->hot.id,
     182             :                        stream->bytes_sent, PT_ERR_CANCELLED, stream->user_data);
     183             :         }
     184           0 :         return 0;
     185             :     }
     186             : 
     187             :     /* Calculate chunk size - use adaptive chunk or default */
     188           0 :     chunk_size = peer->hot.effective_chunk;
     189           0 :     if (chunk_size == 0) {
     190           0 :         chunk_size = 1024;  /* Default if not set */
     191             :     }
     192             : 
     193             :     /* Calculate remaining bytes */
     194           0 :     remaining = stream->total_length - stream->bytes_sent;
     195           0 :     if (chunk_size > remaining) {
     196           0 :         chunk_size = remaining;
     197             :     }
     198             : 
     199             :     /* Send next chunk */
     200           0 :     result = send_func(ctx, peer,
     201           0 :                        stream->data + stream->bytes_sent,
     202             :                        chunk_size);
     203             : 
     204           0 :     if (result == PT_ERR_WOULD_BLOCK) {
     205             :         /* Socket buffer full - try again next poll */
     206           0 :         return 0;
     207             :     }
     208             : 
     209           0 :     if (result < 0) {
     210             :         /* Send failed - abort stream */
     211           0 :         on_complete = (PeerTalk_StreamCompleteCB)stream->on_complete;
     212           0 :         stream->active = 0;
     213             : 
     214           0 :         PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
     215             :             "Stream failed for peer %u: error %d at %u/%u bytes",
     216             :             peer->hot.id, result, stream->bytes_sent, stream->total_length);
     217             : 
     218           0 :         if (on_complete) {
     219           0 :             on_complete((PeerTalk_Context *)ctx, peer->hot.id,
     220             :                        stream->bytes_sent, (PeerTalk_Error)result, stream->user_data);
     221             :         }
     222           0 :         return result;
     223             :     }
     224             : 
     225             :     /* Update bytes sent */
     226           0 :     stream->bytes_sent += chunk_size;
     227             : 
     228             :     /* Check if complete */
     229           0 :     if (stream->bytes_sent >= stream->total_length) {
     230           0 :         on_complete = (PeerTalk_StreamCompleteCB)stream->on_complete;
     231           0 :         stream->active = 0;
     232             : 
     233           0 :         PT_CTX_INFO(ctx, PT_LOG_CAT_SEND,
     234             :             "Stream complete for peer %u: %u bytes",
     235             :             peer->hot.id, stream->bytes_sent);
     236             : 
     237           0 :         if (on_complete) {
     238           0 :             on_complete((PeerTalk_Context *)ctx, peer->hot.id,
     239             :                        stream->bytes_sent, PT_OK, stream->user_data);
     240             :         }
     241             :     }
     242             : 
     243           0 :     return 0;
     244             : }

Generated by: LCOV version 1.14