Line data Source code
1 : /**
2 : * @file stream.c
3 : * @brief PeerTalk Streaming API Implementation
4 : *
5 : * Implements large data transfer bypassing the normal message queue.
6 : * Useful for log file transfers, state synchronization, etc.
7 : */
8 :
9 : #include "pt_internal.h"
10 : #include "peer.h"
11 : #include "protocol.h"
12 : #include "../../include/peertalk.h"
13 :
14 : /* ========================================================================== */
15 : /* Stream Send */
16 : /* ========================================================================== */
17 :
18 4 : PeerTalk_Error PeerTalk_StreamSend(
19 : PeerTalk_Context *ctx_pub,
20 : PeerTalk_PeerID peer_id,
21 : const void *data,
22 : uint32_t length,
23 : PeerTalk_StreamCompleteCB on_complete,
24 : void *user_data)
25 : {
26 4 : struct pt_context *ctx = (struct pt_context *)ctx_pub;
27 : struct pt_peer *peer;
28 :
29 : /* Validate context */
30 4 : if (!ctx || ctx->magic != PT_CONTEXT_MAGIC) {
31 1 : return PT_ERR_INVALID_STATE;
32 : }
33 :
34 : /* Validate data */
35 3 : if (!data || length == 0) {
36 2 : return PT_ERR_INVALID_PARAM;
37 : }
38 :
39 : /* Check max size */
40 1 : if (length > PT_MAX_STREAM_SIZE) {
41 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
42 : "Stream too large: %u > %u", length, PT_MAX_STREAM_SIZE);
43 0 : return PT_ERR_MESSAGE_TOO_LARGE;
44 : }
45 :
46 : /* Find peer */
47 1 : peer = pt_peer_find_by_id(ctx, peer_id);
48 1 : if (!peer) {
49 1 : return PT_ERR_PEER_NOT_FOUND;
50 : }
51 :
52 : /* Check peer is connected */
53 0 : if (peer->hot.state != PT_PEER_CONNECTED) {
54 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
55 : "Cannot stream to peer %u: not connected", peer_id);
56 0 : return PT_ERR_NOT_CONNECTED;
57 : }
58 :
59 : /* Check if stream already active */
60 0 : if (peer->stream.active) {
61 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
62 : "Stream already active for peer %u", peer_id);
63 0 : return PT_ERR_BUSY;
64 : }
65 :
66 : /* Initialize stream state */
67 0 : peer->stream.data = (const uint8_t *)data;
68 0 : peer->stream.total_length = length;
69 0 : peer->stream.bytes_sent = 0;
70 0 : peer->stream.on_complete = (void *)on_complete;
71 0 : peer->stream.user_data = user_data;
72 0 : peer->stream.cancelled = 0;
73 0 : peer->stream.active = 1;
74 :
75 0 : PT_CTX_INFO(ctx, PT_LOG_CAT_SEND,
76 : "Stream started for peer %u: %u bytes", peer_id, length);
77 :
78 0 : return PT_OK;
79 : }
80 :
81 : /* ========================================================================== */
82 : /* Stream Cancel */
83 : /* ========================================================================== */
84 :
85 2 : PeerTalk_Error PeerTalk_StreamCancel(
86 : PeerTalk_Context *ctx_pub,
87 : PeerTalk_PeerID peer_id)
88 : {
89 2 : struct pt_context *ctx = (struct pt_context *)ctx_pub;
90 : struct pt_peer *peer;
91 :
92 : /* Validate context */
93 2 : if (!ctx || ctx->magic != PT_CONTEXT_MAGIC) {
94 0 : return PT_ERR_INVALID_STATE;
95 : }
96 :
97 : /* Find peer */
98 2 : peer = pt_peer_find_by_id(ctx, peer_id);
99 2 : if (!peer) {
100 2 : return PT_ERR_PEER_NOT_FOUND;
101 : }
102 :
103 : /* Check if stream is active */
104 0 : if (!peer->stream.active) {
105 0 : return PT_ERR_NOT_FOUND;
106 : }
107 :
108 : /* Mark as cancelled - will be processed in next poll */
109 0 : peer->stream.cancelled = 1;
110 :
111 0 : PT_CTX_INFO(ctx, PT_LOG_CAT_SEND,
112 : "Stream cancel requested for peer %u", peer_id);
113 :
114 0 : return PT_OK;
115 : }
116 :
117 : /* ========================================================================== */
118 : /* Stream Active Check */
119 : /* ========================================================================== */
120 :
121 2 : int PeerTalk_StreamActive(
122 : PeerTalk_Context *ctx_pub,
123 : PeerTalk_PeerID peer_id)
124 : {
125 2 : struct pt_context *ctx = (struct pt_context *)ctx_pub;
126 : struct pt_peer *peer;
127 :
128 : /* Validate context */
129 2 : if (!ctx || ctx->magic != PT_CONTEXT_MAGIC) {
130 0 : return 0;
131 : }
132 :
133 : /* Find peer */
134 2 : peer = pt_peer_find_by_id(ctx, peer_id);
135 2 : if (!peer) {
136 2 : return 0;
137 : }
138 :
139 0 : return peer->stream.active ? 1 : 0;
140 : }
141 :
142 : /* ========================================================================== */
143 : /* Stream Poll (Internal) */
144 : /* ========================================================================== */
145 :
146 : /**
147 : * Process active stream for a peer
148 : *
149 : * Called from the poll loop to send the next chunk of stream data.
150 : * Uses the peer's effective_chunk size for optimal throughput.
151 : *
152 : * @param ctx PeerTalk context
153 : * @param peer Peer with active stream
154 : * @param send_func Platform-specific send function
155 : * @return 0 on success, negative on error
156 : */
157 317 : int pt_stream_poll(struct pt_context *ctx, struct pt_peer *peer,
158 : int (*send_func)(struct pt_context *, struct pt_peer *,
159 : const void *, size_t))
160 : {
161 317 : pt_peer_stream *stream = &peer->stream;
162 : PeerTalk_StreamCompleteCB on_complete;
163 : uint32_t chunk_size;
164 : uint32_t remaining;
165 : int result;
166 :
167 317 : if (!stream->active) {
168 317 : return 0; /* No active stream */
169 : }
170 :
171 : /* Check for cancellation */
172 0 : if (stream->cancelled) {
173 0 : on_complete = (PeerTalk_StreamCompleteCB)stream->on_complete;
174 0 : stream->active = 0;
175 :
176 0 : PT_CTX_INFO(ctx, PT_LOG_CAT_SEND,
177 : "Stream cancelled for peer %u at %u/%u bytes",
178 : peer->hot.id, stream->bytes_sent, stream->total_length);
179 :
180 0 : if (on_complete) {
181 0 : on_complete((PeerTalk_Context *)ctx, peer->hot.id,
182 : stream->bytes_sent, PT_ERR_CANCELLED, stream->user_data);
183 : }
184 0 : return 0;
185 : }
186 :
187 : /* Calculate chunk size - use adaptive chunk or default */
188 0 : chunk_size = peer->hot.effective_chunk;
189 0 : if (chunk_size == 0) {
190 0 : chunk_size = 1024; /* Default if not set */
191 : }
192 :
193 : /* Calculate remaining bytes */
194 0 : remaining = stream->total_length - stream->bytes_sent;
195 0 : if (chunk_size > remaining) {
196 0 : chunk_size = remaining;
197 : }
198 :
199 : /* Send next chunk */
200 0 : result = send_func(ctx, peer,
201 0 : stream->data + stream->bytes_sent,
202 : chunk_size);
203 :
204 0 : if (result == PT_ERR_WOULD_BLOCK) {
205 : /* Socket buffer full - try again next poll */
206 0 : return 0;
207 : }
208 :
209 0 : if (result < 0) {
210 : /* Send failed - abort stream */
211 0 : on_complete = (PeerTalk_StreamCompleteCB)stream->on_complete;
212 0 : stream->active = 0;
213 :
214 0 : PT_CTX_WARN(ctx, PT_LOG_CAT_SEND,
215 : "Stream failed for peer %u: error %d at %u/%u bytes",
216 : peer->hot.id, result, stream->bytes_sent, stream->total_length);
217 :
218 0 : if (on_complete) {
219 0 : on_complete((PeerTalk_Context *)ctx, peer->hot.id,
220 : stream->bytes_sent, (PeerTalk_Error)result, stream->user_data);
221 : }
222 0 : return result;
223 : }
224 :
225 : /* Update bytes sent */
226 0 : stream->bytes_sent += chunk_size;
227 :
228 : /* Check if complete */
229 0 : if (stream->bytes_sent >= stream->total_length) {
230 0 : on_complete = (PeerTalk_StreamCompleteCB)stream->on_complete;
231 0 : stream->active = 0;
232 :
233 0 : PT_CTX_INFO(ctx, PT_LOG_CAT_SEND,
234 : "Stream complete for peer %u: %u bytes",
235 : peer->hot.id, stream->bytes_sent);
236 :
237 0 : if (on_complete) {
238 0 : on_complete((PeerTalk_Context *)ctx, peer->hot.id,
239 : stream->bytes_sent, PT_OK, stream->user_data);
240 : }
241 : }
242 :
243 0 : return 0;
244 : }
|