Back to Site
Loading...
Searching...
No Matches
dht.h
Go to the documentation of this file.
1#pragma once
2
3#include "socket.h"
4#include "krpc.h"
5#include <string>
6#include <vector>
7#include <array>
8#include <unordered_map>
9#include <unordered_set>
10#include <functional>
11#include <thread>
12#include <mutex>
13#include <atomic>
14#include <chrono>
15#include <memory>
16#include <condition_variable>
17
18// Hash specialization for Peer and NodeId (must be defined before use in unordered_map/set)
19namespace std {
20 template<>
21 struct hash<librats::Peer> {
22 std::size_t operator()(const librats::Peer& peer) const noexcept {
23 std::hash<std::string> hasher;
24 return hasher(peer.ip + ":" + std::to_string(peer.port));
25 }
26 };
27
28 template<>
29 struct hash<array<uint8_t, 20>> {
30 std::size_t operator()(const array<uint8_t, 20>& id) const noexcept {
31 std::size_t seed = 0;
32 std::hash<uint8_t> hasher;
33 for (const auto& byte : id) {
34 seed ^= hasher(byte) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
35 }
36 return seed;
37 }
38 };
39}
40
41namespace librats {
42
43// Constants for Kademlia DHT
44constexpr size_t NODE_ID_SIZE = 20; // 160 bits = 20 bytes
45constexpr size_t K_BUCKET_SIZE = 8; // Maximum nodes per k-bucket
46constexpr size_t ALPHA = 3; // Concurrency parameter
47constexpr int DHT_PORT = 6881; // Standard BitTorrent DHT port
48
49using NodeId = std::array<uint8_t, NODE_ID_SIZE>;
50using InfoHash = std::array<uint8_t, NODE_ID_SIZE>;
51
56namespace SearchNodeFlags {
57 constexpr uint8_t QUERIED = 1 << 0; // Query has been sent to this node
58 constexpr uint8_t SHORT_TIMEOUT = 1 << 1; // Node exceeded short timeout (slot freed, still waiting)
59 constexpr uint8_t RESPONDED = 1 << 2; // Node successfully responded
60 constexpr uint8_t TIMED_OUT = 1 << 3; // Node fully timed out (failed)
61 constexpr uint8_t ABANDONED = 1 << 4; // Node was discarded during search truncation
62}
63
68struct DhtNode {
71 std::chrono::steady_clock::time_point last_seen;
72
73 // Round-trip time in milliseconds (0xffff = unknown, lower is better)
74 uint16_t rtt = 0xffff;
75
76 // Number of consecutive failures (0xff = never pinged, 0 = confirmed good)
77 uint8_t fail_count = 0xff;
78
79 DhtNode() : id(), last_seen(std::chrono::steady_clock::now()) {}
80 DhtNode(const NodeId& id, const Peer& peer)
81 : id(id), peer(peer), last_seen(std::chrono::steady_clock::now()) {}
82
83 // Has this node ever responded to us?
84 bool pinged() const { return fail_count != 0xff; }
85
86 // Is this node confirmed good? (responded with no recent failures)
87 bool confirmed() const { return fail_count == 0; }
88
89 // Mark node as failed (timed out)
90 void mark_failed() {
91 if (pinged() && fail_count < 0xfe) ++fail_count;
92 }
93
94 // Mark node as successful (responded)
95 void mark_success() {
96 fail_count = 0;
97 last_seen = std::chrono::steady_clock::now();
98 }
99
100 // Update RTT with exponential moving average
101 void update_rtt(uint16_t new_rtt) {
102 if (new_rtt == 0xffff) return;
103 if (rtt == 0xffff) {
104 rtt = new_rtt;
105 } else {
106 // Weighted average: 2/3 old + 1/3 new
107 rtt = static_cast<uint16_t>(rtt * 2 / 3 + new_rtt / 3);
108 }
109 }
110
111 // Compare nodes: "less" means "better" node
112 // Priority: confirmed > not confirmed, then lower fail_count, then lower RTT
113 bool is_worse_than(const DhtNode& other) const {
114 // Nodes with failures are worse
115 if (fail_count != other.fail_count) {
116 return fail_count > other.fail_count;
117 }
118 // Higher RTT is worse
119 return rtt > other.rtt;
120 }
121};
122
123
124
128using PeerDiscoveryCallback = std::function<void(const std::vector<Peer>& peers, const InfoHash& info_hash)>;
129
130#ifdef RATS_SEARCH_FEATURES
137using SpiderAnnounceCallback = std::function<void(const InfoHash& info_hash, const Peer& peer)>;
138#endif // RATS_SEARCH_FEATURES
139
145 std::vector<PeerDiscoveryCallback> callbacks;
146 std::vector<Peer> peers;
148 bool should_invoke = false;
149
150 void invoke() {
151 if (should_invoke) {
152 for (const auto& cb : callbacks) {
153 if (cb) cb(peers, info_hash);
154 }
155 }
156 }
157};
158
163public:
169 DhtClient(int port = DHT_PORT, const std::string& bind_address = "", const std::string& data_directory = "");
170
175
180 bool start();
181
185 void stop();
186
191
197 bool bootstrap(const std::vector<Peer>& bootstrap_nodes);
198
205 bool find_peers(const InfoHash& info_hash, PeerDiscoveryCallback callback);
206
214 bool announce_peer(const InfoHash& info_hash, uint16_t port = 0, PeerDiscoveryCallback callback = nullptr);
215
221 void cancel_search(const InfoHash& info_hash);
222
227 const NodeId& get_node_id() const { return node_id_; }
228
234
240
246 bool is_search_active(const InfoHash& info_hash) const;
247
253 bool is_announce_active(const InfoHash& info_hash) const;
254
260
266
271 bool is_running() const { return running_; }
272
273#ifdef RATS_SEARCH_FEATURES
274 // ============================================================================
275 // SPIDER MODE - Aggressive node discovery and announce collection
276 // ============================================================================
277
285 void set_spider_mode(bool enable);
286
291 bool is_spider_mode() const { return spider_mode_; }
292
299
305 void set_spider_ignore(bool ignore);
306
311 bool is_spider_ignoring() const { return spider_ignore_; }
312
319
324 size_t get_spider_pool_size() const;
325
331
337#endif // RATS_SEARCH_FEATURES
338
343 static std::vector<Peer> get_default_bootstrap_nodes();
344
350
356
361 void set_data_directory(const std::string& directory);
362
363private:
364 int port_;
365 std::string bind_address_;
366 std::string data_directory_;
367 NodeId node_id_;
368 socket_t socket_;
369 std::atomic<bool> running_;
370
371 // ============================================================================
372 // MUTEX LOCK ORDER - CRITICAL: Always acquire mutexes in this order to avoid deadlocks
373 // ============================================================================
374 // When acquiring multiple mutexes, ALWAYS follow this order:
375 //
376 // 1. pending_pings_mutex_ (Ping verification state, nodes_being_replaced_)
377 // 2. pending_searches_mutex_ (Search state and transaction mappings)
378 // 3. routing_table_mutex_ (core routing data)
379 // 4. spider_nodes_mutex_ (Spider mode: node pool and visited tracking) [RATS_SEARCH_FEATURES]
380 // 5. announced_peers_mutex_ (Stored peer data)
381 // 6. peer_tokens_mutex_ (Token validation data)
382 // 7. shutdown_mutex_ (Lowest priority - can be locked independently)
383 //
384 // Routing table (k-buckets)
385 std::vector<std::vector<DhtNode>> routing_table_;
386 mutable std::mutex routing_table_mutex_; // Lock order: 3
387
388 // Tokens for peers (use Peer directly as key for efficiency)
389 struct PeerToken {
390 std::string token;
391 std::chrono::steady_clock::time_point created_at;
392
393 PeerToken() : created_at(std::chrono::steady_clock::now()) {}
394 PeerToken(const std::string& t)
395 : token(t), created_at(std::chrono::steady_clock::now()) {}
396 };
397 std::unordered_map<Peer, PeerToken> peer_tokens_;
398 std::mutex peer_tokens_mutex_; // Lock order: 6
399
400
401
402 // Pending find_peers tracking (to map transaction IDs to info_hash)
403 struct PendingSearch {
404 InfoHash info_hash;
405 std::chrono::steady_clock::time_point created_at;
406
407 // Iterative search state - search_nodes is sorted by distance to info_hash (closest first)
408 std::vector<DhtNode> search_nodes;
409 std::vector<Peer> found_peers; // found peers for this search
410 // Single map tracking node states using SearchNodeFlags bitfield
411 // A node is "known" if it exists in this map (any flags set or value 0)
412 std::unordered_map<NodeId, uint8_t> node_states;
413
414 int invoke_count; // number of outstanding requests
415 int branch_factor; // adaptive concurrency limit (starts at ALPHA)
416 bool is_finished; // whether the search is finished
417
418 // Callbacks to invoke when peers are found (supports multiple concurrent searches for same info_hash)
419 std::vector<PeerDiscoveryCallback> callbacks;
420
421 // Announce support: tokens collected during traversal (BEP 5 compliant)
422 // Maps node_id -> write_token received from that node
423 std::unordered_map<NodeId, std::string> write_tokens;
424 bool is_announce; // true if this search is for announce_peer
425 uint16_t announce_port; // port to announce (only valid if is_announce)
426
427 PendingSearch(const InfoHash& hash)
428 : info_hash(hash), created_at(std::chrono::steady_clock::now()),
429 invoke_count(0), branch_factor(ALPHA), is_finished(false),
430 is_announce(false), announce_port(0) {}
431 };
432 std::unordered_map<std::string, PendingSearch> pending_searches_; // info_hash (hex) -> PendingSearch
433 mutable std::mutex pending_searches_mutex_; // Lock order: 2
434
435 // Transaction tracking with queried node info for proper responded_nodes tracking
436 struct SearchTransaction {
437 std::string info_hash_hex;
438 NodeId queried_node_id;
439 std::chrono::steady_clock::time_point sent_at;
440
441 SearchTransaction() = default;
442 SearchTransaction(const std::string& hash, const NodeId& id)
443 : info_hash_hex(hash), queried_node_id(id),
444 sent_at(std::chrono::steady_clock::now()) {}
445 };
446 std::unordered_map<std::string, SearchTransaction> transaction_to_search_; // transaction_id -> SearchTransaction
447
448 // Peer announcement storage (BEP 5 compliant)
449 struct AnnouncedPeer {
450 Peer peer;
451 std::chrono::steady_clock::time_point announced_at;
452
453 AnnouncedPeer(const Peer& p)
454 : peer(p), announced_at(std::chrono::steady_clock::now()) {}
455 };
456 // Map from info_hash (as hex string) to list of announced peers
457 std::unordered_map<std::string, std::vector<AnnouncedPeer>> announced_peers_;
458 std::mutex announced_peers_mutex_; // Lock order: 5
459
460 // Ping-before-replace eviction tracking (BEP 5 compliant)
461 // When a bucket is full and a new node wants to join:
462 // 1. We ping the WORST node in the bucket (highest RTT) to check if it's still alive
463 // 2. If old node responds -> keep it, discard candidate
464 // 3. If old node times out -> replace it with candidate
465 struct PingVerification {
466 DhtNode candidate_node; // The new node waiting to be added
467 DhtNode old_node; // The existing node we're pinging to verify it's alive
468 int bucket_index; // Which bucket this affects
469 std::chrono::steady_clock::time_point ping_sent_at;
470
471 PingVerification(const DhtNode& candidate, const DhtNode& old, int bucket_idx)
472 : candidate_node(candidate), old_node(old), bucket_index(bucket_idx),
473 ping_sent_at(std::chrono::steady_clock::now()) {}
474 };
475 std::unordered_map<std::string, PingVerification> pending_pings_; // transaction_id -> PingVerification
476 std::unordered_set<NodeId> nodes_being_replaced_; // Track old nodes that have pending ping verifications
477 mutable std::mutex pending_pings_mutex_; // Lock order: 1 (protects pending_pings_, nodes_being_replaced_)
478
479 // Network thread
480 std::thread network_thread_;
481 std::thread maintenance_thread_;
482
483 // Conditional variables for immediate shutdown
484 std::condition_variable shutdown_cv_;
485 std::mutex shutdown_mutex_; // Lock order: 7 (can be locked independently)
486
487#ifdef RATS_SEARCH_FEATURES
488 // Spider mode state
489 std::atomic<bool> spider_mode_{false};
490 std::atomic<bool> spider_ignore_{false}; // When true, ignore incoming requests
491 SpiderAnnounceCallback spider_announce_callback_;
492 std::mutex spider_callbacks_mutex_; // Protects spider callbacks
493
494 // Spider node tracking - separate from routing table to keep it stable
495 std::vector<DhtNode> spider_nodes_; // Pool of nodes for spider walking
496 std::unordered_set<NodeId> spider_visited_; // Nodes we've already queried in current session
497 mutable std::mutex spider_nodes_mutex_; // Lock order: 4 - Protects spider_nodes_, spider_visited_, spider_transactions_, spider_contacted_ips_
498 static constexpr size_t MAX_SPIDER_NODES = 2000; // Max nodes to keep in spider pool
499 static constexpr size_t MAX_SPIDER_VISITED = 10000; // Max visited nodes to track
500 static constexpr size_t MAX_SPIDER_CONTACTED_IPS = 10000; // Max contacted IPs to track
501
502 // Track transactions sent in spider mode to ensure responses go to spider pool
503 // even if spider_mode_ is disabled between request and response
504 std::unordered_map<std::string, std::chrono::steady_clock::time_point> spider_transactions_;
505
506 // Track IPs we've sent spider requests to
507 // Used to decide whether to respond with neighbor_id (spider contact) or real node_id (organic DHT)
508 std::unordered_set<std::string> spider_contacted_ips_;
509
510 // Spider helper methods
511 void add_spider_node(const DhtNode& node);
512 void add_spider_nodes(const std::vector<DhtNode>& nodes);
513 bool is_spider_node_visited(const NodeId& id) const;
514 void mark_spider_node_visited(const NodeId& id);
515 void cleanup_spider_state();
516 void cleanup_stale_spider_transactions();
517 void mark_spider_transaction(const std::string& transaction_id);
518 bool is_spider_transaction(const std::string& transaction_id); // Also removes if found
519 void mark_spider_contacted_ip(const std::string& ip);
520 bool is_spider_contacted_ip(const std::string& ip) const;
521#endif // RATS_SEARCH_FEATURES
522
523 // Helper functions
524 void network_loop();
525 void maintenance_loop();
526 void handle_message(const std::vector<uint8_t>& data, const Peer& sender);
527
528
529
530 // KRPC protocol handlers
531 void handle_krpc_message(const KrpcMessage& message, const Peer& sender);
532 void handle_krpc_ping(const KrpcMessage& message, const Peer& sender);
533 void handle_krpc_find_node(const KrpcMessage& message, const Peer& sender);
534 void handle_krpc_get_peers(const KrpcMessage& message, const Peer& sender);
535 void handle_krpc_announce_peer(const KrpcMessage& message, const Peer& sender);
536 void handle_krpc_response(const KrpcMessage& message, const Peer& sender);
537 void handle_krpc_error(const KrpcMessage& message, const Peer& sender);
538
539 // KRPC protocol sending
540 bool send_krpc_message(const KrpcMessage& message, const Peer& peer);
541 void send_krpc_ping(const Peer& peer);
542 void send_krpc_find_node(const Peer& peer, const NodeId& target);
543 void send_krpc_get_peers(const Peer& peer, const InfoHash& info_hash);
544 void send_krpc_announce_peer(const Peer& peer, const InfoHash& info_hash, uint16_t port, const std::string& token);
545
546 void add_node(const DhtNode& node, bool confirmed = true, bool no_verify = false);
547 std::vector<DhtNode> find_closest_nodes(const NodeId& target, size_t count = K_BUCKET_SIZE);
548 std::vector<DhtNode> find_closest_nodes_unlocked(const NodeId& target, size_t count = K_BUCKET_SIZE);
549 int get_bucket_index(const NodeId& id);
550
551 NodeId generate_node_id();
552 NodeId xor_distance(const NodeId& a, const NodeId& b);
553 bool is_closer(const NodeId& a, const NodeId& b, const NodeId& target);
554
562 NodeId neighbor_id(const NodeId& target) const;
563
564
565 std::string generate_token(const Peer& peer);
566 bool verify_token(const Peer& peer, const std::string& token);
567
568
569
570 void cleanup_stale_nodes();
571 void cleanup_stale_peer_tokens();
572 void refresh_buckets();
573 void print_statistics();
574
575 // Pending search management
576 void cleanup_stale_searches();
577 void cleanup_timed_out_search_requests();
578 void cleanup_search_node_states();
579 void handle_get_peers_response_for_search(const std::string& transaction_id, const Peer& responder, const std::vector<Peer>& peers);
580 void handle_get_peers_response_with_nodes(const std::string& transaction_id, const Peer& responder, const std::vector<KrpcNode>& nodes);
581 void handle_get_peers_empty_response(const std::string& transaction_id, const Peer& responder);
582 void save_write_token(PendingSearch& search, const NodeId& node_id, const std::string& token);
583 bool add_search_requests(PendingSearch& search, DeferredCallbacks& deferred);
584 void add_node_to_search(PendingSearch& search, const DhtNode& node);
585 void send_announce_to_closest_nodes(PendingSearch& search);
586
587 // Peer announcement storage management
588 void store_announced_peer(const InfoHash& info_hash, const Peer& peer);
589 std::vector<Peer> get_announced_peers(const InfoHash& info_hash);
590 void cleanup_stale_announced_peers();
591
592 // Ping-before-replace eviction management
593 void initiate_ping_verification(const DhtNode& candidate_node, const DhtNode& old_node, int bucket_index);
594 void handle_ping_verification_response(const std::string& transaction_id, const NodeId& responder_id, const Peer& responder);
595 void cleanup_stale_ping_verifications();
596 bool perform_replacement(const DhtNode& candidate_node, const DhtNode& node_to_replace, int bucket_index);
597
598 // Conversion utilities
599 static KrpcNode dht_node_to_krpc_node(const DhtNode& node);
600 static DhtNode krpc_node_to_dht_node(const KrpcNode& node);
601 static std::vector<KrpcNode> dht_nodes_to_krpc_nodes(const std::vector<DhtNode>& nodes);
602 static std::vector<DhtNode> krpc_nodes_to_dht_nodes(const std::vector<KrpcNode>& nodes);
603};
604
614NodeId string_to_node_id(const std::string& str);
615
621std::string node_id_to_string(const NodeId& id);
622
628NodeId hex_to_node_id(const std::string& hex);
629
635std::string node_id_to_hex(const NodeId& id);
636
637} // namespace librats
DHT Kademlia implementation.
Definition dht.h:162
bool save_routing_table()
Save routing table to disk.
void set_data_directory(const std::string &directory)
Set data directory for persistence.
size_t get_pending_ping_verifications_count() const
Get number of pending ping verifications.
void set_spider_mode(bool enable)
Enable spider mode In spider mode:
bool bootstrap(const std::vector< Peer > &bootstrap_nodes)
Bootstrap the DHT with known nodes.
bool is_running() const
Check if DHT is running.
Definition dht.h:271
void spider_walk()
Trigger a single spider walk iteration Sends find_node to a random node from the spider pool Should b...
~DhtClient()
Destructor.
bool is_search_active(const InfoHash &info_hash) const
Check if a search is currently active for an info hash.
size_t get_active_searches_count() const
Get number of active searches.
static std::vector< Peer > get_default_bootstrap_nodes()
Get default BitTorrent DHT bootstrap nodes.
bool announce_peer(const InfoHash &info_hash, uint16_t port=0, PeerDiscoveryCallback callback=nullptr)
Announce that this node is a peer for a specific info hash.
size_t get_spider_visited_count() const
Get the number of visited nodes in spider mode.
size_t get_spider_pool_size() const
Get the size of the spider node pool.
void stop()
Stop the DHT client.
size_t get_active_announces_count() const
Get number of active announces.
void shutdown_immediate()
Trigger immediate shutdown of all background threads.
bool is_spider_ignoring() const
Check if spider ignore mode is enabled.
Definition dht.h:311
bool start()
Start the DHT client.
bool find_peers(const InfoHash &info_hash, PeerDiscoveryCallback callback)
Find peers for a specific info hash.
bool is_announce_active(const InfoHash &info_hash) const
Check if an announce is currently active for an info hash.
bool load_routing_table()
Load routing table from disk.
bool is_spider_mode() const
Check if spider mode is enabled.
Definition dht.h:291
DhtClient(int port=DHT_PORT, const std::string &bind_address="", const std::string &data_directory="")
Constructor.
const NodeId & get_node_id() const
Get our node ID.
Definition dht.h:227
void set_spider_announce_callback(SpiderAnnounceCallback callback)
Set callback for announce_peer requests (spider mode) Called when other peers announce they have a to...
void clear_spider_state()
Clear spider state (pool and visited nodes) Useful for resetting the spider walk.
size_t get_routing_table_size() const
Get number of nodes in routing table.
void cancel_search(const InfoHash &info_hash)
Cancel an active search or announce for a specific info hash Should be called when a torrent is remov...
void set_spider_ignore(bool ignore)
Set spider ignore mode - when true, incoming requests are not processed Similar to rate limiting in t...
constexpr uint8_t ABANDONED
Definition dht.h:61
constexpr uint8_t RESPONDED
Definition dht.h:59
constexpr uint8_t TIMED_OUT
Definition dht.h:60
constexpr uint8_t QUERIED
Definition dht.h:57
constexpr uint8_t SHORT_TIMEOUT
Definition dht.h:58
std::function< void(const std::vector< Peer > &peers, const InfoHash &info_hash)> PeerDiscoveryCallback
Peer discovery callback.
Definition dht.h:128
constexpr int DHT_PORT
Definition dht.h:47
constexpr size_t NODE_ID_SIZE
Definition dht.h:44
NodeId hex_to_node_id(const std::string &hex)
Convert hex string to NodeId.
NodeId string_to_node_id(const std::string &str)
Utility functions.
constexpr size_t ALPHA
Definition dht.h:46
std::array< uint8_t, NODE_ID_SIZE > InfoHash
Definition dht.h:50
std::string node_id_to_string(const NodeId &id)
Convert NodeId to string.
std::array< uint8_t, NODE_ID_SIZE > NodeId
Definition dht.h:49
std::function< void(const InfoHash &info_hash, const Peer &peer)> SpiderAnnounceCallback
Spider mode callback Called when a peer announces they have a torrent (announce_peer request received...
Definition dht.h:137
std::string node_id_to_hex(const NodeId &id)
Convert NodeId to hex string.
constexpr size_t K_BUCKET_SIZE
Definition dht.h:45
STL namespace.
int socket_t
Definition socket.h:22
Deferred callbacks structure for avoiding deadlock Callbacks are collected while holding the mutex,...
Definition dht.h:144
std::vector< PeerDiscoveryCallback > callbacks
Definition dht.h:145
std::vector< Peer > peers
Definition dht.h:146
DHT Node information Based on libtorrent's node_entry with fail_count and RTT tracking.
Definition dht.h:68
uint8_t fail_count
Definition dht.h:77
Peer peer
Definition dht.h:70
bool is_worse_than(const DhtNode &other) const
Definition dht.h:113
NodeId id
Definition dht.h:69
void mark_success()
Definition dht.h:95
uint16_t rtt
Definition dht.h:74
bool pinged() const
Definition dht.h:84
void update_rtt(uint16_t new_rtt)
Definition dht.h:101
void mark_failed()
Definition dht.h:90
bool confirmed() const
Definition dht.h:87
DhtNode(const NodeId &id, const Peer &peer)
Definition dht.h:80
std::chrono::steady_clock::time_point last_seen
Definition dht.h:71
UDP peer information.
Definition socket.h:33
std::size_t operator()(const array< uint8_t, 20 > &id) const noexcept
Definition dht.h:30
std::size_t operator()(const librats::Peer &peer) const noexcept
Definition dht.h:22