Back to Site
Loading...
Searching...
No Matches
file_transfer.h
Go to the documentation of this file.
1#pragma once
2
43#include "node/peer_network.h"
44#include "peer/peer.h"
45#include "core/bytes.h"
46#include "peer/peer_id.h"
47
48extern "C" {
49#include "sha256.h"
50}
51
52#include <atomic>
53#include <chrono>
54#include <condition_variable>
55#include <cstdint>
56#include <functional>
57#include <memory>
58#include <mutex>
59#include <queue>
60#include <string>
61#include <thread>
62#include <unordered_map>
63#include <vector>
64
65namespace librats {
66
67class FileTransfer final : public Subsystem {
68public:
69 struct Config {
70 uint32_t chunk_size = 64 * 1024;
71 uint32_t window_bytes = 4 * 1024 * 1024;
72 uint32_t progress_interval = 256 * 1024;
73 uint32_t transfer_timeout_secs = 60;
74 uint32_t worker_threads = 4;
75 bool verify_integrity = true;
76 std::string temp_directory = ".";
77 };
78
80 enum class Direction { Sending, Receiving };
81
83 struct FileEntry {
84 std::string relative_path;
85 uint64_t size = 0;
86 };
87
89 struct Offer {
91 uint64_t id = 0;
92 std::string name;
93 uint64_t size = 0;
94 bool is_directory = false;
95 std::vector<FileEntry> files;
96 };
97
99 struct Progress {
100 uint64_t id = 0;
104 uint64_t bytes_transferred = 0;
105 uint64_t total_bytes = 0;
106 uint32_t files_completed = 0;
107 uint32_t total_files = 0;
108
109 double transfer_rate_bps = 0.0;
110 double average_rate_bps = 0.0;
111 std::chrono::milliseconds elapsed{0};
112 std::chrono::milliseconds estimated_time_remaining{0};
113
115 double percent() const {
116 if (total_bytes == 0) return status == Status::Completed ? 100.0 : 0.0;
117 return static_cast<double>(bytes_transferred) / static_cast<double>(total_bytes) * 100.0;
118 }
119 };
120
122 struct Stats {
123 uint64_t bytes_sent = 0, bytes_received = 0;
124 uint64_t completed = 0, failed = 0;
125 };
126
127 using OfferHandler = std::function<void(const Offer&)>;
128 using ProgressHandler = std::function<void(const Progress&)>;
129 using CompleteHandler = std::function<void(uint64_t id, bool success, const std::string& path)>;
130
131 explicit FileTransfer(std::string temp_dir = ".");
132 explicit FileTransfer(Config config);
133 ~FileTransfer() override;
134
135 void on_offer(OfferHandler handler) { offer_handler_ = std::move(handler); }
136 void on_progress(ProgressHandler handler) { progress_handler_ = std::move(handler); }
137 void on_complete(CompleteHandler handler) { complete_handler_ = std::move(handler); }
138
140 uint64_t send_file(const PeerId& to, const std::string& path);
142 uint64_t send_directory(const PeerId& to, const std::string& dir_path);
143
146 void accept(const PeerId& from, uint64_t id, const std::string& dest_path);
147 void reject(const PeerId& from, uint64_t id);
148
150 bool cancel(const PeerId& peer, uint64_t id);
151 bool pause(const PeerId& peer, uint64_t id);
152 bool resume(const PeerId& peer, uint64_t id);
153
154 Stats stats() const;
155
159 static bool is_safe_relative_path(const std::string& p);
160
161 void attach(NodeContext& ctx) override;
162 void start() override;
163 void stop() override;
164
165private:
166 // Smoothed throughput + elapsed/ETA tracking for one transfer. Accessed only
167 // under the owning transfer's mutex. The clock starts lazily on the first
168 // sample (first byte activity); a long gap (pause/idle) is treated as a
169 // discontinuity so it doesn't register as a throughput dip.
170 struct RateTracker {
171 using clock = std::chrono::steady_clock;
172 clock::time_point start{};
173 clock::time_point mark{};
174 uint64_t mark_bytes = 0;
175 double rate_bps = 0.0;
176
177 void sample(uint64_t bytes, clock::time_point now) {
178 constexpr int64_t kMinMs = 250, kMaxMs = 2000;
179 constexpr double kAlpha = 0.4;
180 if (start == clock::time_point{}) { start = mark = now; mark_bytes = bytes; return; }
181 const int64_t dt = std::chrono::duration_cast<std::chrono::milliseconds>(now - mark).count();
182 if (dt < kMinMs) return; // too soon: hold the prior rate
183 if (dt > kMaxMs) { mark = now; mark_bytes = bytes; return; } // gap: reset, don't pollute
184 const double inst = static_cast<double>(bytes - mark_bytes) * 1000.0 / static_cast<double>(dt);
185 rate_bps = rate_bps <= 0.0 ? inst : kAlpha * inst + (1.0 - kAlpha) * rate_bps;
186 mark = now; mark_bytes = bytes;
187 }
188
189 void fill(Progress& p, uint64_t bytes, uint64_t total, clock::time_point now) const {
190 if (start == clock::time_point{}) return; // not live yet
191 const auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now - start);
192 p.elapsed = ms;
193 p.transfer_rate_bps = rate_bps;
194 if (ms.count() > 0)
195 p.average_rate_bps = static_cast<double>(bytes) * 1000.0 / static_cast<double>(ms.count());
196 if (rate_bps > 1.0 && total > bytes)
197 p.estimated_time_remaining = std::chrono::milliseconds(
198 static_cast<int64_t>(static_cast<double>(total - bytes) / rate_bps * 1000.0));
199 }
200 };
201
202 // ── Per-transfer state (held via shared_ptr so workers/handlers keep it
203 // alive past map removal) ──────────────────────────────────────────────
204 struct Outgoing {
205 uint64_t id = 0;
206 PeerId peer;
207 std::string name;
208 std::string root;
209 bool is_directory = false;
210 std::vector<FileEntry> files;
211 std::vector<std::string> sources;
212 uint64_t total_bytes = 0;
213
214 std::mutex mtx;
215 std::condition_variable cv;
216 size_t cur_file = 0;
217 uint64_t cur_offset = 0;
218 uint64_t bytes_done = 0;
219 uint64_t acked = 0;
220 uint32_t files_done = 0;
221 Status status = Status::Pending;
222 bool worker_active = false;
223 bool finished = false;
224 sha256_context_t hash{};
225 std::chrono::steady_clock::time_point last_activity{};
226 RateTracker rate;
227 };
228
229 struct IncomingFile {
230 std::string relative_path;
231 uint64_t size = 0;
232 std::string final_path;
233 std::string temp_path;
234 uint64_t received = 0;
235 bool temp_created = false;
236 bool sha_known = false;
237 bool finalized = false;
238 uint8_t expected_sha[SHA256_HASH_SIZE]{};
239 };
240 struct Incoming {
241 uint64_t id = 0;
242 PeerId peer;
243 std::string name;
244 bool is_directory = false;
245 std::string dest_root;
246 std::vector<IncomingFile> files;
247
248 std::mutex mtx;
249 size_t recv_file = 0;
250 uint64_t bytes_done = 0;
251 uint64_t last_ack = 0;
252 uint32_t files_done = 0;
253 Status status = Status::Pending;
254 bool finished = false;
255 sha256_context_t hash{};
256 uint8_t computed_sha[SHA256_HASH_SIZE]{};
257 std::chrono::steady_clock::time_point last_activity{};
258 RateTracker rate;
259 };
260
261 // ── message handling (reactor thread) ─────────────────────────────────────
262 void on_message(const Peer& peer, ByteView payload);
263 void handle_offer(const PeerId& from, uint64_t id, bool is_dir, uint64_t total,
264 std::string name, std::vector<FileEntry> files);
265 void handle_chunk(const PeerId& from, uint64_t id, uint32_t fidx, uint64_t offset,
266 uint32_t crc, ByteView data);
267 void handle_file_end(const PeerId& from, uint64_t id, uint32_t fidx, const uint8_t* sha);
268
269 // ── sending ──────────────────────────────────────────────────────────────
270 uint64_t start_send(std::shared_ptr<Outgoing> t);
271 void queue_send(uint64_t id);
272 void worker_loop();
273 void run_send(const std::shared_ptr<Outgoing>& t);
274
275 // ── receiving ────────────────────────────────────────────────────────────
276 void try_finalize_file(const std::shared_ptr<Incoming>& t, size_t file_index);
277
278 // ── lifecycle helpers ─────────────────────────────────────────────────────
279 void maintenance_loop();
280 void finish_outgoing(const std::shared_ptr<Outgoing>& t, bool success);
281 void finish_incoming(const std::shared_ptr<Incoming>& t, bool success, const std::string& error);
282 void emit_progress(const std::shared_ptr<Outgoing>& t);
283 void emit_progress(const std::shared_ptr<Incoming>& t);
284
285 std::shared_ptr<Outgoing> find_outgoing(uint64_t id) const;
286 std::shared_ptr<Incoming> find_incoming(const PeerId& peer, uint64_t id) const;
287
288 void send_to(const PeerId& peer, const Bytes& msg) {
289 if (network_) network_->send(peer, MessageType::FileChunk, ByteView(msg));
290 }
291 void send_simple(const PeerId& peer, uint8_t op, uint64_t id);
292 void send_complete(const PeerId& peer, uint64_t id, bool ok);
293
294 PeerNetwork* network_ = nullptr;
295 Config config_;
296 std::atomic<uint64_t> next_id_{1};
297 std::atomic<bool> running_{false};
298
299 OfferHandler offer_handler_;
300 ProgressHandler progress_handler_;
301 CompleteHandler complete_handler_;
302
303 mutable std::mutex mutex_;
304 std::unordered_map<uint64_t, std::shared_ptr<Outgoing>> outgoing_;
305 std::unordered_map<PeerId, std::unordered_map<uint64_t, std::shared_ptr<Incoming>>,
306 PeerId::Hash> incoming_;
307
308 // worker pool + send queue
309 std::vector<std::thread> workers_;
310 std::mutex queue_mutex_;
311 std::condition_variable queue_cv_;
312 std::queue<uint64_t> send_queue_;
313
314 // maintenance (idle timeout / purge)
315 std::thread maintenance_thread_;
316 std::mutex maintenance_mutex_;
317 std::condition_variable maintenance_cv_;
318
319 mutable std::mutex stats_mutex_;
320 Stats stats_;
321};
322
323} // namespace librats
FileTransfer(std::string temp_dir=".")
Stats stats() const
void start() override
static bool is_safe_relative_path(const std::string &p)
A relative path from a peer's directory manifest is safe only if it stays inside the destination: non...
uint64_t send_directory(const PeerId &to, const std::string &dir_path)
Offer a directory tree. Returns the transfer id (0 if the dir is unusable).
uint64_t send_file(const PeerId &to, const std::string &path)
Offer a single file. Returns the transfer id (0 if the file is unusable).
void on_progress(ProgressHandler handler)
void stop() override
bool cancel(const PeerId &peer, uint64_t id)
Control a live transfer (works from either side); (peer, id) names it.
std::function< void(const Offer &)> OfferHandler
bool resume(const PeerId &peer, uint64_t id)
std::function< void(uint64_t id, bool success, const std::string &path)> CompleteHandler
bool pause(const PeerId &peer, uint64_t id)
void on_offer(OfferHandler handler)
~FileTransfer() override
std::function< void(const Progress &)> ProgressHandler
void accept(const PeerId &from, uint64_t id, const std::string &dest_path)
Accept an offered transfer.
void on_complete(CompleteHandler handler)
void reject(const PeerId &from, uint64_t id)
void attach(NodeContext &ctx) override
FileTransfer(Config config)
virtual void send(const PeerId &to, MessageType type, ByteView payload)=0
A pluggable network subsystem.
Definition node.h:65
A lightweight handle to a connected peer.
Self-certifying peer identity.
The narrow contract a subsystem needs from the node — and nothing more.
uint32_t progress_interval
receiver acks every N bytes
bool verify_integrity
per-chunk CRC32 + whole-file SHA-256
std::string temp_directory
holds in-progress downloads
uint32_t worker_threads
concurrent outgoing transfers
uint32_t chunk_size
payload bytes per chunk
uint32_t transfer_timeout_secs
abort a transfer idle this long
uint32_t window_bytes
max un-acked bytes in flight
One file inside a transfer (a single-file transfer has exactly one).
std::string relative_path
POSIX path relative to the transfer root.
Delivered to the offer callback so the app can accept() or reject().
std::vector< FileEntry > files
full manifest
std::string name
file or directory name
uint64_t size
total size across all files
Snapshot passed to the progress callback (both directions).
double average_rate_bps
mean throughput since the transfer went live
double percent() const
Completion in [0, 100].
std::chrono::milliseconds elapsed
time since it went live
double transfer_rate_bps
recent (smoothed) throughput, bytes/sec
std::chrono::milliseconds estimated_time_remaining
ETA at the recent rate (0 = unknown)